Przeglądaj źródła

Task sent event now adds exchange+routing_key

Ask Solem 12 lat temu
rodzic
commit
c4eaa766bf
3 zmienionych plików z 15 dodań i 8 usunięć
  1. 9 3
      celery/app/amqp.py
  2. 2 2
      celery/app/control.py
  3. 4 3
      celery/worker/control.py

+ 9 - 3
celery/app/amqp.py

@@ -14,6 +14,7 @@ from weakref import WeakValueDictionary
 from kombu import Connection, Consumer, Exchange, Producer, Queue
 from kombu import Connection, Consumer, Exchange, Producer, Queue
 from kombu.common import entry_to_queue
 from kombu.common import entry_to_queue
 from kombu.pools import ProducerPool
 from kombu.pools import ProducerPool
+from kombu.utils.encoding import safe_repr
 
 
 from celery import signals
 from celery import signals
 from celery.utils import cached_property, uuid
 from celery.utils import cached_property, uuid
@@ -204,14 +205,19 @@ class TaskProducer(Producer):
 
 
         signals.task_sent.send(sender=task_name, **body)
         signals.task_sent.send(sender=task_name, **body)
         if event_dispatcher:
         if event_dispatcher:
+            exname = exchange or self.exchange
+            if isinstance(exname, Exchange):
+                exname = exname.name
             event_dispatcher.send('task-sent', uuid=task_id,
             event_dispatcher.send('task-sent', uuid=task_id,
                                                name=task_name,
                                                name=task_name,
-                                               args=repr(task_args),
-                                               kwargs=repr(task_kwargs),
+                                               args=safe_repr(task_args),
+                                               kwargs=safe_repr(task_kwargs),
                                                retries=retries,
                                                retries=retries,
                                                eta=eta,
                                                eta=eta,
                                                expires=expires,
                                                expires=expires,
-                                               queue=queue)
+                                               queue=queue,
+                                               exchange=exname,
+                                               routing_key=routing_key)
         return task_id
         return task_id
     delay_task = publish_task   # XXX Compat
     delay_task = publish_task   # XXX Compat
 
 

+ 2 - 2
celery/app/control.py

@@ -71,8 +71,8 @@ class Inspect(object):
     def revoked(self):
     def revoked(self):
         return self._request('dump_revoked')
         return self._request('dump_revoked')
 
 
-    def registered(self):
-        return self._request('dump_tasks')
+    def registered(self, *taskinfoitems):
+        return self._request('dump_tasks', taskinfoitems=taskinfoitems)
     registered_tasks = registered
     registered_tasks = registered
 
 
     def ping(self):
     def ping(self):

+ 4 - 3
celery/worker/control.py

@@ -20,7 +20,7 @@ from celery.utils.log import get_logger
 from . import state
 from . import state
 from .state import revoked
 from .state import revoked
 
 
-TASK_INFO_FIELDS = ('exchange', 'routing_key', 'rate_limit')
+DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
 logger = get_logger(__name__)
 logger = get_logger(__name__)
 
 
 
 
@@ -200,12 +200,13 @@ def dump_revoked(panel, **kwargs):
 
 
 
 
 @Panel.register
 @Panel.register
-def dump_tasks(panel, **kwargs):
+def dump_tasks(panel, taskinfoitems=None, **kwargs):
     tasks = panel.app.tasks
     tasks = panel.app.tasks
+    taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
 
 
     def _extract_info(task):
     def _extract_info(task):
         fields = dict((field, str(getattr(task, field, None)))
         fields = dict((field, str(getattr(task, field, None)))
-                        for field in TASK_INFO_FIELDS
+                        for field in taskinfoitems
                             if getattr(task, field, None) is not None)
                             if getattr(task, field, None) is not None)
         info = map('='.join, fields.items())
         info = map('='.join, fields.items())
         if not info:
         if not info: