Browse Source

Don't use the registry for schedules. Use setup_schedule() to set up the schedule.

This is part of the on-going work to move the schedule away from the registry
to a database instead.

The PeriodicTask classes *will* be replaced with something much more flexible.
Ask Solem 15 years ago
parent
commit
0ced98a1d6
1 changed files with 51 additions and 39 deletions
  1. 51 39
      celery/beat.py

+ 51 - 39
celery/beat.py

@@ -12,8 +12,8 @@ from UserDict import UserDict
 
 from celery import log
 from celery import conf
-from celery import registry as _registry
 from celery import platform
+from celery.execute import send_task
 from celery.messaging import establish_connection
 from celery.utils.info import humanize_seconds
 
@@ -25,13 +25,28 @@ class SchedulingError(Exception):
 class ScheduleEntry(object):
     """An entry in the scheduler.
 
-    :param task: see :attr:`task`.
+    :param name: see :attr:`name`.
+    :param schedule: see :attr:`schedule`.
+    :param args: see :attr:`args`.
+    :param kwargs: see :attr:`kwargs`.
     :keyword last_run_at: see :attr:`last_run_at`.
     :keyword total_run_count: see :attr:`total_run_count`.
 
-    .. attribute:: task
+    .. attribute:: name
 
-        The task class.
+        The task name.
+
+    .. attribute:: schedule
+
+        The schedule (run_every/crontab)
+
+    .. attribute:: args
+
+        Args to apply.
+
+    .. attribute:: kwargs
+
+        Keyword arguments to apply.
 
     .. attribute:: last_run_at
 
@@ -43,8 +58,13 @@ class ScheduleEntry(object):
 
     """
 
-    def __init__(self, name, last_run_at=None, total_run_count=None):
+    def __init__(self, name, schedule, args=(), kwargs={},
+            options={}, last_run_at=None, total_run_count=None):
         self.name = name
+        self.schedule = schedule
+        self.args = args
+        self.kwargs = kwargs
+        self.options = options
         self.last_run_at = last_run_at or datetime.now()
         self.total_run_count = total_run_count or 0
 
@@ -52,26 +72,25 @@ class ScheduleEntry(object):
         """Returns a new instance of the same class, but with
         its date and count fields updated."""
         return self.__class__(self.name,
+                              self.schedule,
+                              self.args,
+                              self.kwargs,
+                              self.options,
                               datetime.now(),
                               self.total_run_count + 1)
 
-    def is_due(self, task):
+    def is_due(self):
         """See :meth:`celery.task.base.PeriodicTask.is_due`."""
-        return task.is_due(self.last_run_at)
+        return self.schedule.is_due(self.last_run_at)
 
 
 class Scheduler(UserDict):
     """Scheduler for periodic tasks.
 
-    :keyword registry: see :attr:`registry`.
     :keyword schedule: see :attr:`schedule`.
     :keyword logger:  see :attr:`logger`.
     :keyword max_interval: see :attr:`max_interval`.
 
-    .. attribute:: registry
-
-        The task registry to use.
-
     .. attribute:: schedule
 
         The schedule dict/shelve.
@@ -86,9 +105,8 @@ class Scheduler(UserDict):
 
     """
 
-    def __init__(self, registry=None, schedule=None, logger=None,
+    def __init__(self, schedule=None, logger=None,
             max_interval=None):
-        self.registry = registry or _registry.TaskRegistry()
         self.data = schedule
         if self.data is None:
             self.data = {}
@@ -96,7 +114,7 @@ class Scheduler(UserDict):
         self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
 
         self.cleanup()
-        self.schedule_registry()
+        self.setup_schedule()
 
     def tick(self):
         """Run a tick, that is one iteration of the scheduler.
@@ -108,15 +126,17 @@ class Scheduler(UserDict):
         connection = establish_connection()
         try:
             for entry in self.schedule.values():
-                is_due, next_time_to_run = self.is_due(entry)
+                is_due, next_time_to_run = entry.is_due()
                 if is_due:
                     debug("Scheduler: Sending due task %s" % entry.name)
                     try:
-                        result = self.apply_async(entry, connection=connection)
+                        result = self.apply_async(entry,
+                                      connection=connection)
                     except SchedulingError, exc:
                         error("Scheduler: %s" % exc)
                     else:
-                        debug("%s sent. id->%s" % (entry.name, result.task_id))
+                        debug("%s sent. id->%s" % (entry.name,
+                                                   result.task_id))
                 if next_time_to_run:
                     remaining_times.append(next_time_to_run)
         finally:
@@ -124,39 +144,33 @@ class Scheduler(UserDict):
 
         return min(remaining_times + [self.max_interval])
 
-    def get_task(self, name):
-        return self.registry[name]
-
-    def is_due(self, entry):
-        return entry.is_due(self.get_task(entry.name))
-
     def apply_async(self, entry, **kwargs):
 
         # Update timestamps and run counts before we actually execute,
         # so we have that done if an exception is raised (doesn't schedule
         # forever.)
         entry = self.schedule[entry.name] = entry.next()
-        task = self.get_task(entry.name)
 
         try:
-            result = task.apply_async(**kwargs)
+            result = send_task(entry.name, entry.args, entry.kwargs,
+                               **entry.options)
         except Exception, exc:
             raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
-                    task.name, exc))
+                    entry.name, exc))
         return result
 
-    def schedule_registry(self):
-        """Add the current contents of the registry to the schedule."""
-        for name, task in self.registry.periodic().items():
-            if name not in self.schedule:
-                self.logger.debug("Scheduler: "
-                    "Added periodic task %s to schedule" % name)
-            self.schedule.setdefault(name, ScheduleEntry(task.name))
+    def setup_schedule(self):
+        from datetime import timedelta
+        from celery.task.schedules import schedule
+        self.schedule["add.often"] = ScheduleEntry("tasks.add",
+                                      schedule(timedelta(seconds=5)),
+                                      args=(4, 4))
+        self.schedule["sleep.often"] = ScheduleEntry("tasks.sleeptask",
+                                      schedule(timedelta(minutes=1)),
+                                      args=(2, ))
 
     def cleanup(self):
-        for task_name, entry in self.schedule.items():
-            if task_name not in self.registry:
-                self.schedule.pop(task_name, None)
+        pass
 
     @property
     def schedule(self):
@@ -165,7 +179,6 @@ class Scheduler(UserDict):
 
 class ClockService(object):
     scheduler_cls = Scheduler
-    registry = _registry.tasks
     open_schedule = lambda self, filename: shelve.open(filename)
 
     def __init__(self, logger=None,
@@ -231,7 +244,6 @@ class ClockService(object):
     def scheduler(self):
         if self._scheduler is None:
             self._scheduler = self.scheduler_cls(schedule=self.schedule,
-                                            registry=self.registry,
                                             logger=self.logger,
                                             max_interval=self.max_interval)
         return self._scheduler