| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 | # -*- coding: utf-8 -*-"""    celery.task.base    ~~~~~~~~~~~~~~~~    The task implementation has been moved to :mod:`celery.app.task`.    :copyright: (c) 2009 - 2011 by Ask Solem.    :license: BSD, see LICENSE for more details."""from __future__ import absolute_importfrom .. import current_appfrom ..app.task import Context, TaskType, BaseTask  # noqafrom ..schedules import maybe_schedulefrom ..utils import deprecated, timeutilsTask = current_app.Task@deprecated("Importing TaskSet from celery.task.base",            alternative="Use celery.task.TaskSet instead.",            removal="2.4")def TaskSet(*args, **kwargs):    from celery.task.sets import TaskSet    return TaskSet(*args, **kwargs)@deprecated("Importing subtask from celery.task.base",            alternative="Use celery.task.subtask instead.",            removal="2.4")def subtask(*args, **kwargs):    from celery.task.sets import subtask    return subtask(*args, **kwargs)class PeriodicTask(Task):    """A periodic task is a task that behaves like a :manpage:`cron` job.    Results of periodic tasks are not stored by default.    .. attribute:: run_every        *REQUIRED* Defines how often the task is run (its interval),        it can be a :class:`~datetime.timedelta` object, a        :class:`~celery.schedules.crontab` object or an integer        specifying the time in seconds.    .. attribute:: relative        If set to :const:`True`, run times are relative to the time when the        server was started. This was the previous behaviour, periodic tasks        are now scheduled by the clock.    :raises NotImplementedError: if the :attr:`run_every` attribute is        not defined.    Example        >>> from celery.task import tasks, PeriodicTask        >>> from datetime import timedelta        >>> class EveryThirtySecondsTask(PeriodicTask):        ...     run_every = timedelta(seconds=30)        ...        ...     def run(self, **kwargs):        ...         logger = self.get_logger(**kwargs)        ...         logger.info("Execute every 30 seconds")        >>> from celery.task import PeriodicTask        >>> from celery.schedules import crontab        >>> class EveryMondayMorningTask(PeriodicTask):        ...     run_every = crontab(hour=7, minute=30, day_of_week=1)        ...        ...     def run(self, **kwargs):        ...         logger = self.get_logger(**kwargs)        ...         logger.info("Execute every Monday at 7:30AM.")        >>> class EveryMorningTask(PeriodicTask):        ...     run_every = crontab(hours=7, minute=30)        ...        ...     def run(self, **kwargs):        ...         logger = self.get_logger(**kwargs)        ...         logger.info("Execute every day at 7:30AM.")        >>> class EveryQuarterPastTheHourTask(PeriodicTask):        ...     run_every = crontab(minute=15)        ...        ...     def run(self, **kwargs):        ...         logger = self.get_logger(**kwargs)        ...         logger.info("Execute every 0:15 past the hour every day.")    """    abstract = True    ignore_result = True    type = "periodic"    relative = False    options = None    def __init__(self):        app = current_app        if not hasattr(self, "run_every"):            raise NotImplementedError(                    "Periodic tasks must have a run_every attribute")        self.run_every = maybe_schedule(self.run_every, self.relative)        # For backward compatibility, add the periodic task to the        # configuration schedule instead.        app.conf.CELERYBEAT_SCHEDULE[self.name] = {                "task": self.name,                "schedule": self.run_every,                "args": (),                "kwargs": {},                "options": self.options or {},                "relative": self.relative,        }        super(PeriodicTask, self).__init__()    def timedelta_seconds(self, delta):        """Convert :class:`~datetime.timedelta` to seconds.        Doesn't account for negative timedeltas.        """        return timeutils.timedelta_seconds(delta)    def is_due(self, last_run_at):        """Returns tuple of two items `(is_due, next_time_to_run)`,        where next time to run is in seconds.        See :meth:`celery.schedules.schedule.is_due` for more information.        """        return self.run_every.is_due(last_run_at)    def remaining_estimate(self, last_run_at):        """Returns when the periodic task should run next as a timedelta."""        return self.run_every.remaining_estimate(last_run_at)
 |