Browse Source

Celerybeat tested and works, added -B option to celeryd to embed it.

Ask Solem 15 years ago
parent
commit
d8c625884e
4 changed files with 73 additions and 31 deletions
  1. 45 22
      celery/beat.py
  2. 2 3
      celery/bin/celerybeat.py
  3. 13 1
      celery/bin/celeryd.py
  4. 13 5
      celery/worker/__init__.py

+ 45 - 22
celery/beat.py

@@ -7,6 +7,7 @@ from datetime import datetime
 from celery import conf
 from celery import registry
 from celery.log import setup_logger
+from celery.exceptions import NotRegistered
 
 
 
@@ -24,27 +25,18 @@ class ScheduleEntry(object):
 
     """
 
-    def __init__(self, task, last_run_at=None, total_run_count=None):
-        self.task = task
+    def __init__(self, name, last_run_at=None,
+            total_run_count=None):
+        self.name = name
         self.last_run_at = last_run_at or datetime.now()
         self.total_run_count = total_run_count or 0
 
-    def execute(self):
-        # Increment timestamps and counts before executing,
-        # in case of exception.
-        self.last_run_at = datetime.now()
-        self.total_run_count += 1
+    def next(self):
+        return self.__class__(self.name, datetime.now(),
+                              self.total_run_count + 1)
 
-        try:
-            result = self.task.apply_async()
-        except Exception, exc:
-            raise SchedulingError(
-                    "Couldn't apply scheduled task %s: %s" % (
-                        self.task.name, exc))
-        return result
-
-    def is_due(self):
-        return datetime.now() > (self.last_run_at + self.task.run_every)
+    def is_due(self, run_every):
+        return datetime.now() > (self.last_run_at + run_every)
 
 
 class Scheduler(UserDict):
@@ -75,6 +67,7 @@ class Scheduler(UserDict):
                 attr_value = attr_default_gen()
             setattr(self, attr_name, attr_value)
 
+        self.cleanup()
         self.schedule_registry()
 
     def tick(self):
@@ -82,14 +75,39 @@ class Scheduler(UserDict):
         Executes all due tasks."""
         for entry in self.get_due_tasks():
             self.logger.debug("Scheduler: Sending due task %s" % (
-                    entry.task.name))
-            result = entry.execute()
+                    entry.name))
+            result = self.apply_async(entry)
             self.logger.debug("Scheduler: %s sent. id->%s" % (
-                    entry.task_name, result.task_id))
+                    entry.name, result.task_id))
 
     def get_due_tasks(self):
         """Get all the schedule entries that are due to execution."""
-        return filter(lambda entry: entry.is_due(), self.schedule.values())
+        return filter(self.is_due, self.schedule.values())
+
+    def get_task(self, name):
+        try:
+            return self.registry[name]
+        except KeyError:
+            raise NotRegistered(name)
+
+    def is_due(self, entry):
+        return entry.is_due(self.get_task(entry.name).run_every)
+
+    def apply_async(self, entry):
+
+        # Update timestamps and run counts before we actually execute,
+        # so we have that done if an exception is raised (doesn't schedule
+        # forever.)
+        entry = self.schedule[entry.name] = entry.next()
+        task = self.get_task(entry.name)
+
+        try:
+            result = task.apply_async()
+        except Exception, exc:
+            raise SchedulingError(
+                    "Couldn't apply scheduled task %s: %s" % (
+                        task.name, exc))
+        return result
 
     def schedule_registry(self):
         """Add the current contents of the registry to the schedule."""
@@ -99,7 +117,12 @@ class Scheduler(UserDict):
                 self.logger.debug(
                         "Scheduler: Adding periodic task %s to schedule" % (
                             task.name))
-            self.schedule.setdefault(name, ScheduleEntry(task))
+            self.schedule.setdefault(name, ScheduleEntry(task.name))
+
+    def cleanup(self):
+        for task_name, entry in self.schedule.items():
+            if task_name not in self.registry:
+                self.schedule.pop(task_name, None)
 
     @property
     def schedule(self):

+ 2 - 3
celery/bin/celerybeat.py

@@ -139,9 +139,8 @@ def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
     current_loader.on_worker_init()
 
     def _run_clock():
-        clockservice = ClockService(loglevel=loglevel,
-                                    logfile=logfile,
-                                    is_detached=detach)
+        logger = setup_logger(loglevel, logfile)
+        clockservice = ClockService(logger=logger, is_detached=detach)
 
         try:
             clockservice.start()

+ 13 - 1
celery/bin/celeryd.py

@@ -21,6 +21,11 @@
 
     Path to pidfile.
 
+.. cmdoption:: -B, --beat
+
+    Also run the ``celerybeat`` periodic task scheduler. Please note that
+    there must only be one instance of this service.
+
 .. cmdoption:: -s, --statistics
 
     Turn on reporting of statistics (remember to flush the statistics message
@@ -86,6 +91,7 @@ Configuration ->
     * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
     * Concurrency -> %(concurrency)s
     * Statistics -> %(statistics)s
+    * Celerybeat -> %(celerybeat)s
 """.strip()
 
 OPTION_LIST = (
@@ -110,6 +116,10 @@ OPTION_LIST = (
     optparse.make_option('-p', '--pidfile', default=conf.DAEMON_PID_FILE,
             action="store", dest="pidfile",
             help="Path to pidfile."),
+    optparse.make_option('-B', '--beat', default=False,
+            action="store_true", dest="run_clockservice",
+            help="Also run the celerybeat periodic task scheduler. \
+                  Please note that only one instance must be running."),
     optparse.make_option('-d', '--detach', '--daemon', default=False,
             action="store_true", dest="detach",
             help="Run in the background as a daemon."),
@@ -138,7 +148,7 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
         loglevel=conf.DAEMON_LOG_LEVEL, logfile=conf.DAEMON_LOG_FILE,
         discard=False, pidfile=conf.DAEMON_PID_FILE, umask=0,
         uid=None, gid=None, supervised=False, working_directory=None,
-        chroot=None, statistics=None, **kwargs):
+        chroot=None, statistics=None, run_clockservice=False, **kwargs):
     """Starts the celery worker server."""
 
     print("Celery %s is starting." % __version__)
@@ -185,6 +195,7 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
             "loglevel": loglevel,
             "pidfile": pidfile,
             "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
+            "celerybeat": run_clockservice and "ON" or "OFF",
     })
 
     print("Celery has started.")
@@ -208,6 +219,7 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
         worker = WorkController(concurrency=concurrency,
                                 loglevel=loglevel,
                                 logfile=logfile,
+                                embed_clockservice=run_clockservice,
                                 is_detached=detach)
 
         # Install signal handler that restarts celeryd on SIGHUP,

+ 13 - 5
celery/worker/__init__.py

@@ -5,6 +5,7 @@ The Multiprocessing Worker Server
 """
 from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
 from celery.worker.controllers import Mediator, PeriodicWorkController
+from celery.beat import ClockServiceThread
 from celery.worker.job import TaskWrapper
 from celery.exceptions import NotRegistered
 from celery.messaging import get_consumer_set
@@ -227,7 +228,7 @@ class WorkController(object):
     _state = None
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
-            is_detached=False):
+            is_detached=False, embed_clockservice=False):
 
         # Options
         self.loglevel = loglevel or self.loglevel
@@ -235,6 +236,7 @@ class WorkController(object):
         self.logfile = logfile or self.logfile
         self.is_detached = is_detached
         self.logger = setup_logger(loglevel, logfile)
+        self.embed_clockservice = embed_clockservice
 
         # Queues
         self.bucket_queue = Queue()
@@ -252,13 +254,19 @@ class WorkController(object):
                                           initial_prefetch_count=concurrency)
         self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
 
+        self.clockservice = None
+        if self.embed_clockservice:
+            self.clockservice = ClockServiceThread(logger=self.logger,
+                                                is_detached=self.is_detached)
+
         # The order is important here;
         #   the first in the list is the first to start,
         # and they must be stopped in reverse order.
-        self.components = [self.pool,
-                           self.mediator,
-                           self.periodic_work_controller,
-                           self.amqp_listener]
+        self.components = filter(None, (self.pool,
+                                        self.mediator,
+                                        self.periodic_work_controller,
+                                        self.clockservice,
+                                        self.amqp_listener))
 
     def start(self):
         """Starts the workers main loop."""