瀏覽代碼

Configurable components: CELERYD_POOL, CELERYD_LISTENER, CELERYD_MEDIATOR, CELERYD_ETA_SCHEDULER

Default values:

    "CELERYD_POOL": "celery.worker.pool.TaskPool",
    "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
    "CELERYD_ETA_SCHEDULER": "celery.worker.controllers.ScheduleController",
    "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
Ask Solem 15 年之前
父節點
當前提交
c7d29a3c8f
共有 3 個文件被更改,包括 35 次插入22 次删除
  1. 9 0
      celery/conf.py
  2. 6 4
      celery/utils/__init__.py
  3. 20 18
      celery/worker/__init__.py

+ 9 - 0
celery/conf.py

@@ -36,6 +36,10 @@ _DEFAULTS = {
     "CELERY_BROKER_CONNECTION_TIMEOUT": 4,
     "CELERY_BROKER_CONNECTION_RETRY": True,
     "CELERY_BROKER_CONNECTION_MAX_RETRIES": 100,
+    "CELERYD_POOL": "celery.worker.pool.TaskPool",
+    "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
+    "CELERYD_ETA_SCHEDULER": "celery.worker.controllers.ScheduleController",
+    "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
     "CELERYD_CONCURRENCY": 0, # defaults to cpu count
     "CELERYD_PREFETCH_MULTIPLIER": 4,
     "CELERYD_LOG_FORMAT": DEFAULT_PROCESS_LOG_FMT,
@@ -114,6 +118,11 @@ CELERYD_LOG_LEVEL = LOG_LEVELS[CELERYD_LOG_LEVEL.upper()]
 CELERYD_CONCURRENCY = _get("CELERYD_CONCURRENCY")
 CELERYD_PREFETCH_MULTIPLIER = _get("CELERYD_PREFETCH_MULTIPLIER")
 
+CELERYD_POOL = _get("CELERYD_POOL")
+CELERYD_LISTENER = _get("CELERYD_LISTENER")
+CELERYD_MEDIATOR = _get("CELERYD_MEDIATOR")
+CELERYD_ETA_SCHEDULER = _get("CELERYD_ETA_SCHEDULER")
+
 # <--- Message routing                             <-   --   --- - ----- -- #
 QUEUES = _get("CELERY_QUEUES")
 DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")

+ 6 - 4
celery/utils/__init__.py

@@ -193,13 +193,15 @@ def timedelta_seconds(delta):
 
 
 def get_cls_by_name(name, aliases={}):
+
+    if not isinstance(name, basestring):
+        return name # already a class
+
     name = aliases.get(name) or name
     module_name, _, cls_name = rpartition(name, ".")
     module = importlib.import_module(module_name)
     return getattr(module, cls_name)
 
 
-def instantiate(name, aliases={}, *args, **kwargs):
-    return _get_cls_by_name(name, aliases)(*args, **kwargs)
-
-
+def instantiate(name, *args, **kwargs):
+    return get_cls_by_name(name)(*args, **kwargs)

+ 20 - 18
celery/worker/__init__.py

@@ -15,13 +15,11 @@ from celery import platform
 from celery import signals
 from celery.log import setup_logger, _hijack_multiprocessing_logger
 from celery.beat import EmbeddedClockService
-from celery.utils import noop
+from celery.utils import noop, instantiate
 
 from celery.worker.pool import TaskPool
 from celery.worker.buckets import TaskBucket
-from celery.worker.listener import CarrotListener
 from celery.worker.scheduler import Scheduler
-from celery.worker.controllers import Mediator, ScheduleController
 
 
 def process_initializer():
@@ -114,6 +112,9 @@ class WorkController(object):
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
             send_events=conf.SEND_EVENTS, hostname=None,
             ready_callback=noop, embed_clockservice=False,
+            pool_cls=conf.CELERYD_POOL, listener_cls=conf.CELERYD_LISTENER,
+            mediator_cls=conf.CELERYD_MEDIATOR,
+            eta_scheduler_cls=conf.CELERYD_ETA_SCHEDULER,
             schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
 
         # Options
@@ -137,14 +138,14 @@ class WorkController(object):
         self.logger.debug("Instantiating thread components...")
 
         # Threads + Pool + Consumer
-        self.pool = TaskPool(self.concurrency,
-                             logger=self.logger,
-                             initializer=process_initializer)
-        self.mediator = Mediator(self.ready_queue,
-                                 callback=self.process_task,
-                                 logger=self.logger)
-        self.scheduler = ScheduleController(self.eta_schedule,
-                                            logger=self.logger)
+        self.pool = instantiate(pool_cls, self.concurrency,
+                                logger=self.logger,
+                                initializer=process_initializer)
+        self.mediator = instantiate(mediator_cls, self.ready_queue,
+                                    callback=self.process_task,
+                                    logger=self.logger)
+        self.scheduler = instantiate(eta_scheduler_cls,
+                                     self.eta_schedule, logger=self.logger)
 
         self.clockservice = None
         if self.embed_clockservice:
@@ -152,13 +153,14 @@ class WorkController(object):
                                     schedule_filename=schedule_filename)
 
         prefetch_count = self.concurrency * conf.CELERYD_PREFETCH_MULTIPLIER
-        self.listener = CarrotListener(self.ready_queue,
-                                       self.eta_schedule,
-                                       logger=self.logger,
-                                       hostname=self.hostname,
-                                       send_events=self.send_events,
-                                       init_callback=self.ready_callback,
-                                       initial_prefetch_count=prefetch_count)
+        self.listener = instantiate(listener_cls,
+                                    self.ready_queue,
+                                    self.eta_schedule,
+                                    logger=self.logger,
+                                    hostname=self.hostname,
+                                    send_events=self.send_events,
+                                    init_callback=self.ready_callback,
+                                    initial_prefetch_count=prefetch_count)
 
         # The order is important here;
         #   the first in the list is the first to start,