Forráskód Böngészése

worker.consumer is now a package and adds CELERYD_CONSUMER_BOOT_STEPS setting

Ask Solem 12 éve
szülő
commit
ffd8e2613c

+ 1 - 0
celery/app/defaults.py

@@ -153,6 +153,7 @@ NAMESPACES = {
         'AUTOSCALER': Option('celery.worker.autoscale.Autoscaler'),
         'AUTORELOADER': Option('celery.worker.autoreload.Autoreloader'),
         'BOOT_STEPS': Option((), type='tuple'),
+        'CONSUMER_BOOT_STEPS': Option((), type='tuple'),
         'CONCURRENCY': Option(0, type='int'),
         'TIMER': Option(type='string'),
         'TIMER_PRECISION': Option(1.0, type='float'),

+ 5 - 4
celery/worker/consumer.py → celery/worker/consumer/__init__.py

@@ -29,9 +29,8 @@ from celery.utils.imports import qualname
 from celery.utils.log import get_logger
 from celery.utils.text import dump_body
 from celery.utils.timeutils import humanize_seconds
-
-from . import state
-from .bootsteps import Namespace as _NS, StartStopComponent, CLOSE
+from celery.worker import state
+from celery.worker.bootsteps import Namespace as _NS, StartStopComponent, CLOSE
 
 logger = get_logger(__name__)
 info, warn, error, crit = (logger.info, logger.warn,
@@ -125,6 +124,7 @@ class Component(StartStopComponent):
 
 class Namespace(_NS):
     name = 'consumer'
+    builtin_boot_steps = ('celery.worker.consumer.components', )
 
     def shutdown(self, parent):
         delayed = self._shutdown_step(parent, parent.components, force=False)
@@ -142,7 +142,8 @@ class Namespace(_NS):
         return delayed
 
     def modules(self):
-        return ('celery.worker.parts', )
+        return (self.builtin_boot_steps +
+                self.app.conf.CELERYD_CONSUMER_BOOT_STEPS)
 
 
 class Consumer(object):

+ 12 - 12
celery/worker/parts.py → celery/worker/consumer/components.py

@@ -8,9 +8,9 @@ from kombu.common import QoS
 from celery.datastructures import AttributeDict
 from celery.utils.log import get_logger
 
-from .bootsteps import StartStopComponent
-from .control import Panel
-from .heartbeat import Heart
+from celery.worker.bootsteps import StartStopComponent
+from celery.worker.control import Panel
+from celery.worker.heartbeat import Heart
 
 logger = get_logger(__name__)
 info, error, debug = logger.info, logger.error, logger.debug
@@ -24,10 +24,10 @@ class ConsumerConnection(StartStopComponent):
         c.connection = None
 
     def start(self, c):
-        debug('  Re-establishing connection to the broker...')
+        debug('Re-establishing connection to the broker...')
         c.connection = c._open_connection()
         # Re-establish the broker connection and setup the task consumer.
-        info('  consumer: Connected to %s.', c.connection.as_uri())
+        info('consumer: Connected to %s.', c.connection.as_uri())
 
     def stop(self, c):
         pass
@@ -62,7 +62,7 @@ class Events(StartStopComponent):
 
     def stop(self, c):
         if c.event_dispatcher:
-            debug('  Shutting down event dispatcher...')
+            debug('Shutting down event dispatcher...')
             c.event_dispatcher = \
                     c.maybe_conn_error(c.event_dispatcher.close)
 
@@ -84,7 +84,7 @@ class Heartbeat(StartStopComponent):
     def stop(self, c):
         if c.heart:
             # Stop the heartbeat thread if it's running.
-            debug('  Heart: Going into cardiac arrest...')
+            debug('Heart: Going into cardiac arrest...')
             c.heart = c.heart.stop()
 
     def shutdown(self, c):
@@ -119,7 +119,7 @@ class Controller(StartStopComponent):
     def shutdown(self, c):
         self.stop_pidbox_node()
         if self.broadcast_consumer:
-            debug('  Cancelling broadcast consumer...')
+            debug('Cancelling broadcast consumer...')
             c.maybe_conn_error(self.broadcast_consumer.cancel)
             self.broadcast_consumer = None
 
@@ -151,11 +151,11 @@ class Controller(StartStopComponent):
         c = self.consumer
         if self._pidbox_node_stopped:
             self._pidbox_node_shutdown.set()
-            debug('  Waiting for broadcast thread to shutdown...')
+            debug('Waiting for broadcast thread to shutdown...')
             self._pidbox_node_stopped.wait()
             self._pidbox_node_stopped = self._pidbox_node_shutdown = None
         elif self.broadcast_consumer:
-            debug('  Closing broadcast channel...')
+            debug('Closing broadcast channel...')
             self.broadcast_consumer = \
                 c.maybe_conn_error(self.broadcast_consumer.channel.close)
 
@@ -206,8 +206,8 @@ class Tasks(StartStopComponent):
 
     def shutdown(self, c):
         if c.task_consumer:
-            debug('  Cancelling task consumer...')
+            debug('Cancelling task consumer...')
             c.maybe_conn_error(c.task_consumer.cancel)
-            debug('  Closing consumer channel...')
+            debug('Closing consumer channel...')
             c.maybe_conn_error(c.task_consumer.close)
             c.task_consumer = None

+ 12 - 1
docs/configuration.rst

@@ -1408,9 +1408,20 @@ CELERYD_BOOT_STEPS
 ~~~~~~~~~~~~~~~~~~
 
 This setting enables you to add additional components to the worker process.
-It should be a list of module names with :class:`celery.abstract.Component`
+It should be a list of module names with
+:class:`celery.worker.bootsteps.Component`
 classes, that augments functionality in the worker.
 
+.. setting:: CELERYD_CONSUMER_BOOT_STEPS
+
+CELERYD_CONSUMER_BOOT_STEPS
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This setting enables you to add additional components to the workers consumer.
+It should be a list of module names with
+:class:`celery.worker.bootsteps.Component`` classes, that augments
+functionality in the consumer.
+
 .. setting:: CELERYD_POOL
 
 CELERYD_POOL