123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652 |
- """The periodic task scheduler."""
- from __future__ import absolute_import, unicode_literals
- import copy
- import errno
- import heapq
- import os
- import time
- import shelve
- import sys
- import traceback
- from collections import namedtuple
- from functools import total_ordering
- from threading import Event, Thread
- from billiard import ensure_multiprocessing
- from billiard.context import Process
- from billiard.common import reset_signals
- from kombu.utils.functional import maybe_evaluate, reprcall
- from kombu.utils.objects import cached_property
- from . import __version__
- from . import platforms
- from . import signals
- from .five import (
- items, monotonic, python_2_unicode_compatible, reraise, values,
- )
- from .schedules import maybe_schedule, crontab
- from .utils.imports import load_extension_class_names, symbol_by_name
- from .utils.time import humanize_seconds
- from .utils.log import get_logger, iter_open_logger_fds
- __all__ = [
- 'SchedulingError', 'ScheduleEntry', 'Scheduler',
- 'PersistentScheduler', 'Service', 'EmbeddedService',
- ]
- event_t = namedtuple('event_t', ('time', 'priority', 'entry'))
- logger = get_logger(__name__)
- debug, info, error, warning = (logger.debug, logger.info,
- logger.error, logger.warning)
- DEFAULT_MAX_INTERVAL = 300
- class SchedulingError(Exception):
- """An error occurred while scheduling a task."""
- @total_ordering
- @python_2_unicode_compatible
- class ScheduleEntry(object):
- """An entry in the scheduler.
- Arguments:
- name (str): see :attr:`name`.
- schedule (~celery.schedules.schedule): see :attr:`schedule`.
- args (Tuple): see :attr:`args`.
- kwargs (Dict): see :attr:`kwargs`.
- options (Dict): see :attr:`options`.
- last_run_at (~datetime.datetime): see :attr:`last_run_at`.
- total_run_count (int): see :attr:`total_run_count`.
- relative (bool): Is the time relative to when the server starts?
- """
-
- name = None
-
- schedule = None
-
- args = None
-
- kwargs = None
-
- options = None
-
- last_run_at = None
-
- total_run_count = 0
- def __init__(self, name=None, task=None, last_run_at=None,
- total_run_count=None, schedule=None, args=(), kwargs={},
- options={}, relative=False, app=None):
- self.app = app
- self.name = name
- self.task = task
- self.args = args
- self.kwargs = kwargs
- self.options = options
- self.schedule = maybe_schedule(schedule, relative, app=self.app)
- self.last_run_at = last_run_at or self._default_now()
- self.total_run_count = total_run_count or 0
- def _default_now(self):
- return self.schedule.now() if self.schedule else self.app.now()
- def _next_instance(self, last_run_at=None):
- """Return new instance, with date and count fields updated."""
- return self.__class__(**dict(
- self,
- last_run_at=last_run_at or self._default_now(),
- total_run_count=self.total_run_count + 1,
- ))
- __next__ = next = _next_instance
- def __reduce__(self):
- return self.__class__, (
- self.name, self.task, self.last_run_at, self.total_run_count,
- self.schedule, self.args, self.kwargs, self.options,
- )
- def update(self, other):
- """Update values from another entry.
- Will only update "editable" fields:
- ``task``, ``schedule``, ``args``, ``kwargs``, ``options``.
- """
- self.__dict__.update({
- 'task': other.task, 'schedule': other.schedule,
- 'args': other.args, 'kwargs': other.kwargs,
- 'options': other.options,
- })
- def is_due(self):
- """See :meth:`~celery.schedule.schedule.is_due`."""
- return self.schedule.is_due(self.last_run_at)
- def __iter__(self):
- return iter(items(vars(self)))
- def __repr__(self):
- return '<{name}: {0.name} {call} {0.schedule}'.format(
- self,
- call=reprcall(self.task, self.args or (), self.kwargs or {}),
- name=type(self).__name__,
- )
- def __lt__(self, other):
- if isinstance(other, ScheduleEntry):
-
-
-
-
-
-
- return id(self) < id(other)
- return NotImplemented
- class Scheduler(object):
- """Scheduler for periodic tasks.
- The :program:`celery beat` program may instantiate this class
- multiple times for introspection purposes, but then with the
- ``lazy`` argument set. It's important for subclasses to
- be idempotent when this argument is set.
- Arguments:
- schedule (~celery.schedules.schedule): see :attr:`schedule`.
- max_interval (int): see :attr:`max_interval`.
- lazy (bool): Don't set up the schedule.
- """
- Entry = ScheduleEntry
-
- schedule = None
-
- max_interval = DEFAULT_MAX_INTERVAL
-
- sync_every = 3 * 60
-
- sync_every_tasks = None
- _last_sync = None
- _tasks_since_sync = 0
- logger = logger
- def __init__(self, app, schedule=None, max_interval=None,
- Producer=None, lazy=False, sync_every_tasks=None, **kwargs):
- self.app = app
- self.data = maybe_evaluate({} if schedule is None else schedule)
- self.max_interval = (max_interval or
- app.conf.beat_max_loop_interval or
- self.max_interval)
- self.Producer = Producer or app.amqp.Producer
- self._heap = None
- self.old_schedulers = None
- self.sync_every_tasks = (
- app.conf.beat_sync_every if sync_every_tasks is None
- else sync_every_tasks)
- if not lazy:
- self.setup_schedule()
- def install_default_entries(self, data):
- entries = {}
- if self.app.conf.result_expires and \
- not self.app.backend.supports_autoexpire:
- if 'celery.backend_cleanup' not in data:
- entries['celery.backend_cleanup'] = {
- 'task': 'celery.backend_cleanup',
- 'schedule': crontab('0', '4', '*'),
- 'options': {'expires': 12 * 3600}}
- self.update_from_dict(entries)
- def apply_entry(self, entry, producer=None):
- info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
- try:
- result = self.apply_async(entry, producer=producer, advance=False)
- except Exception as exc:
- error('Message Error: %s\n%s',
- exc, traceback.format_stack(), exc_info=True)
- else:
- debug('%s sent. id->%s', entry.task, result.id)
- def adjust(self, n, drift=-0.010):
- if n and n > 0:
- return n + drift
- return n
- def is_due(self, entry):
- return entry.is_due()
- def _when(self, entry, next_time_to_run, mktime=time.mktime):
- adjust = self.adjust
- return (mktime(entry._default_now().timetuple()) +
- (adjust(next_time_to_run) or 0))
- def populate_heap(self, event_t=event_t, heapify=heapq.heapify):
- """Populate the heap with the data contained in the schedule."""
- self._heap = [event_t(self._when(e, e.is_due()[1]) or 0, 5, e)
- for e in values(self.schedule)]
- heapify(self._heap)
-
- def tick(self, event_t=event_t, min=min, heappop=heapq.heappop,
- heappush=heapq.heappush):
- """Run a tick - one iteration of the scheduler.
- Executes one due task per call.
- Returns:
- float: preferred delay in seconds for next call.
- """
- adjust = self.adjust
- max_interval = self.max_interval
- if (self._heap is None or
- not self.schedules_equal(self.old_schedulers, self.schedule)):
- self.old_schedulers = copy.copy(self.schedule)
- self.populate_heap()
- H = self._heap
- if not H:
- return max_interval
- event = H[0]
- entry = event[2]
- is_due, next_time_to_run = self.is_due(entry)
- if is_due:
- verify = heappop(H)
- if verify is event:
- next_entry = self.reserve(entry)
- self.apply_entry(entry, producer=self.producer)
- heappush(H, event_t(self._when(next_entry, next_time_to_run),
- event[1], next_entry))
- return 0
- else:
- heappush(H, verify)
- return min(verify[0], max_interval)
- return min(adjust(next_time_to_run) or max_interval, max_interval)
- def schedules_equal(self, old_schedules, new_schedules):
- if set(old_schedules.keys()) != set(new_schedules.keys()):
- return False
- for name, old_entry in old_schedules.items():
- new_entry = new_schedules.get(name)
- if not new_entry or old_entry.schedule != new_entry.schedule:
- return False
- return True
- def should_sync(self):
- return (
- (not self._last_sync or
- (monotonic() - self._last_sync) > self.sync_every) or
- (self.sync_every_tasks and
- self._tasks_since_sync >= self.sync_every_tasks)
- )
- def reserve(self, entry):
- new_entry = self.schedule[entry.name] = next(entry)
- return new_entry
- def apply_async(self, entry, producer=None, advance=True, **kwargs):
-
-
-
- entry = self.reserve(entry) if advance else entry
- task = self.app.tasks.get(entry.task)
- try:
- if task:
- return task.apply_async(entry.args, entry.kwargs,
- producer=producer,
- **entry.options)
- else:
- return self.send_task(entry.task, entry.args, entry.kwargs,
- producer=producer,
- **entry.options)
- except Exception as exc:
- reraise(SchedulingError, SchedulingError(
- "Couldn't apply scheduled task {0.name}: {exc}".format(
- entry, exc=exc)), sys.exc_info()[2])
- finally:
- self._tasks_since_sync += 1
- if self.should_sync():
- self._do_sync()
- def send_task(self, *args, **kwargs):
- return self.app.send_task(*args, **kwargs)
- def setup_schedule(self):
- self.install_default_entries(self.data)
- def _do_sync(self):
- try:
- debug('beat: Synchronizing schedule...')
- self.sync()
- finally:
- self._last_sync = monotonic()
- self._tasks_since_sync = 0
- def sync(self):
- pass
- def close(self):
- self.sync()
- def add(self, **kwargs):
- entry = self.Entry(app=self.app, **kwargs)
- self.schedule[entry.name] = entry
- return entry
- def _maybe_entry(self, name, entry):
- if isinstance(entry, self.Entry):
- entry.app = self.app
- return entry
- return self.Entry(**dict(entry, name=name, app=self.app))
- def update_from_dict(self, dict_):
- self.schedule.update({
- name: self._maybe_entry(name, entry)
- for name, entry in items(dict_)
- })
- def merge_inplace(self, b):
- schedule = self.schedule
- A, B = set(schedule), set(b)
-
- for key in A ^ B:
- schedule.pop(key, None)
-
- for key in B:
- entry = self.Entry(**dict(b[key], name=key, app=self.app))
- if schedule.get(key):
- schedule[key].update(entry)
- else:
- schedule[key] = entry
- def _ensure_connected(self):
-
-
- def _error_handler(exc, interval):
- error('beat: Connection error: %s. '
- 'Trying again in %s seconds...', exc, interval)
- return self.connection.ensure_connection(
- _error_handler, self.app.conf.broker_connection_max_retries
- )
- def get_schedule(self):
- return self.data
- def set_schedule(self, schedule):
- self.data = schedule
- schedule = property(get_schedule, set_schedule)
- @cached_property
- def connection(self):
- return self.app.connection_for_write()
- @cached_property
- def producer(self):
- return self.Producer(self._ensure_connected(), auto_declare=False)
- @property
- def info(self):
- return ''
- class PersistentScheduler(Scheduler):
- """Scheduler backed by :mod:`shelve` database."""
- persistence = shelve
- known_suffixes = ('', '.db', '.dat', '.bak', '.dir')
- _store = None
- def __init__(self, *args, **kwargs):
- self.schedule_filename = kwargs.get('schedule_filename')
- Scheduler.__init__(self, *args, **kwargs)
- def _remove_db(self):
- for suffix in self.known_suffixes:
- with platforms.ignore_errno(errno.ENOENT):
- os.remove(self.schedule_filename + suffix)
- def _open_schedule(self):
- return self.persistence.open(self.schedule_filename, writeback=True)
- def _destroy_open_corrupted_schedule(self, exc):
- error('Removing corrupted schedule file %r: %r',
- self.schedule_filename, exc, exc_info=True)
- self._remove_db()
- return self._open_schedule()
- def setup_schedule(self):
- try:
- self._store = self._open_schedule()
-
-
-
-
-
- self._store.keys()
- except Exception as exc:
- self._store = self._destroy_open_corrupted_schedule(exc)
- self._create_schedule()
- tz = self.app.conf.timezone
- stored_tz = self._store.get(str('tz'))
- if stored_tz is not None and stored_tz != tz:
- warning('Reset: Timezone changed from %r to %r', stored_tz, tz)
- self._store.clear()
- utc = self.app.conf.enable_utc
- stored_utc = self._store.get(str('utc_enabled'))
- if stored_utc is not None and stored_utc != utc:
- choices = {True: 'enabled', False: 'disabled'}
- warning('Reset: UTC changed from %s to %s',
- choices[stored_utc], choices[utc])
- self._store.clear()
- entries = self._store.setdefault(str('entries'), {})
- self.merge_inplace(self.app.conf.beat_schedule)
- self.install_default_entries(self.schedule)
- self._store.update({
- str('__version__'): __version__,
- str('tz'): tz,
- str('utc_enabled'): utc,
- })
- self.sync()
- debug('Current schedule:\n' + '\n'.join(
- repr(entry) for entry in values(entries)))
- def _create_schedule(self):
- for _ in (1, 2):
- try:
- self._store[str('entries')]
- except KeyError:
-
- try:
- self._store[str('entries')] = {}
- except KeyError as exc:
- self._store = self._destroy_open_corrupted_schedule(exc)
- continue
- else:
- if str('__version__') not in self._store:
- warning('DB Reset: Account for new __version__ field')
- self._store.clear()
- elif str('tz') not in self._store:
- warning('DB Reset: Account for new tz field')
- self._store.clear()
- elif str('utc_enabled') not in self._store:
- warning('DB Reset: Account for new utc_enabled field')
- self._store.clear()
- break
- def get_schedule(self):
- return self._store[str('entries')]
- def set_schedule(self, schedule):
- self._store[str('entries')] = schedule
- schedule = property(get_schedule, set_schedule)
- def sync(self):
- if self._store is not None:
- self._store.sync()
- def close(self):
- self.sync()
- self._store.close()
- @property
- def info(self):
- return ' . db -> {self.schedule_filename}'.format(self=self)
- class Service(object):
- """Celery periodic task service."""
- scheduler_cls = PersistentScheduler
- def __init__(self, app, max_interval=None, schedule_filename=None,
- scheduler_cls=None):
- self.app = app
- self.max_interval = (max_interval or
- app.conf.beat_max_loop_interval)
- self.scheduler_cls = scheduler_cls or self.scheduler_cls
- self.schedule_filename = (
- schedule_filename or app.conf.beat_schedule_filename)
- self._is_shutdown = Event()
- self._is_stopped = Event()
- def __reduce__(self):
- return self.__class__, (self.max_interval, self.schedule_filename,
- self.scheduler_cls, self.app)
- def start(self, embedded_process=False):
- info('beat: Starting...')
- debug('beat: Ticking with max interval->%s',
- humanize_seconds(self.scheduler.max_interval))
- signals.beat_init.send(sender=self)
- if embedded_process:
- signals.beat_embedded_init.send(sender=self)
- platforms.set_process_title('celery beat')
- try:
- while not self._is_shutdown.is_set():
- interval = self.scheduler.tick()
- if interval and interval > 0.0:
- debug('beat: Waking up %s.',
- humanize_seconds(interval, prefix='in '))
- time.sleep(interval)
- if self.scheduler.should_sync():
- self.scheduler._do_sync()
- except (KeyboardInterrupt, SystemExit):
- self._is_shutdown.set()
- finally:
- self.sync()
- def sync(self):
- self.scheduler.close()
- self._is_stopped.set()
- def stop(self, wait=False):
- info('beat: Shutting down...')
- self._is_shutdown.set()
- wait and self._is_stopped.wait()
- def get_scheduler(self, lazy=False,
- extension_namespace='celery.beat_schedulers'):
- filename = self.schedule_filename
- aliases = dict(
- load_extension_class_names(extension_namespace) or {})
- return symbol_by_name(self.scheduler_cls, aliases=aliases)(
- app=self.app,
- schedule_filename=filename,
- max_interval=self.max_interval,
- lazy=lazy,
- )
- @cached_property
- def scheduler(self):
- return self.get_scheduler()
- class _Threaded(Thread):
- """Embedded task scheduler using threading."""
- def __init__(self, app, **kwargs):
- super(_Threaded, self).__init__()
- self.app = app
- self.service = Service(app, **kwargs)
- self.daemon = True
- self.name = 'Beat'
- def run(self):
- self.app.set_current()
- self.service.start()
- def stop(self):
- self.service.stop(wait=True)
- try:
- ensure_multiprocessing()
- except NotImplementedError:
- _Process = None
- else:
- class _Process(Process):
- def __init__(self, app, **kwargs):
- super(_Process, self).__init__()
- self.app = app
- self.service = Service(app, **kwargs)
- self.name = 'Beat'
- def run(self):
- reset_signals(full=False)
- platforms.close_open_fds([
- sys.__stdin__, sys.__stdout__, sys.__stderr__,
- ] + list(iter_open_logger_fds()))
- self.app.set_default()
- self.app.set_current()
- self.service.start(embedded_process=True)
- def stop(self):
- self.service.stop()
- self.terminate()
- def EmbeddedService(app, max_interval=None, **kwargs):
- """Return embedded clock service.
- Arguments:
- thread (bool): Run threaded instead of as a separate process.
- Uses :mod:`multiprocessing` by default, if available.
- """
- if kwargs.pop('thread', False) or _Process is None:
-
-
- return _Threaded(app, max_interval=1, **kwargs)
- return _Process(app, max_interval=max_interval, **kwargs)
|