Jelajahi Sumber

celery.beat now uses configurated

Ask Solem 13 tahun lalu
induk
melakukan
eda757dd44
2 mengubah file dengan 48 tambahan dan 15 penghapusan
  1. 14 13
      celery/apps/beat.py
  2. 34 2
      celery/worker/__init__.py

+ 14 - 13
celery/apps/beat.py

@@ -9,6 +9,7 @@ import traceback
 from .. import __version__, platforms
 from .. import beat
 from ..app import app_or_default
+from ..app.abstract import configurated, from_config
 from ..utils import LOG_LEVELS, qualname
 from ..utils.timeutils import humanize_seconds
 
@@ -23,27 +24,27 @@ Configuration ->
 """.strip()
 
 
-class Beat(object):
+class Beat(configurated):
     Service = beat.Service
 
-    def __init__(self, loglevel=None, logfile=None, schedule=None,
-            max_interval=None, scheduler_cls=None, app=None,
-            socket_timeout=30, redirect_stdouts=None,
-            redirect_stdouts_level=None, pidfile=None, **kwargs):
+    loglevel = from_config("log_level")
+    logfile = from_config("log_file")
+    schedule = from_config("schedule_filename")
+    scheduler_cls = from_config("scheduler")
+    redirect_stdouts = from_config()
+    redirect_stdouts_level = from_config()
+
+    def __init__(self, max_interval=None, app=None,
+            socket_timeout=30, pidfile=None, **kwargs):
         """Starts the celerybeat task scheduler."""
         self.app = app = app_or_default(app)
+        self.setup_defaults(kwargs, namespace="celerybeat")
+
+        print("SCHEDULE: %r" % (self.schedule, ))
 
-        self.loglevel = loglevel or app.conf.CELERYBEAT_LOG_LEVEL
-        self.logfile = logfile or app.conf.CELERYBEAT_LOG_FILE
-        self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
-        self.scheduler_cls = scheduler_cls or app.conf.CELERYBEAT_SCHEDULER
         self.max_interval = max_interval
         self.socket_timeout = socket_timeout
         self.colored = app.log.colored(self.logfile)
-        self.redirect_stdouts = (redirect_stdouts or
-                                 app.conf.CELERY_REDIRECT_STDOUTS)
-        self.redirect_stdouts_level = (redirect_stdouts_level or
-                                       app.conf.CELERY_REDIRECT_STDOUTS_LEVEL)
         self.pidfile = pidfile
 
         if not isinstance(self.loglevel, int):

+ 34 - 2
celery/worker/__init__.py

@@ -3,7 +3,10 @@
     celery.worker
     ~~~~~~~~~~~~~
 
-    The worker.
+    :class:`WorkController` can be used to instantiate in-process workers.
+
+    The worker consists of several components, all managed by boot-steps
+    (mod:`celery.abstract`).
 
     :copyright: (c) 2009 - 2012 by Ask Solem.
     :license: BSD, see LICENSE for more details.
@@ -38,6 +41,12 @@ TERMINATE = 0x3
 
 
 class Namespace(abstract.Namespace):
+    """This is the boot-step namespace of the :class:`WorkController`.
+
+    It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
+    own set of built-in boot-step modules.
+
+    """
     name = "worker"
     builtin_boot_steps = ("celery.worker.autoscale",
                           "celery.worker.autoreload",
@@ -50,6 +59,19 @@ class Namespace(abstract.Namespace):
 
 
 class Pool(abstract.StartStopComponent):
+    """The pool component.
+
+    Describes how to initialize the worker pool, and starts and stops
+    the pool during worker startup/shutdown.
+
+    Adds attributes:
+
+        * autoscale
+        * pool
+        * max_concurrency
+        * min_concurrency
+
+    """
     name = "worker.pool"
     requires = ("queues", )
 
@@ -73,6 +95,12 @@ class Pool(abstract.StartStopComponent):
 
 
 class Beat(abstract.StartStopComponent):
+    """Component used to embed a celerybeat process.
+
+    This will only be enabled if the ``embed_clockservice``
+    argument is set.
+
+    """
     name = "worker.beat"
 
     def __init__(self, w, embed_clockservice=False, **kwargs):
@@ -80,7 +108,7 @@ class Beat(abstract.StartStopComponent):
         w.beat = None
 
     def create(self, w):
-        from celery.beat import EmbeddedService
+        from ..beat import EmbeddedService
         b = w.beat = EmbeddedService(app=w.app,
                                      logger=w.logger,
                                      schedule_filename=w.schedule_filename,
@@ -89,6 +117,8 @@ class Beat(abstract.StartStopComponent):
 
 
 class Queues(abstract.Component):
+    """This component initializes the internal queues
+    used by the worker."""
     name = "worker.queues"
 
     def create(self, w):
@@ -102,6 +132,7 @@ class Queues(abstract.Component):
 
 
 class Timers(abstract.Component):
+    """This component initializes the internal timers used by the worker."""
     name = "worker.timers"
     requires = ("pool", )
 
@@ -118,6 +149,7 @@ class Timers(abstract.Component):
 
 
 class StateDB(abstract.Component):
+    """This component sets up the workers state db if enabled."""
     name = "worker.state-db"
 
     def __init__(self, w, **kwargs):