Explorar o código

Merge branch '3.0'

Ask Solem %!s(int64=12) %!d(string=hai) anos
pai
achega
c5d8965a2e

+ 9 - 3
celery/app/amqp.py

@@ -14,6 +14,7 @@ from weakref import WeakValueDictionary
 from kombu import Connection, Consumer, Exchange, Producer, Queue
 from kombu.common import entry_to_queue
 from kombu.pools import ProducerPool
+from kombu.utils.encoding import safe_repr
 
 from celery import signals
 from celery.utils import cached_property, uuid
@@ -204,14 +205,19 @@ class TaskProducer(Producer):
 
         signals.task_sent.send(sender=task_name, **body)
         if event_dispatcher:
+            exname = exchange or self.exchange
+            if isinstance(exname, Exchange):
+                exname = exname.name
             event_dispatcher.send('task-sent', uuid=task_id,
                                                name=task_name,
-                                               args=repr(task_args),
-                                               kwargs=repr(task_kwargs),
+                                               args=safe_repr(task_args),
+                                               kwargs=safe_repr(task_kwargs),
                                                retries=retries,
                                                eta=eta,
                                                expires=expires,
-                                               queue=queue)
+                                               queue=queue,
+                                               exchange=exname,
+                                               routing_key=routing_key)
         return task_id
     delay_task = publish_task   # XXX Compat
 

+ 2 - 2
celery/app/control.py

@@ -71,8 +71,8 @@ class Inspect(object):
     def revoked(self):
         return self._request('dump_revoked')
 
-    def registered(self):
-        return self._request('dump_tasks')
+    def registered(self, *taskinfoitems):
+        return self._request('dump_tasks', taskinfoitems=taskinfoitems)
     registered_tasks = registered
 
     def ping(self):

+ 1 - 2
celery/events/cursesmon.py

@@ -364,8 +364,7 @@ class CursesMonitor(object):  # pragma: no cover
             except KeyError:
                 pass
             else:
-                info = selection.info(['args', 'kwargs',
-                                       'result', 'runtime', 'eta'])
+                info = selection.info()
                 if 'runtime' in info:
                     info['runtime'] = '%.2fs' % info['runtime']
                 if 'result' in info:

+ 12 - 7
celery/events/state.py

@@ -105,7 +105,8 @@ class Task(Element):
 
     #: meth:`info` displays these fields by default.
     _info_fields = ('args', 'kwargs', 'retries', 'result',
-                    'eta', 'runtime', 'expires', 'exception')
+                    'eta', 'runtime', 'expires', 'exception',
+                    'exchange', 'routing_key')
 
     #: Default values.
     _defaults = dict(uuid=None, name=None, state=states.PENDING,
@@ -114,7 +115,7 @@ class Task(Element):
                      revoked=False, args=None, kwargs=None, eta=None,
                      expires=None, retries=None, worker=None, result=None,
                      exception=None, timestamp=None, runtime=None,
-                     traceback=None)
+                     traceback=None, exchange=None, routing_key=None)
 
     def __init__(self, **fields):
         super(Task, self).__init__(**dict(self._defaults, **fields))
@@ -185,11 +186,15 @@ class Task(Element):
 
     def info(self, fields=None, extra=[]):
         """Information about this task suitable for on-screen display."""
-        if fields is None:
-            fields = self._info_fields
-        return dict((key, getattr(self, key, None))
-                        for key in list(fields) + list(extra)
-                            if getattr(self, key, None) is not None)
+        fields = self._info_fields if fields is None else fields
+
+        def _keys():
+            for key in list(fields) + list(extra):
+                value = getattr(self, key, None)
+                if value is not None:
+                    yield key, value
+
+        return dict(_keys())
 
     def __repr__(self):
         return '<Task: %s(%s) %s>' % (self.name, self.uuid, self.state)

+ 19 - 0
celery/worker/consumer.py

@@ -764,6 +764,25 @@ class Consumer(object):
         elif state.should_terminate:
             raise SystemTerminate()
 
+    def add_task_queue(self, queue, exchange=None, exchange_type=None,
+            routing_key=None, **options):
+        cset = self.task_consumer
+        exchange = queue if exchange is None else exchange
+        routing_key = queue if routing_key is None else routing_key
+        exchange_type = 'direct' if exchange_type is None else exchange_type
+        if not cset.consuming_from(queue):
+            q = self.app.amqp.queues.add(queue,
+                    exchange=exchange,
+                    exchange_type=exchange_type,
+                    routing_key=routing_key, **options)
+            cset.add_queue(q)
+            cset.consume()
+            logger.info('Started consuming from %r', queue)
+
+    def cancel_task_queue(self, queue):
+        self.app.amqp.queues.select_remove(queue)
+        self.task_consumer.cancel_by_queue(queue)
+
     @property
     def info(self):
         """Returns information about this consumer instance

+ 8 - 21
celery/worker/control.py

@@ -20,7 +20,7 @@ from celery.utils.log import get_logger
 from . import state
 from .state import revoked
 
-TASK_INFO_FIELDS = ('exchange', 'routing_key', 'rate_limit')
+DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
 logger = get_logger(__name__)
 
 
@@ -200,12 +200,13 @@ def dump_revoked(panel, **kwargs):
 
 
 @Panel.register
-def dump_tasks(panel, **kwargs):
+def dump_tasks(panel, taskinfoitems=None, **kwargs):
     tasks = panel.app.tasks
+    taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
 
     def _extract_info(task):
         fields = dict((field, str(getattr(task, field, None)))
-                        for field in TASK_INFO_FIELDS
+                        for field in taskinfoitems
                             if getattr(task, field, None) is not None)
         info = map('='.join, fields.items())
         if not info:
@@ -266,28 +267,14 @@ def shutdown(panel, msg='Got shutdown from remote', **kwargs):
 @Panel.register
 def add_consumer(panel, queue, exchange=None, exchange_type=None,
         routing_key=None, **options):
-    cset = panel.consumer.task_consumer
-    exchange = queue if exchange is None else exchange
-    routing_key = queue if routing_key is None else routing_key
-    exchange_type = 'direct' if exchange_type is None else exchange_type
-    if not cset.consuming_from(queue):
-        q = panel.app.amqp.queues.add(queue,
-                exchange=exchange,
-                exchange_type=exchange_type,
-                routing_key=routing_key, **options)
-        cset.add_queue(q)
-        cset.consume()
-        logger.info('Started consuming from %r', queue)
-        return {'ok': 'started consuming from %r' % (queue, )}
-    else:
-        return {'ok': 'already consuming from %r' % (queue, )}
+    panel.consumer.add_task_queue(queue, exchange, exchange_type,
+                                  routing_key, **options)
+    return {'ok': 'add consumer %r' % (queue, )}
 
 
 @Panel.register
 def cancel_consumer(panel, queue=None, **_):
-    panel.app.amqp.queues.select_remove(queue)
-    cset = panel.consumer.task_consumer
-    cset.cancel_by_queue(queue)
+    panel.consumer.cancel_task_queue(queue)
     return {'ok': 'no longer consuming from %s' % (queue, )}