Browse Source

Task: Errbacks can now have (request, exception, traceback) signature.

Closes #2538
Ask Solem 8 years ago
parent
commit
ae3f36f7da
4 changed files with 71 additions and 18 deletions
  1. 20 7
      celery/backends/base.py
  2. 14 0
      celery/canvas.py
  3. 5 0
      celery/utils/functional.py
  4. 32 11
      docs/userguide/canvas.rst

+ 20 - 7
celery/backends/base.py

@@ -37,7 +37,7 @@ from celery.result import (
     GroupResult, ResultBase, allow_join_result, result_from_tuple,
 )
 from celery.utils.collections import BufferMap
-from celery.utils.functional import LRUCache
+from celery.utils.functional import LRUCache, arity_greater
 from celery.utils.log import get_logger
 from celery.utils.serialization import (
     get_pickled_exception,
@@ -153,12 +153,25 @@ class Backend(object):
         if request:
             if request.chord:
                 self.on_chord_part_return(request, state, exc)
-            if call_errbacks:
-                root_id = request.root_id or task_id
-                group(
-                    [self.app.signature(errback)
-                     for errback in request.errbacks or []], app=self.app,
-                ).apply_async((task_id,), parent_id=task_id, root_id=root_id)
+            if call_errbacks and request.errbacks:
+                self._call_task_errbacks(request, exc, traceback)
+
+    def _call_task_errbacks(self, request, exc, traceback):
+        old_signature = []
+        for errback in request.errbacks:
+            errback = self.app.signature(errback)
+            if arity_greater(errback.type.__header__, 1):
+                errback(request, exc, traceback)
+            else:
+                old_signature.append(errback)
+        if old_signature:
+            # Previously errback was called as a task so we still
+            # need to do so if the errback only takes a single task_id arg.
+            task_id = request.id
+            root_id = request.root_id or task_id
+            group(old_signature, app=self.app).apply_async(
+                (task_id,), parent_id=task_id, root_id=root_id
+            )
 
     def mark_as_revoked(self, task_id, reason='',
                         request=None, store_result=True, state=states.REVOKED):

+ 14 - 0
celery/canvas.py

@@ -299,6 +299,20 @@ class Signature(dict):
     def link_error(self, errback):
         return self.append_to_list_option('link_error', errback)
 
+    def on_error(self, errback):
+        """Version of :meth:`link_error` that supports chaining.
+
+        on_error chains the original signature, not the errback so::
+
+            >>> add.s(2, 2).on_error(errback.s()).delay()
+
+        calls the ``add`` task, not the ``errback`` task, but the
+        reverse is true for :meth:`link_error`.
+
+        """
+        self.link_error(errback)
+        return self
+
     def flatten_links(self):
         return list(_chain.from_iterable(_chain(
             [[self]],

+ 5 - 0
celery/utils/functional.py

@@ -270,6 +270,11 @@ def head_from_fun(fun, bound=False, debug=False):
     return result
 
 
+def arity_greater(fun, n):
+    argspec = getfullargspec(fun)
+    return argspec.varargs or len(argspec.args) > n
+
+
 def fun_takes_argument(name, fun, position=None):
     spec = getfullargspec(fun)
     return (

+ 32 - 11
docs/userguide/canvas.rst

@@ -534,30 +534,38 @@ and signatures can be linked too:
     >>> s.link(mul.s(4))
     >>> s.link(log_result.s())
 
-You can also add *error callbacks* using the ``link_error`` argument:
+You can also add *error callbacks* using the `on_error` method::
+
+.. code-block:: pycon
+
+    >>> add.s(2, 2).on_error(log_error.s()).delay()
+
+Which will resut in the following ``.apply_async`` call when the signature
+is applied:
 
 .. code-block:: pycon
 
     >>> add.apply_async((2, 2), link_error=log_error.s())
 
-    >>> add.signature((2, 2), link_error=log_error.s())
+The worker will not actually call the errback as a task, but will
+instead call the errback function directly so that the raw request, exception
+and traceback objects can be passed to it.
 
-Since exceptions can only be serialized when pickle is used
-the error callbacks take the id of the parent task as argument instead:
+Here's an example errback:
 
 .. code-block:: python
 
     from __future__ import print_function
+
     import os
+
     from proj.celery import app
 
     @app.task
-    def log_error(task_id):
-        result = app.AsyncResult(task_id)
-        result.get(propagate=False)  # make sure result written.
-        with open(os.path.join('/var/errors', task_id), 'a') as fh:
+    def log_error(request, exc, traceback):
+        with open(os.path.join('/var/errors', request.id), 'a') as fh:
             print('--\n\n{0} {1} {2}'.format(
-                task_id, result.result, result.traceback), file=fh)
+                task_id, exc, traceback), file=fh)
 
 To make it even easier to link tasks together there is
 a special signature called :class:`~celery.chain` that lets
@@ -828,8 +836,7 @@ Error handling
 
 So what happens if one of the tasks raises an exception?
 
-Errors will propagate to the callback, so the callback will not be executed
-instead the callback changes to failure state, and the error is set
+The chord callback result will transition to the failure state, and the error is set
 to the :exc:`~@ChordError` exception:
 
 .. code-block:: pycon
@@ -859,6 +866,20 @@ Note that the rest of the tasks will still execute, so the third task
 Also the :exc:`~@ChordError` only shows the task that failed
 first (in time): it does not respect the ordering of the header group.
 
+To perform an action when a chord fails you can therefore attach
+an errback to the chord callback:
+
+.. code-block:: python
+
+    @app.task
+    def on_chord_error(request, exc, traceback):
+        print('Task {0!r} raised error: {1!r}'.format(request.id, exc))
+
+.. code-block:: pycon
+
+    >>> c = (group(add.s(i, i) for i in range(10)) |
+    ...      xsum.s().on_error(on_chord_error.s()))).delay()
+
 .. _chord-important-notes:
 
 Important Notes