beat.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.beat
  4. ~~~~~~~~~~~
  5. The periodic task scheduler.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import errno
  10. import os
  11. import time
  12. import shelve
  13. import sys
  14. import traceback
  15. from billiard import Process, ensure_multiprocessing
  16. from kombu.utils import cached_property, reprcall
  17. from kombu.utils.functional import maybe_promise
  18. from . import __version__
  19. from . import platforms
  20. from . import signals
  21. from . import current_app
  22. from .app import app_or_default
  23. from .schedules import maybe_schedule, crontab
  24. from .utils.imports import instantiate
  25. from .utils.threads import Event, Thread
  26. from .utils.timeutils import humanize_seconds
  27. from .utils.log import get_logger
  28. logger = get_logger(__name__)
  29. debug, info, error, warning = (logger.debug, logger.info,
  30. logger.error, logger.warning)
  31. DEFAULT_MAX_INTERVAL = 300 # 5 minutes
  32. class SchedulingError(Exception):
  33. """An error occured while scheduling a task."""
  34. class ScheduleEntry(object):
  35. """An entry in the scheduler.
  36. :keyword name: see :attr:`name`.
  37. :keyword schedule: see :attr:`schedule`.
  38. :keyword args: see :attr:`args`.
  39. :keyword kwargs: see :attr:`kwargs`.
  40. :keyword options: see :attr:`options`.
  41. :keyword last_run_at: see :attr:`last_run_at`.
  42. :keyword total_run_count: see :attr:`total_run_count`.
  43. :keyword relative: Is the time relative to when the server starts?
  44. """
  45. #: The task name
  46. name = None
  47. #: The schedule (run_every/crontab)
  48. schedule = None
  49. #: Positional arguments to apply.
  50. args = None
  51. #: Keyword arguments to apply.
  52. kwargs = None
  53. #: Task execution options.
  54. options = None
  55. #: The time and date of when this task was last scheduled.
  56. last_run_at = None
  57. #: Total number of times this task has been scheduled.
  58. total_run_count = 0
  59. def __init__(self, name=None, task=None, last_run_at=None,
  60. total_run_count=None, schedule=None, args=(), kwargs={},
  61. options={}, relative=False):
  62. self.name = name
  63. self.task = task
  64. self.args = args
  65. self.kwargs = kwargs
  66. self.options = options
  67. self.schedule = maybe_schedule(schedule, relative)
  68. self.last_run_at = last_run_at or self._default_now()
  69. self.total_run_count = total_run_count or 0
  70. def _default_now(self):
  71. return self.schedule.now() if self.schedule else current_app.now()
  72. def _next_instance(self, last_run_at=None):
  73. """Returns a new instance of the same class, but with
  74. its date and count fields updated."""
  75. return self.__class__(**dict(
  76. self,
  77. last_run_at=last_run_at or self._default_now(),
  78. total_run_count=self.total_run_count + 1,
  79. ))
  80. __next__ = next = _next_instance # for 2to3
  81. def update(self, other):
  82. """Update values from another entry.
  83. Does only update "editable" fields (task, schedule, args, kwargs,
  84. options).
  85. """
  86. self.__dict__.update({'task': other.task, 'schedule': other.schedule,
  87. 'args': other.args, 'kwargs': other.kwargs,
  88. 'options': other.options})
  89. def is_due(self):
  90. """See :meth:`~celery.schedule.schedule.is_due`."""
  91. return self.schedule.is_due(self.last_run_at)
  92. def __iter__(self):
  93. return vars(self).iteritems()
  94. def __repr__(self):
  95. return '<Entry: %s %s {%s}' % (
  96. self.name,
  97. reprcall(self.task, self.args or (), self.kwargs or {}),
  98. self.schedule,
  99. )
  100. class Scheduler(object):
  101. """Scheduler for periodic tasks.
  102. :keyword schedule: see :attr:`schedule`.
  103. :keyword max_interval: see :attr:`max_interval`.
  104. """
  105. Entry = ScheduleEntry
  106. #: The schedule dict/shelve.
  107. schedule = None
  108. #: Maximum time to sleep between re-checking the schedule.
  109. max_interval = DEFAULT_MAX_INTERVAL
  110. #: How often to sync the schedule (3 minutes by default)
  111. sync_every = 3 * 60
  112. _last_sync = None
  113. logger = logger # compat
  114. def __init__(self, schedule=None, max_interval=None,
  115. app=None, Publisher=None, lazy=False, **kwargs):
  116. app = self.app = app_or_default(app)
  117. self.data = maybe_promise({} if schedule is None else schedule)
  118. self.max_interval = (max_interval
  119. or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
  120. or self.max_interval)
  121. self.Publisher = Publisher or app.amqp.TaskProducer
  122. if not lazy:
  123. self.setup_schedule()
  124. def install_default_entries(self, data):
  125. entries = {}
  126. if self.app.conf.CELERY_TASK_RESULT_EXPIRES and \
  127. not self.app.backend.supports_autoexpire:
  128. if 'celery.backend_cleanup' not in data:
  129. entries['celery.backend_cleanup'] = {
  130. 'task': 'celery.backend_cleanup',
  131. 'schedule': crontab('0', '4', '*'),
  132. 'options': {'expires': 12 * 3600}}
  133. self.update_from_dict(entries)
  134. def maybe_due(self, entry, publisher=None):
  135. is_due, next_time_to_run = entry.is_due()
  136. if is_due:
  137. info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
  138. try:
  139. result = self.apply_async(entry, publisher=publisher)
  140. except Exception, exc:
  141. error('Message Error: %s\n%s',
  142. exc, traceback.format_stack(), exc_info=True)
  143. else:
  144. debug('%s sent. id->%s', entry.task, result.id)
  145. return next_time_to_run
  146. def tick(self):
  147. """Run a tick, that is one iteration of the scheduler.
  148. Executes all due tasks.
  149. """
  150. remaining_times = []
  151. try:
  152. for entry in self.schedule.itervalues():
  153. next_time_to_run = self.maybe_due(entry, self.publisher)
  154. if next_time_to_run:
  155. remaining_times.append(next_time_to_run)
  156. except RuntimeError:
  157. pass
  158. return min(remaining_times + [self.max_interval])
  159. def should_sync(self):
  160. return (not self._last_sync or
  161. (time.time() - self._last_sync) > self.sync_every)
  162. def reserve(self, entry):
  163. new_entry = self.schedule[entry.name] = entry.next()
  164. return new_entry
  165. def apply_async(self, entry, publisher=None, **kwargs):
  166. # Update timestamps and run counts before we actually execute,
  167. # so we have that done if an exception is raised (doesn't schedule
  168. # forever.)
  169. entry = self.reserve(entry)
  170. task = self.app.tasks.get(entry.task)
  171. try:
  172. if task:
  173. result = task.apply_async(entry.args, entry.kwargs,
  174. publisher=publisher,
  175. **entry.options)
  176. else:
  177. result = self.send_task(entry.task, entry.args, entry.kwargs,
  178. publisher=publisher,
  179. **entry.options)
  180. except Exception, exc:
  181. raise SchedulingError, SchedulingError(
  182. "Couldn't apply scheduled task %s: %s" % (
  183. entry.name, exc)), sys.exc_info()[2]
  184. finally:
  185. if self.should_sync():
  186. self._do_sync()
  187. return result
  188. def send_task(self, *args, **kwargs):
  189. return self.app.send_task(*args, **kwargs)
  190. def setup_schedule(self):
  191. self.install_default_entries(self.data)
  192. def _do_sync(self):
  193. try:
  194. debug('Celerybeat: Synchronizing schedule...')
  195. self.sync()
  196. finally:
  197. self._last_sync = time.time()
  198. def sync(self):
  199. pass
  200. def close(self):
  201. self.sync()
  202. def add(self, **kwargs):
  203. entry = self.Entry(**kwargs)
  204. self.schedule[entry.name] = entry
  205. return entry
  206. def _maybe_entry(self, name, entry):
  207. if isinstance(entry, self.Entry):
  208. return entry
  209. return self.Entry(**dict(entry, name=name))
  210. def update_from_dict(self, dict_):
  211. self.schedule.update(dict(
  212. (name, self._maybe_entry(name, entry))
  213. for name, entry in dict_.items()))
  214. def merge_inplace(self, b):
  215. schedule = self.schedule
  216. A, B = set(schedule), set(b)
  217. # Remove items from disk not in the schedule anymore.
  218. for key in A ^ B:
  219. schedule.pop(key, None)
  220. # Update and add new items in the schedule
  221. for key in B:
  222. entry = self.Entry(**dict(b[key], name=key))
  223. if schedule.get(key):
  224. schedule[key].update(entry)
  225. else:
  226. schedule[key] = entry
  227. def _ensure_connected(self):
  228. # callback called for each retry while the connection
  229. # can't be established.
  230. def _error_handler(exc, interval):
  231. error('Celerybeat: Connection error: %s. '
  232. 'Trying again in %s seconds...', exc, interval)
  233. return self.connection.ensure_connection(
  234. _error_handler, self.app.conf.BROKER_CONNECTION_MAX_RETRIES
  235. )
  236. def get_schedule(self):
  237. return self.data
  238. def set_schedule(self, schedule):
  239. self.data = schedule
  240. schedule = property(get_schedule, set_schedule)
  241. @cached_property
  242. def connection(self):
  243. return self.app.connection()
  244. @cached_property
  245. def publisher(self):
  246. return self.Publisher(self._ensure_connected())
  247. @property
  248. def info(self):
  249. return ''
  250. class PersistentScheduler(Scheduler):
  251. persistence = shelve
  252. known_suffixes = ('', '.db', '.dat', '.bak', '.dir')
  253. _store = None
  254. def __init__(self, *args, **kwargs):
  255. self.schedule_filename = kwargs.get('schedule_filename')
  256. Scheduler.__init__(self, *args, **kwargs)
  257. def _remove_db(self):
  258. for suffix in self.known_suffixes:
  259. with platforms.ignore_errno(errno.ENOENT):
  260. os.remove(self.schedule_filename + suffix)
  261. def setup_schedule(self):
  262. try:
  263. self._store = self.persistence.open(self.schedule_filename,
  264. writeback=True)
  265. entries = self._store.setdefault('entries', {})
  266. except Exception, exc:
  267. error('Removing corrupted schedule file %r: %r',
  268. self.schedule_filename, exc, exc_info=True)
  269. self._remove_db()
  270. self._store = self.persistence.open(self.schedule_filename,
  271. writeback=True)
  272. else:
  273. if '__version__' not in self._store:
  274. warning('Reset: Account for new __version__ field')
  275. self._store.clear() # remove schedule at 2.2.2 upgrade.
  276. if 'tz' not in self._store:
  277. warning('Reset: Account for new tz field')
  278. self._store.clear() # remove schedule at 3.0.8 upgrade
  279. if 'utc_enabled' not in self._store:
  280. warning('Reset: Account for new utc_enabled field')
  281. self._store.clear() # remove schedule at 3.0.9 upgrade
  282. tz = self.app.conf.CELERY_TIMEZONE
  283. stored_tz = self._store.get('tz')
  284. if stored_tz is not None and stored_tz != tz:
  285. warning('Reset: Timezone changed from %r to %r', stored_tz, tz)
  286. self._store.clear() # Timezone changed, reset db!
  287. utc = self.app.conf.CELERY_ENABLE_UTC
  288. stored_utc = self._store.get('utc_enabled')
  289. if stored_utc is not None and stored_utc != utc:
  290. choices = {True: 'enabled', False: 'disabled'}
  291. warning('Reset: UTC changed from %s to %s',
  292. choices[stored_utc], choices[utc])
  293. self._store.clear() # UTC setting changed, reset db!
  294. entries = self._store.setdefault('entries', {})
  295. self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
  296. self.install_default_entries(self.schedule)
  297. self._store.update(__version__=__version__, tz=tz, utc_enabled=utc)
  298. self.sync()
  299. debug('Current schedule:\n' + '\n'.join(
  300. repr(entry) for entry in entries.itervalues()))
  301. def get_schedule(self):
  302. return self._store['entries']
  303. def set_schedule(self, schedule):
  304. self._store['entries'] = schedule
  305. schedule = property(get_schedule, set_schedule)
  306. def sync(self):
  307. if self._store is not None:
  308. self._store.sync()
  309. def close(self):
  310. self.sync()
  311. self._store.close()
  312. @property
  313. def info(self):
  314. return ' . db -> %s' % (self.schedule_filename, )
  315. class Service(object):
  316. scheduler_cls = PersistentScheduler
  317. def __init__(self, max_interval=None, schedule_filename=None,
  318. scheduler_cls=None, app=None):
  319. app = self.app = app_or_default(app)
  320. self.max_interval = (max_interval
  321. or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
  322. self.scheduler_cls = scheduler_cls or self.scheduler_cls
  323. self.schedule_filename = (
  324. schedule_filename or app.conf.CELERYBEAT_SCHEDULE_FILENAME)
  325. self._is_shutdown = Event()
  326. self._is_stopped = Event()
  327. def __reduce__(self):
  328. return self.__class__, (self.max_interval, self.schedule_filename,
  329. self.scheduler_cls, self.app)
  330. def start(self, embedded_process=False):
  331. info('Celerybeat: Starting...')
  332. debug('Celerybeat: Ticking with max interval->%s',
  333. humanize_seconds(self.scheduler.max_interval))
  334. signals.beat_init.send(sender=self)
  335. if embedded_process:
  336. signals.beat_embedded_init.send(sender=self)
  337. platforms.set_process_title('celerybeat')
  338. try:
  339. while not self._is_shutdown.is_set():
  340. interval = self.scheduler.tick()
  341. debug('Celerybeat: Waking up %s.',
  342. humanize_seconds(interval, prefix='in '))
  343. time.sleep(interval)
  344. except (KeyboardInterrupt, SystemExit):
  345. self._is_shutdown.set()
  346. finally:
  347. self.sync()
  348. def sync(self):
  349. self.scheduler.close()
  350. self._is_stopped.set()
  351. def stop(self, wait=False):
  352. info('Celerybeat: Shutting down...')
  353. self._is_shutdown.set()
  354. wait and self._is_stopped.wait() # block until shutdown done.
  355. def get_scheduler(self, lazy=False):
  356. filename = self.schedule_filename
  357. scheduler = instantiate(self.scheduler_cls,
  358. app=self.app,
  359. schedule_filename=filename,
  360. max_interval=self.max_interval,
  361. lazy=lazy)
  362. return scheduler
  363. @cached_property
  364. def scheduler(self):
  365. return self.get_scheduler()
  366. class _Threaded(Thread):
  367. """Embedded task scheduler using threading."""
  368. def __init__(self, *args, **kwargs):
  369. super(_Threaded, self).__init__()
  370. self.service = Service(*args, **kwargs)
  371. self.daemon = True
  372. self.name = 'Beat'
  373. def run(self):
  374. self.service.start()
  375. def stop(self):
  376. self.service.stop(wait=True)
  377. try:
  378. ensure_multiprocessing()
  379. except NotImplementedError: # pragma: no cover
  380. _Process = None
  381. else:
  382. class _Process(Process): # noqa
  383. def __init__(self, *args, **kwargs):
  384. super(_Process, self).__init__()
  385. self.service = Service(*args, **kwargs)
  386. self.name = 'Beat'
  387. def run(self):
  388. platforms.signals.reset('SIGTERM')
  389. self.service.start(embedded_process=True)
  390. def stop(self):
  391. self.service.stop()
  392. self.terminate()
  393. def EmbeddedService(*args, **kwargs):
  394. """Return embedded clock service.
  395. :keyword thread: Run threaded instead of as a separate process.
  396. Default is :const:`False`.
  397. """
  398. if kwargs.pop('thread', False) or _Process is None:
  399. # Need short max interval to be able to stop thread
  400. # in reasonable time.
  401. kwargs.setdefault('max_interval', 1)
  402. return _Threaded(*args, **kwargs)
  403. return _Process(*args, **kwargs)