| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 | # -*- coding: utf-8 -*-"""    celery.beat    ~~~~~~~~~~~    The Celery periodic task scheduler.    :copyright: (c) 2009 - 2011 by Ask Solem.    :license: BSD, see LICENSE for more details."""from __future__ import absolute_importimport errnoimport osimport timeimport shelveimport sysimport threadingimport tracebacktry:    import multiprocessingexcept ImportError:    multiprocessing = None  # noqafrom datetime import datetimefrom . import __version__from . import platformsfrom . import registryfrom . import signalsfrom .app import app_or_defaultfrom .log import SilenceRepeatedfrom .schedules import maybe_schedule, crontabfrom .utils import cached_property, instantiate, maybe_promisefrom .utils.timeutils import humanize_secondsclass SchedulingError(Exception):    """An error occured while scheduling a task."""class ScheduleEntry(object):    """An entry in the scheduler.    :keyword name: see :attr:`name`.    :keyword schedule: see :attr:`schedule`.    :keyword args: see :attr:`args`.    :keyword kwargs: see :attr:`kwargs`.    :keyword options: see :attr:`options`.    :keyword last_run_at: see :attr:`last_run_at`.    :keyword total_run_count: see :attr:`total_run_count`.    :keyword relative: Is the time relative to when the server starts?    """    #: The task name    name = None    #: The schedule (run_every/crontab)    schedule = None    #: Positional arguments to apply.    args = None    #: Keyword arguments to apply.    kwargs = None    #: Task execution options.    options = None    #: The time and date of when this task was last scheduled.    last_run_at = None    #: Total number of times this task has been scheduled.    total_run_count = 0    def __init__(self, name=None, task=None, last_run_at=None,            total_run_count=None, schedule=None, args=(), kwargs={},            options={}, relative=False):        self.name = name        self.task = task        self.args = args        self.kwargs = kwargs        self.options = options        self.schedule = maybe_schedule(schedule, relative)        self.last_run_at = last_run_at or self._default_now()        self.total_run_count = total_run_count or 0    def _default_now(self):        return datetime.utcnow()    def _next_instance(self, last_run_at=None):        """Returns a new instance of the same class, but with        its date and count fields updated."""        return self.__class__(**dict(self,                                last_run_at=last_run_at or datetime.utcnow(),                                total_run_count=self.total_run_count + 1))    __next__ = next = _next_instance  # for 2to3    def update(self, other):        """Update values from another entry.        Does only update "editable" fields (task, schedule, args, kwargs,        options).        """        self.__dict__.update({"task": other.task, "schedule": other.schedule,                              "args": other.args, "kwargs": other.kwargs,                              "options": other.options})    def is_due(self):        """See :meth:`celery.task.base.PeriodicTask.is_due`."""        return self.schedule.is_due(self.last_run_at)    def __iter__(self):        return vars(self).iteritems()    def __repr__(self):        return ("<Entry: %(name)s %(task)s(*%(args)s, **%(kwargs)s) "                "{%(schedule)s}>" % vars(self))class Scheduler(object):    """Scheduler for periodic tasks.    :keyword schedule: see :attr:`schedule`.    :keyword logger: see :attr:`logger`.    :keyword max_interval: see :attr:`max_interval`.    """    Entry = ScheduleEntry    #: The schedule dict/shelve.    schedule = None    #: Current logger.    logger = None    #: Maximum time to sleep between re-checking the schedule.    max_interval = 1    #: How often to sync the schedule (3 minutes by default)    sync_every = 3 * 60    _last_sync = None    def __init__(self, schedule=None, logger=None, max_interval=None,            app=None, Publisher=None, lazy=False, **kwargs):        app = self.app = app_or_default(app)        self.data = maybe_promise({} if schedule is None else schedule)        self.logger = logger or app.log.get_default_logger(name="celery.beat")        self.max_interval = max_interval or \                                app.conf.CELERYBEAT_MAX_LOOP_INTERVAL        self.Publisher = Publisher or app.amqp.TaskPublisher        if not lazy:            self.setup_schedule()    def install_default_entries(self, data):        entries = {}        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:            if "celery.backend_cleanup" not in data:                entries["celery.backend_cleanup"] = {                        "task": "celery.backend_cleanup",                        "schedule": crontab("0", "4", "*"),                        "options": {"expires": 12 * 3600}}        self.update_from_dict(entries)    def maybe_due(self, entry, publisher=None):        is_due, next_time_to_run = entry.is_due()        if is_due:            self.logger.debug("Scheduler: Sending due task %s", entry.task)            try:                result = self.apply_async(entry, publisher=publisher)            except Exception, exc:                self.logger.error("Message Error: %s\n%s", exc,                                  traceback.format_stack(),                                  exc_info=sys.exc_info())            else:                self.logger.debug("%s sent. id->%s", entry.task,                                                     result.task_id)        return next_time_to_run    def tick(self):        """Run a tick, that is one iteration of the scheduler.        Executes all due tasks.        """        remaining_times = []        try:            for entry in self.schedule.itervalues():                next_time_to_run = self.maybe_due(entry, self.publisher)                if next_time_to_run:                    remaining_times.append(next_time_to_run)        except RuntimeError:            pass        return min(remaining_times + [self.max_interval])    def should_sync(self):        return (not self._last_sync or                (time.time() - self._last_sync) > self.sync_every)    def reserve(self, entry):        new_entry = self.schedule[entry.name] = entry.next()        return new_entry    def apply_async(self, entry, publisher=None, **kwargs):        # Update timestamps and run counts before we actually execute,        # so we have that done if an exception is raised (doesn't schedule        # forever.)        entry = self.reserve(entry)        task = registry.tasks.get(entry.task)        try:            if task:                result = task.apply_async(entry.args, entry.kwargs,                                          publisher=publisher,                                          **entry.options)            else:                result = self.send_task(entry.task, entry.args, entry.kwargs,                                        publisher=publisher,                                        **entry.options)        except Exception, exc:            raise SchedulingError("Couldn't apply scheduled task %s: %s" % (                    entry.name, exc))        if self.should_sync():            self._do_sync()        return result    def send_task(self, *args, **kwargs):               # pragma: no cover        return self.app.send_task(*args, **kwargs)    def setup_schedule(self):        self.install_default_entries(self.data)    def _do_sync(self):        try:            self.logger.debug("Celerybeat: Synchronizing schedule...")            self.sync()        finally:            self._last_sync = time.time()    def sync(self):        pass    def close(self):        self.sync()    def add(self, **kwargs):        entry = self.Entry(**kwargs)        self.schedule[entry.name] = entry        return entry    def _maybe_entry(self, name, entry):        if isinstance(entry, self.Entry):            return entry        return self.Entry(**dict(entry, name=name))    def update_from_dict(self, dict_):        self.schedule.update(dict((name, self._maybe_entry(name, entry))                                for name, entry in dict_.items()))    def merge_inplace(self, b):        schedule = self.schedule        A, B = set(schedule.keys()), set(b.keys())        # Remove items from disk not in the schedule anymore.        for key in A ^ B:            schedule.pop(key, None)        # Update and add new items in the schedule        for key in B:            entry = self.Entry(**dict(b[key], name=key))            if schedule.get(key):                schedule[key].update(entry)            else:                schedule[key] = entry    def get_schedule(self):        return self.data    def set_schedule(self, schedule):        self.data = schedule    def _ensure_connected(self):        # callback called for each retry while the connection        # can't be established.        def _error_handler(exc, interval):            self.logger.error("Celerybeat: Connection error: %s. "                              "Trying again in %s seconds...", exc, interval)        return self.connection.ensure_connection(_error_handler,                    self.app.conf.BROKER_CONNECTION_MAX_RETRIES)    @cached_property    def connection(self):        return self.app.broker_connection()    @cached_property    def publisher(self):        return self.Publisher(connection=self._ensure_connected())    @property    def schedule(self):        return self.get_schedule()    @property    def info(self):        return ""class PersistentScheduler(Scheduler):    persistence = shelve    _store = None    def __init__(self, *args, **kwargs):        self.schedule_filename = kwargs.get("schedule_filename")        Scheduler.__init__(self, *args, **kwargs)    def _remove_db(self):        for suffix in "", ".db", ".dat", ".bak", ".dir":            try:                os.remove(self.schedule_filename + suffix)            except OSError, exc:                if exc.errno != errno.ENOENT:                    raise    def setup_schedule(self):        try:            self._store = self.persistence.open(self.schedule_filename,                                                writeback=True)            entries = self._store.setdefault("entries", {})        except Exception, exc:            self.logger.error("Removing corrupted schedule file %r: %r",                              self.schedule_filename, exc, exc_info=True)            self._remove_db()            self._store = self.persistence.open(self.schedule_filename,                                                writeback=True)        else:            if "__version__" not in self._store:                self._store.clear()   # remove schedule at 2.2.2 upgrade.        entries = self._store.setdefault("entries", {})        self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)        self.install_default_entries(self.schedule)        self._store["__version__"] = __version__        self.sync()        self.logger.debug("Current schedule:\n" +                          "\n".join(repr(entry)                                    for entry in entries.itervalues()))    def get_schedule(self):        return self._store["entries"]    def sync(self):        if self._store is not None:            self._store.sync()    def close(self):        self.sync()        self._store.close()    @property    def info(self):        return "    . db -> %s" % (self.schedule_filename, )class Service(object):    scheduler_cls = PersistentScheduler    def __init__(self, logger=None, max_interval=None, schedule_filename=None,            scheduler_cls=None, app=None):        app = self.app = app_or_default(app)        self.max_interval = max_interval or \                                app.conf.CELERYBEAT_MAX_LOOP_INTERVAL        self.scheduler_cls = scheduler_cls or self.scheduler_cls        self.logger = logger or app.log.get_default_logger(name="celery.beat")        self.schedule_filename = schedule_filename or \                                    app.conf.CELERYBEAT_SCHEDULE_FILENAME        self._is_shutdown = threading.Event()        self._is_stopped = threading.Event()        self.debug = SilenceRepeated(self.logger.debug,                        10 if self.max_interval < 60 else 1)    def start(self, embedded_process=False):        self.logger.info("Celerybeat: Starting...")        self.logger.debug("Celerybeat: Ticking with max interval->%s",                          humanize_seconds(self.scheduler.max_interval))        signals.beat_init.send(sender=self)        if embedded_process:            signals.beat_embedded_init.send(sender=self)            platforms.set_process_title("celerybeat")        try:            while not self._is_shutdown.isSet():                interval = self.scheduler.tick()                self.debug("Celerybeat: Waking up %s." % (                        humanize_seconds(interval, prefix="in ")))                time.sleep(interval)        except (KeyboardInterrupt, SystemExit):            self._is_shutdown.set()        finally:            self.sync()    def sync(self):        self.scheduler.close()        self._is_stopped.set()    def stop(self, wait=False):        self.logger.info("Celerybeat: Shutting down...")        self._is_shutdown.set()        wait and self._is_stopped.wait()  # block until shutdown done.    def get_scheduler(self, lazy=False):        filename = self.schedule_filename        scheduler = instantiate(self.scheduler_cls,                                app=self.app,                                schedule_filename=filename,                                logger=self.logger,                                max_interval=self.max_interval,                                lazy=lazy)        return scheduler    @cached_property    def scheduler(self):        return self.get_scheduler()class _Threaded(threading.Thread):    """Embedded task scheduler using threading."""    def __init__(self, *args, **kwargs):        super(_Threaded, self).__init__()        self.service = Service(*args, **kwargs)        self.setDaemon(True)        self.setName("Beat")    def run(self):        self.service.start()    def stop(self):        self.service.stop(wait=True)if multiprocessing is not None:    class _Process(multiprocessing.Process):        """Embedded task scheduler using multiprocessing."""        def __init__(self, *args, **kwargs):            super(_Process, self).__init__()            self.service = Service(*args, **kwargs)            self.name = "Beat"        def run(self):            platforms.signals.reset("SIGTERM")            self.service.start(embedded_process=True)        def stop(self):            self.service.stop()            self.terminate()else:    _Process = Nonedef EmbeddedService(*args, **kwargs):    """Return embedded clock service.    :keyword thread: Run threaded instead of as a separate process.        Default is :const:`False`.    """    if kwargs.pop("thread", False) or _Process is None:        # Need short max interval to be able to stop thread        # in reasonable time.        kwargs.setdefault("max_interval", 1)        return _Threaded(*args, **kwargs)    return _Process(*args, **kwargs)
 |