Browse Source

Bootsteps no longer using a class registry

Bootsteps are now identified by name, and the namespace
must list them explicitly.

Also adds:

    - Celery.user_options

        Additional options to command line programs.

    - Celery.steps

        Additional bootsteps

and adds:

    - celery.bootsteps.ConsumerStep

    Can be used to create consumer bootsteps, where you only
    have to provide a list of kombu.Consumer instances in the steps
    get_consumers method.

Example:

.. code-block:: python

    from celery import Celery
    from celery.bootsteps import ConsumerStep
    from celery.bin import Option
    from kombu import Consumer, Queue, Exchange

    celery = Celery()

    class CustomConsumer(ConsumerStep):
        queue = Queue('custom', Exchange('custom'), routing_key='custom')

        def __init__(self, c, custom_consumer=False, **kwargs):
            self.enabled = custom_consumer

        def get_consumers(self, connection):
            return [
                Consumer(connection.channel(),
                    queues=[self.queue],
                    callbacks=[self.on_message]),
            ]

        def on_message(self, body, message):
            print('Got message: %r' % (body, ))
            message.ack()

    celery.user_options['worker'].add(
        Option('--custom-consumer', action='store_true',
               help='Start custom consumer'),
    )
    celery.steps['consumer'].add(CustomConsumer)
Ask Solem 12 years ago
parent
commit
97d3d1fe47

+ 3 - 1
celery/app/base.py

@@ -11,7 +11,7 @@ from __future__ import absolute_import
 import threading
 import warnings
 
-from collections import deque
+from collections import defaultdict, deque
 from contextlib import contextmanager
 from copy import deepcopy
 from functools import wraps
@@ -72,6 +72,8 @@ class Celery(object):
         self.set_as_current = set_as_current
         self.registry_cls = symbol_by_name(self.registry_cls)
         self.accept_magic_kwargs = accept_magic_kwargs
+        self.user_options = defaultdict(set)
+        self.steps = defaultdict(set)
 
         self.configured = False
         self._pending_defaults = deque()

+ 2 - 0
celery/bin/__init__.py

@@ -1,3 +1,5 @@
 from __future__ import absolute_import
 
+from collections import defaultdict
+
 from .base import Option  # noqa

+ 1 - 1
celery/bin/celeryd.py

@@ -197,7 +197,7 @@ class WorkerCommand(Command):
             Option('--autoreload', action='store_true'),
             Option('--no-execv', action='store_true', default=False),
             Option('-D', '--detach', action='store_true'),
-        ) + daemon_options()
+        ) + daemon_options() + tuple(self.app.user_options['worker'])
 
 
 def main():

+ 64 - 51
celery/bootsteps.py

@@ -12,10 +12,13 @@ from collections import defaultdict
 from importlib import import_module
 from threading import Event
 
-from celery.datastructures import DependencyGraph
-from celery.utils.imports import instantiate
-from celery.utils.log import get_logger
-from celery.utils.threads import default_socket_timeout
+from kombu.common import ignore_errors
+from kombu.utils import symbol_by_name
+
+from .datastructures import DependencyGraph
+from .utils.imports import instantiate
+from .utils.log import get_logger
+from .utils.threads import default_socket_timeout
 
 try:
     from greenlet import GreenletExit
@@ -32,10 +35,11 @@ CLOSE = 0x2
 TERMINATE = 0x3
 
 logger = get_logger(__name__)
+debug = logger.debug
 
 
-def qualname(c):
-    return '.'.join([c.namespace.name, c.name.capitalize()])
+def _pre(ns, fmt):
+    return '| {0}: {1}'.format(ns.name, fmt)
 
 
 class Namespace(object):
@@ -54,8 +58,7 @@ class Namespace(object):
     name = None
     state = None
     started = 0
-
-    _unclaimed = defaultdict(dict)
+    default_steps = set()
 
     def __init__(self, name=None, app=None, on_start=None,
             on_close=None, on_stopped=None):
@@ -71,13 +74,11 @@ class Namespace(object):
         self.state = RUN
         if self.on_start:
             self.on_start()
-        for i, step in enumerate(parent.steps):
-            if step:
-                logger.debug('Starting %s...', qualname(step))
-                self.started = i + 1
-                print('STARTING: %r' % (step.start, ))
-                step.start(parent)
-                logger.debug('%s OK!', qualname(step))
+        for i, step in enumerate(filter(None, parent.steps)):
+            self._debug('Starting %s', step.name)
+            self.started = i + 1
+            step.start(parent)
+            debug('^-- substep ok')
 
     def close(self, parent):
         if self.on_close:
@@ -91,7 +92,7 @@ class Namespace(object):
         with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):  # Issue 975
             for step in reversed(parent.steps):
                 if step:
-                    logger.debug('%s %s...', description, qualname(step))
+                    self._debug('%s %s...', description, step.name)
                     fun = getattr(step, attr, None)
                     if fun:
                         fun(parent)
@@ -124,16 +125,6 @@ class Namespace(object):
         except IGNORE_ERRORS:
             pass
 
-    def modules(self):
-        """Subclasses can override this to return a
-        list of modules to import before steps are claimed."""
-        return []
-
-    def load_modules(self):
-        """Will load the steps modules this namespace depends on."""
-        for m in self.modules():
-            self.import_module(m)
-
     def apply(self, parent, **kwargs):
         """Apply the steps in this namespace to an object.
 
@@ -144,11 +135,9 @@ class Namespace(object):
         will also be added the the objects ``steps`` attribute.
 
         """
-        self._debug('Loading modules.')
-        self.load_modules()
-        self._debug('Claiming steps.')
-        self.steps = self._claim()
-        self._debug('Building boot step graph.')
+        self._debug('Loading boot-steps.')
+        self.steps = self.claim_steps()
+        self._debug('Building graph.')
         self.boot_steps = [self.bind_step(name, parent, **kwargs)
                                 for name in self._finalize_boot_steps()]
         self._debug('New boot order: {%s}',
@@ -183,14 +172,23 @@ class Namespace(object):
             for obj in G:
                 if obj != last.name:
                     G.add_edge(last.name, obj)
-        return G.topsort()
+        try:
+            return G.topsort()
+        except KeyError as exc:
+            raise KeyError('unknown boot-step: %s' % exc)
 
-    def _claim(self):
-        return self._unclaimed[self.name]
+    def claim_steps(self):
+        return dict(self.load_step(step) for step in self._unclaimed_steps())
+
+    def _unclaimed_steps(self):
+        return set(self.default_steps) | self.app.steps[self.name]
+
+    def load_step(self, step):
+        step = symbol_by_name(step)
+        return step.name, step
 
     def _debug(self, msg, *args):
-        return logger.debug('[%s] ' + msg,
-                            *(self.name.capitalize(), ) + args)
+        return debug(_pre(self, msg), *args)
 
 
 def _prepare_requires(req):
@@ -203,21 +201,12 @@ class StepType(type):
     """Metaclass for steps."""
 
     def __new__(cls, name, bases, attrs):
-        abstract = attrs.pop('abstract', False)
-        if not abstract:
-            try:
-                cname = attrs['name']
-            except KeyError:
-                raise NotImplementedError('Steps must be named')
-            namespace = attrs.get('namespace', None)
-            if not namespace:
-                attrs['namespace'], _, attrs['name'] = cname.partition('.')
+        module = attrs.get('__module__')
+        qname = '.'.join([module, name]) if module else name
+        attrs['name'] = attrs.get('name') or qname
         attrs['requires'] = tuple(_prepare_requires(req)
                                     for req in attrs.get('requires', ()))
-        cls = super(StepType, cls).__new__(cls, name, bases, attrs)
-        if not abstract:
-            Namespace._unclaimed[cls.namespace][cls.name] = cls
-        return cls
+        return super(StepType, cls).__new__(cls, name, bases, attrs)
 
 
 class Step(object):
@@ -273,8 +262,8 @@ class Step(object):
         step should be created."""
         return self.enabled
 
-    def instantiate(self, qualname, *args, **kwargs):
-        return instantiate(qualname, *args, **kwargs)
+    def instantiate(self, name, *args, **kwargs):
+        return instantiate(name, *args, **kwargs)
 
     def include(self, parent):
         if self.include_if(parent):
@@ -302,3 +291,27 @@ class StartStopStep(Step):
     def include(self, parent):
         if super(StartStopStep, self).include(parent):
             parent.steps.append(self)
+
+
+class ConsumerStep(StartStopStep):
+    abstract = True
+    requires = ('Connection', )
+    consumers = None
+
+    def get_consumers(self, channel):
+        raise NotImplementedError('missing get_consumers')
+
+    def start(self, c):
+        self.consumers = self.get_consumers(c.connection)
+        for consumer in self.consumers or []:
+            consumer.consume()
+
+    def stop(self, c):
+        for consumer in self.consumers or []:
+            ignore_errors(c.connection, consumer.cancel)
+
+    def shutdown(self, c):
+        self.stop(c)
+        for consumer in self.consumers or []:
+            if consumer.channel:
+                ignore_errors(c.connection, consumer.channel.close)

+ 15 - 10
celery/worker/__init__.py

@@ -79,16 +79,20 @@ class WorkController(configurated):
         own set of built-in boot-step modules.
 
         """
-        name = 'worker'
-        builtin_boot_steps = (
-            'celery.worker.components',
-            'celery.worker.autoscale',
-            'celery.worker.autoreload',
-            'celery.worker.mediator',
-        )
-
-        def modules(self):
-            return self.builtin_boot_steps + self.app.conf.CELERYD_BOOT_STEPS
+        name = 'Worker'
+        default_steps = set([
+            'celery.worker.components:Hub',
+            'celery.worker.components:Queues',
+            'celery.worker.components:Pool',
+            'celery.worker.components:Beat',
+            'celery.worker.components:Timers',
+            'celery.worker.components:StateDB',
+            'celery.worker.components:Consumer',
+            'celery.worker.autoscale:WorkerComponent',
+            'celery.worker.autoreload:WorkerComponent',
+            'celery.worker.mediator:WorkerComponent',
+
+        ])
 
     def __init__(self, app=None, hostname=None, **kwargs):
         self.app = app_or_default(app or self.app)
@@ -117,6 +121,7 @@ class WorkController(configurated):
         self.loglevel = mlevel(self.loglevel)
         self.ready_callback = ready_callback or self.on_consumer_ready
         self.use_eventloop = self.should_use_eventloop()
+        self.options = kwargs
 
         signals.worker_init.send(sender=self)
 

+ 4 - 2
celery/worker/autoreload.py

@@ -24,6 +24,8 @@ from celery.utils.imports import module_file
 from celery.utils.log import get_logger
 from celery.utils.threads import bgThread
 
+from .components import Pool
+
 try:                        # pragma: no cover
     import pyinotify
     _ProcessEvent = pyinotify.ProcessEvent
@@ -35,8 +37,8 @@ logger = get_logger(__name__)
 
 
 class WorkerComponent(bootsteps.StartStopStep):
-    name = 'worker.autoreloader'
-    requires = ('pool', )
+    name = 'Autoreloader'
+    requires = (Pool, )
 
     def __init__(self, w, autoreload=None, **kwargs):
         self.enabled = w.autoreload = autoreload

+ 3 - 2
celery/worker/autoscale.py

@@ -23,6 +23,7 @@ from celery.utils.log import get_logger
 from celery.utils.threads import bgThread
 
 from . import state
+from .components import Pool
 from .hub import DummyLock
 
 logger = get_logger(__name__)
@@ -30,8 +31,8 @@ debug, info, error = logger.debug, logger.info, logger.error
 
 
 class WorkerComponent(bootsteps.StartStopStep):
-    name = 'worker.autoscaler'
-    requires = ('pool', )
+    name = 'Autoscaler'
+    requires = (Pool, )
 
     def __init__(self, w, **kwargs):
         self.enabled = w.autoscale

+ 9 - 8
celery/worker/components.py

@@ -24,7 +24,7 @@ from .buckets import TaskBucket, FastQueue
 
 
 class Hub(bootsteps.StartStopStep):
-    name = 'worker.hub'
+    name = 'Hub'
 
     def __init__(self, w, **kwargs):
         w.hub = None
@@ -41,7 +41,7 @@ class Hub(bootsteps.StartStopStep):
 class Queues(bootsteps.Step):
     """This step initializes the internal queues
     used by the worker."""
-    name = 'worker.queues'
+    name = 'Queues'
     requires = (Hub, )
 
     def create(self, w):
@@ -78,7 +78,7 @@ class Pool(bootsteps.StartStopStep):
         * min_concurrency
 
     """
-    name = 'worker.pool'
+    name = 'Pool'
     requires = (Queues, )
 
     def __init__(self, w, autoscale=None, autoreload=None,
@@ -185,7 +185,7 @@ class Beat(bootsteps.StartStopStep):
     argument is set.
 
     """
-    name = 'worker.beat'
+    name = 'Beat'
 
     def __init__(self, w, beat=False, **kwargs):
         self.enabled = w.beat = beat
@@ -201,7 +201,7 @@ class Beat(bootsteps.StartStopStep):
 
 class Timers(bootsteps.Step):
     """This step initializes the internal timers used by the worker."""
-    name = 'worker.timers'
+    name = 'Timers'
     requires = (Pool, )
 
     def include_if(self, w):
@@ -226,7 +226,7 @@ class Timers(bootsteps.Step):
 
 class StateDB(bootsteps.Step):
     """This step sets up the workers state db if enabled."""
-    name = 'worker.state-db'
+    name = 'StateDB'
 
     def __init__(self, w, **kwargs):
         self.enabled = w.state_db
@@ -238,7 +238,7 @@ class StateDB(bootsteps.Step):
 
 
 class Consumer(bootsteps.StartStopStep):
-    name = 'worker.consumer'
+    name = 'Consumer'
     last = True
 
     def create(self, w):
@@ -253,5 +253,6 @@ class Consumer(bootsteps.StartStopStep):
                 timer=w.timer,
                 app=w.app,
                 controller=w,
-                hub=w.hub)
+                hub=w.hub,
+                worker_options=w.options)
         return c

+ 17 - 12
celery/worker/consumer.py

@@ -110,19 +110,24 @@ class Consumer(object):
     timer = None
 
     class Namespace(bootsteps.Namespace):
-        name = 'consumer'
+        name = 'Consumer'
+        default_steps = [
+            'celery.worker.consumer:Connection',
+            'celery.worker.consumer:Events',
+            'celery.worker.consumer:Heart',
+            'celery.worker.consumer:Control',
+            'celery.worker.consumer:Tasks',
+            'celery.worker.consumer:Evloop',
+        ]
 
         def shutdown(self, parent):
             self.restart(parent, 'Shutdown', 'shutdown')
 
-        def modules(self):
-            return self.app.conf.CELERYD_CONSUMER_BOOT_STEPS
-
     def __init__(self, ready_queue,
             init_callback=noop, hostname=None,
             pool=None, app=None,
             timer=None, controller=None, hub=None, amqheartbeat=None,
-            **kwargs):
+            worker_options=None, **kwargs):
         self.app = app_or_default(app)
         self.controller = controller
         self.ready_queue = ready_queue
@@ -161,7 +166,7 @@ class Consumer(object):
         self.namespace = self.Namespace(
             app=self.app, on_start=self.on_start, on_close=self.on_close,
         )
-        self.namespace.apply(self, **kwargs)
+        self.namespace.apply(self, **worker_options or {})
 
     def start(self):
         ns, loop = self.namespace, self.loop
@@ -358,7 +363,7 @@ class Consumer(object):
 
 
 class Connection(bootsteps.StartStopStep):
-    name = 'consumer.connection'
+    name = 'Connection'
 
     def __init__(self, c, **kwargs):
         c.connection = None
@@ -376,7 +381,7 @@ class Connection(bootsteps.StartStopStep):
 
 
 class Events(bootsteps.StartStopStep):
-    name = 'consumer.events'
+    name = 'Events'
     requires = (Connection, )
 
     def __init__(self, c, send_events=None, **kwargs):
@@ -401,7 +406,7 @@ class Events(bootsteps.StartStopStep):
 
 
 class Heart(bootsteps.StartStopStep):
-    name = 'consumer.heart'
+    name = 'Heart'
     requires = (Events, )
 
     def __init__(self, c, **kwargs):
@@ -417,7 +422,7 @@ class Heart(bootsteps.StartStopStep):
 
 
 class Control(bootsteps.StartStopStep):
-    name = 'consumer.control'
+    name = 'Control'
     requires = (Events, )
 
     def __init__(self, c, **kwargs):
@@ -429,7 +434,7 @@ class Control(bootsteps.StartStopStep):
 
 
 class Tasks(bootsteps.StartStopStep):
-    name = 'consumer.tasks'
+    name = 'Tasks'
     requires = (Control, )
 
     def __init__(self, c, initial_prefetch_count=2, **kwargs):
@@ -457,7 +462,7 @@ class Tasks(bootsteps.StartStopStep):
 
 
 class Evloop(bootsteps.StartStopStep):
-    name = 'consumer.evloop'
+    name = 'Evloop'
     last = True
 
     def start(self, c):

+ 3 - 2
celery/worker/mediator.py

@@ -24,13 +24,14 @@ from celery.bootsteps import StartStopStep
 from celery.utils.threads import bgThread
 from celery.utils.log import get_logger
 
+from . import components
 
 logger = get_logger(__name__)
 
 
 class WorkerComponent(StartStopStep):
-    name = 'worker.mediator'
-    requires = ('pool', 'queues', )
+    name = 'Mediator'
+    requires = (components.Pool, components.Queues, )
 
     def __init__(self, w, **kwargs):
         w.mediator = None