|
@@ -1,14 +1,13 @@
|
|
|
import sys
|
|
|
import warnings
|
|
|
-from datetime import datetime, timedelta
|
|
|
+from datetime import timedelta
|
|
|
|
|
|
from billiard.serialization import pickle
|
|
|
|
|
|
from celery import conf
|
|
|
from celery.log import setup_task_logger
|
|
|
from celery.utils import gen_unique_id, padlist
|
|
|
-from celery.utils.timeutils import timedelta_seconds, weekday
|
|
|
-from celery.utils.timeutils import delta_resolution
|
|
|
+from celery.utils.timeutils import timedelta_seconds
|
|
|
from celery.result import BaseAsyncResult, TaskSetResult, EagerResult
|
|
|
from celery.execute import apply_async, apply
|
|
|
from celery.registry import tasks
|
|
@@ -17,6 +16,8 @@ from celery.messaging import TaskPublisher, TaskConsumer
|
|
|
from celery.messaging import establish_connection as _establish_connection
|
|
|
from celery.exceptions import MaxRetriesExceededError, RetryTaskError
|
|
|
|
|
|
+from celery.task.schedules import schedule
|
|
|
+
|
|
|
|
|
|
class TaskType(type):
|
|
|
"""Metaclass for tasks.
|
|
@@ -654,100 +655,6 @@ class TaskSet(object):
|
|
|
return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
|
|
|
|
|
|
|
|
|
-class schedule(object):
|
|
|
- relative = False
|
|
|
-
|
|
|
- def __init__(self, run_every=None, relative=False):
|
|
|
- self.run_every = run_every
|
|
|
- self.relative = relative
|
|
|
-
|
|
|
- def remaining_estimate(self, last_run_at):
|
|
|
- """Returns when the periodic task should run next as a timedelta."""
|
|
|
- next_run_at = last_run_at + self.run_every
|
|
|
- if not self.relative:
|
|
|
- next_run_at = delta_resolution(next_run_at, self.run_every)
|
|
|
- return next_run_at - datetime.now()
|
|
|
-
|
|
|
- def is_due(self, last_run_at):
|
|
|
- """Returns tuple of two items ``(is_due, next_time_to_run)``,
|
|
|
- where next time to run is in seconds.
|
|
|
-
|
|
|
- See :meth:`PeriodicTask.is_due` for more information.
|
|
|
-
|
|
|
- """
|
|
|
- rem_delta = self.remaining_estimate(last_run_at)
|
|
|
- rem = timedelta_seconds(rem_delta)
|
|
|
- if rem == 0:
|
|
|
- return True, timedelta_seconds(self.run_every)
|
|
|
- return False, rem
|
|
|
-
|
|
|
-
|
|
|
-class crontab(schedule):
|
|
|
- """A crontab can be used as the ``run_every`` value of a
|
|
|
- :class:`PeriodicTask` to add cron-like scheduling.
|
|
|
-
|
|
|
- Like a :manpage:`cron` job, you can specify units of time of when
|
|
|
- you would like the task to execute. While not a full implementation
|
|
|
- of cron's features, it should provide a fair degree of common scheduling
|
|
|
- needs.
|
|
|
-
|
|
|
- You can specify a minute, an hour, and/or a day of the week.
|
|
|
-
|
|
|
- .. attribute:: minute
|
|
|
-
|
|
|
- An integer from 0-59 that represents the minute of an hour of when
|
|
|
- execution should occur.
|
|
|
-
|
|
|
- .. attribute:: hour
|
|
|
-
|
|
|
- An integer from 0-23 that represents the hour of a day of when
|
|
|
- execution should occur.
|
|
|
-
|
|
|
- .. attribute:: day_of_week
|
|
|
-
|
|
|
- An integer from 0-6, where Sunday = 0 and Saturday = 6, that
|
|
|
- represents the day of week that execution should occur.
|
|
|
-
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, minute=None, hour=None, day_of_week=None,
|
|
|
- nowfun=datetime.now):
|
|
|
- self.hour = hour # (0 - 23)
|
|
|
- self.minute = minute # (0 - 59)
|
|
|
- self.day_of_week = day_of_week # (0 - 6) (Sunday=0)
|
|
|
- self.nowfun = nowfun
|
|
|
-
|
|
|
- if isinstance(self.day_of_week, basestring):
|
|
|
- self.day_of_week = weekday(self.day_of_week)
|
|
|
-
|
|
|
-
|
|
|
- def remaining_estimate(self, last_run_at):
|
|
|
- # remaining_estimate controls the frequency of scheduler
|
|
|
- # ticks. The scheduler needs to wake up every second in this case.
|
|
|
- return 1
|
|
|
-
|
|
|
- def is_due(self, last_run_at):
|
|
|
- now = self.nowfun()
|
|
|
- last = now - last_run_at
|
|
|
- due, when = False, 1
|
|
|
- if last.days > 0 or last.seconds > 60:
|
|
|
- if self.day_of_week in (None, now.isoweekday()):
|
|
|
- due, when = self._check_hour_minute(now)
|
|
|
- return due, when
|
|
|
-
|
|
|
- def _check_hour_minute(self, now):
|
|
|
- due, when = False, 1
|
|
|
- if self.hour is None and self.minute is None:
|
|
|
- due, when = True, 1
|
|
|
- if self.hour is None and self.minute == now.minute:
|
|
|
- due, when = True, 1
|
|
|
- if self.hour == now.hour and self.minute is None:
|
|
|
- due, when = True, 1
|
|
|
- if self.hour == now.hour and self.minute == now.minute:
|
|
|
- due, when = True, 1
|
|
|
- return due, when
|
|
|
-
|
|
|
-
|
|
|
class PeriodicTask(Task):
|
|
|
"""A periodic task is a task that behaves like a :manpage:`cron` job.
|
|
|
|
|
@@ -779,7 +686,9 @@ class PeriodicTask(Task):
|
|
|
... logger = self.get_logger(**kwargs)
|
|
|
... logger.info("Execute every 30 seconds")
|
|
|
|
|
|
- >>> from celery.task import PeriodicTask, crontab
|
|
|
+ >>> from celery.task import PeriodicTask
|
|
|
+ >>> from celery.task.schedules import crontab
|
|
|
+
|
|
|
>>> class EveryMondayMorningTask(PeriodicTask):
|
|
|
... run_every = crontab(hour=7, minute=30, day_of_week=1)
|
|
|
...
|
|
@@ -796,7 +705,7 @@ class PeriodicTask(Task):
|
|
|
|
|
|
>>> class EveryQuarterPastTheHourTask(PeriodicTask):
|
|
|
... run_every = crontab(minute=15)
|
|
|
-
|
|
|
+ ...
|
|
|
... def run(self, **kwargs):
|
|
|
... logger = self.get_logger(**kwargs)
|
|
|
... logger.info("Execute every 0:15 past the hour every day.")
|