Ask Solem 11 lat temu
rodzic
commit
44367d85b0

+ 12 - 2
celery/bootsteps.py

@@ -99,6 +99,12 @@ class Blueprint(object):
     state = None
     state = None
     started = 0
     started = 0
     default_steps = set()
     default_steps = set()
+    state_to_name = {
+        0: 'initializing',
+        RUN: 'running',
+        CLOSE: 'closing',
+        TERMINATE: 'terminating',
+    }
 
 
     def __init__(self, steps=None, name=None, app=None,
     def __init__(self, steps=None, name=None, app=None,
                  on_start=None, on_close=None, on_stopped=None):
                  on_start=None, on_close=None, on_stopped=None):
@@ -121,6 +127,9 @@ class Blueprint(object):
             step.start(parent)
             step.start(parent)
             debug('^-- substep ok')
             debug('^-- substep ok')
 
 
+    def human_state(self):
+        return self.state_to_name[self.state or 0]
+
     def info(self, parent):
     def info(self, parent):
         info = {}
         info = {}
         for step in parent.steps:
         for step in parent.steps:
@@ -135,7 +144,8 @@ class Blueprint(object):
     def restart(self, parent, method='stop', description='Restarting'):
     def restart(self, parent, method='stop', description='Restarting'):
         self.send_all(parent, method, description)
         self.send_all(parent, method, description)
 
 
-    def send_all(self, parent, method, description=None, reverse=True):
+    def send_all(self, parent, method,
+                 description=None, reverse=True, args=()):
         description = description or method.capitalize()
         description = description or method.capitalize()
         steps = reversed(parent.steps) if reverse else parent.steps
         steps = reversed(parent.steps) if reverse else parent.steps
         with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):  # Issue 975
         with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):  # Issue 975
@@ -144,7 +154,7 @@ class Blueprint(object):
                     self._debug('%s %s...', description, step.alias)
                     self._debug('%s %s...', description, step.alias)
                     fun = getattr(step, method, None)
                     fun = getattr(step, method, None)
                     if fun:
                     if fun:
-                        fun(parent)
+                        fun(parent, *args)
 
 
     def stop(self, parent, close=True, terminate=False):
     def stop(self, parent, close=True, terminate=False):
         what = 'Terminating' if terminate else 'Stopping'
         what = 'Terminating' if terminate else 'Stopping'

+ 8 - 0
celery/worker/__init__.py

@@ -301,6 +301,14 @@ class WorkController(object):
             info['rusage'] = 'N/A'
             info['rusage'] = 'N/A'
         return info
         return info
 
 
+    def __repr__(self):
+        return '<Worker: {self.hostname} ({state})>'.format(
+            self=self, state=self.blueprint.human_state(),
+        )
+
+    def __str__(self):
+        return self.hostname
+
     @property
     @property
     def state(self):
     def state(self):
         return state
         return state

+ 5 - 0
celery/worker/consumer.py

@@ -417,6 +417,11 @@ class Consumer(object):
 
 
         return on_task_received
         return on_task_received
 
 
+    def __repr__(self):
+        return '<Consumer: {self.hostname} ({state})>'.format(
+            self=self, state=self.blueprint.human_state(),
+        )
+
 
 
 class Connection(bootsteps.StartStopStep):
 class Connection(bootsteps.StartStopStep):
 
 

+ 0 - 11
docs/reference/celery.worker.hub.rst

@@ -1,11 +0,0 @@
-=====================================
- celery.worker.hub
-=====================================
-
-.. contents::
-    :local:
-.. currentmodule:: celery.worker.hub
-
-.. automodule:: celery.worker.hub
-    :members:
-    :undoc-members:

+ 11 - 11
docs/userguide/extending.rst

@@ -135,7 +135,7 @@ Attributes
 
 
 .. attribute:: hub
 .. attribute:: hub
 
 
-    Event loop object (:class:`~celery.worker.hub.Hub`).  You can use
+    Event loop object (:class:`~kombu.async.Hub`).  You can use
     this to register callbacks in the event loop.
     this to register callbacks in the event loop.
 
 
     This is only supported by async I/O enabled transports (amqp, redis),
     This is only supported by async I/O enabled transports (amqp, redis),
@@ -279,7 +279,7 @@ Attributes
 
 
 .. attribute:: hub
 .. attribute:: hub
 
 
-    Event loop object (:class:`~celery.worker.hub.Hub`).  You can use
+    Event loop object (:class:`~kombu.async.Hub`).  You can use
     this to register callbacks in the event loop.
     this to register callbacks in the event loop.
 
 
     This is only supported by async I/O enabled transports (amqp, redis),
     This is only supported by async I/O enabled transports (amqp, redis),
@@ -456,15 +456,15 @@ It can be added both as a worker and consumer bootstep:
 Starting the worker with this step installed will give us the following
 Starting the worker with this step installed will give us the following
 logs::
 logs::
 
 
-    <celery.apps.worker.Worker object at 0x101ad8410> is in init
-    <celery.worker.consumer.Consumer object at 0x101c2d790> is in init
+    <Worker: w@example.com (initializing)> is in init
+    <Consumer: w@example.com (initializing)> is in init
     [2013-05-29 16:18:20,544: WARNING/MainProcess]
     [2013-05-29 16:18:20,544: WARNING/MainProcess]
-        <celery.apps.worker.Worker object at 0x101ad8410> is starting
+        <Worker: w@example.com (running)> is starting
     [2013-05-29 16:18:21,577: WARNING/MainProcess]
     [2013-05-29 16:18:21,577: WARNING/MainProcess]
-        <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
-    <celery.worker.consumer.Consumer object at 0x101c2d790> is stopping
-    <celery.apps.worker.Worker object at 0x101ad8410> is stopping
-    <celery.worker.consumer.Consumer object at 0x101c2d790> is shutting down
+        <Consumer: w@example.com (running)> is starting
+    <Consumer: w@example.com (closing)> is stopping
+    <Worker: w@example.com (closing)> is stopping
+    <Consumer: w@example.com (terminating)> is shutting down
 
 
 The ``print`` statements will be redirected to the logging subsystem after
 The ``print`` statements will be redirected to the logging subsystem after
 the worker has been initialized, so the "is starting" lines are timestamped.
 the worker has been initialized, so the "is starting" lines are timestamped.
@@ -629,8 +629,8 @@ Worker API
 ==========
 ==========
 
 
 
 
-:class:`~celery.worker.Hub` - The workers async event loop.
------------------------------------------------------------
+:class:`~kombu.async.Hub` - The workers async event loop.
+---------------------------------------------------------
 :supported transports: amqp, redis
 :supported transports: amqp, redis
 
 
 .. versionadded:: 3.0
 .. versionadded:: 3.0