beat.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. """
  2. Periodic Task Scheduler
  3. """
  4. import time
  5. import shelve
  6. import sys
  7. import threading
  8. import traceback
  9. try:
  10. import multiprocessing
  11. except ImportError:
  12. multiprocessing = None
  13. from datetime import datetime
  14. from celery import platforms
  15. from celery import registry
  16. from celery import signals
  17. from celery.app import app_or_default
  18. from celery.log import SilenceRepeated
  19. from celery.schedules import maybe_schedule, crontab
  20. from celery.utils import cached_property, instantiate, maybe_promise
  21. from celery.utils.compat import UserDict
  22. from celery.utils.timeutils import humanize_seconds
  23. class SchedulingError(Exception):
  24. """An error occured while scheduling a task."""
  25. class ScheduleEntry(object):
  26. """An entry in the scheduler.
  27. :param name: see :attr:`name`.
  28. :param schedule: see :attr:`schedule`.
  29. :param args: see :attr:`args`.
  30. :param kwargs: see :attr:`kwargs`.
  31. :keyword last_run_at: see :attr:`last_run_at`.
  32. :keyword total_run_count: see :attr:`total_run_count`.
  33. .. attribute:: name
  34. The task name.
  35. .. attribute:: schedule
  36. The schedule (run_every/crontab)
  37. .. attribute:: args
  38. Args to apply.
  39. .. attribute:: kwargs
  40. Keyword arguments to apply.
  41. .. attribute:: last_run_at
  42. The time and date of when this task was last run.
  43. .. attribute:: total_run_count
  44. Total number of times this periodic task has been executed.
  45. """
  46. def __init__(self, name=None, task=None, last_run_at=None,
  47. total_run_count=None, schedule=None, args=(), kwargs={},
  48. options={}, relative=False):
  49. self.name = name
  50. self.task = task
  51. self.schedule = maybe_schedule(schedule, relative)
  52. self.args = args
  53. self.kwargs = kwargs
  54. self.options = options
  55. self.last_run_at = last_run_at or datetime.now()
  56. self.total_run_count = total_run_count or 0
  57. def next(self, last_run_at=None):
  58. """Returns a new instance of the same class, but with
  59. its date and count fields updated."""
  60. last_run_at = last_run_at or datetime.now()
  61. total_run_count = self.total_run_count + 1
  62. return self.__class__(**dict(self,
  63. last_run_at=last_run_at,
  64. total_run_count=total_run_count))
  65. def update(self, other):
  66. """Update values from another entry.
  67. Does only update "editable" fields (schedule, args,
  68. kwargs, options).
  69. """
  70. self.task = other.task
  71. self.schedule = other.schedule
  72. self.args = other.args
  73. self.kwargs = other.kwargs
  74. self.options = other.options
  75. def is_due(self):
  76. """See :meth:`celery.task.base.PeriodicTask.is_due`."""
  77. return self.schedule.is_due(self.last_run_at)
  78. def __iter__(self):
  79. return vars(self).iteritems()
  80. def __repr__(self):
  81. return "<Entry: %s %s(*%s, **%s) {%s}>" % (self.name,
  82. self.task,
  83. self.args,
  84. self.kwargs,
  85. self.schedule)
  86. class Scheduler(UserDict):
  87. """Scheduler for periodic tasks.
  88. :keyword schedule: see :attr:`schedule`.
  89. :keyword logger: see :attr:`logger`.
  90. :keyword max_interval: see :attr:`max_interval`.
  91. .. attribute:: schedule
  92. The schedule dict/shelve.
  93. .. attribute:: logger
  94. The logger to use.
  95. .. attribute:: max_interval
  96. Maximum time to sleep between re-checking the schedule.
  97. """
  98. Entry = ScheduleEntry
  99. def __init__(self, schedule=None, logger=None, max_interval=None,
  100. app=None, Publisher=None, lazy=False, **kwargs):
  101. UserDict.__init__(self)
  102. if schedule is None:
  103. schedule = {}
  104. self.app = app_or_default(app)
  105. conf = self.app.conf
  106. self.data = self.install_default_entries(schedule)
  107. self.logger = logger or self.app.log.get_default_logger(
  108. name="celery.beat")
  109. self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
  110. self.Publisher = Publisher or self.app.amqp.TaskPublisher
  111. if not lazy:
  112. self.setup_schedule()
  113. def install_default_entries(self, schedule):
  114. schedule = maybe_promise(schedule)
  115. if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
  116. schedule.setdefault("celery.backend_cleanup",
  117. self.Entry(task="celery.backend_cleanup",
  118. schedule=crontab("0", "4", "*"),
  119. options={"expires": 12 * 3600}))
  120. return schedule
  121. def maybe_due(self, entry, publisher=None):
  122. is_due, next_time_to_run = entry.is_due()
  123. if is_due:
  124. self.logger.debug("Scheduler: Sending due task %s" % entry.task)
  125. try:
  126. result = self.apply_async(entry, publisher=publisher)
  127. except Exception, exc:
  128. self.logger.error("Message Error: %s\n%s" % (exc,
  129. traceback.format_stack()), exc_info=sys.exc_info())
  130. else:
  131. self.logger.debug("%s sent. id->%s" % (entry.task,
  132. result.task_id))
  133. return next_time_to_run
  134. def tick(self):
  135. """Run a tick, that is one iteration of the scheduler.
  136. Executes all due tasks.
  137. """
  138. remaining_times = []
  139. try:
  140. for entry in self.schedule.itervalues():
  141. next_time_to_run = self.maybe_due(entry, self.publisher)
  142. if next_time_to_run:
  143. remaining_times.append(next_time_to_run)
  144. except RuntimeError:
  145. pass
  146. return min(remaining_times + [self.max_interval])
  147. def reserve(self, entry):
  148. new_entry = self.schedule[entry.name] = entry.next()
  149. return new_entry
  150. def apply_async(self, entry, publisher=None, **kwargs):
  151. # Update timestamps and run counts before we actually execute,
  152. # so we have that done if an exception is raised (doesn't schedule
  153. # forever.)
  154. entry = self.reserve(entry)
  155. try:
  156. task = registry.tasks[entry.task]
  157. except KeyError:
  158. task = None
  159. try:
  160. if task:
  161. result = task.apply_async(entry.args, entry.kwargs,
  162. publisher=publisher,
  163. **entry.options)
  164. else:
  165. result = self.send_task(entry.task, entry.args, entry.kwargs,
  166. publisher=publisher,
  167. **entry.options)
  168. except Exception, exc:
  169. raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
  170. entry.name, exc))
  171. return result
  172. def send_task(self, *args, **kwargs): # pragma: no cover
  173. return self.app.send_task(*args, **kwargs)
  174. def setup_schedule(self):
  175. pass
  176. def sync(self):
  177. pass
  178. def close(self):
  179. self.sync()
  180. def add(self, **kwargs):
  181. entry = self.Entry(**kwargs)
  182. self.schedule[entry.name] = entry
  183. return entry
  184. def _maybe_entry(self, name, entry):
  185. if isinstance(entry, self.Entry):
  186. return entry
  187. return self.Entry(name, **entry)
  188. def update_from_dict(self, dict_):
  189. self.update(dict((name, self._maybe_entry(name, entry))
  190. for name, entry in dict_.items()))
  191. def merge_inplace(self, b):
  192. A, B = set(self.keys()), set(b.keys())
  193. # Remove items from disk not in the schedule anymore.
  194. for key in A ^ B:
  195. self.pop(key, None)
  196. # Update and add new items in the schedule
  197. for key in B:
  198. entry = self.Entry(**dict(b[key]))
  199. if self.get(key):
  200. self[key].update(entry)
  201. else:
  202. self[key] = entry
  203. def get_schedule(self):
  204. return self.data
  205. @cached_property
  206. def connection(self):
  207. return self.app.broker_connection()
  208. @cached_property
  209. def publisher(self):
  210. return self.Publisher(connection=self.connection)
  211. @property
  212. def schedule(self):
  213. return self.get_schedule()
  214. @property
  215. def info(self):
  216. return ""
  217. class PersistentScheduler(Scheduler):
  218. persistence = shelve
  219. _store = None
  220. def __init__(self, *args, **kwargs):
  221. self.schedule_filename = kwargs.get("schedule_filename")
  222. Scheduler.__init__(self, *args, **kwargs)
  223. def setup_schedule(self):
  224. self._store = self.persistence.open(self.schedule_filename)
  225. self.data = self._store
  226. self.merge_inplace(self.install_default_entries(
  227. self.app.conf.CELERYBEAT_SCHEDULE))
  228. self.sync()
  229. self.data = self._store
  230. def sync(self):
  231. if self._store is not None:
  232. self.logger.debug("CeleryBeat: Syncing schedule to disk...")
  233. self._store.sync()
  234. def close(self):
  235. self.sync()
  236. self._store.close()
  237. @property
  238. def info(self):
  239. return " . db -> %s" % (self.schedule_filename, )
  240. class Service(object):
  241. scheduler_cls = PersistentScheduler
  242. def __init__(self, logger=None,
  243. max_interval=None, schedule=None, schedule_filename=None,
  244. scheduler_cls=None, app=None):
  245. self.app = app_or_default(app)
  246. self.max_interval = max_interval or \
  247. self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
  248. self.scheduler_cls = scheduler_cls or self.scheduler_cls
  249. self.logger = logger or self.app.log.get_default_logger(
  250. name="celery.beat")
  251. self.schedule = schedule or self.app.conf.CELERYBEAT_SCHEDULE
  252. self.schedule_filename = schedule_filename or \
  253. self.app.conf.CELERYBEAT_SCHEDULE_FILENAME
  254. self._shutdown = threading.Event()
  255. self._stopped = threading.Event()
  256. silence = self.max_interval < 60 and 10 or 1
  257. self.debug = SilenceRepeated(self.logger.debug,
  258. max_iterations=silence)
  259. def start(self, embedded_process=False):
  260. self.logger.info("Celerybeat: Starting...")
  261. self.logger.debug("Celerybeat: "
  262. "Ticking with max interval->%s" % (
  263. humanize_seconds(self.scheduler.max_interval)))
  264. signals.beat_init.send(sender=self)
  265. if embedded_process:
  266. signals.beat_embedded_init.send(sender=self)
  267. platforms.set_process_title("celerybeat")
  268. try:
  269. try:
  270. while not self._shutdown.isSet():
  271. interval = self.scheduler.tick()
  272. self.debug("Celerybeat: Waking up %s." % (
  273. humanize_seconds(interval, prefix="in ")))
  274. time.sleep(interval)
  275. except (KeyboardInterrupt, SystemExit):
  276. self._shutdown.set()
  277. finally:
  278. self.sync()
  279. def sync(self):
  280. self.scheduler.close()
  281. self._stopped.set()
  282. def stop(self, wait=False):
  283. self.logger.info("Celerybeat: Shutting down...")
  284. self._shutdown.set()
  285. wait and self._stopped.wait() # block until shutdown done.
  286. def get_scheduler(self, lazy=False):
  287. filename = self.schedule_filename
  288. scheduler = instantiate(self.scheduler_cls,
  289. app=self.app,
  290. schedule_filename=filename,
  291. logger=self.logger,
  292. max_interval=self.max_interval,
  293. lazy=lazy)
  294. if not lazy:
  295. scheduler.update_from_dict(self.schedule)
  296. return scheduler
  297. @cached_property
  298. def scheduler(self):
  299. return self.get_scheduler()
  300. class _Threaded(threading.Thread):
  301. """Embedded task scheduler using threading."""
  302. def __init__(self, *args, **kwargs):
  303. super(_Threaded, self).__init__()
  304. self.service = Service(*args, **kwargs)
  305. self.setDaemon(True)
  306. self.setName("Beat")
  307. def run(self):
  308. self.service.start()
  309. def stop(self):
  310. self.service.stop(wait=True)
  311. if multiprocessing is not None:
  312. class _Process(multiprocessing.Process):
  313. """Embedded task scheduler using multiprocessing."""
  314. def __init__(self, *args, **kwargs):
  315. super(_Process, self).__init__()
  316. self.service = Service(*args, **kwargs)
  317. self.name = "Beat"
  318. def run(self):
  319. platforms.reset_signal("SIGTERM")
  320. self.service.start(embedded_process=True)
  321. def stop(self):
  322. self.service.stop()
  323. self.terminate()
  324. else:
  325. _Process = None
  326. def EmbeddedService(*args, **kwargs):
  327. """Return embedded clock service.
  328. :keyword thread: Run threaded instead of as a separate process.
  329. Default is :const:`False`.
  330. """
  331. if kwargs.pop("thread", False) or _Process is None:
  332. # Need short max interval to be able to stop thread
  333. # in reasonable time.
  334. kwargs.setdefault("max_interval", 1)
  335. return _Threaded(*args, **kwargs)
  336. return _Process(*args, **kwargs)