|
@@ -3,28 +3,13 @@ import math
|
|
|
import shelve
|
|
|
import atexit
|
|
|
import threading
|
|
|
-from UserDict import UserDict
|
|
|
from datetime import datetime
|
|
|
+from UserDict import UserDict
|
|
|
|
|
|
from celery import log
|
|
|
from celery import conf
|
|
|
from celery import registry
|
|
|
-
|
|
|
-TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
|
|
|
- ("hour", 60 * 60, lambda n: int(math.ceil(n))),
|
|
|
- ("minute", 60, lambda n: int(math.ceil(n))),
|
|
|
- ("second", 1, lambda n: "%.2d" % n))
|
|
|
-
|
|
|
-
|
|
|
-def humanize_seconds(secs, prefix=""):
|
|
|
- """Show seconds in human form, e.g. 60 is "1 minute", 7200 is "2
|
|
|
- hours"."""
|
|
|
- for unit, divider, formatter in TIME_UNITS:
|
|
|
- if secs >= divider:
|
|
|
- w = secs / divider
|
|
|
- punit = w > 1 and unit+"s" or unit
|
|
|
- return "%s%s %s" % (prefix, formatter(w), punit)
|
|
|
- return "now"
|
|
|
+from celery.utils.info import humanize_seconds
|
|
|
|
|
|
|
|
|
class SchedulingError(Exception):
|
|
@@ -34,10 +19,21 @@ class SchedulingError(Exception):
|
|
|
class ScheduleEntry(object):
|
|
|
"""An entry in the scheduler.
|
|
|
|
|
|
- :param task: The task class.
|
|
|
- :keyword last_run_at: The time and date when this task was last run.
|
|
|
- :keyword total_run_count: Total number of times this periodic task has
|
|
|
- been executed.
|
|
|
+ :param task: see :attr:`task`.
|
|
|
+ :keyword last_run_at: see :attr:`last_run_at`.
|
|
|
+ :keyword total_run_count: see :attr:`total_run_count`.
|
|
|
+
|
|
|
+ .. attribute:: task
|
|
|
+
|
|
|
+ The task class.
|
|
|
+
|
|
|
+ .. attribute:: last_run_at
|
|
|
+
|
|
|
+ The time and date of when this task was last run.
|
|
|
+
|
|
|
+ .. attribute:: total_run_count
|
|
|
+
|
|
|
+ Total number of times this periodic task has been executed.
|
|
|
|
|
|
"""
|
|
|
|
|
@@ -49,9 +45,9 @@ class ScheduleEntry(object):
|
|
|
def next(self):
|
|
|
"""Returns a new instance of the same class, but with
|
|
|
its date and count fields updated."""
|
|
|
- return self.__class__(name=self.name,
|
|
|
- last_run_at=datetime.now(),
|
|
|
- total_run_count=self.total_run_count + 1)
|
|
|
+ return self.__class__(self.name,
|
|
|
+ datetime.now(),
|
|
|
+ self.total_run_count + 1)
|
|
|
|
|
|
def is_due(self, task):
|
|
|
"""See :meth:`celery.task.base.PeriodicTask.is_due`."""
|
|
@@ -61,28 +57,35 @@ class ScheduleEntry(object):
|
|
|
class Scheduler(UserDict):
|
|
|
"""Scheduler for periodic tasks.
|
|
|
|
|
|
- :keyword registry: The task registry to use.
|
|
|
- :keyword schedule: The schedule dictionary. Default is the global
|
|
|
- persistent schedule ``celery.beat.schedule``.
|
|
|
- :keyword logger: The logger to use.
|
|
|
- :keyword max_interval: Maximum time to sleep between re-checking the
|
|
|
- schedule.
|
|
|
+ :keyword registry: see :attr:`registry`.
|
|
|
+ :keyword schedule: see :attr:`schedule`.
|
|
|
+ :keyword logger: see :attr:`logger`.
|
|
|
+ :keyword max_interval: see :attr:`max_interval`.
|
|
|
|
|
|
- """
|
|
|
+ .. attribute:: registry
|
|
|
+
|
|
|
+ The task registry to use.
|
|
|
+
|
|
|
+ .. attribute:: schedule
|
|
|
+
|
|
|
+ The schedule dict/shelve.
|
|
|
|
|
|
- def __init__(self, **kwargs):
|
|
|
+ .. attribute:: logger
|
|
|
|
|
|
- attr_defaults = {"registry": lambda: {},
|
|
|
- "schedule": lambda: {},
|
|
|
- "logger": log.get_default_logger,
|
|
|
- "max_interval": conf.CELERYBEAT_MAX_LOOP_INTERVAL}
|
|
|
+ The logger to use.
|
|
|
|
|
|
- for attr_name, attr_default_gen in attr_defaults.items():
|
|
|
- if attr_name in kwargs:
|
|
|
- attr_value = kwargs[attr_name]
|
|
|
- else:
|
|
|
- attr_value = attr_default_gen()
|
|
|
- setattr(self, attr_name, attr_value)
|
|
|
+ .. attribute:: max_interval
|
|
|
+
|
|
|
+ Maximum time to sleep between re-checking the schedule.
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, registry=None, schedule=None, logger=None,
|
|
|
+ max_interval=None):
|
|
|
+ self.registry = registry or {}
|
|
|
+ self.schedule = schedule or {}
|
|
|
+ self.logger = logger or log.get_default_logger()
|
|
|
+ self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
|
|
|
|
|
|
self.cleanup()
|
|
|
self.schedule_registry()
|
|
@@ -90,15 +93,20 @@ class Scheduler(UserDict):
|
|
|
def tick(self):
|
|
|
"""Run a tick, that is one iteration of the scheduler.
|
|
|
Executes all due tasks."""
|
|
|
+ debug = self.logger.debug
|
|
|
+ error = self.logger.error
|
|
|
+
|
|
|
remaining_times = []
|
|
|
for entry in self.schedule.values():
|
|
|
is_due, next_time_to_run = self.is_due(entry)
|
|
|
if is_due:
|
|
|
- self.logger.debug("Scheduler: Sending due task %s" % (
|
|
|
- entry.name))
|
|
|
- result = self.apply_async(entry)
|
|
|
- self.logger.debug("Scheduler: %s sent. id->%s" % (
|
|
|
- entry.name, result.task_id))
|
|
|
+ debug("Scheduler: Sending due task %s" % entry.name)
|
|
|
+ try:
|
|
|
+ result = self.apply_async(entry)
|
|
|
+ except SchedulingError, exc:
|
|
|
+ error("Scheduler: %s" % exc)
|
|
|
+ else:
|
|
|
+ debug("%s sent. id->%s" % (entry.name, result.task_id))
|
|
|
if next_time_to_run:
|
|
|
remaining_times.append(next_time_to_run)
|
|
|
|
|
@@ -121,18 +129,16 @@ class Scheduler(UserDict):
|
|
|
try:
|
|
|
result = task.apply_async()
|
|
|
except Exception, exc:
|
|
|
- raise SchedulingError(
|
|
|
- "Couldn't apply scheduled task %s: %s" % (
|
|
|
- task.name, exc))
|
|
|
+ raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
|
|
|
+ task.name, exc))
|
|
|
return result
|
|
|
|
|
|
def schedule_registry(self):
|
|
|
"""Add the current contents of the registry to the schedule."""
|
|
|
for name, task in self.registry.periodic().items():
|
|
|
if name not in self.schedule:
|
|
|
- self.logger.debug(
|
|
|
- "Scheduler: Adding periodic task %s to schedule" % (
|
|
|
- task.name))
|
|
|
+ self.logger.debug("Scheduler: "
|
|
|
+ "Added periodic task %s to schedule" % name)
|
|
|
self.schedule.setdefault(name, ScheduleEntry(task.name))
|
|
|
|
|
|
def cleanup(self):
|