beat.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. """
  2. Periodic Task Scheduler
  3. """
  4. import time
  5. import shelve
  6. import threading
  7. import multiprocessing
  8. from datetime import datetime, timedelta
  9. from UserDict import UserDict
  10. from celery import log
  11. from celery import conf
  12. from celery import platform
  13. from celery.execute import send_task
  14. from celery.schedules import schedule
  15. from celery.messaging import establish_connection
  16. from celery.utils import instantiate
  17. from celery.utils.info import humanize_seconds
  18. class SchedulingError(Exception):
  19. """An error occured while scheduling a task."""
  20. def maybe_schedule(s, relative=False):
  21. if isinstance(s, int):
  22. s = timedelta(seconds=s)
  23. if isinstance(s, timedelta):
  24. return schedule(s, relative)
  25. return s
  26. class ScheduleEntry(object):
  27. """An entry in the scheduler.
  28. :param name: see :attr:`name`.
  29. :param schedule: see :attr:`schedule`.
  30. :param args: see :attr:`args`.
  31. :param kwargs: see :attr:`kwargs`.
  32. :keyword last_run_at: see :attr:`last_run_at`.
  33. :keyword total_run_count: see :attr:`total_run_count`.
  34. .. attribute:: name
  35. The task name.
  36. .. attribute:: schedule
  37. The schedule (run_every/crontab)
  38. .. attribute:: args
  39. Args to apply.
  40. .. attribute:: kwargs
  41. Keyword arguments to apply.
  42. .. attribute:: last_run_at
  43. The time and date of when this task was last run.
  44. .. attribute:: total_run_count
  45. Total number of times this periodic task has been executed.
  46. """
  47. def __init__(self, name, schedule, args=(), kwargs={},
  48. options={}, last_run_at=None, total_run_count=None,
  49. relative=False):
  50. self.name = name
  51. self.schedule = maybe_schedule(schedule, relative)
  52. self.args = args
  53. self.kwargs = kwargs
  54. self.options = options
  55. self.last_run_at = last_run_at or datetime.now()
  56. self.total_run_count = total_run_count or 0
  57. def next(self):
  58. """Returns a new instance of the same class, but with
  59. its date and count fields updated."""
  60. return self.__class__(self.name,
  61. self.schedule,
  62. self.args,
  63. self.kwargs,
  64. self.options,
  65. datetime.now(),
  66. self.total_run_count + 1)
  67. def update(self, other):
  68. """Update values from another entry.
  69. Does only update "editable" fields (schedule, args,
  70. kwargs, options).
  71. """
  72. self.schedule = other.schedule
  73. self.args = other.args
  74. self.kwargs = other.kwargs
  75. self.options = other.options
  76. def is_due(self):
  77. """See :meth:`celery.task.base.PeriodicTask.is_due`."""
  78. return self.schedule.is_due(self.last_run_at)
  79. def __repr__(self):
  80. return "<Entry: %s(*%s, **%s) {%s}>" % (self.name,
  81. self.args,
  82. self.kwargs,
  83. self.schedule)
  84. class Scheduler(UserDict):
  85. """Scheduler for periodic tasks.
  86. :keyword schedule: see :attr:`schedule`.
  87. :keyword logger: see :attr:`logger`.
  88. :keyword max_interval: see :attr:`max_interval`.
  89. .. attribute:: schedule
  90. The schedule dict/shelve.
  91. .. attribute:: logger
  92. The logger to use.
  93. .. attribute:: max_interval
  94. Maximum time to sleep between re-checking the schedule.
  95. """
  96. Entry = ScheduleEntry
  97. def __init__(self, schedule=None, logger=None, max_interval=None,
  98. **kwargs):
  99. UserDict.__init__(self)
  100. if schedule is None:
  101. schedule = self.dict_to_entries(conf.CELERYBEAT_SCHEDULE)
  102. self.data = schedule
  103. self.logger = logger or log.get_default_logger("celery.beat")
  104. self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
  105. self.setup_schedule()
  106. def maybe_due(self, entry, connection=None):
  107. is_due, next_time_to_run = entry.is_due()
  108. if is_due:
  109. self.logger.debug("Scheduler: Sending due task %s" % entry.name)
  110. try:
  111. result = self.apply_async(entry, connection=connection)
  112. except SchedulingError, exc:
  113. self.logger.error("Scheduler: %s" % exc)
  114. else:
  115. self.logger.debug("%s sent. id->%s" % (entry.name,
  116. result.task_id))
  117. return next_time_to_run
  118. def tick(self):
  119. """Run a tick, that is one iteration of the scheduler.
  120. Executes all due tasks.
  121. """
  122. remaining_times = []
  123. connection = establish_connection()
  124. try:
  125. try:
  126. for entry in self.schedule.itervalues():
  127. next_time_to_run = self.maybe_due(entry, connection)
  128. if next_time_to_run:
  129. remaining_times.append(next_time_to_run)
  130. except RuntimeError:
  131. pass
  132. finally:
  133. connection.close()
  134. return min(remaining_times + [self.max_interval])
  135. def reserve(self, entry):
  136. new_entry = self[entry.name] = entry.next()
  137. return new_entry
  138. def apply_async(self, entry, **kwargs):
  139. # Update timestamps and run counts before we actually execute,
  140. # so we have that done if an exception is raised (doesn't schedule
  141. # forever.)
  142. entry = self.reserve(entry)
  143. try:
  144. result = send_task(entry.name, entry.args, entry.kwargs,
  145. **entry.options)
  146. except Exception, exc:
  147. raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
  148. entry.name, exc))
  149. return result
  150. def setup_schedule(self):
  151. pass
  152. def sync(self):
  153. pass
  154. def close(self):
  155. self.sync()
  156. def dict_to_entries(self, dict_):
  157. return dict((name, self.Entry(**entry))
  158. for name, entry in dict_.items())
  159. def get_schedule(self):
  160. return self.data
  161. def _set_schedule(self, schedule):
  162. self.data = schedule
  163. def _get_schedule(self):
  164. return self.get_schedule()
  165. schedule = property(_get_schedule, _set_schedule)
  166. class PersistentScheduler(Scheduler):
  167. persistence = shelve
  168. _store = None
  169. def __init__(self, *args, **kwargs):
  170. self.schedule_filename = kwargs.get("schedule_filename")
  171. Scheduler.__init__(self, *args, **kwargs)
  172. def setup_schedule(self):
  173. self._store = self.persistence.open(self.schedule_filename)
  174. self._diskmerge(self._store, conf.CELERYBEAT_SCHEDULE)
  175. self.sync()
  176. self.schedule = self._store
  177. def _diskmerge(self, a, b):
  178. A, B = set(a), set(b)
  179. # Remove items from disk not in the schedule anymore.
  180. for key in A ^ B:
  181. a.pop(key, None)
  182. # Update and add new items in the schedule
  183. for key in B:
  184. entry = self.Entry(**b[key])
  185. if a.get(key):
  186. a[key].update(entry)
  187. else:
  188. a[key] = entry
  189. def sync(self):
  190. if self._store is not None:
  191. self.logger.debug("CeleryBeat: Syncing schedule to disk...")
  192. self._store.sync()
  193. def close(self):
  194. self.sync()
  195. self._store.close()
  196. class Service(object):
  197. scheduler_cls = PersistentScheduler
  198. def __init__(self, logger=None,
  199. max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
  200. schedule=conf.CELERYBEAT_SCHEDULE,
  201. schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
  202. scheduler_cls=None):
  203. self.max_interval = max_interval
  204. self.scheduler_cls = scheduler_cls or self.scheduler_cls
  205. self.logger = logger or log.get_default_logger(name="celery.beat")
  206. self.schedule = schedule
  207. self.schedule_filename = schedule_filename
  208. self._scheduler = None
  209. self._shutdown = threading.Event()
  210. self._stopped = threading.Event()
  211. silence = self.max_interval < 60 and 10 or 1
  212. self.debug = log.SilenceRepeated(self.logger.debug,
  213. max_iterations=silence)
  214. def start(self, embedded_process=False):
  215. self.logger.info("Celerybeat: Starting...")
  216. self.logger.debug("Celerybeat: "
  217. "Ticking with max interval->%s" % (
  218. humanize_seconds(self.scheduler.max_interval)))
  219. if embedded_process:
  220. platform.set_process_title("celerybeat")
  221. try:
  222. try:
  223. while not self._shutdown.isSet():
  224. interval = self.scheduler.tick()
  225. self.debug("Celerybeat: Waking up %s." % (
  226. humanize_seconds(interval, prefix="in ")))
  227. time.sleep(interval)
  228. except (KeyboardInterrupt, SystemExit):
  229. self._shutdown.set()
  230. finally:
  231. self.sync()
  232. def sync(self):
  233. self.scheduler.close()
  234. self._stopped.set()
  235. def stop(self, wait=False):
  236. self.logger.info("Celerybeat: Shutting down...")
  237. self._shutdown.set()
  238. wait and self._stopped.wait() # block until shutdown done.
  239. @property
  240. def scheduler(self):
  241. if self._scheduler is None:
  242. filename = self.schedule_filename
  243. self._scheduler = instantiate(self.scheduler_cls,
  244. schedule=self.schedule,
  245. schedule_filename=filename,
  246. logger=self.logger,
  247. max_interval=self.max_interval)
  248. return self._scheduler
  249. class _Threaded(threading.Thread):
  250. """Embedded task scheduler using threading."""
  251. def __init__(self, *args, **kwargs):
  252. super(_Threaded, self).__init__()
  253. self.service = Service(*args, **kwargs)
  254. self.setDaemon(True)
  255. def run(self):
  256. self.service.start()
  257. def stop(self):
  258. self.service.stop(wait=True)
  259. class _Process(multiprocessing.Process):
  260. """Embedded task scheduler using multiprocessing."""
  261. def __init__(self, *args, **kwargs):
  262. super(_Process, self).__init__()
  263. self.service = Service(*args, **kwargs)
  264. def run(self):
  265. platform.reset_signal("SIGTERM")
  266. self.service.start(embedded_process=True)
  267. def stop(self):
  268. self.service.stop()
  269. self.terminate()
  270. def EmbeddedService(*args, **kwargs):
  271. """Return embedded clock service.
  272. :keyword thread: Run threaded instead of as a separate process.
  273. Default is ``False``.
  274. """
  275. if kwargs.pop("thread", False):
  276. # Need short max interval to be able to stop thread
  277. # in reasonable time.
  278. kwargs.setdefault("max_interval", 1)
  279. return _Threaded(*args, **kwargs)
  280. return _Process(*args, **kwargs)