소스 검색

Merge branch 'cell'

Ask Solem 12 년 전
부모
커밋
5e6b04b095
7개의 변경된 파일45개의 추가작업 그리고 19개의 파일을 삭제
  1. 1 0
      celery/app/defaults.py
  2. 21 14
      celery/bootsteps.py
  3. 4 1
      celery/tests/worker/test_bootsteps.py
  4. 3 3
      celery/tests/worker/test_control.py
  5. 3 0
      celery/worker/components.py
  6. 12 0
      celery/worker/consumer.py
  7. 1 1
      celery/worker/control.py

+ 1 - 0
celery/app/defaults.py

@@ -150,6 +150,7 @@ NAMESPACES = {
         'WORKER_DIRECT': Option(False, type='bool'),
     },
     'CELERYD': {
+        'AGENT': Option(None, type='string'),
         'AUTOSCALER': Option('celery.worker.autoscale:Autoscaler'),
         'AUTORELOADER': Option('celery.worker.autoreload:Autoreloader'),
         'BOOT_STEPS': Option((), type='tuple'),

+ 21 - 14
celery/bootsteps.py

@@ -148,9 +148,10 @@ class Namespace(object):
         for name in self._finalize_boot_steps(steps):
             step = steps[name] = steps[name](parent, **kwargs)
             order.append(step)
-            step.include(parent)
         self._debug('New boot order: {%s}',
                     ', '.join(s.alias for s in self.order))
+        for step in order:
+            step.include(parent)
         return self
 
     def import_module(self, module):
@@ -244,11 +245,6 @@ class Step(object):
     #: Note that all dependencies must be in the same namespace.
     requires = ()
 
-    #: Optional obj created by the :meth:`create` method.
-    #: This is used by :class:`StartStopStep` to keep the
-    #: original service object.
-    obj = None
-
     #: This flag is reserved for the workers Consumer,
     #: since it is required to always be started last.
     #: There can only be one object marked with lsat
@@ -261,10 +257,6 @@ class Step(object):
     def __init__(self, parent, **kwargs):
         pass
 
-    def create(self, parent):
-        """Create the step."""
-        pass
-
     def include_if(self, parent):
         """An optional predicate that decided whether this
         step should be created."""
@@ -273,10 +265,17 @@ class Step(object):
     def instantiate(self, name, *args, **kwargs):
         return instantiate(name, *args, **kwargs)
 
-    def include(self, parent):
+    def _should_include(self, parent):
         if self.include_if(parent):
-            self.obj = self.create(parent)
-            return True
+            return True, self.create(parent)
+        return False, None
+
+    def include(self, parent):
+        return self._should_include(parent)[0]
+
+    def create(self, parent):
+        """Create the step."""
+        pass
 
     def __repr__(self):
         return '<step: {0.alias}>'.format(self)
@@ -288,6 +287,11 @@ class Step(object):
 
 class StartStopStep(Step):
 
+    #: Optional obj created by the :meth:`create` method.
+    #: This is used by :class:`StartStopStep` to keep the
+    #: original service object.
+    obj = None
+
     def start(self, parent):
         if self.obj:
             return self.obj.start()
@@ -303,8 +307,11 @@ class StartStopStep(Step):
         self.stop(parent)
 
     def include(self, parent):
-        if super(StartStopStep, self).include(parent):
+        inc, ret = self._should_include(parent)
+        if inc:
+            self.obj = ret
             parent.steps.append(self)
+        return inc
 
 
 class ConsumerStep(StartStopStep):

+ 4 - 1
celery/tests/worker/test_bootsteps.py

@@ -9,9 +9,12 @@ from celery.tests.utils import AppCase, Case
 
 class test_Step(Case):
 
-    class Def(bootsteps.Step):
+    class Def(bootsteps.StartStopStep):
         name = 'test_Step.Def'
 
+    def setUp(self):
+        self.steps = []
+
     def test_namespace_name(self, ns='test_namespace_name'):
 
         class X(bootsteps.Step):

+ 3 - 3
celery/tests/worker/test_control.py

@@ -382,7 +382,7 @@ class test_ControlPanel(Case):
         m = {'method': 'ping',
              'destination': hostname}
         r = self.panel.handle_message(m, None)
-        self.assertEqual(r, 'pong')
+        self.assertEqual(r, {'ok': 'pong'})
 
     def test_shutdown(self):
         m = {'method': 'shutdown',
@@ -405,8 +405,8 @@ class test_ControlPanel(Case):
                       mailbox=self.app.control.mailbox)
         r = panel.dispatch('ping', reply_to={'exchange': 'x',
                                              'routing_key': 'x'})
-        self.assertEqual(r, 'pong')
-        self.assertDictEqual(replies[0], {panel.hostname: 'pong'})
+        self.assertEqual(r, {'ok': 'pong'})
+        self.assertDictEqual(replies[0], {panel.hostname: {'ok': 'pong'}})
 
     def test_pool_restart(self):
         consumer = Consumer()

+ 3 - 0
celery/worker/components.py

@@ -42,6 +42,9 @@ class Queues(bootsteps.Step):
     used by the worker."""
     requires = (Hub, )
 
+    def __init__(self, w, **kwargs):
+        w.start_mediator = False
+
     def create(self, w):
         w.start_mediator = True
         if not w.pool_cls.rlimit_safe:

+ 12 - 0
celery/worker/consumer.py

@@ -118,6 +118,7 @@ class Consumer(object):
             'celery.worker.consumer:Control',
             'celery.worker.consumer:Tasks',
             'celery.worker.consumer:Evloop',
+            'celery.worker.consumer:Agent',
         ]
 
         def shutdown(self, parent):
@@ -456,6 +457,17 @@ class Tasks(bootsteps.StartStopStep):
             c.task_consumer = None
 
 
+class Agent(bootsteps.StartStopStep):
+    requires = (Connection, )
+
+    def __init__(self, c, **kwargs):
+        self.agent_cls = self.enabled = c.app.conf.CELERYD_AGENT
+
+    def create(self, c):
+        agent = c.agent = self.instantiate(self.agent_cls, c.connection)
+        return agent
+
+
 class Evloop(bootsteps.StartStopStep):
     last = True
 

+ 1 - 1
celery/worker/control.py

@@ -214,7 +214,7 @@ def dump_tasks(panel, taskinfoitems=None, **kwargs):
 
 @Panel.register
 def ping(panel, **kwargs):
-    return 'pong'
+    return {'ok': 'pong'}
 
 
 @Panel.register