beat.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. """
  2. Periodic Task Scheduler
  3. """
  4. import time
  5. import shelve
  6. import threading
  7. import multiprocessing
  8. from datetime import datetime
  9. from UserDict import UserDict
  10. from celery import log
  11. from celery import conf
  12. from celery import registry as _registry
  13. from celery import platform
  14. from celery.messaging import establish_connection
  15. from celery.utils.info import humanize_seconds
  16. class SchedulingError(Exception):
  17. """An error occured while scheduling a task."""
  18. class ScheduleEntry(object):
  19. """An entry in the scheduler.
  20. :param task: see :attr:`task`.
  21. :keyword last_run_at: see :attr:`last_run_at`.
  22. :keyword total_run_count: see :attr:`total_run_count`.
  23. .. attribute:: task
  24. The task class.
  25. .. attribute:: last_run_at
  26. The time and date of when this task was last run.
  27. .. attribute:: total_run_count
  28. Total number of times this periodic task has been executed.
  29. """
  30. def __init__(self, name, last_run_at=None, total_run_count=None):
  31. self.name = name
  32. self.last_run_at = last_run_at or datetime.now()
  33. self.total_run_count = total_run_count or 0
  34. def next(self):
  35. """Returns a new instance of the same class, but with
  36. its date and count fields updated."""
  37. return self.__class__(self.name,
  38. datetime.now(),
  39. self.total_run_count + 1)
  40. def is_due(self, task):
  41. """See :meth:`celery.task.base.PeriodicTask.is_due`."""
  42. return task.is_due(self.last_run_at)
  43. class Scheduler(UserDict):
  44. """Scheduler for periodic tasks.
  45. :keyword registry: see :attr:`registry`.
  46. :keyword schedule: see :attr:`schedule`.
  47. :keyword logger: see :attr:`logger`.
  48. :keyword max_interval: see :attr:`max_interval`.
  49. .. attribute:: registry
  50. The task registry to use.
  51. .. attribute:: schedule
  52. The schedule dict/shelve.
  53. .. attribute:: logger
  54. The logger to use.
  55. .. attribute:: max_interval
  56. Maximum time to sleep between re-checking the schedule.
  57. """
  58. def __init__(self, registry=None, schedule=None, logger=None,
  59. max_interval=None):
  60. self.registry = registry or _registry.TaskRegistry()
  61. self.data = schedule
  62. if self.data is None:
  63. self.data = {}
  64. self.logger = logger or log.get_default_logger()
  65. self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
  66. self.cleanup()
  67. self.schedule_registry()
  68. def tick(self):
  69. """Run a tick, that is one iteration of the scheduler.
  70. Executes all due tasks."""
  71. debug = self.logger.debug
  72. error = self.logger.error
  73. remaining_times = []
  74. connection = establish_connection()
  75. try:
  76. for entry in self.schedule.values():
  77. is_due, next_time_to_run = self.is_due(entry)
  78. if is_due:
  79. debug("Scheduler: Sending due task %s" % entry.name)
  80. try:
  81. result = self.apply_async(entry, connection=connection)
  82. except SchedulingError, exc:
  83. error("Scheduler: %s" % exc)
  84. else:
  85. debug("%s sent. id->%s" % (entry.name, result.task_id))
  86. if next_time_to_run:
  87. remaining_times.append(next_time_to_run)
  88. finally:
  89. connection.close()
  90. return min(remaining_times + [self.max_interval])
  91. def get_task(self, name):
  92. return self.registry[name]
  93. def is_due(self, entry):
  94. return entry.is_due(self.get_task(entry.name))
  95. def apply_async(self, entry, **kwargs):
  96. # Update timestamps and run counts before we actually execute,
  97. # so we have that done if an exception is raised (doesn't schedule
  98. # forever.)
  99. entry = self.schedule[entry.name] = entry.next()
  100. task = self.get_task(entry.name)
  101. try:
  102. result = task.apply_async(**kwargs)
  103. except Exception, exc:
  104. raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
  105. task.name, exc))
  106. return result
  107. def schedule_registry(self):
  108. """Add the current contents of the registry to the schedule."""
  109. for name, task in self.registry.periodic().items():
  110. if name not in self.schedule:
  111. self.logger.debug("Scheduler: "
  112. "Added periodic task %s to schedule" % name)
  113. self.schedule.setdefault(name, ScheduleEntry(task.name))
  114. def cleanup(self):
  115. for task_name, entry in self.schedule.items():
  116. if task_name not in self.registry:
  117. self.schedule.pop(task_name, None)
  118. @property
  119. def schedule(self):
  120. return self.data
  121. class ClockService(object):
  122. scheduler_cls = Scheduler
  123. registry = _registry.tasks
  124. open_schedule = lambda self, filename: shelve.open(filename)
  125. def __init__(self, logger=None,
  126. max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
  127. schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
  128. self.logger = logger or log.get_default_logger()
  129. self.max_interval = max_interval
  130. self.schedule_filename = schedule_filename
  131. self._shutdown = threading.Event()
  132. self._stopped = threading.Event()
  133. self._schedule = None
  134. self._scheduler = None
  135. self._in_sync = False
  136. silence = self.max_interval < 60 and 10 or 1
  137. self.debug = log.SilenceRepeated(self.logger.debug,
  138. max_iterations=silence)
  139. def start(self, embedded_process=False):
  140. self.logger.info("ClockService: Starting...")
  141. self.logger.debug("ClockService: "
  142. "Ticking with max interval->%s, schedule->%s" % (
  143. humanize_seconds(self.max_interval),
  144. self.schedule_filename))
  145. if embedded_process:
  146. platform.set_process_title("celerybeat")
  147. try:
  148. try:
  149. while True:
  150. if self._shutdown.isSet():
  151. break
  152. interval = self.scheduler.tick()
  153. self.debug("ClockService: Waking up %s." % (
  154. humanize_seconds(interval, prefix="in ")))
  155. time.sleep(interval)
  156. except (KeyboardInterrupt, SystemExit):
  157. self.sync()
  158. finally:
  159. self.sync()
  160. def sync(self):
  161. if self._schedule is not None and not self._in_sync:
  162. self.logger.debug("ClockService: Syncing schedule to disk...")
  163. self._schedule.sync()
  164. self._schedule.close()
  165. self._in_sync = True
  166. self._stopped.set()
  167. def stop(self, wait=False):
  168. self.logger.info("ClockService: Shutting down...")
  169. self._shutdown.set()
  170. wait and self._stopped.wait() # block until shutdown done.
  171. @property
  172. def schedule(self):
  173. if self._schedule is None:
  174. filename = self.schedule_filename
  175. self._schedule = self.open_schedule(filename=filename)
  176. return self._schedule
  177. @property
  178. def scheduler(self):
  179. if self._scheduler is None:
  180. self._scheduler = self.scheduler_cls(schedule=self.schedule,
  181. registry=self.registry,
  182. logger=self.logger,
  183. max_interval=self.max_interval)
  184. return self._scheduler
  185. class _Threaded(threading.Thread):
  186. """Embedded clock service using threading."""
  187. def __init__(self, *args, **kwargs):
  188. super(_Threaded, self).__init__()
  189. self.clockservice = ClockService(*args, **kwargs)
  190. self.setDaemon(True)
  191. def run(self):
  192. self.clockservice.start()
  193. def stop(self):
  194. self.clockservice.stop(wait=True)
  195. class _Process(multiprocessing.Process):
  196. """Embedded clock service using multiprocessing."""
  197. def __init__(self, *args, **kwargs):
  198. super(_Process, self).__init__()
  199. self.clockservice = ClockService(*args, **kwargs)
  200. def run(self):
  201. platform.reset_signal("SIGTERM")
  202. self.clockservice.start(embedded_process=True)
  203. def stop(self):
  204. self.clockservice.stop()
  205. self.terminate()
  206. def EmbeddedClockService(*args, **kwargs):
  207. """Return embedded clock service.
  208. :keyword thread: Run threaded instead of as a separate process.
  209. Default is ``False``.
  210. """
  211. if kwargs.pop("thread", False):
  212. # Need short max interval to be able to stop thread
  213. # in reasonable time.
  214. kwargs.setdefault("max_interval", 1)
  215. return _Threaded(*args, **kwargs)
  216. return _Process(*args, **kwargs)