浏览代码

New signals: before_task_publish / after_task_publish

- The new task_send in 3.1 signal was renamed to before_task_publish

- The task_sent signal is deprecated and after_task_publish must be used
  instead.  The new signal gives more flexibility as it contains more
  information.

Closes #1606
Ask Solem 11 年之前
父节点
当前提交
3c537aa99f

+ 24 - 13
celery/app/amqp.py

@@ -211,9 +211,11 @@ class TaskProducer(Producer):
                      serializer=None, delivery_mode=None, compression=None,
                      serializer=None, delivery_mode=None, compression=None,
                      reply_to=None, time_limit=None, soft_time_limit=None,
                      reply_to=None, time_limit=None, soft_time_limit=None,
                      declare=None, headers=None,
                      declare=None, headers=None,
-                     send_task_send=signals.task_send.send,
-                     send_task_sent=signals.task_sent.send,
-                     send_receivers=signals.task_send.receivers,
+                     send_before_publish=signals.before_task_publish.send,
+                     before_receivers=signals.before_task_publish.receivers,
+                     send_after_publish=signals.after_task_publish.send,
+                     after_receivers=signals.after_task_publish.receivers,
+                     send_task_sent=signals.task_sent.send,  # XXX deprecated
                      sent_receivers=signals.task_sent.receivers,
                      sent_receivers=signals.task_sent.receivers,
                      **kwargs):
                      **kwargs):
         """Send task message."""
         """Send task message."""
@@ -272,14 +274,16 @@ class TaskProducer(Producer):
             'chord': chord,
             'chord': chord,
         }
         }
 
 
-        if send_receivers:
-            send_task_send(sender=task_name, body=body,
-                           exchange=exchange,
-                           routing_key=routing_key,
-                           declare=declare,
-                           headers=headers,
-                           properties=kwargs,
-                           retry_policy=retry_policy)
+        if before_receivers:
+            send_before_publish(
+                sender=task_name, body=body,
+                exchange=exchange,
+                routing_key=routing_key,
+                declare=declare,
+                headers=headers,
+                properties=kwargs,
+                retry_policy=retry_policy,
+            )
 
 
         self.publish(
         self.publish(
             body,
             body,
@@ -294,8 +298,15 @@ class TaskProducer(Producer):
             **kwargs
             **kwargs
         )
         )
 
 
-        if sent_receivers:
-            send_task_sent(sender=task_name, **body)
+        if after_receivers:
+            send_after_publish(sender=task_name, body=body,
+                               exchange=exchange, routing_key=routing_key)
+
+        if sent_receivers:  # XXX deprecated
+            send_task_sent(sender=task_name, task_id=task_id,
+                           task=task_name, args=task_args,
+                           kwargs=task_kwargs, eta=eta,
+                           taskset=group_id or taskset_id)
         if self.send_sent_event:
         if self.send_sent_event:
             evd = event_dispatcher or self.event_dispatcher
             evd = event_dispatcher or self.event_dispatcher
             exname = exchange or self.exchange
             exname = exchange or self.exchange

+ 1 - 1
celery/backends/database/session.py

@@ -49,7 +49,7 @@ def get_engine(dburi, **kwargs):
 def create_session(dburi, short_lived_sessions=False, **kwargs):
 def create_session(dburi, short_lived_sessions=False, **kwargs):
     engine = get_engine(dburi, **kwargs)
     engine = get_engine(dburi, **kwargs)
     if short_lived_sessions or dburi not in _SESSIONS:
     if short_lived_sessions or dburi not in _SESSIONS:
-        session = _SESSIONS[dburi] = sessionmaker(bind=engine)
+        _SESSIONS[dburi] = sessionmaker(bind=engine)
     return engine, _SESSIONS[dburi]
     return engine, _SESSIONS[dburi]
 
 
 
 

+ 1 - 2
celery/concurrency/prefork.py

@@ -389,7 +389,7 @@ class AsynPool(_pool.Pool):
         # only used by async pool.
         # only used by async pool.
         if hard:
         if hard:
             self._tref_for_id[job] = hub.call_at(
             self._tref_for_id[job] = hub.call_at(
-                time() + (hard - soft), self._on_hard_timeout, job,
+                now() + (hard - soft), self._on_hard_timeout, job,
             )
             )
         try:
         try:
             result = self._cache[job]
             result = self._cache[job]
@@ -482,7 +482,6 @@ class AsynPool(_pool.Pool):
         mark_write_fd_as_active = active_writes.add
         mark_write_fd_as_active = active_writes.add
         mark_write_gen_as_active = self._active_writers.add
         mark_write_gen_as_active = self._active_writers.add
         mark_worker_as_busy = busy_workers.add
         mark_worker_as_busy = busy_workers.add
-        mark_worker_as_available = busy_workers.discard
         write_generator_done = self._active_writers.discard
         write_generator_done = self._active_writers.discard
         get_job = self._cache.__getitem__
         get_job = self._cache.__getitem__
         # puts back at the end of the queue
         # puts back at the end of the queue

+ 7 - 2
celery/signals.py

@@ -15,7 +15,8 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 from .utils.dispatch import Signal
 from .utils.dispatch import Signal
 
 
-__all__ = ['task_sent', 'task_prerun', 'task_postrun', 'task_success',
+__all__ = ['before_task_publish', 'after_task_publish',
+           'task_prerun', 'task_postrun', 'task_success',
            'task_retry', 'task_failure', 'task_revoked', 'celeryd_init',
            'task_retry', 'task_failure', 'task_revoked', 'celeryd_init',
            'celeryd_after_setup', 'worker_init', 'worker_process_init',
            'celeryd_after_setup', 'worker_init', 'worker_process_init',
            'worker_ready', 'worker_shutdown', 'setup_logging',
            'worker_ready', 'worker_shutdown', 'setup_logging',
@@ -24,10 +25,14 @@ __all__ = ['task_sent', 'task_prerun', 'task_postrun', 'task_success',
            'eventlet_pool_preshutdown', 'eventlet_pool_postshutdown',
            'eventlet_pool_preshutdown', 'eventlet_pool_postshutdown',
            'eventlet_pool_apply']
            'eventlet_pool_apply']
 
 
-task_send = Signal(providing_args=[
+before_task_publish = Signal(providing_args=[
     'body', 'exchange', 'routing_key', 'headers', 'properties',
     'body', 'exchange', 'routing_key', 'headers', 'properties',
     'declare', 'retry_policy',
     'declare', 'retry_policy',
 ])
 ])
+after_task_publish = Signal(providing_args=[
+    'body', 'exchange', 'routing_key',
+])
+#: Deprecated, use after_task_publish instead.
 task_sent = Signal(providing_args=[
 task_sent = Signal(providing_args=[
     'task_id', 'task', 'args', 'kwargs', 'eta', 'taskset',
     'task_id', 'task', 'args', 'kwargs', 'eta', 'taskset',
 ])
 ])

+ 1 - 0
celery/tests/concurrency/test_prefork.py

@@ -228,6 +228,7 @@ class test_ResultHandler(PoolCase):
             Mock(), Mock(), Mock(), Mock(),
             Mock(), Mock(), Mock(), Mock(),
             fileno_to_outq={},
             fileno_to_outq={},
             on_process_alive=Mock(),
             on_process_alive=Mock(),
+            on_job_ready=Mock(),
         )
         )
         self.assertTrue(x)
         self.assertTrue(x)
         x.on_state_change = Mock()
         x.on_state_change = Mock()

+ 2 - 0
docs/internals/protocol.rst

@@ -1,5 +1,7 @@
 .. _internals-task-message-protocol:
 .. _internals-task-message-protocol:
 
 
+.. _task-message-protocol-v1:
+
 =======================
 =======================
  Task Messages
  Task Messages
 =======================
 =======================

+ 42 - 35
docs/userguide/signals.rst

@@ -21,30 +21,33 @@ Basics
 Several kinds of events trigger signals, you can connect to these signals
 Several kinds of events trigger signals, you can connect to these signals
 to perform actions as they trigger.
 to perform actions as they trigger.
 
 
-Example connecting to the :signal:`task_sent` signal:
+Example connecting to the :signal:`after_task_publish` signal:
 
 
 .. code-block:: python
 .. code-block:: python
 
 
-    from celery.signals import task_sent
+    from celery.signals import after_task_publish
 
 
-    @task_sent.connect
-    def task_sent_handler(sender=None, task_id=None, task=None, args=None,
-                          kwargs=None, **kwds):
-        print('Got signal task_sent for task id {0}'.format(task_id))
+    @after_task_publish.connect
+    def task_sent_handler(sender=None, body=None, **kwargs):
+        print('after_task_publish for task id {body[id]}'.format(
+            body=body,
+        ))
 
 
 
 
 Some signals also have a sender which you can filter by. For example the
 Some signals also have a sender which you can filter by. For example the
-:signal:`task_sent` signal uses the task name as a sender, so you can
-connect your handler to be called only when tasks with name `"tasks.add"`
-has been sent by providing the `sender` argument to
-:class:`~celery.utils.dispatch.signal.Signal.connect`:
+:signal:`after_task_publish` signal uses the task name as a sender, so by
+providing the ``sender`` argument to
+:class:`~celery.utils.dispatch.signal.Signal.connect` you can
+connect your handler to be called every time a task with name `"proj.tasks.add"`
+is published:
 
 
 .. code-block:: python
 .. code-block:: python
 
 
-    @task_sent.connect(sender='tasks.add')
-    def task_sent_handler(sender=None, task_id=None, task=None, args=None,
-                          kwargs=None, **kwds):
-        print('Got signal task_sent for task id {0}'.format(task_id)
+    @after_task_publish.connect(sender='proj.tasks.add')
+    def task_sent_handler(sender=None, body=None, **kwargs):
+        print('after_task_publish for task id {body[id]}'.format(
+            body=body,
+        ))
 
 
 Signals use the same implementation as django.core.dispatch. As a result other
 Signals use the same implementation as django.core.dispatch. As a result other
 keyword parameters (e.g. signal) are passed to all signal handlers by default.
 keyword parameters (e.g. signal) are passed to all signal handlers by default.
@@ -61,10 +64,10 @@ Signals
 Task Signals
 Task Signals
 ------------
 ------------
 
 
-.. signal:: task_send
+.. signal:: before_task_publish
 
 
-task_send
-~~~~~~~~~
+before_task_publish
+~~~~~~~~~~~~~~~~~~~
 .. versionadded:: 3.1
 .. versionadded:: 3.1
 
 
 Dispatched before a task is published.
 Dispatched before a task is published.
@@ -79,7 +82,7 @@ Provides arguements:
     Task message body.
     Task message body.
 
 
     This is a mapping containing the task message fields
     This is a mapping containing the task message fields
-    (see :ref:`internals-task-message-protocol`).
+    (see :ref:`task-message-protocol-v1`).
 
 
 * exchange
 * exchange
 
 
@@ -87,7 +90,7 @@ Provides arguements:
 
 
 * routing_key
 * routing_key
 
 
-    Routing used when sending the message.
+    Routing key to use when sending the message.
 
 
 * headers
 * headers
 
 
@@ -108,10 +111,10 @@ Provides arguements:
     Mapping of retry options.  Can be any argument to
     Mapping of retry options.  Can be any argument to
     :meth:`kombu.Connection.ensure` and can be modified.
     :meth:`kombu.Connection.ensure` and can be modified.
 
 
-.. signal:: task_sent
+.. signal:: after_task_publish
 
 
-task_sent
-~~~~~~~~~
+after_task_publish
+~~~~~~~~~~~~~~~~~~
 
 
 Dispatched when a task has been sent to the broker.
 Dispatched when a task has been sent to the broker.
 Note that this is executed in the process that sent the task.
 Note that this is executed in the process that sent the task.
@@ -120,24 +123,18 @@ Sender is the name of the task being sent.
 
 
 Provides arguments:
 Provides arguments:
 
 
-* task_id
-    Id of the task to be executed.
+* body
 
 
-* task
-    The task being executed.
+    The task message body, see :ref:`task-message-protocol-v1`
+    for a reference of possible fields that can be defined.
 
 
-* args
-    the tasks positional arguments.
+* exchange
 
 
-* kwargs
-    The tasks keyword arguments.
+    Name of the exchange or :class:`~kombu.Exchange` object used.
 
 
-* eta
-    The time to execute the task.
+* routing_key
 
 
-* taskset
-    Id of the group this task is part of (if any).
-    (named taskset for historial reasons)
+    Routing key used.
 
 
 .. signal:: task_prerun
 .. signal:: task_prerun
 
 
@@ -580,3 +577,13 @@ Provides arguments:
 * options
 * options
 
 
     Mapping of the parsed user preload options (with default values).
     Mapping of the parsed user preload options (with default values).
+
+Deprecated Signals
+------------------
+
+.. signal:: task_sent
+
+task_sent
+~~~~~~~~~
+
+This signal is deprecated, please use :signal:`after_task_publish` instead.

+ 6 - 1
docs/whatsnew-3.1.rst

@@ -847,9 +847,14 @@ In Other News
 
 
         >>> t.apply_async(headers={'sender': 'George Costanza'})
         >>> t.apply_async(headers={'sender': 'George Costanza'})
 
 
-- New :signal:`task_send`` signal dispatched before a task message
+- New :signal:`task_before_publish`` signal dispatched before a task message
   is sent and can be used to modify the final message fields (Issue #1281).
   is sent and can be used to modify the final message fields (Issue #1281).
 
 
+- New :signal:`task_after_publish` signal replaces the old :signal:`task_sent`
+  signal.
+
+    The :signal:`task_sent` signal is now deprecated and should not be used.
+
 - ``celery.platforms.PIDFile`` renamed to :class:`celery.platforms.Pidfile`.
 - ``celery.platforms.PIDFile`` renamed to :class:`celery.platforms.Pidfile`.
 
 
 - MongoDB Backend: Can now be configured using an URL
 - MongoDB Backend: Can now be configured using an URL