beat.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. from __future__ import absolute_import
  2. import errno
  3. import os
  4. import time
  5. import shelve
  6. import sys
  7. import threading
  8. import traceback
  9. try:
  10. import multiprocessing
  11. except ImportError:
  12. multiprocessing = None # noqa
  13. from datetime import datetime
  14. from kombu.utils import cached_property
  15. from celery import __version__
  16. from celery import platforms
  17. from celery import registry
  18. from celery import signals
  19. from celery.app import app_or_default
  20. from celery.log import SilenceRepeated
  21. from celery.schedules import maybe_schedule, crontab
  22. from celery.utils import instantiate, maybe_promise
  23. from celery.utils.timeutils import humanize_seconds
  24. class SchedulingError(Exception):
  25. """An error occured while scheduling a task."""
  26. class ScheduleEntry(object):
  27. """An entry in the scheduler.
  28. :keyword name: see :attr:`name`.
  29. :keyword schedule: see :attr:`schedule`.
  30. :keyword args: see :attr:`args`.
  31. :keyword kwargs: see :attr:`kwargs`.
  32. :keyword options: see :attr:`options`.
  33. :keyword last_run_at: see :attr:`last_run_at`.
  34. :keyword total_run_count: see :attr:`total_run_count`.
  35. :keyword relative: Is the time relative to when the server starts?
  36. """
  37. #: The task name
  38. name = None
  39. #: The schedule (run_every/crontab)
  40. schedule = None
  41. #: Positional arguments to apply.
  42. args = None
  43. #: Keyword arguments to apply.
  44. kwargs = None
  45. #: Task execution options.
  46. options = None
  47. #: The time and date of when this task was last scheduled.
  48. last_run_at = None
  49. #: Total number of times this task has been scheduled.
  50. total_run_count = 0
  51. def __init__(self, name=None, task=None, last_run_at=None,
  52. total_run_count=None, schedule=None, args=(), kwargs={},
  53. options={}, relative=False):
  54. self.name = name
  55. self.task = task
  56. self.args = args
  57. self.kwargs = kwargs
  58. self.options = options
  59. self.schedule = maybe_schedule(schedule, relative)
  60. self.last_run_at = last_run_at or self._default_now()
  61. self.total_run_count = total_run_count or 0
  62. def _default_now(self):
  63. return datetime.now()
  64. def next(self, last_run_at=None):
  65. """Returns a new instance of the same class, but with
  66. its date and count fields updated."""
  67. return self.__class__(**dict(self,
  68. last_run_at=last_run_at or datetime.now(),
  69. total_run_count=self.total_run_count + 1))
  70. def update(self, other):
  71. """Update values from another entry.
  72. Does only update "editable" fields (task, schedule, args, kwargs,
  73. options).
  74. """
  75. self.__dict__.update({"task": other.task, "schedule": other.schedule,
  76. "args": other.args, "kwargs": other.kwargs,
  77. "options": other.options})
  78. def is_due(self):
  79. """See :meth:`celery.task.base.PeriodicTask.is_due`."""
  80. return self.schedule.is_due(self.last_run_at)
  81. def __iter__(self):
  82. return vars(self).iteritems()
  83. def __repr__(self):
  84. return ("<Entry: %(name)s %(task)s(*%(args)s, **%(kwargs)s) "
  85. "{%(schedule)s}>" % vars(self))
  86. class Scheduler(object):
  87. """Scheduler for periodic tasks.
  88. :keyword schedule: see :attr:`schedule`.
  89. :keyword logger: see :attr:`logger`.
  90. :keyword max_interval: see :attr:`max_interval`.
  91. """
  92. Entry = ScheduleEntry
  93. #: The schedule dict/shelve.
  94. schedule = None
  95. #: Current logger.
  96. logger = None
  97. #: Maximum time to sleep between re-checking the schedule.
  98. max_interval = 1
  99. #: How often to sync the schedule (3 minutes by default)
  100. sync_every = 3 * 60
  101. _last_sync = None
  102. def __init__(self, schedule=None, logger=None, max_interval=None,
  103. app=None, Publisher=None, lazy=False, **kwargs):
  104. app = self.app = app_or_default(app)
  105. self.data = maybe_promise({} if schedule is None else schedule)
  106. self.logger = logger or app.log.get_default_logger(name="celery.beat")
  107. self.max_interval = max_interval or \
  108. app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
  109. self.Publisher = Publisher or app.amqp.TaskPublisher
  110. if not lazy:
  111. self.setup_schedule()
  112. def install_default_entries(self, data):
  113. entries = {}
  114. if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
  115. if "celery.backend_cleanup" not in data:
  116. entries["celery.backend_cleanup"] = {
  117. "task": "celery.backend_cleanup",
  118. "schedule": crontab("0", "4", "*"),
  119. "options": {"expires": 12 * 3600}}
  120. self.update_from_dict(entries)
  121. def maybe_due(self, entry, publisher=None):
  122. is_due, next_time_to_run = entry.is_due()
  123. if is_due:
  124. self.logger.debug("Scheduler: Sending due task %s" % entry.task)
  125. try:
  126. result = self.apply_async(entry, publisher=publisher)
  127. except Exception, exc:
  128. self.logger.error("Message Error: %s\n%s" % (exc,
  129. traceback.format_stack()), exc_info=sys.exc_info())
  130. else:
  131. self.logger.debug("%s sent. id->%s" % (entry.task,
  132. result.task_id))
  133. return next_time_to_run
  134. def tick(self):
  135. """Run a tick, that is one iteration of the scheduler.
  136. Executes all due tasks.
  137. """
  138. remaining_times = []
  139. try:
  140. for entry in self.schedule.itervalues():
  141. next_time_to_run = self.maybe_due(entry, self.publisher)
  142. if next_time_to_run:
  143. remaining_times.append(next_time_to_run)
  144. except RuntimeError:
  145. pass
  146. return min(remaining_times + [self.max_interval])
  147. def should_sync(self):
  148. return (not self._last_sync or
  149. (time.time() - self._last_sync) > self.sync_every)
  150. def reserve(self, entry):
  151. new_entry = self.schedule[entry.name] = entry.next()
  152. return new_entry
  153. def apply_async(self, entry, publisher=None, **kwargs):
  154. # Update timestamps and run counts before we actually execute,
  155. # so we have that done if an exception is raised (doesn't schedule
  156. # forever.)
  157. entry = self.reserve(entry)
  158. task = registry.tasks.get(entry.task)
  159. try:
  160. if task:
  161. result = task.apply_async(entry.args, entry.kwargs,
  162. publisher=publisher,
  163. **entry.options)
  164. else:
  165. result = self.send_task(entry.task, entry.args, entry.kwargs,
  166. publisher=publisher,
  167. **entry.options)
  168. except Exception, exc:
  169. raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
  170. entry.name, exc))
  171. if self.should_sync():
  172. self._do_sync()
  173. return result
  174. def send_task(self, *args, **kwargs): # pragma: no cover
  175. return self.app.send_task(*args, **kwargs)
  176. def setup_schedule(self):
  177. self.install_default_entries(self.data)
  178. def _do_sync(self):
  179. try:
  180. self.logger.debug("Celerybeat: Synchronizing schedule...")
  181. self.sync()
  182. finally:
  183. self._last_sync = time.time()
  184. def sync(self):
  185. pass
  186. def close(self):
  187. self.sync()
  188. def add(self, **kwargs):
  189. entry = self.Entry(**kwargs)
  190. self.schedule[entry.name] = entry
  191. return entry
  192. def _maybe_entry(self, name, entry):
  193. if isinstance(entry, self.Entry):
  194. return entry
  195. return self.Entry(**dict(entry, name=name))
  196. def update_from_dict(self, dict_):
  197. self.schedule.update(dict((name, self._maybe_entry(name, entry))
  198. for name, entry in dict_.items()))
  199. def merge_inplace(self, b):
  200. schedule = self.schedule
  201. A, B = set(schedule.keys()), set(b.keys())
  202. # Remove items from disk not in the schedule anymore.
  203. for key in A ^ B:
  204. schedule.pop(key, None)
  205. # Update and add new items in the schedule
  206. for key in B:
  207. entry = self.Entry(**dict(b[key], name=key))
  208. if schedule.get(key):
  209. schedule[key].update(entry)
  210. else:
  211. schedule[key] = entry
  212. def get_schedule(self):
  213. return self.data
  214. def set_schedule(self, schedule):
  215. self.data = schedule
  216. @cached_property
  217. def connection(self):
  218. return self.app.broker_connection()
  219. @cached_property
  220. def publisher(self):
  221. return self.Publisher(connection=self.connection)
  222. @property
  223. def schedule(self):
  224. return self.get_schedule()
  225. @property
  226. def info(self):
  227. return ""
  228. class PersistentScheduler(Scheduler):
  229. persistence = shelve
  230. _store = None
  231. def __init__(self, *args, **kwargs):
  232. self.schedule_filename = kwargs.get("schedule_filename")
  233. Scheduler.__init__(self, *args, **kwargs)
  234. def _remove_db(self):
  235. for suffix in "", ".db", ".dat", ".bak", ".dir":
  236. try:
  237. os.remove(self.schedule_filename + suffix)
  238. except OSError, exc:
  239. if exc.errno != errno.ENOENT:
  240. raise
  241. def setup_schedule(self):
  242. try:
  243. self._store = self.persistence.open(self.schedule_filename,
  244. writeback=True)
  245. entries = self._store.setdefault("entries", {})
  246. except Exception, exc:
  247. self.logger.error("Removing corrupted schedule file %r: %r" % (
  248. self.schedule_filename, exc))
  249. self._remove_db()
  250. self._store = self.persistence.open(self.schedule_filename,
  251. writeback=True)
  252. else:
  253. if "__version__" not in self._store:
  254. self._store.clear() # remove schedule at 2.2.2 upgrade.
  255. entries = self._store.setdefault("entries", {})
  256. self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
  257. self.install_default_entries(self.schedule)
  258. self._store["__version__"] = __version__
  259. self.sync()
  260. self.logger.debug("Current schedule:\n" +
  261. "\n".join(repr(entry)
  262. for entry in entries.itervalues()))
  263. def get_schedule(self):
  264. return self._store["entries"]
  265. def sync(self):
  266. if self._store is not None:
  267. self._store.sync()
  268. def close(self):
  269. self.sync()
  270. self._store.close()
  271. @property
  272. def info(self):
  273. return " . db -> %s" % (self.schedule_filename, )
  274. class Service(object):
  275. scheduler_cls = PersistentScheduler
  276. def __init__(self, logger=None, max_interval=None, schedule_filename=None,
  277. scheduler_cls=None, app=None):
  278. app = self.app = app_or_default(app)
  279. self.max_interval = max_interval or \
  280. app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
  281. self.scheduler_cls = scheduler_cls or self.scheduler_cls
  282. self.logger = logger or app.log.get_default_logger(name="celery.beat")
  283. self.schedule_filename = schedule_filename or \
  284. app.conf.CELERYBEAT_SCHEDULE_FILENAME
  285. self._shutdown = threading.Event()
  286. self._stopped = threading.Event()
  287. self.debug = SilenceRepeated(self.logger.debug,
  288. 10 if self.max_interval < 60 else 1)
  289. def start(self, embedded_process=False):
  290. self.logger.info("Celerybeat: Starting...")
  291. self.logger.debug("Celerybeat: Ticking with max interval->%s" % (
  292. humanize_seconds(self.scheduler.max_interval)))
  293. signals.beat_init.send(sender=self)
  294. if embedded_process:
  295. signals.beat_embedded_init.send(sender=self)
  296. platforms.set_process_title("celerybeat")
  297. try:
  298. while not self._shutdown.isSet():
  299. interval = self.scheduler.tick()
  300. self.debug("Celerybeat: Waking up %s." % (
  301. humanize_seconds(interval, prefix="in ")))
  302. time.sleep(interval)
  303. except (KeyboardInterrupt, SystemExit):
  304. self._shutdown.set()
  305. finally:
  306. self.sync()
  307. def sync(self):
  308. self.scheduler.close()
  309. self._stopped.set()
  310. def stop(self, wait=False):
  311. self.logger.info("Celerybeat: Shutting down...")
  312. self._shutdown.set()
  313. wait and self._stopped.wait() # block until shutdown done.
  314. def get_scheduler(self, lazy=False):
  315. filename = self.schedule_filename
  316. scheduler = instantiate(self.scheduler_cls,
  317. app=self.app,
  318. schedule_filename=filename,
  319. logger=self.logger,
  320. max_interval=self.max_interval,
  321. lazy=lazy)
  322. return scheduler
  323. @cached_property
  324. def scheduler(self):
  325. return self.get_scheduler()
  326. class _Threaded(threading.Thread):
  327. """Embedded task scheduler using threading."""
  328. def __init__(self, *args, **kwargs):
  329. super(_Threaded, self).__init__()
  330. self.service = Service(*args, **kwargs)
  331. self.setDaemon(True)
  332. self.setName("Beat")
  333. def run(self):
  334. self.service.start()
  335. def stop(self):
  336. self.service.stop(wait=True)
  337. if multiprocessing is not None:
  338. class _Process(multiprocessing.Process):
  339. """Embedded task scheduler using multiprocessing."""
  340. def __init__(self, *args, **kwargs):
  341. super(_Process, self).__init__()
  342. self.service = Service(*args, **kwargs)
  343. self.name = "Beat"
  344. def run(self):
  345. platforms.signals.reset("SIGTERM")
  346. self.service.start(embedded_process=True)
  347. def stop(self):
  348. self.service.stop()
  349. self.terminate()
  350. else:
  351. _Process = None
  352. def EmbeddedService(*args, **kwargs):
  353. """Return embedded clock service.
  354. :keyword thread: Run threaded instead of as a separate process.
  355. Default is :const:`False`.
  356. """
  357. if kwargs.pop("thread", False) or _Process is None:
  358. # Need short max interval to be able to stop thread
  359. # in reasonable time.
  360. kwargs.setdefault("max_interval", 1)
  361. return _Threaded(*args, **kwargs)
  362. return _Process(*args, **kwargs)