beat.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import time
  2. import math
  3. import shelve
  4. import atexit
  5. import threading
  6. from UserDict import UserDict
  7. from datetime import datetime
  8. from celery import log
  9. from celery import conf
  10. from celery import registry
  11. from celery.log import setup_logger
  12. from celery.exceptions import NotRegistered
  13. TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
  14. ("hour", 60 * 60, lambda n: int(math.ceil(n))),
  15. ("minute", 60, lambda n: int(math.ceil(n))),
  16. ("second", 1, lambda n: "%.2d" % n))
  17. def humanize_seconds(secs, prefix=""):
  18. """Show seconds in human form, e.g. 60 is "1 minute", 7200 is "2
  19. hours"."""
  20. for unit, divider, formatter in TIME_UNITS:
  21. if secs >= divider:
  22. w = secs / divider
  23. punit = w > 1 and unit+"s" or unit
  24. return "%s%s %s" % (prefix, formatter(w), punit)
  25. return "now"
  26. class SchedulingError(Exception):
  27. """An error occured while scheduling a task."""
  28. class ScheduleEntry(object):
  29. """An entry in the scheduler.
  30. :param task: The task class.
  31. :keyword last_run_at: The time and date when this task was last run.
  32. :keyword total_run_count: Total number of times this periodic task has
  33. been executed.
  34. """
  35. def __init__(self, name, last_run_at=None, total_run_count=None):
  36. self.name = name
  37. self.last_run_at = last_run_at or datetime.now()
  38. self.total_run_count = total_run_count or 0
  39. def next(self):
  40. """Returns a new instance of the same class, but with
  41. its date and count fields updated."""
  42. return self.__class__(name=self.name,
  43. last_run_at=datetime.now(),
  44. total_run_count=self.total_run_count + 1)
  45. def is_due(self, task):
  46. """See :meth:`celery.task.base.PeriodicTask.is_due`."""
  47. return task.is_due(self.last_run_at)
  48. class Scheduler(UserDict):
  49. """Scheduler for periodic tasks.
  50. :keyword registry: The task registry to use.
  51. :keyword schedule: The schedule dictionary. Default is the global
  52. persistent schedule ``celery.beat.schedule``.
  53. :keyword logger: The logger to use.
  54. :keyword max_interval: Maximum time to sleep between re-checking the
  55. schedule.
  56. """
  57. def __init__(self, **kwargs):
  58. attr_defaults = {"registry": lambda: {},
  59. "schedule": lambda: {},
  60. "logger": log.get_default_logger,
  61. "max_interval": conf.CELERYBEAT_MAX_LOOP_INTERVAL}
  62. for attr_name, attr_default_gen in attr_defaults.items():
  63. if attr_name in kwargs:
  64. attr_value = kwargs[attr_name]
  65. else:
  66. attr_value = attr_default_gen()
  67. setattr(self, attr_name, attr_value)
  68. self.cleanup()
  69. self.schedule_registry()
  70. def tick(self):
  71. """Run a tick, that is one iteration of the scheduler.
  72. Executes all due tasks."""
  73. remaining_times = []
  74. for entry in self.schedule.values():
  75. is_due, next_time_to_run = self.is_due(entry)
  76. if is_due:
  77. self.logger.debug("Scheduler: Sending due task %s" % (
  78. entry.name))
  79. result = self.apply_async(entry)
  80. self.logger.debug("Scheduler: %s sent. id->%s" % (
  81. entry.name, result.task_id))
  82. if next_time_to_run:
  83. remaining_times.append(next_time_to_run)
  84. return min(remaining_times + [self.max_interval])
  85. def get_task(self, name):
  86. try:
  87. return self.registry[name]
  88. except KeyError:
  89. raise NotRegistered(name)
  90. def is_due(self, entry):
  91. return entry.is_due(self.get_task(entry.name))
  92. def apply_async(self, entry):
  93. # Update timestamps and run counts before we actually execute,
  94. # so we have that done if an exception is raised (doesn't schedule
  95. # forever.)
  96. entry = self.schedule[entry.name] = entry.next()
  97. task = self.get_task(entry.name)
  98. try:
  99. result = task.apply_async()
  100. except Exception, exc:
  101. raise SchedulingError(
  102. "Couldn't apply scheduled task %s: %s" % (
  103. task.name, exc))
  104. return result
  105. def schedule_registry(self):
  106. """Add the current contents of the registry to the schedule."""
  107. periodic_tasks = self.registry.get_all_periodic()
  108. for name, task in self.registry.get_all_periodic().items():
  109. if name not in self.schedule:
  110. self.logger.debug(
  111. "Scheduler: Adding periodic task %s to schedule" % (
  112. task.name))
  113. self.schedule.setdefault(name, ScheduleEntry(task.name))
  114. def cleanup(self):
  115. for task_name, entry in self.schedule.items():
  116. if task_name not in self.registry:
  117. self.schedule.pop(task_name, None)
  118. @property
  119. def schedule(self):
  120. return self.data
  121. class ClockService(object):
  122. scheduler_cls = Scheduler
  123. registry = registry.tasks
  124. def __init__(self, logger=None, is_detached=False,
  125. max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
  126. schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
  127. self.logger = logger
  128. self.max_interval = max_interval
  129. self.schedule_filename = schedule_filename
  130. self._shutdown = threading.Event()
  131. self._stopped = threading.Event()
  132. def start(self):
  133. self.logger.info("ClockService: Starting...")
  134. schedule = shelve.open(filename=self.schedule_filename)
  135. #atexit.register(schedule.close)
  136. scheduler = self.scheduler_cls(schedule=schedule,
  137. registry=self.registry,
  138. logger=self.logger,
  139. max_interval=self.max_interval)
  140. self.logger.debug("ClockService: "
  141. "Ticking with max interval->%s, schedule->%s" % (
  142. humanize_seconds(self.max_interval),
  143. self.schedule_filename))
  144. synced = [False]
  145. def _stop():
  146. if not synced[0]:
  147. self.logger.debug("ClockService: Syncing schedule to disk...")
  148. schedule.sync()
  149. schedule.close()
  150. synced[0] = True
  151. self._stopped.set()
  152. try:
  153. while True:
  154. if self._shutdown.isSet():
  155. break
  156. interval = scheduler.tick()
  157. self.logger.debug("ClockService: Waking up %s." % (
  158. humanize_seconds(interval, prefix="in ")))
  159. time.sleep(interval)
  160. except (KeyboardInterrupt, SystemExit):
  161. _stop()
  162. finally:
  163. _stop()
  164. def stop(self, wait=False):
  165. self._shutdown.set()
  166. wait and self._stopped.wait() # block until shutdown done.
  167. class ClockServiceThread(threading.Thread):
  168. def __init__(self, *args, **kwargs):
  169. self.clockservice = ClockService(*args, **kwargs)
  170. threading.Thread.__init__(self)
  171. self.setDaemon(True)
  172. def run(self):
  173. self.clockservice.start()
  174. def stop(self):
  175. self.clockservice.stop(wait=True)