Browse Source

Tests passing, perfecting

Ask Solem 12 years ago
parent
commit
ce8041ccad

+ 67 - 54
celery/bootsteps.py

@@ -8,7 +8,7 @@
 """
 from __future__ import absolute_import
 
-from collections import defaultdict
+from collections import deque
 from importlib import import_module
 from threading import Event
 
@@ -16,7 +16,7 @@ from kombu.common import ignore_errors
 from kombu.utils import symbol_by_name
 
 from .datastructures import DependencyGraph
-from .utils.imports import instantiate
+from .utils.imports import instantiate, qualname, symbol_by_name
 from .utils.log import get_logger
 from .utils.threads import default_socket_timeout
 
@@ -39,20 +39,24 @@ debug = logger.debug
 
 
 def _pre(ns, fmt):
-    return '| {0}: {1}'.format(ns.name, fmt)
+    return '| {0}: {1}'.format(ns.alias, fmt)
 
 
-class Namespace(object):
-    """A namespace containing bootsteps.
+def _maybe_name(s):
+    if not isinstance(s, basestring):
+        return s.name
+    return s
 
-    Every step must belong to a namespace.
 
-    When step classes are created they are added to the
-    mapping of unclaimed steps.  The steps will be
-    claimed when the namespace they belong to is created.
+class Namespace(object):
+    """A namespace containing bootsteps.
 
-    :keyword name: Set the name of this namespace.
+    :keyword steps: List of steps.
+    :keyword name: Set explicit name for this namespace.
     :keyword app: Set the Celery app for this namespace.
+    :keyword on_start: Optional callback applied after namespace start.
+    :keyword on_close: Optional callback applied before namespace close.
+    :keyword on_stopped: Optional callback applied after namespace stopped.
 
     """
     name = None
@@ -60,22 +64,23 @@ class Namespace(object):
     started = 0
     default_steps = set()
 
-    def __init__(self, name=None, app=None, on_start=None,
+    def __init__(self, steps=None, name=None, app=None, on_start=None,
             on_close=None, on_stopped=None):
         self.app = app
-        self.name = name or self.name
+        self.name = name or self.name or qualname(type(self))
+        self.types = set(steps or []) | set(self.default_steps)
         self.on_start = on_start
         self.on_close = on_close
         self.on_stopped = on_stopped
-        self.services = []
         self.shutdown_complete = Event()
+        self.steps = {}
 
     def start(self, parent):
         self.state = RUN
         if self.on_start:
             self.on_start()
         for i, step in enumerate(filter(None, parent.steps)):
-            self._debug('Starting %s', step.name)
+            self._debug('Starting %s', step.alias)
             self.started = i + 1
             step.start(parent)
             debug('^-- substep ok')
@@ -92,7 +97,7 @@ class Namespace(object):
         with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):  # Issue 975
             for step in reversed(parent.steps):
                 if step:
-                    self._debug('%s %s...', description, step.name)
+                    self._debug('%s %s...', description, step.alias)
                     fun = getattr(step, attr, None)
                     if fun:
                         fun(parent)
@@ -136,23 +141,18 @@ class Namespace(object):
 
         """
         self._debug('Loading boot-steps.')
-        self.steps = self.claim_steps()
-        self._debug('Building graph.')
-        self.boot_steps = [self.bind_step(name, parent, **kwargs)
-                                for name in self._finalize_boot_steps()]
-        self._debug('New boot order: {%s}',
-                ', '.join(c.name for c in self.boot_steps))
+        order = self.order = []
+        steps = self.steps = self.claim_steps()
 
-        for step in self.boot_steps:
+        self._debug('Building graph...')
+        for name in self._finalize_boot_steps(steps):
+            step = steps[name] = steps[name](parent, **kwargs)
+            order.append(step)
             step.include(parent)
+        self._debug('New boot order: {%s}',
+                    ', '.join(s.alias for s in self.order))
         return self
 
-    def bind_step(self, name, parent, **kwargs):
-        """Bind step to parent object and this namespace."""
-        comp = self[name](parent, **kwargs)
-        comp.namespace = self
-        return comp
-
     def import_module(self, module):
         return import_module(module)
 
@@ -164,9 +164,23 @@ class Namespace(object):
             if C.last:
                 return C
 
-    def _finalize_boot_steps(self):
+    def _firstpass(self, steps):
+        stream = deque(step.requires for step in steps.itervalues())
+        while stream:
+            for node in stream.popleft():
+                node = symbol_by_name(node)
+                if node.name not in self.steps:
+                    steps[node.name] = node
+                stream.append(node.requires)
+        for node in steps.itervalues():
+            node.requires = [_maybe_name(n) for n in node.requires]
+        for step in steps.values():
+            [steps[n] for n in step.requires]
+
+    def _finalize_boot_steps(self, steps):
+        self._firstpass(steps)
         G = self.graph = DependencyGraph((C.name, C.requires)
-                            for C in self.steps.itervalues())
+                            for C in steps.itervalues())
         last = self._find_last()
         if last:
             for obj in G:
@@ -178,10 +192,10 @@ class Namespace(object):
             raise KeyError('unknown boot-step: %s' % exc)
 
     def claim_steps(self):
-        return dict(self.load_step(step) for step in self._unclaimed_steps())
+        return dict(self.load_step(step) for step in self._all_steps())
 
-    def _unclaimed_steps(self):
-        return set(self.default_steps) | self.app.steps[self.name]
+    def _all_steps(self):
+        return self.types | self.app.steps[self.name]
 
     def load_step(self, step):
         step = symbol_by_name(step)
@@ -190,11 +204,9 @@ class Namespace(object):
     def _debug(self, msg, *args):
         return debug(_pre(self, msg), *args)
 
-
-def _prepare_requires(req):
-    if not isinstance(req, basestring):
-        req = req.name
-    return req
+    @property
+    def alias(self):
+        return self.name.rsplit('.', 1)[-1]
 
 
 class StepType(type):
@@ -202,12 +214,17 @@ class StepType(type):
 
     def __new__(cls, name, bases, attrs):
         module = attrs.get('__module__')
-        qname = '.'.join([module, name]) if module else name
-        attrs['name'] = attrs.get('name') or qname
-        attrs['requires'] = tuple(_prepare_requires(req)
-                                    for req in attrs.get('requires', ()))
+        qname = '{0}.{1}'.format(module, name) if module else name
+        attrs.update(
+            __qualname__=qname,
+            name=attrs.get('name') or qname,
+            requires=attrs.get('requires', ()),
+        )
         return super(StepType, cls).__new__(cls, name, bases, attrs)
 
+    def __repr__(self):
+        return 'step:{0.name}{{{0.requires!r}}}'.format(self)
+
 
 class Step(object):
     """A Bootstep.
@@ -220,22 +237,13 @@ class Step(object):
     """
     __metaclass__ = StepType
 
-    #: The name of the step, or the namespace
-    #: and the name of the step separated by dot.
+    #: Optional step name, will use qualname if not specified.
     name = None
 
     #: List of other steps that that must be started before this step.
     #: Note that all dependencies must be in the same namespace.
     requires = ()
 
-    #: can be used to specify the namespace,
-    #: if the name does not include it.
-    namespace = None
-
-    #: if set the step will not be registered,
-    #: but can be used as a base class.
-    abstract = True
-
     #: Optional obj created by the :meth:`create` method.
     #: This is used by :class:`StartStopStep` to keep the
     #: original service object.
@@ -270,9 +278,15 @@ class Step(object):
             self.obj = self.create(parent)
             return True
 
+    def __repr__(self):
+        return '<step: {0.alias}>'.format(self)
+
+    @property
+    def alias(self):
+        return self.name.rsplit('.', 1)[-1]
+
 
 class StartStopStep(Step):
-    abstract = True
 
     def start(self, parent):
         if self.obj:
@@ -294,7 +308,6 @@ class StartStopStep(Step):
 
 
 class ConsumerStep(StartStopStep):
-    abstract = True
     requires = ('Connection', )
     consumers = None
 

+ 21 - 62
celery/tests/worker/test_bootsteps.py

@@ -12,27 +12,16 @@ class test_Step(Case):
     class Def(bootsteps.Step):
         name = 'test_Step.Def'
 
-    def test_steps_must_be_named(self):
-        with self.assertRaises(NotImplementedError):
-
-            class X(bootsteps.Step):
-                pass
-
-        class Y(bootsteps.Step):
-            abstract = True
-
     def test_namespace_name(self, ns='test_namespace_name'):
 
         class X(bootsteps.Step):
             namespace = ns
             name = 'X'
-        self.assertEqual(X.namespace, ns)
         self.assertEqual(X.name, 'X')
 
         class Y(bootsteps.Step):
-            name = '%s.Y' % (ns, )
-        self.assertEqual(Y.namespace, ns)
-        self.assertEqual(Y.name, 'Y')
+            name = '%s.Y' % ns
+        self.assertEqual(Y.name, '%s.Y' % ns)
 
     def test_init(self):
         self.assertTrue(self.Def(self))
@@ -116,18 +105,6 @@ class test_Namespace(AppCase):
     class NS(bootsteps.Namespace):
         name = 'test_Namespace'
 
-    class ImportingNS(bootsteps.Namespace):
-
-        def __init__(self, *args, **kwargs):
-            bootsteps.Namespace.__init__(self, *args, **kwargs)
-            self.imported = []
-
-        def modules(self):
-            return ['A', 'B', 'C']
-
-        def import_module(self, module):
-            self.imported.append(module)
-
     def test_steps_added_to_unclaimed(self):
 
         class tnA(bootsteps.Step):
@@ -139,24 +116,18 @@ class test_Namespace(AppCase):
         class xxA(bootsteps.Step):
             name = 'xx.A'
 
-        self.assertIn('A', self.NS._unclaimed['test_Namespace'])
-        self.assertIn('B', self.NS._unclaimed['test_Namespace'])
-        self.assertIn('A', self.NS._unclaimed['xx'])
-        self.assertNotIn('B', self.NS._unclaimed['xx'])
+        class NS(self.NS):
+            default_steps = [tnA, tnB]
+        ns = NS(app=self.app)
+
+        self.assertIn(tnA, ns._all_steps())
+        self.assertIn(tnB, ns._all_steps())
+        self.assertNotIn(xxA, ns._all_steps())
 
     def test_init(self):
         ns = self.NS(app=self.app)
         self.assertIs(ns.app, self.app)
         self.assertEqual(ns.name, 'test_Namespace')
-        self.assertFalse(ns.services)
-
-    def test_interface_modules(self):
-        self.NS(app=self.app).modules()
-
-    def test_load_modules(self):
-        x = self.ImportingNS(app=self.app)
-        x.load_modules()
-        self.assertListEqual(x.imported, ['A', 'B', 'C'])
 
     def test_apply(self):
 
@@ -166,42 +137,30 @@ class test_Namespace(AppCase):
             def modules(self):
                 return ['A', 'B']
 
-        class A(bootsteps.Step):
-            name = 'test_apply.A'
-            requires = ['C']
-
         class B(bootsteps.Step):
             name = 'test_apply.B'
 
         class C(bootsteps.Step):
             name = 'test_apply.C'
-            requires = ['B']
+            requires = [B]
+
+        class A(bootsteps.Step):
+            name = 'test_apply.A'
+            requires = [C]
 
         class D(bootsteps.Step):
             name = 'test_apply.D'
             last = True
 
-        x = MyNS(app=self.app)
-        x.import_module = Mock()
+        x = MyNS([A, D], app=self.app)
         x.apply(self)
 
-        self.assertItemsEqual(x.steps.values(), [A, B, C, D])
-        self.assertTrue(x.import_module.call_count)
-
-        for boot_step in x.boot_steps:
-            self.assertEqual(boot_step.namespace, x)
-
-        self.assertIsInstance(x.boot_steps[0], B)
-        self.assertIsInstance(x.boot_steps[1], C)
-        self.assertIsInstance(x.boot_steps[2], A)
-        self.assertIsInstance(x.boot_steps[3], D)
-
-        self.assertIs(x['A'], A)
-
-    def test_import_module(self):
-        x = self.NS(app=self.app)
-        import os
-        self.assertIs(x.import_module('os'), os)
+        self.assertIsInstance(x.order[0], B)
+        self.assertIsInstance(x.order[1], C)
+        self.assertIsInstance(x.order[2], A)
+        self.assertIsInstance(x.order[3], D)
+        self.assertIn(A, x.types)
+        self.assertIs(x[A.name], x.order[2])
 
     def test_find_last_but_no_steps(self):
 

+ 1 - 4
celery/tests/worker/test_worker.py

@@ -49,10 +49,7 @@ class PlaceHolder(object):
 
 
 def find_step(obj, typ):
-    for c in obj.namespace.boot_steps:
-        if isinstance(c, typ):
-            return c
-    raise Exception('Instance %s has no step %s' % (obj, typ))
+    return obj.namespace.steps[typ.name]
 
 
 class _MyKombuConsumer(Consumer):

+ 4 - 12
celery/utils/imports.py

@@ -24,18 +24,10 @@ class NotAPackage(Exception):
     pass
 
 
-if sys.version_info >= (3, 3):  # pragma: no cover
-
-    def qualname(obj):
-        return obj.__qualname__
-
-else:
-
-    def qualname(obj):  # noqa
-        if not hasattr(obj, '__name__') and hasattr(obj, '__class__'):
-            return qualname(obj.__class__)
-
-        return '%s.%s' % (obj.__module__, obj.__name__)
+def qualname(obj):  # noqa
+    if not hasattr(obj, '__name__') and hasattr(obj, '__class__'):
+        obj = obj.__class__
+    return '%s.%s' % (obj.__module__, obj.__name__)
 
 
 def instantiate(name, *args, **kwargs):

+ 0 - 1
celery/worker/autoreload.py

@@ -37,7 +37,6 @@ logger = get_logger(__name__)
 
 
 class WorkerComponent(bootsteps.StartStopStep):
-    name = 'Autoreloader'
     requires = (Pool, )
 
     def __init__(self, w, autoreload=None, **kwargs):

+ 0 - 1
celery/worker/autoscale.py

@@ -31,7 +31,6 @@ debug, info, error = logger.debug, logger.info, logger.error
 
 
 class WorkerComponent(bootsteps.StartStopStep):
-    name = 'Autoscaler'
     requires = (Pool, )
 
     def __init__(self, w, **kwargs):

+ 0 - 7
celery/worker/components.py

@@ -24,7 +24,6 @@ from .buckets import TaskBucket, FastQueue
 
 
 class Hub(bootsteps.StartStopStep):
-    name = 'Hub'
 
     def __init__(self, w, **kwargs):
         w.hub = None
@@ -41,7 +40,6 @@ class Hub(bootsteps.StartStopStep):
 class Queues(bootsteps.Step):
     """This step initializes the internal queues
     used by the worker."""
-    name = 'Queues'
     requires = (Hub, )
 
     def create(self, w):
@@ -78,7 +76,6 @@ class Pool(bootsteps.StartStopStep):
         * min_concurrency
 
     """
-    name = 'Pool'
     requires = (Queues, )
 
     def __init__(self, w, autoscale=None, autoreload=None,
@@ -185,7 +182,6 @@ class Beat(bootsteps.StartStopStep):
     argument is set.
 
     """
-    name = 'Beat'
 
     def __init__(self, w, beat=False, **kwargs):
         self.enabled = w.beat = beat
@@ -201,7 +197,6 @@ class Beat(bootsteps.StartStopStep):
 
 class Timers(bootsteps.Step):
     """This step initializes the internal timers used by the worker."""
-    name = 'Timers'
     requires = (Pool, )
 
     def include_if(self, w):
@@ -226,7 +221,6 @@ class Timers(bootsteps.Step):
 
 class StateDB(bootsteps.Step):
     """This step sets up the workers state db if enabled."""
-    name = 'StateDB'
 
     def __init__(self, w, **kwargs):
         self.enabled = w.state_db
@@ -238,7 +232,6 @@ class StateDB(bootsteps.Step):
 
 
 class Consumer(bootsteps.StartStopStep):
-    name = 'Consumer'
     last = True
 
     def create(self, w):

+ 0 - 6
celery/worker/consumer.py

@@ -363,7 +363,6 @@ class Consumer(object):
 
 
 class Connection(bootsteps.StartStopStep):
-    name = 'Connection'
 
     def __init__(self, c, **kwargs):
         c.connection = None
@@ -381,7 +380,6 @@ class Connection(bootsteps.StartStopStep):
 
 
 class Events(bootsteps.StartStopStep):
-    name = 'Events'
     requires = (Connection, )
 
     def __init__(self, c, send_events=None, **kwargs):
@@ -406,7 +404,6 @@ class Events(bootsteps.StartStopStep):
 
 
 class Heart(bootsteps.StartStopStep):
-    name = 'Heart'
     requires = (Events, )
 
     def __init__(self, c, **kwargs):
@@ -422,7 +419,6 @@ class Heart(bootsteps.StartStopStep):
 
 
 class Control(bootsteps.StartStopStep):
-    name = 'Control'
     requires = (Events, )
 
     def __init__(self, c, **kwargs):
@@ -434,7 +430,6 @@ class Control(bootsteps.StartStopStep):
 
 
 class Tasks(bootsteps.StartStopStep):
-    name = 'Tasks'
     requires = (Control, )
 
     def __init__(self, c, initial_prefetch_count=2, **kwargs):
@@ -462,7 +457,6 @@ class Tasks(bootsteps.StartStopStep):
 
 
 class Evloop(bootsteps.StartStopStep):
-    name = 'Evloop'
     last = True
 
     def start(self, c):

+ 0 - 1
celery/worker/mediator.py

@@ -30,7 +30,6 @@ logger = get_logger(__name__)
 
 
 class WorkerComponent(StartStopStep):
-    name = 'Mediator'
     requires = (components.Pool, components.Queues, )
 
     def __init__(self, w, **kwargs):