beat.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import time
  2. import shelve
  3. import threading
  4. from datetime import datetime
  5. from UserDict import UserDict
  6. from celery import log
  7. from celery import conf
  8. from celery import registry as _registry
  9. from celery.utils.info import humanize_seconds
  10. class SchedulingError(Exception):
  11. """An error occured while scheduling a task."""
  12. class ScheduleEntry(object):
  13. """An entry in the scheduler.
  14. :param task: see :attr:`task`.
  15. :keyword last_run_at: see :attr:`last_run_at`.
  16. :keyword total_run_count: see :attr:`total_run_count`.
  17. .. attribute:: task
  18. The task class.
  19. .. attribute:: last_run_at
  20. The time and date of when this task was last run.
  21. .. attribute:: total_run_count
  22. Total number of times this periodic task has been executed.
  23. """
  24. def __init__(self, name, last_run_at=None, total_run_count=None):
  25. self.name = name
  26. self.last_run_at = last_run_at or datetime.now()
  27. self.total_run_count = total_run_count or 0
  28. def next(self):
  29. """Returns a new instance of the same class, but with
  30. its date and count fields updated."""
  31. return self.__class__(self.name,
  32. datetime.now(),
  33. self.total_run_count + 1)
  34. def is_due(self, task):
  35. """See :meth:`celery.task.base.PeriodicTask.is_due`."""
  36. return task.is_due(self.last_run_at)
  37. class Scheduler(UserDict):
  38. """Scheduler for periodic tasks.
  39. :keyword registry: see :attr:`registry`.
  40. :keyword schedule: see :attr:`schedule`.
  41. :keyword logger: see :attr:`logger`.
  42. :keyword max_interval: see :attr:`max_interval`.
  43. .. attribute:: registry
  44. The task registry to use.
  45. .. attribute:: schedule
  46. The schedule dict/shelve.
  47. .. attribute:: logger
  48. The logger to use.
  49. .. attribute:: max_interval
  50. Maximum time to sleep between re-checking the schedule.
  51. """
  52. def __init__(self, registry=None, schedule=None, logger=None,
  53. max_interval=None):
  54. self.registry = registry or _registry.TaskRegistry()
  55. self.data = schedule or {}
  56. self.logger = logger or log.get_default_logger()
  57. self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
  58. self.cleanup()
  59. self.schedule_registry()
  60. def tick(self):
  61. """Run a tick, that is one iteration of the scheduler.
  62. Executes all due tasks."""
  63. debug = self.logger.debug
  64. error = self.logger.error
  65. remaining_times = []
  66. for entry in self.schedule.values():
  67. is_due, next_time_to_run = self.is_due(entry)
  68. if is_due:
  69. debug("Scheduler: Sending due task %s" % entry.name)
  70. try:
  71. result = self.apply_async(entry)
  72. except SchedulingError, exc:
  73. error("Scheduler: %s" % exc)
  74. else:
  75. debug("%s sent. id->%s" % (entry.name, result.task_id))
  76. if next_time_to_run:
  77. remaining_times.append(next_time_to_run)
  78. return min(remaining_times + [self.max_interval])
  79. def get_task(self, name):
  80. return self.registry[name]
  81. def is_due(self, entry):
  82. return entry.is_due(self.get_task(entry.name))
  83. def apply_async(self, entry):
  84. # Update timestamps and run counts before we actually execute,
  85. # so we have that done if an exception is raised (doesn't schedule
  86. # forever.)
  87. entry = self.schedule[entry.name] = entry.next()
  88. task = self.get_task(entry.name)
  89. try:
  90. result = task.apply_async()
  91. except Exception, exc:
  92. raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
  93. task.name, exc))
  94. return result
  95. def schedule_registry(self):
  96. """Add the current contents of the registry to the schedule."""
  97. for name, task in self.registry.periodic().items():
  98. if name not in self.schedule:
  99. self.logger.debug("Scheduler: "
  100. "Added periodic task %s to schedule" % name)
  101. self.schedule.setdefault(name, ScheduleEntry(task.name))
  102. def cleanup(self):
  103. for task_name, entry in self.schedule.items():
  104. if task_name not in self.registry:
  105. self.schedule.pop(task_name, None)
  106. @property
  107. def schedule(self):
  108. return self.data
  109. class ClockService(object):
  110. scheduler_cls = Scheduler
  111. registry = _registry.tasks
  112. open_schedule = lambda self, filename: shelve.open(filename)
  113. def __init__(self, logger=None,
  114. max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
  115. schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
  116. self.logger = logger or log.get_default_logger()
  117. self.max_interval = max_interval
  118. self.schedule_filename = schedule_filename
  119. self._shutdown = threading.Event()
  120. self._stopped = threading.Event()
  121. self._schedule = None
  122. self._scheduler = None
  123. self._in_sync = False
  124. silence = self.max_interval < 60 and 10 or 1
  125. self.debug = log.SilenceRepeated(self.logger.debug,
  126. max_iterations=silence)
  127. def start(self):
  128. self.logger.info("ClockService: Starting...")
  129. self.logger.debug("ClockService: "
  130. "Ticking with max interval->%s, schedule->%s" % (
  131. humanize_seconds(self.max_interval),
  132. self.schedule_filename))
  133. try:
  134. while True:
  135. if self._shutdown.isSet():
  136. break
  137. interval = self.scheduler.tick()
  138. self.debug("ClockService: Waking up %s." % (
  139. humanize_seconds(interval, prefix="in ")))
  140. time.sleep(interval)
  141. except (KeyboardInterrupt, SystemExit):
  142. self.sync()
  143. finally:
  144. self.sync()
  145. def sync(self):
  146. if self._schedule is not None and not self._in_sync:
  147. self.logger.debug("ClockService: Syncing schedule to disk...")
  148. self._schedule.sync()
  149. self._schedule.close()
  150. self._in_sync = True
  151. self._stopped.set()
  152. def stop(self, wait=False):
  153. self._shutdown.set()
  154. wait and self._stopped.wait() # block until shutdown done.
  155. @property
  156. def schedule(self):
  157. if self._schedule is None:
  158. filename = self.schedule_filename
  159. self._schedule = self.open_schedule(filename=filename)
  160. return self._schedule
  161. @property
  162. def scheduler(self):
  163. if self._scheduler is None:
  164. self._scheduler = self.scheduler_cls(schedule=self.schedule,
  165. registry=self.registry,
  166. logger=self.logger,
  167. max_interval=self.max_interval)
  168. return self._scheduler
  169. class ClockServiceThread(threading.Thread):
  170. def __init__(self, *args, **kwargs):
  171. self.clockservice = ClockService(*args, **kwargs)
  172. threading.Thread.__init__(self)
  173. self.setDaemon(True)
  174. def run(self):
  175. self.clockservice.start()
  176. def stop(self):
  177. self.clockservice.stop(wait=True)