Browse Source

Adds task.send signal. Closes #1281

Ask Solem 11 years ago
parent
commit
cf70cbfbe5
3 changed files with 70 additions and 4 deletions
  1. 18 2
      celery/app/amqp.py
  2. 4 0
      celery/signals.py
  3. 48 2
      docs/userguide/signals.rst

+ 18 - 2
celery/app/amqp.py

@@ -195,7 +195,12 @@ class TaskProducer(Producer):
                      callbacks=None, errbacks=None, routing_key=None,
                      serializer=None, delivery_mode=None, compression=None,
                      reply_to=None, time_limit=None, soft_time_limit=None,
-                     declare=None, **kwargs):
+                     declare=None, headers=None,
+                     send_task_send=signals.task_send.send,
+                     send_task_sent=signals.task_sent.send,
+                     send_receivers=signals.task_send.receivers,
+                     sent_receivers=signals.task_sent.receivers,
+                     **kwargs):
         """Send task message."""
         retry = self.retry if retry is None else retry
 
@@ -249,17 +254,28 @@ class TaskProducer(Producer):
             'chord': chord,
         }
 
+        if send_receivers:
+            send_task_send(sender=task_name, body=body,
+                           exchange=exchange,
+                           routing_key=routing_key,
+                           declare=declare,
+                           headers=headers,
+                           properties=kwargs,
+                           retry_policy=retry_policy)
+
         self.publish(
             body,
             exchange=exchange, routing_key=routing_key,
             serializer=serializer or self.serializer,
             compression=compression or self.compression,
+            headers=headers,
             retry=retry, retry_policy=_rp,
             delivery_mode=delivery_mode, declare=declare,
             **kwargs
         )
 
-        signals.task_sent.send(sender=task_name, **body)
+        if sent_receivers:
+            send_task_sent(sender=task_name, **body)
         if self.send_sent_event:
             evd = event_dispatcher or self.event_dispatcher
             exname = exchange or self.exchange

+ 4 - 0
celery/signals.py

@@ -24,6 +24,10 @@ __all__ = ['task_sent', 'task_prerun', 'task_postrun', 'task_success',
            'eventlet_pool_preshutdown', 'eventlet_pool_postshutdown',
            'eventlet_pool_apply']
 
+task_send = Signal(providing_args=[
+    'body', 'exchange', 'routing_key', 'headers', 'properties',
+    'declare', 'retry_policy',
+])
 task_sent = Signal(providing_args=[
     'task_id', 'task', 'args', 'kwargs', 'eta', 'taskset',
 ])

+ 48 - 2
docs/userguide/signals.rst

@@ -61,14 +61,60 @@ Signals
 Task Signals
 ------------
 
+.. signal:: task_send
+
+task_send
+~~~~~~~~~
+.. versionadded:: 3.1
+
+Dispatched before a task is published.
+Note that this is executed in the process sending the task.
+
+Sender is the name of the task being sent.
+
+Provides arguements:
+
+* body
+
+    Task message body.
+
+    This is a mapping containing the task message fields
+    (see :ref:`internals-task-message-protocol`).
+
+* exchange
+
+    Name of the exchange to send to or a :class:`~kombu.Exchange` object.
+
+* routing_key
+
+    Routing used when sending the message.
+
+* headers
+
+    Application headers mapping (can be modified).
+
+* properties
+
+    Message properties (can be modified)
+
+* declare
+
+    List of entities (:class:`~kombu.Exchange`,
+    :class:`~kombu.Queue` or :class:~`kombu.binding` to declare before
+    publishing the message.  Can be modified.
+
+* retry_policy
+
+    Mapping of retry options.  Can be any argument to
+    :meth:`kombu.Connection.ensure` and can be modified.
+
 .. signal:: task_sent
 
 task_sent
 ~~~~~~~~~
 
 Dispatched when a task has been sent to the broker.
-Note that this is executed in the client process, the one sending
-the task, not in the worker.
+Note that this is executed in the process that sent the task.
 
 Sender is the name of the task being sent.