| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483 | # -*- coding: utf-8 -*-"""    celery.beat    ~~~~~~~~~~~    The periodic task scheduler."""from __future__ import absolute_importimport errnoimport osimport timeimport shelveimport sysimport tracebackfrom threading import Event, Threadfrom billiard import Process, ensure_multiprocessingfrom kombu.utils import reprcallfrom kombu.utils.functional import maybe_promisefrom . import __version__from . import platformsfrom . import signalsfrom . import current_appfrom .app import app_or_defaultfrom .schedules import maybe_schedule, crontabfrom .utils import cached_propertyfrom .utils.imports import instantiatefrom .utils.timeutils import humanize_secondsfrom .utils.log import get_loggerlogger = get_logger(__name__)debug, info, error = logger.debug, logger.info, logger.errorDEFAULT_MAX_INTERVAL = 300  # 5 minutesclass SchedulingError(Exception):    """An error occured while scheduling a task."""class ScheduleEntry(object):    """An entry in the scheduler.    :keyword name: see :attr:`name`.    :keyword schedule: see :attr:`schedule`.    :keyword args: see :attr:`args`.    :keyword kwargs: see :attr:`kwargs`.    :keyword options: see :attr:`options`.    :keyword last_run_at: see :attr:`last_run_at`.    :keyword total_run_count: see :attr:`total_run_count`.    :keyword relative: Is the time relative to when the server starts?    """    #: The task name    name = None    #: The schedule (run_every/crontab)    schedule = None    #: Positional arguments to apply.    args = None    #: Keyword arguments to apply.    kwargs = None    #: Task execution options.    options = None    #: The time and date of when this task was last scheduled.    last_run_at = None    #: Total number of times this task has been scheduled.    total_run_count = 0    def __init__(self, name=None, task=None, last_run_at=None,            total_run_count=None, schedule=None, args=(), kwargs={},            options={}, relative=False):        self.name = name        self.task = task        self.args = args        self.kwargs = kwargs        self.options = options        self.schedule = maybe_schedule(schedule, relative)        self.last_run_at = last_run_at or self._default_now()        self.total_run_count = total_run_count or 0    def _default_now(self):        return current_app.now()    def _next_instance(self, last_run_at=None):        """Returns a new instance of the same class, but with        its date and count fields updated."""        return self.__class__(**dict(self,                                last_run_at=last_run_at or self._default_now(),                                total_run_count=self.total_run_count + 1))    __next__ = next = _next_instance  # for 2to3    def update(self, other):        """Update values from another entry.        Does only update "editable" fields (task, schedule, args, kwargs,        options).        """        self.__dict__.update({'task': other.task, 'schedule': other.schedule,                              'args': other.args, 'kwargs': other.kwargs,                              'options': other.options})    def is_due(self):        """See :meth:`~celery.schedule.schedule.is_due`."""        return self.schedule.is_due(self.last_run_at)    def __iter__(self):        return vars(self).iteritems()    def __repr__(self):        return '<Entry: {0.name} {call} {0.schedule}'.format(self,            call=reprcall(self.task, self.args or (), self.kwargs or {}))class Scheduler(object):    """Scheduler for periodic tasks.    :keyword schedule: see :attr:`schedule`.    :keyword max_interval: see :attr:`max_interval`.    """    Entry = ScheduleEntry    #: The schedule dict/shelve.    schedule = None    #: Maximum time to sleep between re-checking the schedule.    max_interval = DEFAULT_MAX_INTERVAL    #: How often to sync the schedule (3 minutes by default)    sync_every = 3 * 60    _last_sync = None    logger = logger  # compat    def __init__(self, schedule=None, max_interval=None,            app=None, Publisher=None, lazy=False, **kwargs):        app = self.app = app_or_default(app)        self.data = maybe_promise({} if schedule is None else schedule)        self.max_interval = (max_interval                                or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL                                or self.max_interval)        self.Publisher = Publisher or app.amqp.TaskProducer        if not lazy:            self.setup_schedule()    def install_default_entries(self, data):        entries = {}        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:            if 'celery.backend_cleanup' not in data:                entries['celery.backend_cleanup'] = {                        'task': 'celery.backend_cleanup',                        'schedule': crontab('0', '4', '*'),                        'options': {'expires': 12 * 3600}}        self.update_from_dict(entries)    def maybe_due(self, entry, publisher=None):        is_due, next_time_to_run = entry.is_due()        if is_due:            info('Scheduler: Sending due task %s', entry.task)            try:                result = self.apply_async(entry, publisher=publisher)            except Exception as exc:                error('Message Error: %s\n%s',                      exc, traceback.format_stack(), exc_info=True)            else:                debug('%s sent. id->%s', entry.task, result.id)        return next_time_to_run    def tick(self):        """Run a tick, that is one iteration of the scheduler.        Executes all due tasks.        """        remaining_times = []        try:            for entry in self.schedule.itervalues():                next_time_to_run = self.maybe_due(entry, self.publisher)                if next_time_to_run:                    remaining_times.append(next_time_to_run)        except RuntimeError:            pass        return min(remaining_times + [self.max_interval])    def should_sync(self):        return (not self._last_sync or                (time.time() - self._last_sync) > self.sync_every)    def reserve(self, entry):        new_entry = self.schedule[entry.name] = next(entry)        return new_entry    def apply_async(self, entry, publisher=None, **kwargs):        # Update timestamps and run counts before we actually execute,        # so we have that done if an exception is raised (doesn't schedule        # forever.)        entry = self.reserve(entry)        task = self.app.tasks.get(entry.task)        try:            if task:                result = task.apply_async(entry.args, entry.kwargs,                                          publisher=publisher,                                          **entry.options)            else:                result = self.send_task(entry.task, entry.args, entry.kwargs,                                        publisher=publisher,                                        **entry.options)        except Exception as exc:            raise SchedulingError, SchedulingError(                "Couldn't apply scheduled task {0.name}: {exc}".format(                    entry, exc)), sys.exc_info()[2]        finally:            if self.should_sync():                self._do_sync()        return result    def send_task(self, *args, **kwargs):        return self.app.send_task(*args, **kwargs)    def setup_schedule(self):        self.install_default_entries(self.data)    def _do_sync(self):        try:            debug('Celerybeat: Synchronizing schedule...')            self.sync()        finally:            self._last_sync = time.time()    def sync(self):        pass    def close(self):        self.sync()    def add(self, **kwargs):        entry = self.Entry(**kwargs)        self.schedule[entry.name] = entry        return entry    def _maybe_entry(self, name, entry):        if isinstance(entry, self.Entry):            return entry        return self.Entry(**dict(entry, name=name))    def update_from_dict(self, dict_):        self.schedule.update(dict((name, self._maybe_entry(name, entry))                                for name, entry in dict_.items()))    def merge_inplace(self, b):        schedule = self.schedule        A, B = set(schedule), set(b)        # Remove items from disk not in the schedule anymore.        for key in A ^ B:            schedule.pop(key, None)        # Update and add new items in the schedule        for key in B:            entry = self.Entry(**dict(b[key], name=key))            if schedule.get(key):                schedule[key].update(entry)            else:                schedule[key] = entry    def _ensure_connected(self):        # callback called for each retry while the connection        # can't be established.        def _error_handler(exc, interval):            error('Celerybeat: Connection error: %s. '                  'Trying again in %s seconds...', exc, interval)        return self.connection.ensure_connection(_error_handler,                    self.app.conf.BROKER_CONNECTION_MAX_RETRIES)    def get_schedule(self):        return self.data    def set_schedule(self, schedule):        self.data = schedule    schedule = property(get_schedule, set_schedule)    @cached_property    def connection(self):        return self.app.connection()    @cached_property    def publisher(self):        return self.Publisher(self._ensure_connected())    @property    def info(self):        return ''class PersistentScheduler(Scheduler):    persistence = shelve    known_suffixes = ('', '.db', '.dat', '.bak', '.dir')    _store = None    def __init__(self, *args, **kwargs):        self.schedule_filename = kwargs.get('schedule_filename')        Scheduler.__init__(self, *args, **kwargs)    def _remove_db(self):        for suffix in self.known_suffixes:            try:                os.remove(self.schedule_filename + suffix)            except OSError as exc:                if exc.errno != errno.ENOENT:                    raise    def setup_schedule(self):        try:            self._store = self.persistence.open(self.schedule_filename,                                                writeback=True)            entries = self._store.setdefault('entries', {})        except Exception as exc:            error('Removing corrupted schedule file %r: %r',                  self.schedule_filename, exc, exc_info=True)            self._remove_db()            self._store = self.persistence.open(self.schedule_filename,                                                writeback=True)        else:            if '__version__' not in self._store:                self._store.clear()   # remove schedule at 2.2.2 upgrade.            if 'utc' not in self._store:                self._store.clear()   # remove schedule at 3.0.1 upgrade.        entries = self._store.setdefault('entries', {})        self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)        self.install_default_entries(self.schedule)        self._store.update(__version__=__version__, utc=True)        self.sync()        debug('Current schedule:\n' + '\n'.join(repr(entry)                                    for entry in entries.itervalues()))    def get_schedule(self):        return self._store['entries']    def set_schedule(self, schedule):        self._store['entries'] = schedule    schedule = property(get_schedule, set_schedule)    def sync(self):        if self._store is not None:            self._store.sync()    def close(self):        self.sync()        self._store.close()    @property    def info(self):        return '    . db -> {self.schedule_filename}'.format(self=self)class Service(object):    scheduler_cls = PersistentScheduler    def __init__(self, max_interval=None, schedule_filename=None,            scheduler_cls=None, app=None):        app = self.app = app_or_default(app)        self.max_interval = (max_interval                             or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)        self.scheduler_cls = scheduler_cls or self.scheduler_cls        self.schedule_filename = schedule_filename or \                                    app.conf.CELERYBEAT_SCHEDULE_FILENAME        self._is_shutdown = Event()        self._is_stopped = Event()    def start(self, embedded_process=False):        info('Celerybeat: Starting...')        debug('Celerybeat: Ticking with max interval->%s',              humanize_seconds(self.scheduler.max_interval))        signals.beat_init.send(sender=self)        if embedded_process:            signals.beat_embedded_init.send(sender=self)            platforms.set_process_title('celerybeat')        try:            while not self._is_shutdown.is_set():                interval = self.scheduler.tick()                debug('Celerybeat: Waking up %s.',                      humanize_seconds(interval, prefix='in '))                time.sleep(interval)        except (KeyboardInterrupt, SystemExit):            self._is_shutdown.set()        finally:            self.sync()    def sync(self):        self.scheduler.close()        self._is_stopped.set()    def stop(self, wait=False):        info('Celerybeat: Shutting down...')        self._is_shutdown.set()        wait and self._is_stopped.wait()  # block until shutdown done.    def get_scheduler(self, lazy=False):        filename = self.schedule_filename        scheduler = instantiate(self.scheduler_cls,                                app=self.app,                                schedule_filename=filename,                                max_interval=self.max_interval,                                lazy=lazy)        return scheduler    @cached_property    def scheduler(self):        return self.get_scheduler()class _Threaded(Thread):    """Embedded task scheduler using threading."""    def __init__(self, *args, **kwargs):        super(_Threaded, self).__init__()        self.service = Service(*args, **kwargs)        self.daemon = True        self.name = 'Beat'    def run(self):        self.service.start()    def stop(self):        self.service.stop(wait=True)try:    ensure_multiprocessing()except NotImplementedError:     # pragma: no cover    _Process = Noneelse:    class _Process(Process):    # noqa        def __init__(self, *args, **kwargs):            super(_Process, self).__init__()            self.service = Service(*args, **kwargs)            self.name = 'Beat'        def run(self):            platforms.signals.reset('SIGTERM')            self.service.start(embedded_process=True)        def stop(self):            self.service.stop()            self.terminate()def EmbeddedService(*args, **kwargs):    """Return embedded clock service.    :keyword thread: Run threaded instead of as a separate process.        Uses :mod:`multiprocessing` by default, if available.    """    if kwargs.pop('thread', False) or _Process is None:        # Need short max interval to be able to stop thread        # in reasonable time.        kwargs.setdefault('max_interval', 1)        return _Threaded(*args, **kwargs)    return _Process(*args, **kwargs)
 |