timegen_every.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from __future__ import print_function
  2. import logging
  3. log = logging.getLogger("time-every")
  4. from datetime import datetime, timedelta
  5. from pytimeparse.timeparse import timeparse
  6. def getdate(val):
  7. if type(val) is int:
  8. return datetime.fromtimestamp(val)
  9. return val
  10. class Alarm(object):
  11. def __init__(self):
  12. pass
  13. def next_ring(self, current_time=None):
  14. """if current_time is None, it is now()
  15. returns the next time it will ring; or None if it will not anymore
  16. """
  17. raise NotImplementedError()
  18. def has_ring(self, time=None):
  19. """returns True IFF the alarm will ring exactly at ``time``"""
  20. raise NotImplementedError()
  21. def all_rings(self, current_time=None):
  22. """
  23. all future rings
  24. this, of course, is an iterator (they could be infinite)
  25. """
  26. ring = self.next_ring(current_time)
  27. while ring is not None:
  28. yield ring
  29. ring = self.next_ring(ring)
  30. class SingleAlarm(Alarm):
  31. """
  32. rings a single time
  33. """
  34. description = "Only once, at a specified date and time"
  35. def __init__(self, obj):
  36. super().__init__()
  37. self.dt = getdate(obj["timestamp"])
  38. def next_ring(self, current_time=None):
  39. """if current_time is None, it is now()"""
  40. if current_time is None:
  41. current_time = datetime.now()
  42. if current_time >= self.dt:
  43. return None
  44. return self.dt
  45. def has_ring(self, current_time=None):
  46. if current_time is None:
  47. current_time = datetime.now()
  48. return current_time == self.dt
  49. class FrequencyAlarm(Alarm):
  50. """
  51. rings on {t | exists a k integer >= 0 s.t. t = start+k*t, start<t<end}
  52. """
  53. description = "Events at a specified frequency. Example: every 30minutes"
  54. def __init__(self, obj):
  55. self.start = getdate(obj["start"])
  56. try:
  57. self.interval = int(obj["interval"])
  58. except ValueError:
  59. self.interval = timeparse(obj["interval"])
  60. assert type(self.interval) is int
  61. self.end = getdate(obj["end"]) if "end" in obj else None
  62. self.weekdays = [int(x) for x in obj["weekdays"]] if "weekdays" in obj else None
  63. if self.weekdays is not None:
  64. for weekday in self.weekdays:
  65. if not 1 <= weekday <= 7:
  66. raise ValueError("Not a valid weekday: {}".format(weekday))
  67. def next_ring(self, current_time=None):
  68. """if current_time is None, it is now()"""
  69. if current_time is None:
  70. current_time = datetime.now()
  71. if self.end is not None and current_time > self.end:
  72. return None
  73. if current_time < self.start:
  74. return self.start
  75. if self.end is not None:
  76. assert self.start <= current_time <= self.end
  77. else:
  78. assert self.start <= current_time
  79. # this "infinite" loop is required by the weekday exclusion: in
  80. # fact, it is necessary to retry until a valid event/weekday is
  81. # found. a "while True" might have been more elegant (and maybe
  82. # fast), but this gives a clear upper bound to the cycle.
  83. for _ in range(max(60 * 60 * 24 * 7 // self.interval, 1)):
  84. n_interval = (
  85. (current_time - self.start).total_seconds() // self.interval
  86. ) + 1
  87. ring = self.start + timedelta(seconds=self.interval * n_interval)
  88. if ring == current_time:
  89. ring += timedelta(seconds=self.interval)
  90. if self.end is not None and ring > self.end:
  91. return None
  92. if self.weekdays is not None and ring.isoweekday() not in self.weekdays:
  93. current_time = ring
  94. continue
  95. return ring
  96. log.warning(
  97. "Can't find a valid time for event %s; " "something went wrong", str(self)
  98. )
  99. return None
  100. def has_ring(self, current_time=None):
  101. if current_time is None:
  102. current_time = datetime.now()
  103. if not self.start >= current_time >= self.end:
  104. return False
  105. n_interval = (current_time - self.start).total_seconds() // self.interval
  106. expected_time = self.start + timedelta(seconds=self.interval * n_interval)
  107. return expected_time == current_time
  108. def __str__(self):
  109. return "FrequencyAlarm(every %ds)" % self.interval