beat.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.beat
  4. ~~~~~~~~~~~
  5. The Celery periodic task scheduler.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. import errno
  11. import os
  12. import time
  13. import shelve
  14. import sys
  15. import threading
  16. import traceback
  17. try:
  18. import multiprocessing
  19. except ImportError:
  20. multiprocessing = None # noqa
  21. from kombu.utils import reprcall
  22. from kombu.utils.functional import maybe_promise
  23. from . import __version__
  24. from . import platforms
  25. from . import signals
  26. from . import current_app
  27. from .app import app_or_default
  28. from .schedules import maybe_schedule, crontab
  29. from .utils import cached_property
  30. from .utils.imports import instantiate
  31. from .utils.timeutils import humanize_seconds
  32. class SchedulingError(Exception):
  33. """An error occured while scheduling a task."""
  34. class ScheduleEntry(object):
  35. """An entry in the scheduler.
  36. :keyword name: see :attr:`name`.
  37. :keyword schedule: see :attr:`schedule`.
  38. :keyword args: see :attr:`args`.
  39. :keyword kwargs: see :attr:`kwargs`.
  40. :keyword options: see :attr:`options`.
  41. :keyword last_run_at: see :attr:`last_run_at`.
  42. :keyword total_run_count: see :attr:`total_run_count`.
  43. :keyword relative: Is the time relative to when the server starts?
  44. """
  45. #: The task name
  46. name = None
  47. #: The schedule (run_every/crontab)
  48. schedule = None
  49. #: Positional arguments to apply.
  50. args = None
  51. #: Keyword arguments to apply.
  52. kwargs = None
  53. #: Task execution options.
  54. options = None
  55. #: The time and date of when this task was last scheduled.
  56. last_run_at = None
  57. #: Total number of times this task has been scheduled.
  58. total_run_count = 0
  59. def __init__(self, name=None, task=None, last_run_at=None,
  60. total_run_count=None, schedule=None, args=(), kwargs={},
  61. options={}, relative=False):
  62. self.name = name
  63. self.task = task
  64. self.args = args
  65. self.kwargs = kwargs
  66. self.options = options
  67. self.schedule = maybe_schedule(schedule, relative)
  68. self.last_run_at = last_run_at or self._default_now()
  69. self.total_run_count = total_run_count or 0
  70. def _default_now(self):
  71. return current_app.now()
  72. def _next_instance(self, last_run_at=None):
  73. """Returns a new instance of the same class, but with
  74. its date and count fields updated."""
  75. return self.__class__(**dict(self,
  76. last_run_at=last_run_at or self._default_now(),
  77. total_run_count=self.total_run_count + 1))
  78. __next__ = next = _next_instance # for 2to3
  79. def update(self, other):
  80. """Update values from another entry.
  81. Does only update "editable" fields (task, schedule, args, kwargs,
  82. options).
  83. """
  84. self.__dict__.update({"task": other.task, "schedule": other.schedule,
  85. "args": other.args, "kwargs": other.kwargs,
  86. "options": other.options})
  87. def is_due(self):
  88. """See :meth:`~celery.schedule.schedule.is_due`."""
  89. return self.schedule.is_due(self.last_run_at)
  90. def __iter__(self):
  91. return vars(self).iteritems()
  92. def __repr__(self):
  93. return ("<Entry: %s %s {%s}" % (self.name,
  94. reprcall(self.task, self.args or (), self.kwargs or {}),
  95. self.schedule))
  96. class Scheduler(object):
  97. """Scheduler for periodic tasks.
  98. :keyword schedule: see :attr:`schedule`.
  99. :keyword logger: see :attr:`logger`.
  100. :keyword max_interval: see :attr:`max_interval`.
  101. """
  102. Entry = ScheduleEntry
  103. #: The schedule dict/shelve.
  104. schedule = None
  105. #: Current logger.
  106. logger = None
  107. #: Maximum time to sleep between re-checking the schedule.
  108. max_interval = 1
  109. #: How often to sync the schedule (3 minutes by default)
  110. sync_every = 3 * 60
  111. _last_sync = None
  112. def __init__(self, schedule=None, logger=None, max_interval=None,
  113. app=None, Publisher=None, lazy=False, **kwargs):
  114. app = self.app = app_or_default(app)
  115. self.data = maybe_promise({} if schedule is None else schedule)
  116. self.logger = logger or app.log.get_default_logger(name="celery.beat")
  117. self.max_interval = max_interval or \
  118. app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
  119. self.Publisher = Publisher or app.amqp.TaskPublisher
  120. if not lazy:
  121. self.setup_schedule()
  122. def install_default_entries(self, data):
  123. entries = {}
  124. if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
  125. if "celery.backend_cleanup" not in data:
  126. entries["celery.backend_cleanup"] = {
  127. "task": "celery.backend_cleanup",
  128. "schedule": crontab("0", "4", "*"),
  129. "options": {"expires": 12 * 3600}}
  130. self.update_from_dict(entries)
  131. def maybe_due(self, entry, publisher=None):
  132. is_due, next_time_to_run = entry.is_due()
  133. if is_due:
  134. self.logger.info("Scheduler: Sending due task %s", entry.task)
  135. try:
  136. result = self.apply_async(entry, publisher=publisher)
  137. except Exception, exc:
  138. self.logger.error("Message Error: %s\n%s", exc,
  139. traceback.format_stack(),
  140. exc_info=True)
  141. else:
  142. self.logger.debug("%s sent. id->%s", entry.task, result.id)
  143. return next_time_to_run
  144. def tick(self):
  145. """Run a tick, that is one iteration of the scheduler.
  146. Executes all due tasks.
  147. """
  148. remaining_times = []
  149. try:
  150. for entry in self.schedule.itervalues():
  151. next_time_to_run = self.maybe_due(entry, self.publisher)
  152. if next_time_to_run:
  153. remaining_times.append(next_time_to_run)
  154. except RuntimeError:
  155. pass
  156. return min(remaining_times + [self.max_interval])
  157. def should_sync(self):
  158. return (not self._last_sync or
  159. (time.time() - self._last_sync) > self.sync_every)
  160. def reserve(self, entry):
  161. new_entry = self.schedule[entry.name] = entry.next()
  162. return new_entry
  163. def apply_async(self, entry, publisher=None, **kwargs):
  164. # Update timestamps and run counts before we actually execute,
  165. # so we have that done if an exception is raised (doesn't schedule
  166. # forever.)
  167. entry = self.reserve(entry)
  168. task = self.app.tasks.get(entry.task)
  169. try:
  170. if task:
  171. result = task.apply_async(entry.args, entry.kwargs,
  172. publisher=publisher,
  173. **entry.options)
  174. else:
  175. result = self.send_task(entry.task, entry.args, entry.kwargs,
  176. publisher=publisher,
  177. **entry.options)
  178. except Exception, exc:
  179. raise SchedulingError, SchedulingError(
  180. "Couldn't apply scheduled task %s: %s" % (
  181. entry.name, exc)), sys.exc_info()[2]
  182. if self.should_sync():
  183. self._do_sync()
  184. return result
  185. def send_task(self, *args, **kwargs): # pragma: no cover
  186. return self.app.send_task(*args, **kwargs)
  187. def setup_schedule(self):
  188. self.install_default_entries(self.data)
  189. def _do_sync(self):
  190. try:
  191. self.logger.debug("Celerybeat: Synchronizing schedule...")
  192. self.sync()
  193. finally:
  194. self._last_sync = time.time()
  195. def sync(self):
  196. pass
  197. def close(self):
  198. self.sync()
  199. def add(self, **kwargs):
  200. entry = self.Entry(**kwargs)
  201. self.schedule[entry.name] = entry
  202. return entry
  203. def _maybe_entry(self, name, entry):
  204. if isinstance(entry, self.Entry):
  205. return entry
  206. return self.Entry(**dict(entry, name=name))
  207. def update_from_dict(self, dict_):
  208. self.schedule.update(dict((name, self._maybe_entry(name, entry))
  209. for name, entry in dict_.items()))
  210. def merge_inplace(self, b):
  211. schedule = self.schedule
  212. A, B = set(schedule), set(b)
  213. # Remove items from disk not in the schedule anymore.
  214. for key in A ^ B:
  215. schedule.pop(key, None)
  216. # Update and add new items in the schedule
  217. for key in B:
  218. entry = self.Entry(**dict(b[key], name=key))
  219. if schedule.get(key):
  220. schedule[key].update(entry)
  221. else:
  222. schedule[key] = entry
  223. def get_schedule(self):
  224. return self.data
  225. def set_schedule(self, schedule):
  226. self.data = schedule
  227. def _ensure_connected(self):
  228. # callback called for each retry while the connection
  229. # can't be established.
  230. def _error_handler(exc, interval):
  231. self.logger.error("Celerybeat: Connection error: %s. "
  232. "Trying again in %s seconds...", exc, interval)
  233. return self.connection.ensure_connection(_error_handler,
  234. self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
  235. @cached_property
  236. def connection(self):
  237. return self.app.broker_connection()
  238. @cached_property
  239. def publisher(self):
  240. return self.Publisher(connection=self._ensure_connected())
  241. @property
  242. def schedule(self):
  243. return self.get_schedule()
  244. @property
  245. def info(self):
  246. return ""
  247. class PersistentScheduler(Scheduler):
  248. persistence = shelve
  249. _store = None
  250. def __init__(self, *args, **kwargs):
  251. self.schedule_filename = kwargs.get("schedule_filename")
  252. Scheduler.__init__(self, *args, **kwargs)
  253. def _remove_db(self):
  254. for suffix in "", ".db", ".dat", ".bak", ".dir":
  255. try:
  256. os.remove(self.schedule_filename + suffix)
  257. except OSError, exc:
  258. if exc.errno != errno.ENOENT:
  259. raise
  260. def setup_schedule(self):
  261. try:
  262. self._store = self.persistence.open(self.schedule_filename,
  263. writeback=True)
  264. entries = self._store.setdefault("entries", {})
  265. except Exception, exc:
  266. self.logger.error("Removing corrupted schedule file %r: %r",
  267. self.schedule_filename, exc, exc_info=True)
  268. self._remove_db()
  269. self._store = self.persistence.open(self.schedule_filename,
  270. writeback=True)
  271. else:
  272. if "__version__" not in self._store:
  273. self._store.clear() # remove schedule at 2.2.2 upgrade.
  274. entries = self._store.setdefault("entries", {})
  275. self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
  276. self.install_default_entries(self.schedule)
  277. self._store["__version__"] = __version__
  278. self.sync()
  279. self.logger.debug("Current schedule:\n" +
  280. "\n".join(repr(entry)
  281. for entry in entries.itervalues()))
  282. def get_schedule(self):
  283. return self._store["entries"]
  284. def sync(self):
  285. if self._store is not None:
  286. self._store.sync()
  287. def close(self):
  288. self.sync()
  289. self._store.close()
  290. @property
  291. def info(self):
  292. return " . db -> %s" % (self.schedule_filename, )
  293. class Service(object):
  294. scheduler_cls = PersistentScheduler
  295. def __init__(self, logger=None, max_interval=None, schedule_filename=None,
  296. scheduler_cls=None, app=None):
  297. app = self.app = app_or_default(app)
  298. self.max_interval = max_interval or \
  299. app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
  300. self.scheduler_cls = scheduler_cls or self.scheduler_cls
  301. self.logger = logger or app.log.get_default_logger(name="celery.beat")
  302. self.schedule_filename = schedule_filename or \
  303. app.conf.CELERYBEAT_SCHEDULE_FILENAME
  304. self._is_shutdown = threading.Event()
  305. self._is_stopped = threading.Event()
  306. def start(self, embedded_process=False):
  307. self.logger.info("Celerybeat: Starting...")
  308. self.logger.debug("Celerybeat: Ticking with max interval->%s",
  309. humanize_seconds(self.scheduler.max_interval))
  310. signals.beat_init.send(sender=self)
  311. if embedded_process:
  312. signals.beat_embedded_init.send(sender=self)
  313. platforms.set_process_title("celerybeat")
  314. try:
  315. while not self._is_shutdown.isSet():
  316. interval = self.scheduler.tick()
  317. self.logger.debug("Celerybeat: Waking up %s.",
  318. humanize_seconds(interval, prefix="in "))
  319. time.sleep(interval)
  320. except (KeyboardInterrupt, SystemExit):
  321. self._is_shutdown.set()
  322. finally:
  323. self.sync()
  324. def sync(self):
  325. self.scheduler.close()
  326. self._is_stopped.set()
  327. def stop(self, wait=False):
  328. self.logger.info("Celerybeat: Shutting down...")
  329. self._is_shutdown.set()
  330. wait and self._is_stopped.wait() # block until shutdown done.
  331. def get_scheduler(self, lazy=False):
  332. filename = self.schedule_filename
  333. scheduler = instantiate(self.scheduler_cls,
  334. app=self.app,
  335. schedule_filename=filename,
  336. logger=self.logger,
  337. max_interval=self.max_interval,
  338. lazy=lazy)
  339. return scheduler
  340. @cached_property
  341. def scheduler(self):
  342. return self.get_scheduler()
  343. class _Threaded(threading.Thread):
  344. """Embedded task scheduler using threading."""
  345. def __init__(self, *args, **kwargs):
  346. super(_Threaded, self).__init__()
  347. self.service = Service(*args, **kwargs)
  348. self.setDaemon(True)
  349. self.setName("Beat")
  350. def run(self):
  351. self.service.start()
  352. def stop(self):
  353. self.service.stop(wait=True)
  354. if multiprocessing is not None:
  355. class _Process(multiprocessing.Process):
  356. """Embedded task scheduler using multiprocessing."""
  357. def __init__(self, *args, **kwargs):
  358. super(_Process, self).__init__()
  359. self.service = Service(*args, **kwargs)
  360. self.name = "Beat"
  361. def run(self):
  362. platforms.signals.reset("SIGTERM")
  363. self.service.start(embedded_process=True)
  364. def stop(self):
  365. self.service.stop()
  366. self.terminate()
  367. else:
  368. _Process = None
  369. def EmbeddedService(*args, **kwargs):
  370. """Return embedded clock service.
  371. :keyword thread: Run threaded instead of as a separate process.
  372. Default is :const:`False`.
  373. """
  374. if kwargs.pop("thread", False) or _Process is None:
  375. # Need short max interval to be able to stop thread
  376. # in reasonable time.
  377. kwargs.setdefault("max_interval", 1)
  378. return _Threaded(*args, **kwargs)
  379. return _Process(*args, **kwargs)