timegen_every.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from __future__ import print_function
  2. import logging
  3. log = logging.getLogger('time-every')
  4. from datetime import datetime, timedelta
  5. from pytimeparse.timeparse import timeparse
  6. def getdate(val):
  7. if type(val) is int:
  8. return datetime.fromtimestamp(val)
  9. return val
  10. class Alarm(object):
  11. def __init__(self):
  12. pass
  13. def next_ring(self, current_time=None):
  14. '''if current_time is None, it is now()
  15. returns the next time it will ring; or None if it will not anymore
  16. '''
  17. raise NotImplementedError()
  18. def has_ring(self, time=None):
  19. raise NotImplementedError()
  20. def all_rings(self, current_time=None):
  21. '''
  22. all future rings
  23. this, of course, is an iterator (they could be infinite)
  24. '''
  25. ring = self.next_ring(current_time)
  26. while ring is not None:
  27. yield ring
  28. ring = self.next_ring(ring)
  29. class SingleAlarm(Alarm):
  30. '''
  31. rings a single time
  32. '''
  33. description = 'Only once, at a specified date and time'
  34. def __init__(self, obj):
  35. self.dt = getdate(obj['timestamp'])
  36. def next_ring(self, current_time=None):
  37. '''if current_time is None, it is now()'''
  38. if current_time is None:
  39. current_time = datetime.now()
  40. if current_time >= self.dt:
  41. return None
  42. return self.dt
  43. def has_ring(self, current_time=None):
  44. if current_time is None:
  45. current_time = datetime.now()
  46. return current_time == self.dt
  47. class FrequencyAlarm(Alarm):
  48. '''
  49. rings on {t | exists a k integer >= 0 s.t. t = start+k*t, start<t<end}
  50. '''
  51. description = 'Events at a specified frequency. Example: every 30minutes'
  52. def __init__(self, obj):
  53. self.start = getdate(obj['start'])
  54. try:
  55. self.interval = int(obj['interval'])
  56. except ValueError:
  57. self.interval = timeparse(obj['interval'])
  58. assert type(self.interval) is int
  59. self.end = getdate(obj['end']) if 'end' in obj else None
  60. def next_ring(self, current_time=None):
  61. '''if current_time is None, it is now()'''
  62. if current_time is None:
  63. current_time = datetime.now()
  64. if self.end is not None and current_time > self.end:
  65. return None
  66. if current_time < self.start:
  67. return self.start
  68. if self.end is not None:
  69. assert self.start <= current_time <= self.end
  70. else:
  71. assert self.start <= current_time
  72. n_interval = (
  73. (current_time - self.start).total_seconds() // self.interval
  74. ) + 1
  75. ring = self.start + timedelta(seconds=self.interval * n_interval)
  76. if ring == current_time:
  77. ring += timedelta(seconds=self.interval)
  78. if self.end is not None and ring > self.end:
  79. return None
  80. return ring
  81. def has_ring(self, current_time=None):
  82. if current_time is None:
  83. current_time = datetime.now()
  84. if not self.start >= current_time >= self.end:
  85. return False
  86. n_interval = (current_time - self.start).total_seconds() // \
  87. self.interval
  88. expected_time = self.start + \
  89. timedelta(seconds=self.interval * n_interval)
  90. return expected_time == current_time
  91. def __str__(self):
  92. return 'FrequencyAlarm(every %ds)' % self.interval