timegen_every.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from __future__ import print_function
  2. import logging
  3. log = logging.getLogger('time-every')
  4. from datetime import datetime, timedelta
  5. from pytimeparse.timeparse import timeparse
  6. def getdate(val):
  7. if type(val) is int:
  8. return datetime.fromtimestamp(val)
  9. return val
  10. class Alarm(object):
  11. def __init__(self):
  12. pass
  13. def next_ring(self, current_time=None):
  14. '''if current_time is None, it is now()
  15. returns the next time it will ring; or None if it will not anymore
  16. '''
  17. raise NotImplementedError()
  18. def has_ring(self, time=None):
  19. '''returns True IFF the alarm will ring exactly at ``time``'''
  20. raise NotImplementedError()
  21. def all_rings(self, current_time=None):
  22. '''
  23. all future rings
  24. this, of course, is an iterator (they could be infinite)
  25. '''
  26. ring = self.next_ring(current_time)
  27. while ring is not None:
  28. yield ring
  29. ring = self.next_ring(ring)
  30. class SingleAlarm(Alarm):
  31. '''
  32. rings a single time
  33. '''
  34. description = 'Only once, at a specified date and time'
  35. def __init__(self, obj):
  36. super().__init__()
  37. self.dt = getdate(obj['timestamp'])
  38. def next_ring(self, current_time=None):
  39. '''if current_time is None, it is now()'''
  40. if current_time is None:
  41. current_time = datetime.now()
  42. if current_time >= self.dt:
  43. return None
  44. return self.dt
  45. def has_ring(self, current_time=None):
  46. if current_time is None:
  47. current_time = datetime.now()
  48. return current_time == self.dt
  49. class FrequencyAlarm(Alarm):
  50. '''
  51. rings on {t | exists a k integer >= 0 s.t. t = start+k*t, start<t<end}
  52. '''
  53. description = 'Events at a specified frequency. Example: every 30minutes'
  54. def __init__(self, obj):
  55. self.start = getdate(obj['start'])
  56. try:
  57. self.interval = int(obj['interval'])
  58. except ValueError:
  59. self.interval = timeparse(obj['interval'])
  60. assert type(self.interval) is int
  61. self.end = getdate(obj['end']) if 'end' in obj else None
  62. self.weekdays = [int(x) for x in obj['weekdays']] if \
  63. 'weekdays' in obj else None
  64. if self.weekdays is not None:
  65. for weekday in self.weekdays:
  66. if not 1 <= weekday <= 7:
  67. raise ValueError('Not a valid weekday: {}'
  68. .format(weekday))
  69. def next_ring(self, current_time=None):
  70. '''if current_time is None, it is now()'''
  71. if current_time is None:
  72. current_time = datetime.now()
  73. if self.end is not None and current_time > self.end:
  74. return None
  75. if current_time < self.start:
  76. return self.start
  77. if self.end is not None:
  78. assert self.start <= current_time <= self.end
  79. else:
  80. assert self.start <= current_time
  81. # this "infinite" loop is required by the weekday exclusion: in
  82. # fact, it is necessary to retry until a valid event/weekday is
  83. # found. a "while True" might have been more elegant (and maybe
  84. # fast), but this gives a clear upper bound to the cycle.
  85. for _ in range(60*60*24*7 // self.interval):
  86. n_interval = (
  87. (current_time - self.start).total_seconds() // self.interval
  88. ) + 1
  89. ring = self.start + timedelta(seconds=self.interval * n_interval)
  90. if ring == current_time:
  91. ring += timedelta(seconds=self.interval)
  92. if self.end is not None and ring > self.end:
  93. return None
  94. if self.weekdays is not None \
  95. and ring.isoweekday() not in self.weekdays:
  96. current_time = ring
  97. continue
  98. return ring
  99. log.warning("Can't find a valid time for event %s; "
  100. "something went wrong", str(self))
  101. return None
  102. def has_ring(self, current_time=None):
  103. if current_time is None:
  104. current_time = datetime.now()
  105. if not self.start >= current_time >= self.end:
  106. return False
  107. n_interval = (current_time - self.start).total_seconds() // \
  108. self.interval
  109. expected_time = self.start + \
  110. timedelta(seconds=self.interval * n_interval)
  111. return expected_time == current_time
  112. def __str__(self):
  113. return 'FrequencyAlarm(every %ds)' % self.interval