schedules.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. from datetime import datetime, timedelta
  2. from dateutil.relativedelta import relativedelta
  3. from pyparsing import (Word, Literal, ZeroOrMore, Optional,
  4. Group, StringEnd, alphas)
  5. from celery.utils import is_iterable
  6. from celery.utils.timeutils import timedelta_seconds, weekday, remaining
  7. class schedule(object):
  8. relative = False
  9. def __init__(self, run_every=None, relative=False):
  10. self.run_every = run_every
  11. self.relative = relative
  12. def remaining_estimate(self, last_run_at):
  13. """Returns when the periodic task should run next as a timedelta."""
  14. return remaining(last_run_at, self.run_every, relative=self.relative)
  15. def is_due(self, last_run_at):
  16. """Returns tuple of two items ``(is_due, next_time_to_run)``,
  17. where next time to run is in seconds.
  18. e.g.
  19. * ``(True, 20)``, means the task should be run now, and the next
  20. time to run is in 20 seconds.
  21. * ``(False, 12)``, means the task should be run in 12 seconds.
  22. You can override this to decide the interval at runtime,
  23. but keep in mind the value of :setting:`CELERYBEAT_MAX_LOOP_INTERVAL`,
  24. which decides the maximum number of seconds celerybeat can sleep
  25. between re-checking the periodic task intervals. So if you
  26. dynamically change the next run at value, and the max interval is
  27. set to 5 minutes, it will take 5 minutes for the change to take
  28. effect, so you may consider lowering the value of
  29. :setting:`CELERYBEAT_MAX_LOOP_INTERVAL` if responsiveness is of
  30. importance to you.
  31. """
  32. rem_delta = self.remaining_estimate(last_run_at)
  33. rem = timedelta_seconds(rem_delta)
  34. if rem == 0:
  35. return True, timedelta_seconds(self.run_every)
  36. return False, rem
  37. def __eq__(self, other):
  38. if isinstance(other, schedule):
  39. return self.run_every == other.run_every
  40. return self.run_every == other
  41. class crontab_parser(object):
  42. """Parser for crontab expressions. Any expression of the form 'groups'
  43. (see BNF grammar below) is accepted and expanded to a set of numbers.
  44. These numbers represent the units of time that the crontab needs to
  45. run on::
  46. digit :: '0'..'9'
  47. dow :: 'a'..'z'
  48. number :: digit+ | dow+
  49. steps :: number
  50. range :: number ( '-' number ) ?
  51. numspec :: '*' | range
  52. expr :: numspec ( '/' steps ) ?
  53. groups :: expr ( ',' expr ) *
  54. The parser is a general purpose one, useful for parsing hours, minutes and
  55. day_of_week expressions. Example usage::
  56. >>> minutes = crontab_parser(60).parse("*/15")
  57. [0, 15, 30, 45]
  58. >>> hours = crontab_parser(24).parse("*/4")
  59. [0, 4, 8, 12, 16, 20]
  60. >>> day_of_week = crontab_parser(7).parse("*")
  61. [0, 1, 2, 3, 4, 5, 6]
  62. """
  63. def __init__(self, max_=60):
  64. # define the grammar structure
  65. digits = "0123456789"
  66. star = Literal('*')
  67. number = Word(digits) | Word(alphas)
  68. steps = number
  69. range_ = number + Optional(Literal('-') + number)
  70. numspec = star | range_
  71. expr = Group(numspec) + Optional(Literal('/') + steps)
  72. extra_groups = ZeroOrMore(Literal(',') + expr)
  73. groups = expr + extra_groups + StringEnd()
  74. # define parse actions
  75. star.setParseAction(self._expand_star)
  76. number.setParseAction(self._expand_number)
  77. range_.setParseAction(self._expand_range)
  78. expr.setParseAction(self._filter_steps)
  79. extra_groups.setParseAction(self._ignore_comma)
  80. groups.setParseAction(self._join_to_set)
  81. self.max_ = max_
  82. self.parser = groups
  83. @staticmethod
  84. def _expand_number(toks):
  85. try:
  86. i = int(toks[0])
  87. except ValueError:
  88. try:
  89. i = weekday(toks[0])
  90. except KeyError:
  91. raise ValueError("Invalid weekday literal '%s'." % toks[0])
  92. return [i]
  93. @staticmethod
  94. def _expand_range(toks):
  95. if len(toks) > 1:
  96. return range(toks[0], int(toks[2]) + 1)
  97. else:
  98. return toks[0]
  99. def _expand_star(self, toks):
  100. return range(self.max_)
  101. @staticmethod
  102. def _filter_steps(toks):
  103. numbers = toks[0]
  104. if len(toks) > 1:
  105. steps = toks[2]
  106. return [n for n in numbers if n % steps == 0]
  107. else:
  108. return numbers
  109. @staticmethod
  110. def _ignore_comma(toks):
  111. return filter(lambda x: x != ',', toks)
  112. @staticmethod
  113. def _join_to_set(toks):
  114. return set(toks.asList())
  115. def parse(self, cronspec):
  116. return self.parser.parseString(cronspec).pop()
  117. class crontab(schedule):
  118. """A crontab can be used as the ``run_every`` value of a
  119. :class:`PeriodicTask` to add cron-like scheduling.
  120. Like a :manpage:`cron` job, you can specify units of time of when
  121. you would like the task to execute. It is a reasonably complete
  122. implementation of cron's features, so it should provide a fair
  123. degree of scheduling needs.
  124. You can specify a minute, an hour, and/or a day of the week in any
  125. of the following formats:
  126. .. attribute:: minute
  127. - A (list of) integers from 0-59 that represent the minutes of
  128. an hour of when execution should occur; or
  129. - A string representing a crontab pattern. This may get pretty
  130. advanced, like `minute="*/15"` (for every quarter) or
  131. `minute="1,13,30-45,50-59/2"`.
  132. .. attribute:: hour
  133. - A (list of) integers from 0-23 that represent the hours of
  134. a day of when execution should occur; or
  135. - A string representing a crontab pattern. This may get pretty
  136. advanced, like `hour="*/3"` (for every three hours) or
  137. `hour="0,8-17/2"` (at midnight, and every two hours during
  138. office hours).
  139. .. attribute:: day_of_week
  140. - A (list of) integers from 0-6, where Sunday = 0 and Saturday =
  141. 6, that represent the days of a week that execution should
  142. occur.
  143. - A string representing a crontab pattern. This may get pretty
  144. advanced, like `day_of_week="mon-fri"` (for weekdays only).
  145. (Beware that `day_of_week="*/2"` does not literally mean
  146. "every two days", but "every day that is divisible by two"!)
  147. """
  148. @staticmethod
  149. def _expand_cronspec(cronspec, max_):
  150. """Takes the given cronspec argument in one of the forms::
  151. int (like 7)
  152. basestring (like '3-5,*/15', '*', or 'monday')
  153. set (like set([0,15,30,45]))
  154. list (like [8-17])
  155. And convert it to an (expanded) set representing all time unit
  156. values on which the crontab triggers. Only in case of the base
  157. type being 'basestring', parsing occurs. (It is fast and
  158. happens only once for each crontab instance, so there is no
  159. significant performance overhead involved.)
  160. For the other base types, merely Python type conversions happen.
  161. The argument `max_` is needed to determine the expansion of '*'.
  162. """
  163. if isinstance(cronspec, int):
  164. result = set([cronspec])
  165. elif isinstance(cronspec, basestring):
  166. result = crontab_parser(max_).parse(cronspec)
  167. elif isinstance(cronspec, set):
  168. result = cronspec
  169. elif is_iterable(cronspec):
  170. result = set(cronspec)
  171. else:
  172. raise TypeError(
  173. "Argument cronspec needs to be of any of the "
  174. "following types: int, basestring, or an iterable type. "
  175. "'%s' was given." % type(cronspec))
  176. # assure the result does not exceed the max
  177. for number in result:
  178. if number >= max_:
  179. raise ValueError(
  180. "Invalid crontab pattern. Valid "
  181. "range is 0-%d. '%d' was found." % (max_ - 1, number))
  182. return result
  183. def __init__(self, minute='*', hour='*', day_of_week='*',
  184. nowfun=datetime.now):
  185. self._orig_minute = minute
  186. self._orig_hour = hour
  187. self._orig_day_of_week = day_of_week
  188. self.hour = self._expand_cronspec(hour, 24)
  189. self.minute = self._expand_cronspec(minute, 60)
  190. self.day_of_week = self._expand_cronspec(day_of_week, 7)
  191. self.nowfun = nowfun
  192. def __repr__(self):
  193. return "<crontab: %s %s %s (m/h/d)>" % (self._orig_minute or "*",
  194. self._orig_hour or "*",
  195. self._orig_day_of_week or "*")
  196. def __reduce__(self):
  197. return (self.__class__, (self._orig_minute,
  198. self._orig_hour,
  199. self._orig_day_of_week), None)
  200. def remaining_estimate(self, last_run_at):
  201. """Returns when the periodic task should run next as a timedelta."""
  202. weekday = last_run_at.isoweekday()
  203. execute_this_hour = (weekday in self.day_of_week and
  204. last_run_at.hour in self.hour and
  205. last_run_at.minute < max(self.minute))
  206. if execute_this_hour:
  207. next_minute = min(minute for minute in self.minute
  208. if minute > last_run_at.minute)
  209. delta = relativedelta(minute=next_minute,
  210. second=0,
  211. microsecond=0)
  212. else:
  213. next_minute = min(self.minute)
  214. execute_today = (weekday in self.day_of_week and
  215. (last_run_at.hour < max(self.hour) or
  216. execute_this_hour))
  217. if execute_today:
  218. next_hour = min(hour for hour in self.hour
  219. if hour > last_run_at.hour)
  220. delta = relativedelta(hour=next_hour,
  221. minute=next_minute,
  222. second=0,
  223. microsecond=0)
  224. else:
  225. next_hour = min(self.hour)
  226. iso_next_day = min([day for day in self.day_of_week
  227. if day > weekday] or
  228. self.day_of_week)
  229. add_week = iso_next_day == weekday
  230. delta = relativedelta(weeks=add_week and 1 or 0,
  231. weekday=(iso_next_day - 1) % 7,
  232. hour=next_hour,
  233. minute=next_minute,
  234. second=0,
  235. microsecond=0)
  236. return remaining(last_run_at, delta, now=self.nowfun())
  237. def is_due(self, last_run_at):
  238. """Returns tuple of two items ``(is_due, next_time_to_run)``,
  239. where next time to run is in seconds.
  240. See :meth:`celery.schedules.schedule.is_due` for more information.
  241. """
  242. rem_delta = self.remaining_estimate(last_run_at)
  243. rem = timedelta_seconds(rem_delta)
  244. due = rem == 0
  245. if due:
  246. rem_delta = self.remaining_estimate(last_run_at=self.nowfun())
  247. rem = timedelta_seconds(rem_delta)
  248. return due, rem
  249. def __eq__(self, other):
  250. if isinstance(other, crontab):
  251. return (other.day_of_week == self.day_of_week and
  252. other.hour == self.hour and
  253. other.minute == self.minute)
  254. return other is self
  255. def maybe_schedule(s, relative=False):
  256. if isinstance(s, int):
  257. s = timedelta(seconds=s)
  258. if isinstance(s, timedelta):
  259. return schedule(s, relative)
  260. return s