Browse Source

Renames bootsteps.Namespace -> bootsteps.Blueprint

Ask Solem 12 years ago
parent
commit
279966f285

+ 1 - 1
celery/apps/worker.py

@@ -113,7 +113,7 @@ class Worker(WorkController):
             enabled=not no_color if no_color is not None else no_color
         )
 
-    def on_init_namespace(self):
+    def on_init_blueprint(self):
         self._custom_logging = self.setup_logging()
         # apply task execution optimizations
         # -- This will finalize the app!

+ 3 - 3
celery/bin/graph.py

@@ -33,11 +33,11 @@ class graph(Command):
         worker = self.app.WorkController()
         include = set(arg.lower() for arg in args or ['worker', 'consumer'])
         if 'worker' in include:
-            graph = worker.namespace.graph
+            graph = worker.blueprint.graph
             if 'consumer' in include:
-                worker.namespace.connect_with(worker.consumer.namespace)
+                worker.blueprint.connect_with(worker.consumer.blueprint)
         else:
-            graph = worker.consumer.namespace.graph
+            graph = worker.consumer.blueprint.graph
         graph.to_dot(self.stdout)
 
     def workers(self, *args, **kwargs):

+ 17 - 16
celery/bootsteps.py

@@ -3,7 +3,7 @@
     celery.bootsteps
     ~~~~~~~~~~~~~~~~
 
-    The bootsteps!
+    A directed acyclic graph of reusable components.
 
 """
 from __future__ import absolute_import, unicode_literals
@@ -47,10 +47,11 @@ def _label(s):
 
 
 class StepFormatter(GraphFormatter):
+    """Graph formatter for :class:`Blueprint`."""
 
-    namespace_prefix = '⧉'
+    blueprint_prefix = '⧉'
     conditional_prefix = '∘'
-    namespace_scheme = {
+    blueprint_scheme = {
         'shape': 'parallelogram',
         'color': 'slategray4',
         'fillcolor': 'slategray3',
@@ -64,13 +65,13 @@ class StepFormatter(GraphFormatter):
 
     def _get_prefix(self, step):
         if step.last:
-            return self.namespace_prefix
+            return self.blueprint_prefix
         if step.conditional:
             return self.conditional_prefix
         return ''
 
     def node(self, obj, **attrs):
-        scheme = self.namespace_scheme if obj.last else self.node_scheme
+        scheme = self.blueprint_scheme if obj.last else self.node_scheme
         return self.draw_node(obj, scheme, attrs)
 
     def edge(self, a, b, **attrs):
@@ -79,15 +80,15 @@ class StepFormatter(GraphFormatter):
         return self.draw_edge(a, b, self.edge_scheme, attrs)
 
 
-class Namespace(object):
-    """A namespace containing bootsteps.
+class Blueprint(object):
+    """Blueprint containing bootsteps that can be applied to objects.
 
     :keyword steps: List of steps.
-    :keyword name: Set explicit name for this namespace.
-    :keyword app: Set the Celery app for this namespace.
-    :keyword on_start: Optional callback applied after namespace start.
-    :keyword on_close: Optional callback applied before namespace close.
-    :keyword on_stopped: Optional callback applied after namespace stopped.
+    :keyword name: Set explicit name for this blueprint.
+    :keyword app: Set the Celery app for this blueprint.
+    :keyword on_start: Optional callback applied after blueprint start.
+    :keyword on_close: Optional callback applied before blueprint close.
+    :keyword on_stopped: Optional callback applied after blueprint stopped.
 
     """
     GraphFormatter = StepFormatter
@@ -171,7 +172,7 @@ class Namespace(object):
             pass
 
     def apply(self, parent, **kwargs):
-        """Apply the steps in this namespace to an object.
+        """Apply the steps in this blueprint to an object.
 
         This will apply the ``__init__`` and ``include`` methods
         of each step, with the object as argument::
@@ -295,13 +296,13 @@ class Step(object):
     conditional = False
 
     #: List of other steps that that must be started before this step.
-    #: Note that all dependencies must be in the same namespace.
+    #: Note that all dependencies must be in the same blueprint.
     requires = ()
 
     #: This flag is reserved for the workers Consumer,
     #: since it is required to always be started last.
-    #: There can only be one object marked with lsat
-    #: in every namespace.
+    #: There can only be one object marked last
+    #: in every blueprint.
     last = False
 
     #: This provides the default for :meth:`include_if`.

+ 21 - 21
celery/tests/worker/test_bootsteps.py

@@ -15,16 +15,16 @@ class test_Step(Case):
     def setUp(self):
         self.steps = []
 
-    def test_namespace_name(self, ns='test_namespace_name'):
+    def test_blueprint_name(self, bp='test_blueprint_name'):
 
         class X(bootsteps.Step):
-            namespace = ns
+            blueprint = bp
             name = 'X'
         self.assertEqual(X.name, 'X')
 
         class Y(bootsteps.Step):
-            name = '%s.Y' % ns
-        self.assertEqual(Y.name, '%s.Y' % ns)
+            name = '%s.Y' % bp
+        self.assertEqual(Y.name, '%s.Y' % bp)
 
     def test_init(self):
         self.assertTrue(self.Def(self))
@@ -103,38 +103,38 @@ class test_StartStopStep(Case):
         x.obj.stop.assert_called_with()
 
 
-class test_Namespace(AppCase):
+class test_Blueprint(AppCase):
 
-    class NS(bootsteps.Namespace):
-        name = 'test_Namespace'
+    class Blueprint(bootsteps.Blueprint):
+        name = 'test_Blueprint'
 
     def test_steps_added_to_unclaimed(self):
 
         class tnA(bootsteps.Step):
-            name = 'test_Namespace.A'
+            name = 'test_Blueprint.A'
 
         class tnB(bootsteps.Step):
-            name = 'test_Namespace.B'
+            name = 'test_Blueprint.B'
 
         class xxA(bootsteps.Step):
             name = 'xx.A'
 
-        class NS(self.NS):
+        class Blueprint(self.Blueprint):
             default_steps = [tnA, tnB]
-        ns = NS(app=self.app)
+        blueprint = Blueprint(app=self.app)
 
-        self.assertIn(tnA, ns._all_steps())
-        self.assertIn(tnB, ns._all_steps())
-        self.assertNotIn(xxA, ns._all_steps())
+        self.assertIn(tnA, blueprint._all_steps())
+        self.assertIn(tnB, blueprint._all_steps())
+        self.assertNotIn(xxA, blueprint._all_steps())
 
     def test_init(self):
-        ns = self.NS(app=self.app)
-        self.assertIs(ns.app, self.app)
-        self.assertEqual(ns.name, 'test_Namespace')
+        blueprint = self.Blueprint(app=self.app)
+        self.assertIs(blueprint.app, self.app)
+        self.assertEqual(blueprint.name, 'test_Blueprint')
 
     def test_apply(self):
 
-        class MyNS(bootsteps.Namespace):
+        class MyBlueprint(bootsteps.Blueprint):
             name = 'test_apply'
 
             def modules(self):
@@ -155,7 +155,7 @@ class test_Namespace(AppCase):
             name = 'test_apply.D'
             last = True
 
-        x = MyNS([A, D], app=self.app)
+        x = MyBlueprint([A, D], app=self.app)
         x.apply(self)
 
         self.assertIsInstance(x.order[0], B)
@@ -167,9 +167,9 @@ class test_Namespace(AppCase):
 
     def test_find_last_but_no_steps(self):
 
-        class MyNS(bootsteps.Namespace):
+        class MyBlueprint(bootsteps.Blueprint):
             name = 'qwejwioqjewoqiej'
 
-        x = MyNS(app=self.app)
+        x = MyBlueprint(app=self.app)
         x.apply(self)
         self.assertIsNone(x._find_last())

+ 36 - 36
celery/tests/worker/test_worker.py

@@ -37,8 +37,8 @@ from celery.tests.utils import AppCase, Case
 
 def MockStep(step=None):
     step = Mock() if step is None else step
-    step.namespace = Mock()
-    step.namespace.name = 'MockNS'
+    step.blueprint = Mock()
+    step.blueprint.name = 'MockNS'
     step.name = 'MockStep(%s)' % (id(step), )
     return step
 
@@ -48,7 +48,7 @@ class PlaceHolder(object):
 
 
 def find_step(obj, typ):
-    return obj.namespace.steps[typ.name]
+    return obj.blueprint.steps[typ.name]
 
 
 class Consumer(__Consumer):
@@ -257,28 +257,28 @@ class test_Consumer(Case):
 
     def test_start_when_closed(self):
         l = MyKombuConsumer(self.buffer.put, timer=self.timer)
-        l.namespace.state = CLOSE
+        l.blueprint.state = CLOSE
         l.start()
 
     def test_connection(self):
         l = MyKombuConsumer(self.buffer.put, timer=self.timer)
 
-        l.namespace.start(l)
+        l.blueprint.start(l)
         self.assertIsInstance(l.connection, Connection)
 
-        l.namespace.state = RUN
+        l.blueprint.state = RUN
         l.event_dispatcher = None
-        l.namespace.restart(l)
+        l.blueprint.restart(l)
         self.assertTrue(l.connection)
 
-        l.namespace.state = RUN
+        l.blueprint.state = RUN
         l.shutdown()
         self.assertIsNone(l.connection)
         self.assertIsNone(l.task_consumer)
 
-        l.namespace.start(l)
+        l.blueprint.start(l)
         self.assertIsInstance(l.connection, Connection)
-        l.namespace.restart(l)
+        l.blueprint.restart(l)
 
         l.stop()
         l.shutdown()
@@ -287,7 +287,7 @@ class test_Consumer(Case):
 
     def test_close_connection(self):
         l = MyKombuConsumer(self.buffer.put, timer=self.timer)
-        l.namespace.state = RUN
+        l.blueprint.state = RUN
         step = find_step(l, consumer.Connection)
         conn = l.connection = Mock()
         step.shutdown(l)
@@ -298,7 +298,7 @@ class test_Consumer(Case):
         eventer = l.event_dispatcher = Mock()
         eventer.enabled = True
         heart = l.heart = MockHeart()
-        l.namespace.state = RUN
+        l.blueprint.state = RUN
         Events = find_step(l, consumer.Events)
         Events.shutdown(l)
         Heart = find_step(l, consumer.Heart)
@@ -455,7 +455,7 @@ class test_Consumer(Case):
                 raise socket.error('foo')
 
         l = Consumer(self.buffer.put, timer=self.timer)
-        l.namespace.state = RUN
+        l.blueprint.state = RUN
         c = l.connection = Connection()
         l.connection.obj = l
         l.task_consumer = Mock()
@@ -463,7 +463,7 @@ class test_Consumer(Case):
         with self.assertRaises(socket.error):
             l.loop(*l.loop_args())
 
-        l.namespace.state = CLOSE
+        l.blueprint.state = CLOSE
         l.connection = c
         l.loop(*l.loop_args())
 
@@ -622,14 +622,14 @@ class test_Consumer(Case):
             eta=(datetime.now() + timedelta(days=1)).isoformat(),
         )
 
-        l.namespace.start(l)
+        l.blueprint.start(l)
         p = l.app.conf.BROKER_CONNECTION_RETRY
         l.app.conf.BROKER_CONNECTION_RETRY = False
         try:
-            l.namespace.start(l)
+            l.blueprint.start(l)
         finally:
             l.app.conf.BROKER_CONNECTION_RETRY = p
-        l.namespace.restart(l)
+        l.blueprint.restart(l)
         l.event_dispatcher = Mock()
         callback = self._get_on_message(l)
         callback(m.decode(), m)
@@ -806,7 +806,7 @@ class test_Consumer(Case):
         l = Consumer(self.buffer.put, timer=self.timer)
         l.steps.pop()
         self.assertEqual(None, l.pool)
-        l.namespace.start(l)
+        l.blueprint.start(l)
 
 
 class test_WorkController(AppCase):
@@ -826,7 +826,7 @@ class test_WorkController(AppCase):
 
     def create_worker(self, **kw):
         worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
-        worker.namespace.shutdown_complete.set()
+        worker.blueprint.shutdown_complete.set()
         return worker
 
     @patch('celery.platforms.create_pidlock')
@@ -892,17 +892,17 @@ class test_WorkController(AppCase):
     def test_dont_stop_or_terminate(self):
         worker = WorkController(concurrency=1, loglevel=0)
         worker.stop()
-        self.assertNotEqual(worker.namespace.state, CLOSE)
+        self.assertNotEqual(worker.blueprint.state, CLOSE)
         worker.terminate()
-        self.assertNotEqual(worker.namespace.state, CLOSE)
+        self.assertNotEqual(worker.blueprint.state, CLOSE)
 
         sigsafe, worker.pool.signal_safe = worker.pool.signal_safe, False
         try:
-            worker.namespace.state = RUN
+            worker.blueprint.state = RUN
             worker.stop(in_sighandler=True)
-            self.assertNotEqual(worker.namespace.state, CLOSE)
+            self.assertNotEqual(worker.blueprint.state, CLOSE)
             worker.terminate(in_sighandler=True)
-            self.assertNotEqual(worker.namespace.state, CLOSE)
+            self.assertNotEqual(worker.blueprint.state, CLOSE)
         finally:
             worker.pool.signal_safe = sigsafe
 
@@ -945,10 +945,10 @@ class test_WorkController(AppCase):
                            kwargs={})
         task = Request.from_message(m, m.decode())
         worker.steps = []
-        worker.namespace.state = RUN
+        worker.blueprint.state = RUN
         with self.assertRaises(KeyboardInterrupt):
             worker._process_task(task)
-        self.assertEqual(worker.namespace.state, TERMINATE)
+        self.assertEqual(worker.blueprint.state, TERMINATE)
 
     def test_process_task_raise_SystemTerminate(self):
         worker = self.worker
@@ -959,10 +959,10 @@ class test_WorkController(AppCase):
                            kwargs={})
         task = Request.from_message(m, m.decode())
         worker.steps = []
-        worker.namespace.state = RUN
+        worker.blueprint.state = RUN
         with self.assertRaises(SystemExit):
             worker._process_task(task)
-        self.assertEqual(worker.namespace.state, TERMINATE)
+        self.assertEqual(worker.blueprint.state, TERMINATE)
 
     def test_process_task_raise_regular(self):
         worker = self.worker
@@ -1023,10 +1023,10 @@ class test_WorkController(AppCase):
 
     def test_start__stop(self):
         worker = self.worker
-        worker.namespace.shutdown_complete.set()
+        worker.blueprint.shutdown_complete.set()
         worker.steps = [MockStep(StartStopStep(self)) for _ in range(4)]
-        worker.namespace.state = RUN
-        worker.namespace.started = 4
+        worker.blueprint.state = RUN
+        worker.blueprint.started = 4
         for w in worker.steps:
             w.start = Mock()
             w.close = Mock()
@@ -1065,15 +1065,15 @@ class test_WorkController(AppCase):
 
     def test_start__terminate(self):
         worker = self.worker
-        worker.namespace.shutdown_complete.set()
-        worker.namespace.started = 5
-        worker.namespace.state = RUN
+        worker.blueprint.shutdown_complete.set()
+        worker.blueprint.started = 5
+        worker.blueprint.state = RUN
         worker.steps = [MockStep() for _ in range(5)]
         worker.start()
         for w in worker.steps[:3]:
             self.assertTrue(w.start.call_count)
-        self.assertTrue(worker.namespace.started, len(worker.steps))
-        self.assertEqual(worker.namespace.state, RUN)
+        self.assertTrue(worker.blueprint.started, len(worker.steps))
+        self.assertEqual(worker.blueprint.state, RUN)
         worker.terminate()
         for step in worker.steps:
             self.assertTrue(step.terminate.call_count)

+ 15 - 15
celery/worker/__init__.py

@@ -55,12 +55,12 @@ class WorkController(object):
     app = None
 
     pidlock = None
-    namespace = None
+    blueprint = None
     pool = None
     semaphore = None
 
-    class Namespace(bootsteps.Namespace):
-        """Worker bootstep namespace."""
+    class Blueprint(bootsteps.Blueprint):
+        """Worker bootstep blueprint."""
         name = 'Worker'
         default_steps = set([
             'celery.worker.components:Hub',
@@ -119,14 +119,14 @@ class WorkController(object):
         # Initialize bootsteps
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
         self.steps = []
-        self.on_init_namespace()
-        self.namespace = self.Namespace(app=self.app,
+        self.on_init_blueprint()
+        self.blueprint = self.Blueprint(app=self.app,
                                         on_start=self.on_start,
                                         on_close=self.on_close,
                                         on_stopped=self.on_stopped)
-        self.namespace.apply(self, **kwargs)
+        self.blueprint.apply(self, **kwargs)
 
-    def on_init_namespace(self):
+    def on_init_blueprint(self):
         pass
 
     def on_before_init(self, **kwargs):
@@ -186,7 +186,7 @@ class WorkController(object):
     def start(self):
         """Starts the workers main loop."""
         try:
-            self.namespace.start(self)
+            self.blueprint.start(self)
         except SystemTerminate:
             self.terminate()
         except Exception as exc:
@@ -240,11 +240,11 @@ class WorkController(object):
             self._shutdown(warm=False)
 
     def _shutdown(self, warm=True):
-        # if namespace does not exist it means that we had an
+        # if blueprint does not exist it means that we had an
         # error before the bootsteps could be initialized.
-        if self.namespace is not None:
-            self.namespace.stop(self, terminate=not warm)
-            self.namespace.join()
+        if self.blueprint is not None:
+            self.blueprint.stop(self, terminate=not warm)
+            self.blueprint.join()
 
     def reload(self, modules=None, reload=False, reloader=None):
         modules = self.app.loader.task_modules if modules is None else modules
@@ -287,14 +287,14 @@ class WorkController(object):
 
     def stats(self):
         info = self.info()
-        info.update(self.namespace.info(self))
-        info.update(self.consumer.namespace.info(self.consumer))
+        info.update(self.blueprint.info(self))
+        info.update(self.consumer.blueprint.info(self.consumer))
         info.update(rusage=self.rusage())
         return info
 
     @property
     def _state(self):
-        return self.namespace.state
+        return self.blueprint.state
 
     @property
     def state(self):

+ 11 - 11
celery/worker/consumer.py

@@ -135,7 +135,7 @@ class Consumer(object):
 
     restart_count = -1  # first start is the same as a restart
 
-    class Namespace(bootsteps.Namespace):
+    class Blueprint(bootsteps.Blueprint):
         name = 'Consumer'
         default_steps = [
             'celery.worker.consumer:Connection',
@@ -199,10 +199,10 @@ class Consumer(object):
             self.app.conf.BROKER_CONNECTION_TIMEOUT = None
 
         self.steps = []
-        self.namespace = self.Namespace(
+        self.blueprint = self.Blueprint(
             app=self.app, on_close=self.on_close,
         )
-        self.namespace.apply(self, **dict(worker_options or {}, **kwargs))
+        self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
 
     def bucket_for_task(self, type):
         limit = rate(getattr(type, 'rate_limit', None))
@@ -224,12 +224,12 @@ class Consumer(object):
             self.handle_task(request)
 
     def start(self):
-        ns, loop = self.namespace, self.loop
-        while ns.state != CLOSE:
+        blueprint, loop = self.blueprint, self.loop
+        while blueprint.state != CLOSE:
             self.restart_count += 1
             maybe_shutdown()
             try:
-                ns.start(self)
+                blueprint.start(self)
             except self.connection_errors as exc:
                 if isinstance(exc, OSError) and get_errno(exc) == errno.EMFILE:
                     raise  # Too many open files
@@ -239,21 +239,21 @@ class Consumer(object):
                 except RestartFreqExceeded as exc:
                     crit('Frequent restarts detected: %r', exc, exc_info=1)
                     sleep(1)
-                if ns.state != CLOSE and self.connection:
+                if blueprint.state != CLOSE and self.connection:
                     warn(CONNECTION_RETRY, exc_info=True)
                     try:
                         self.connection.collect()
                     except Exception:
                         pass
                     self.on_close()
-                    ns.restart(self)
+                    blueprint.restart(self)
 
     def shutdown(self):
         self.in_shutdown = True
-        self.namespace.shutdown(self)
+        self.blueprint.shutdown(self)
 
     def stop(self):
-        self.namespace.stop(self)
+        self.blueprint.stop(self)
 
     def on_ready(self):
         callback, self.init_callback = self.init_callback, None
@@ -262,7 +262,7 @@ class Consumer(object):
 
     def loop_args(self):
         return (self, self.connection, self.task_consumer,
-                self.strategies, self.namespace, self.hub, self.qos,
+                self.strategies, self.blueprint, self.hub, self.qos,
                 self.amqheartbeat, self.handle_unknown_message,
                 self.handle_unknown_task, self.handle_invalid_task,
                 self.app.clock, self.amqheartbeat_rate)

BIN
docs/images/consumer_graph.png


BIN
docs/images/worker_graph.png


+ 7 - 8
docs/userguide/extending.rst

@@ -17,18 +17,17 @@ Bootsteps
 
 .. _extending-worker-bootsteps:
 
-Worker Bootsteps
-================
+Worker bootsteps
+----------------
 
-.. figure:: ../images/worker_graph.png
-   :width: 700px
+Blablah
 
+.. _extending-consumer-bootsteps:
 
-Consumer Bootsteps
-==================
+Consumer bootsteps
+------------------
 
-.. figure:: ../images/consumer_graph.png
-   :width: 700px
+blahblah
 
 
 .. _extending-programs: