timegen_every.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from __future__ import print_function
  2. import logging
  3. log = logging.getLogger('time-every')
  4. from datetime import datetime, timedelta
  5. from pytimeparse.timeparse import timeparse
  6. def getdate(val):
  7. if type(val) is int:
  8. return datetime.fromtimestamp(val)
  9. return val
  10. class Alarm(object):
  11. def __init__(self):
  12. pass
  13. def next_ring(self, current_time=None):
  14. '''if current_time is None, it is now()
  15. returns the next time it will ring; or None if it will not anymore
  16. '''
  17. raise NotImplementedError()
  18. def has_ring(self, time=None):
  19. '''returns True IFF the alarm will ring exactly at ``time``'''
  20. raise NotImplementedError()
  21. def all_rings(self, current_time=None):
  22. '''
  23. all future rings
  24. this, of course, is an iterator (they could be infinite)
  25. '''
  26. ring = self.next_ring(current_time)
  27. while ring is not None:
  28. yield ring
  29. ring = self.next_ring(ring)
  30. class SingleAlarm(Alarm):
  31. '''
  32. rings a single time
  33. '''
  34. description = 'Only once, at a specified date and time'
  35. def __init__(self, obj):
  36. self.dt = getdate(obj['timestamp'])
  37. def next_ring(self, current_time=None):
  38. '''if current_time is None, it is now()'''
  39. if current_time is None:
  40. current_time = datetime.now()
  41. if current_time >= self.dt:
  42. return None
  43. return self.dt
  44. def has_ring(self, current_time=None):
  45. if current_time is None:
  46. current_time = datetime.now()
  47. return current_time == self.dt
  48. class FrequencyAlarm(Alarm):
  49. '''
  50. rings on {t | exists a k integer >= 0 s.t. t = start+k*t, start<t<end}
  51. '''
  52. description = 'Events at a specified frequency. Example: every 30minutes'
  53. def __init__(self, obj):
  54. self.start = getdate(obj['start'])
  55. try:
  56. self.interval = int(obj['interval'])
  57. except ValueError:
  58. self.interval = timeparse(obj['interval'])
  59. assert type(self.interval) is int
  60. self.end = getdate(obj['end']) if 'end' in obj else None
  61. self.weekdays = [int(x) for x in obj['weekdays']] if \
  62. 'weekdays' in obj else None
  63. if self.weekdays is not None:
  64. for weekday in self.weekdays:
  65. if not 1 <= weekday <= 7:
  66. raise ValueError('Not a valid weekday: {}'
  67. .format(weekday))
  68. def next_ring(self, current_time=None):
  69. '''if current_time is None, it is now()'''
  70. if current_time is None:
  71. current_time = datetime.now()
  72. if self.end is not None and current_time > self.end:
  73. return None
  74. if current_time < self.start:
  75. return self.start
  76. if self.end is not None:
  77. assert self.start <= current_time <= self.end
  78. else:
  79. assert self.start <= current_time
  80. # this "infinite" loop is required by the weekday exclusion: in
  81. # fact, it is necessary to retry until a valid event/weekday is
  82. # found. a "while True" might have been more elegant (and maybe
  83. # fast), but this gives a clear upper bound to the cycle.
  84. for _ in range(60*60*24*7 // self.interval):
  85. n_interval = (
  86. (current_time - self.start).total_seconds() // self.interval
  87. ) + 1
  88. ring = self.start + timedelta(seconds=self.interval * n_interval)
  89. if ring == current_time:
  90. ring += timedelta(seconds=self.interval)
  91. if self.end is not None and ring > self.end:
  92. return None
  93. if self.weekdays is not None \
  94. and ring.isoweekday() not in self.weekdays:
  95. current_time = ring
  96. continue
  97. return ring
  98. log.warning("Can't find a valid time for event; "
  99. "something went wrong")
  100. return None
  101. def has_ring(self, current_time=None):
  102. if current_time is None:
  103. current_time = datetime.now()
  104. if not self.start >= current_time >= self.end:
  105. return False
  106. n_interval = (current_time - self.start).total_seconds() // \
  107. self.interval
  108. expected_time = self.start + \
  109. timedelta(seconds=self.interval * n_interval)
  110. return expected_time == current_time
  111. def __str__(self):
  112. return 'FrequencyAlarm(every %ds)' % self.interval