|
@@ -7,14 +7,16 @@ import time
|
|
|
import shelve
|
|
|
import threading
|
|
|
import multiprocessing
|
|
|
-from datetime import datetime
|
|
|
+from datetime import datetime, timedelta
|
|
|
from UserDict import UserDict
|
|
|
|
|
|
from celery import log
|
|
|
from celery import conf
|
|
|
-from celery import registry as _registry
|
|
|
from celery import platform
|
|
|
+from celery.execute import send_task
|
|
|
+from celery.schedules import schedule
|
|
|
from celery.messaging import establish_connection
|
|
|
+from celery.utils import instantiate
|
|
|
from celery.utils.info import humanize_seconds
|
|
|
|
|
|
|
|
@@ -25,13 +27,28 @@ class SchedulingError(Exception):
|
|
|
class ScheduleEntry(object):
|
|
|
"""An entry in the scheduler.
|
|
|
|
|
|
- :param task: see :attr:`task`.
|
|
|
+ :param name: see :attr:`name`.
|
|
|
+ :param schedule: see :attr:`schedule`.
|
|
|
+ :param args: see :attr:`args`.
|
|
|
+ :param kwargs: see :attr:`kwargs`.
|
|
|
:keyword last_run_at: see :attr:`last_run_at`.
|
|
|
:keyword total_run_count: see :attr:`total_run_count`.
|
|
|
|
|
|
- .. attribute:: task
|
|
|
+ .. attribute:: name
|
|
|
|
|
|
- The task class.
|
|
|
+ The task name.
|
|
|
+
|
|
|
+ .. attribute:: schedule
|
|
|
+
|
|
|
+ The schedule (run_every/crontab)
|
|
|
+
|
|
|
+ .. attribute:: args
|
|
|
+
|
|
|
+ Args to apply.
|
|
|
+
|
|
|
+ .. attribute:: kwargs
|
|
|
+
|
|
|
+ Keyword arguments to apply.
|
|
|
|
|
|
.. attribute:: last_run_at
|
|
|
|
|
@@ -43,8 +60,13 @@ class ScheduleEntry(object):
|
|
|
|
|
|
"""
|
|
|
|
|
|
- def __init__(self, name, last_run_at=None, total_run_count=None):
|
|
|
+ def __init__(self, name, schedule, args=(), kwargs={},
|
|
|
+ options={}, last_run_at=None, total_run_count=None):
|
|
|
self.name = name
|
|
|
+ self.schedule = schedule
|
|
|
+ self.args = args
|
|
|
+ self.kwargs = kwargs
|
|
|
+ self.options = options
|
|
|
self.last_run_at = last_run_at or datetime.now()
|
|
|
self.total_run_count = total_run_count or 0
|
|
|
|
|
@@ -52,26 +74,31 @@ class ScheduleEntry(object):
|
|
|
"""Returns a new instance of the same class, but with
|
|
|
its date and count fields updated."""
|
|
|
return self.__class__(self.name,
|
|
|
+ self.schedule,
|
|
|
+ self.args,
|
|
|
+ self.kwargs,
|
|
|
+ self.options,
|
|
|
datetime.now(),
|
|
|
self.total_run_count + 1)
|
|
|
|
|
|
- def is_due(self, task):
|
|
|
+ def is_due(self):
|
|
|
"""See :meth:`celery.task.base.PeriodicTask.is_due`."""
|
|
|
- return task.is_due(self.last_run_at)
|
|
|
+ return self.schedule.is_due(self.last_run_at)
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ return "<Entry: %s(*%s, **%s) {%s}>" % (self.name,
|
|
|
+ self.args,
|
|
|
+ self.kwargs,
|
|
|
+ self.schedule)
|
|
|
|
|
|
|
|
|
class Scheduler(UserDict):
|
|
|
"""Scheduler for periodic tasks.
|
|
|
|
|
|
- :keyword registry: see :attr:`registry`.
|
|
|
:keyword schedule: see :attr:`schedule`.
|
|
|
:keyword logger: see :attr:`logger`.
|
|
|
:keyword max_interval: see :attr:`max_interval`.
|
|
|
|
|
|
- .. attribute:: registry
|
|
|
-
|
|
|
- The task registry to use.
|
|
|
-
|
|
|
.. attribute:: schedule
|
|
|
|
|
|
The schedule dict/shelve.
|
|
@@ -85,10 +112,10 @@ class Scheduler(UserDict):
|
|
|
Maximum time to sleep between re-checking the schedule.
|
|
|
|
|
|
"""
|
|
|
+ Entry = ScheduleEntry
|
|
|
|
|
|
- def __init__(self, registry=None, schedule=None, logger=None,
|
|
|
+ def __init__(self, schedule=None, logger=None,
|
|
|
max_interval=None):
|
|
|
- self.registry = registry or _registry.TaskRegistry()
|
|
|
self.data = schedule
|
|
|
if self.data is None:
|
|
|
self.data = {}
|
|
@@ -96,7 +123,10 @@ class Scheduler(UserDict):
|
|
|
self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
|
|
|
|
|
|
self.cleanup()
|
|
|
- self.schedule_registry()
|
|
|
+ self.setup_schedule()
|
|
|
+
|
|
|
+ def iterentries(self):
|
|
|
+ return self.schedule.itervalues()
|
|
|
|
|
|
def tick(self):
|
|
|
"""Run a tick, that is one iteration of the scheduler.
|
|
@@ -107,16 +137,18 @@ class Scheduler(UserDict):
|
|
|
remaining_times = []
|
|
|
connection = establish_connection()
|
|
|
try:
|
|
|
- for entry in self.schedule.values():
|
|
|
- is_due, next_time_to_run = self.is_due(entry)
|
|
|
+ for entry in self.iterentries():
|
|
|
+ is_due, next_time_to_run = entry.is_due()
|
|
|
if is_due:
|
|
|
debug("Scheduler: Sending due task %s" % entry.name)
|
|
|
try:
|
|
|
- result = self.apply_async(entry, connection=connection)
|
|
|
+ result = self.apply_async(entry,
|
|
|
+ connection=connection)
|
|
|
except SchedulingError, exc:
|
|
|
error("Scheduler: %s" % exc)
|
|
|
else:
|
|
|
- debug("%s sent. id->%s" % (entry.name, result.task_id))
|
|
|
+ debug("%s sent. id->%s" % (entry.name,
|
|
|
+ result.task_id))
|
|
|
if next_time_to_run:
|
|
|
remaining_times.append(next_time_to_run)
|
|
|
finally:
|
|
@@ -124,39 +156,47 @@ class Scheduler(UserDict):
|
|
|
|
|
|
return min(remaining_times + [self.max_interval])
|
|
|
|
|
|
- def get_task(self, name):
|
|
|
- return self.registry[name]
|
|
|
-
|
|
|
- def is_due(self, entry):
|
|
|
- return entry.is_due(self.get_task(entry.name))
|
|
|
+ def reserve(self, entry):
|
|
|
+ new_entry = self.schedule[entry.name] = entry.next()
|
|
|
+ return new_entry
|
|
|
|
|
|
def apply_async(self, entry, **kwargs):
|
|
|
-
|
|
|
# Update timestamps and run counts before we actually execute,
|
|
|
# so we have that done if an exception is raised (doesn't schedule
|
|
|
# forever.)
|
|
|
- entry = self.schedule[entry.name] = entry.next()
|
|
|
- task = self.get_task(entry.name)
|
|
|
+ entry = self.reserve(entry)
|
|
|
+
|
|
|
+ print("APPLYING: %s" % (entry, ))
|
|
|
|
|
|
try:
|
|
|
- result = task.apply_async(**kwargs)
|
|
|
+ result = send_task(entry.name, entry.args, entry.kwargs,
|
|
|
+ **entry.options)
|
|
|
except Exception, exc:
|
|
|
raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
|
|
|
- task.name, exc))
|
|
|
+ entry.name, exc))
|
|
|
return result
|
|
|
|
|
|
- def schedule_registry(self):
|
|
|
- """Add the current contents of the registry to the schedule."""
|
|
|
- for name, task in self.registry.periodic().items():
|
|
|
- if name not in self.schedule:
|
|
|
- self.logger.debug("Scheduler: "
|
|
|
- "Added periodic task %s to schedule" % name)
|
|
|
- self.schedule.setdefault(name, ScheduleEntry(task.name))
|
|
|
+ def maybe_schedule(self, s, relative=False):
|
|
|
+ if isinstance(s, int):
|
|
|
+ return timedelta(seconds=s)
|
|
|
+ if isinstance(s, timedelta):
|
|
|
+ return schedule(s, relative)
|
|
|
+ return s
|
|
|
+
|
|
|
+ def setup_schedule(self):
|
|
|
+ self.data = self.dict_to_entries(conf.CELERYBEAT_SCHEDULE)
|
|
|
+
|
|
|
+ def dict_to_entries(self, dict_):
|
|
|
+ entries = {}
|
|
|
+ for name, entry in dict_.items():
|
|
|
+ relative = entry.pop("relative", None)
|
|
|
+ entry["schedule"] = self.maybe_schedule(entry["schedule"],
|
|
|
+ relative)
|
|
|
+ entries[name] = self.Entry(**entry)
|
|
|
+ return entries
|
|
|
|
|
|
def cleanup(self):
|
|
|
- for task_name, entry in self.schedule.items():
|
|
|
- if task_name not in self.registry:
|
|
|
- self.schedule.pop(task_name, None)
|
|
|
+ pass
|
|
|
|
|
|
@property
|
|
|
def schedule(self):
|
|
@@ -165,30 +205,29 @@ class Scheduler(UserDict):
|
|
|
|
|
|
class ClockService(object):
|
|
|
scheduler_cls = Scheduler
|
|
|
- registry = _registry.tasks
|
|
|
open_schedule = lambda self, filename: shelve.open(filename)
|
|
|
|
|
|
def __init__(self, logger=None,
|
|
|
max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
|
|
|
- schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
|
|
|
+ schedule=conf.CELERYBEAT_SCHEDULE,
|
|
|
+ schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
|
|
|
+ scheduler_cls=None):
|
|
|
self.logger = logger or log.get_default_logger()
|
|
|
self.max_interval = max_interval
|
|
|
- self.schedule_filename = schedule_filename
|
|
|
+ self.scheduler_cls = scheduler_cls or self.scheduler_cls
|
|
|
self._shutdown = threading.Event()
|
|
|
self._stopped = threading.Event()
|
|
|
- self._schedule = None
|
|
|
+ self.schedule = schedule
|
|
|
self._scheduler = None
|
|
|
- self._in_sync = False
|
|
|
silence = self.max_interval < 60 and 10 or 1
|
|
|
self.debug = log.SilenceRepeated(self.logger.debug,
|
|
|
max_iterations=silence)
|
|
|
|
|
|
def start(self, embedded_process=False):
|
|
|
- self.logger.info("ClockService: Starting...")
|
|
|
- self.logger.debug("ClockService: "
|
|
|
- "Ticking with max interval->%s, schedule->%s" % (
|
|
|
- humanize_seconds(self.max_interval),
|
|
|
- self.schedule_filename))
|
|
|
+ self.logger.info("Celerybeat: Starting...")
|
|
|
+ self.logger.debug("Celerybeat: "
|
|
|
+ "Ticking with max interval->%s" % (
|
|
|
+ humanize_seconds(self.max_interval)))
|
|
|
|
|
|
if embedded_process:
|
|
|
platform.set_process_title("celerybeat")
|
|
@@ -199,7 +238,7 @@ class ClockService(object):
|
|
|
if self._shutdown.isSet():
|
|
|
break
|
|
|
interval = self.scheduler.tick()
|
|
|
- self.debug("ClockService: Waking up %s." % (
|
|
|
+ self.debug("Celerybeat: Waking up %s." % (
|
|
|
humanize_seconds(interval, prefix="in ")))
|
|
|
time.sleep(interval)
|
|
|
except (KeyboardInterrupt, SystemExit):
|
|
@@ -208,32 +247,20 @@ class ClockService(object):
|
|
|
self.sync()
|
|
|
|
|
|
def sync(self):
|
|
|
- if self._schedule is not None and not self._in_sync:
|
|
|
- self.logger.debug("ClockService: Syncing schedule to disk...")
|
|
|
- self._schedule.sync()
|
|
|
- self._schedule.close()
|
|
|
- self._in_sync = True
|
|
|
- self._stopped.set()
|
|
|
+ self._stopped.set()
|
|
|
|
|
|
def stop(self, wait=False):
|
|
|
- self.logger.info("ClockService: Shutting down...")
|
|
|
+ self.logger.info("Celerybeat: Shutting down...")
|
|
|
self._shutdown.set()
|
|
|
wait and self._stopped.wait() # block until shutdown done.
|
|
|
|
|
|
- @property
|
|
|
- def schedule(self):
|
|
|
- if self._schedule is None:
|
|
|
- filename = self.schedule_filename
|
|
|
- self._schedule = self.open_schedule(filename=filename)
|
|
|
- return self._schedule
|
|
|
-
|
|
|
@property
|
|
|
def scheduler(self):
|
|
|
if self._scheduler is None:
|
|
|
- self._scheduler = self.scheduler_cls(schedule=self.schedule,
|
|
|
- registry=self.registry,
|
|
|
- logger=self.logger,
|
|
|
- max_interval=self.max_interval)
|
|
|
+ self._scheduler = instantiate(self.scheduler_cls,
|
|
|
+ schedule=self.schedule,
|
|
|
+ logger=self.logger,
|
|
|
+ max_interval=self.max_interval)
|
|
|
return self._scheduler
|
|
|
|
|
|
|