beat.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import time
  2. import shelve
  3. import atexit
  4. import threading
  5. from UserDict import UserDict
  6. from datetime import datetime
  7. from celery import conf
  8. from celery import registry
  9. from celery.log import setup_logger
  10. from celery.exceptions import NotRegistered
  11. class SchedulingError(Exception):
  12. """An error occured while scheduling task."""
  13. class ScheduleEntry(object):
  14. """An entry in the scheduler.
  15. :param task: The task class.
  16. :keyword last_run_at: The time and date when this task was last run.
  17. :keyword total_run_count: Total number of times this periodic task has
  18. been executed.
  19. """
  20. def __init__(self, name, last_run_at=None,
  21. total_run_count=None):
  22. self.name = name
  23. self.last_run_at = last_run_at or datetime.now()
  24. self.total_run_count = total_run_count or 0
  25. def next(self):
  26. return self.__class__(self.name, datetime.now(),
  27. self.total_run_count + 1)
  28. def is_due(self, run_every):
  29. return datetime.now() > (self.last_run_at + run_every)
  30. class Scheduler(UserDict):
  31. """Scheduler for periodic tasks.
  32. :keyword registry: The task registry to use.
  33. :keyword schedule: The schedule dictionary. Default is the global
  34. persistent schedule ``celery.beat.schedule``.
  35. """
  36. interval = 1
  37. def __init__(self, **kwargs):
  38. def _get_default_logger():
  39. import multiprocessing
  40. return multiprocessing.get_logger()
  41. attr_defaults = {"registry": lambda: {},
  42. "schedule": lambda: {},
  43. "interval": lambda: self.interval,
  44. "logger": _get_default_logger}
  45. for attr_name, attr_default_gen in attr_defaults.items():
  46. if attr_name in kwargs:
  47. attr_value = kwargs[attr_name]
  48. else:
  49. attr_value = attr_default_gen()
  50. setattr(self, attr_name, attr_value)
  51. self.cleanup()
  52. self.schedule_registry()
  53. def tick(self):
  54. """Run a tick, that is one iteration of the scheduler.
  55. Executes all due tasks."""
  56. for entry in self.get_due_tasks():
  57. self.logger.debug("Scheduler: Sending due task %s" % (
  58. entry.name))
  59. result = self.apply_async(entry)
  60. self.logger.debug("Scheduler: %s sent. id->%s" % (
  61. entry.name, result.task_id))
  62. def get_due_tasks(self):
  63. """Get all the schedule entries that are due to execution."""
  64. return filter(self.is_due, self.schedule.values())
  65. def get_task(self, name):
  66. try:
  67. return self.registry[name]
  68. except KeyError:
  69. raise NotRegistered(name)
  70. def is_due(self, entry):
  71. return entry.is_due(self.get_task(entry.name).run_every)
  72. def apply_async(self, entry):
  73. # Update timestamps and run counts before we actually execute,
  74. # so we have that done if an exception is raised (doesn't schedule
  75. # forever.)
  76. entry = self.schedule[entry.name] = entry.next()
  77. task = self.get_task(entry.name)
  78. try:
  79. result = task.apply_async()
  80. except Exception, exc:
  81. raise SchedulingError(
  82. "Couldn't apply scheduled task %s: %s" % (
  83. task.name, exc))
  84. return result
  85. def schedule_registry(self):
  86. """Add the current contents of the registry to the schedule."""
  87. periodic_tasks = self.registry.get_all_periodic()
  88. for name, task in self.registry.get_all_periodic().items():
  89. if name not in self.schedule:
  90. self.logger.debug(
  91. "Scheduler: Adding periodic task %s to schedule" % (
  92. task.name))
  93. self.schedule.setdefault(name, ScheduleEntry(task.name))
  94. def cleanup(self):
  95. for task_name, entry in self.schedule.items():
  96. if task_name not in self.registry:
  97. self.schedule.pop(task_name, None)
  98. @property
  99. def schedule(self):
  100. return self.data
  101. class ClockService(object):
  102. scheduler_cls = Scheduler
  103. schedule_filename = conf.CELERYBEAT_SCHEDULE_FILENAME
  104. registry = registry.tasks
  105. def __init__(self, logger=None, is_detached=False):
  106. self.logger = logger
  107. self._shutdown = threading.Event()
  108. self._stopped = threading.Event()
  109. def start(self):
  110. self.logger.info("ClockService: Starting...")
  111. schedule = shelve.open(filename=self.schedule_filename)
  112. #atexit.register(schedule.close)
  113. scheduler = self.scheduler_cls(schedule=schedule,
  114. registry=self.registry,
  115. logger=self.logger)
  116. self.logger.debug(
  117. "ClockService: Ticking with interval->%d, schedule->%s" % (
  118. scheduler.interval, self.schedule_filename))
  119. synced = [False]
  120. def _stop():
  121. if not synced[0]:
  122. self.logger.debug("ClockService: Syncing schedule to disk...")
  123. schedule.sync()
  124. schedule.close()
  125. synced[0] = True
  126. self._stopped.set()
  127. try:
  128. while True:
  129. if self._shutdown.isSet():
  130. break
  131. scheduler.tick()
  132. time.sleep(scheduler.interval)
  133. except (KeyboardInterrupt, SystemExit):
  134. _stop()
  135. finally:
  136. _stop()
  137. def stop(self, wait=False):
  138. self._shutdown.set()
  139. wait and self._stopped.wait() # block until shutdown done.
  140. class ClockServiceThread(threading.Thread):
  141. def __init__(self, *args, **kwargs):
  142. self.clockservice = ClockService(*args, **kwargs)
  143. threading.Thread.__init__(self)
  144. self.setDaemon(True)
  145. def run(self):
  146. self.clockservice.start()
  147. def stop(self):
  148. self.clockservice.stop(wait=True)