Parcourir la source

celeryd: Added --scheduler option to be used in combination with -B. Closes #229

Ask Solem il y a 14 ans
Parent
commit
f7c0062d2e
3 fichiers modifiés avec 16 ajouts et 4 suppressions
  1. 3 1
      celery/apps/worker.py
  2. 9 1
      celery/bin/celeryd.py
  3. 4 2
      celery/worker/__init__.py

+ 3 - 1
celery/apps/worker.py

@@ -43,7 +43,7 @@ class Worker(object):
             max_tasks_per_child=None, queues=None, events=False, db=None,
             include=None, app=None, pidfile=None,
             redirect_stdouts=None, redirect_stdouts_level=None,
-            autoscale=None, **kwargs):
+            autoscale=None, scheduler_cls=None, **kwargs):
         self.app = app = app_or_default(app)
         self.concurrency = (concurrency or
                             app.conf.CELERYD_CONCURRENCY or
@@ -54,6 +54,7 @@ class Worker(object):
         self.discard = discard
         self.run_clockservice = run_clockservice
         self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
+        self.scheduler_cls = scheduler_cls or app.conf.CELERYBEAT_SCHEDULER
         self.events = events
         self.task_time_limit = (task_time_limit or
                                 app.conf.CELERYD_TASK_TIME_LIMIT)
@@ -196,6 +197,7 @@ class Worker(object):
                                 ready_callback=self.on_listener_ready,
                                 embed_clockservice=self.run_clockservice,
                                 schedule_filename=self.schedule,
+                                scheduler_cls=self.scheduler_cls,
                                 send_events=self.events,
                                 db=self.db,
                                 queues=self.queues,

+ 9 - 1
celery/bin/celeryd.py

@@ -43,6 +43,10 @@
     Defaults to ``celerybeat-schedule``. The extension ".db" will be
     appended to the filename.
 
+.. cmdoption:: --scheduler
+
+    Scheduler class to use. Default is celery.beat.PersistentScheduler
+
 .. cmdoption:: -E, --events
 
     Send events that can be captured by monitors like ``celerymon``.
@@ -114,7 +118,11 @@ class WorkerCommand(Command):
                      "option. The extension '.db' will be appended to the "
                     "filename. Default: %s" % (
                         conf.CELERYBEAT_SCHEDULE_FILENAME, )),
-
+            Option('--scheduler',
+                default=None,
+                action="store", dest="scheduler_cls",
+                help="Scheduler class. Default is "
+                     "celery.beat.PersistentScheduler"),
             Option('-S', '--statedb', default=conf.CELERYD_STATE_DB,
                 action="store", dest="db",
                 help="Path to the state database. The extension '.db' will "

+ 4 - 2
celery/worker/__init__.py

@@ -121,7 +121,7 @@ class WorkController(object):
             pool_putlocks=None, db=None, prefetch_multiplier=None,
             eta_scheduler_precision=None, queues=None,
             disable_rate_limits=None, autoscale=None,
-            autoscaler_cls=None, app=None):
+            autoscaler_cls=None, scheduler_cls=None, app=None):
 
         self.app = app_or_default(app)
         conf = self.app.conf
@@ -143,6 +143,7 @@ class WorkController(object):
                                     conf.CELERYD_AUTOSCALER
         self.schedule_filename = schedule_filename or \
                                     conf.CELERYBEAT_SCHEDULE_FILENAME
+        self.scheduler_cls = scheduler_cls or conf.CELERYBEAT_SCHEDULER
         self.hostname = hostname or socket.gethostname()
         self.embed_clockservice = embed_clockservice
         self.ready_callback = ready_callback
@@ -217,7 +218,8 @@ class WorkController(object):
         if self.embed_clockservice:
             self.beat = beat.EmbeddedService(app=self.app,
                                 logger=self.logger,
-                                schedule_filename=self.schedule_filename)
+                                schedule_filename=self.schedule_filename,
+                                scheduler_cls=self.scheduler_cls)
 
         prefetch_count = self.concurrency * self.prefetch_multiplier
         self.listener = instantiate(self.listener_cls,