Pārlūkot izejas kodu

Defines what happens when a chord task fails. Closes #1172

Ask Solem 12 gadi atpakaļ
vecāks
revīzija
e8515ed7c5
5 mainītis faili ar 104 papildinājumiem un 16 dzēšanām
  1. 18 7
      celery/app/builtins.py
  2. 35 6
      celery/backends/base.py
  3. 4 0
      celery/exceptions.py
  4. 6 0
      celery/result.py
  5. 41 3
      docs/userguide/canvas.rst

+ 18 - 7
celery/app/builtins.py

@@ -68,18 +68,29 @@ def add_unlock_chord_task(app):
 
     """
     from celery.canvas import subtask
-    from celery import result as _res
+    from celery.exceptions import ChordError
 
     @app.task(name='celery.chord_unlock', max_retries=None,
               default_retry_delay=1, ignore_result=True, _force_evaluate=True)
-    def unlock_chord(group_id, callback, interval=None, propagate=False,
+    def unlock_chord(group_id, callback, interval=None, propagate=True,
                      max_retries=None, result=None):
         if interval is None:
             interval = unlock_chord.default_retry_delay
-        result = _res.GroupResult(group_id, map(_res.AsyncResult, result))
-        j = result.join_native if result.supports_native_join else result.join
-        if result.ready():
-            subtask(callback).delay(j(propagate=propagate))
+        deps = app.GroupResult(group_id, map(app.AsyncResult, result))
+        j = deps.join_native if deps.supports_native_join else deps.join
+        if deps.ready():
+            callback = subtask(callback)
+            try:
+                ret = j(propagate=propagate)
+            except Exception, exc:
+                culprit = deps._failed_join_report().next()
+
+                app._tasks[callback.task].backend.fail_from_current_stack(
+                    callback.id, exc=ChordError('Dependency %s raised %r' % (
+                        culprit.id, exc)),
+                )
+            else:
+                callback.delay(ret)
         else:
             return unlock_chord.retry(countdown=interval,
                                       max_retries=max_retries)
@@ -274,7 +285,7 @@ def add_chord_task(app):
         ignore_result = False
 
         def run(self, header, body, partial_args=(), interval=1, countdown=1,
-                max_retries=None, propagate=False, eager=False, **kwargs):
+                max_retries=None, propagate=True, eager=False, **kwargs):
             group_id = uuid()
             AsyncResult = self.app.AsyncResult
             prepare_member = self._prepare_member

+ 35 - 6
celery/backends/base.py

@@ -20,13 +20,14 @@ import sys
 
 from datetime import timedelta
 
+from billiard.einfo import ExceptionInfo
 from kombu import serialization
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
 
 from celery import states
 from celery.app import current_task
 from celery.datastructures import LRUCache
-from celery.exceptions import TimeoutError, TaskRevokedError
+from celery.exceptions import ChordError, TaskRevokedError, TimeoutError
 from celery.result import from_serializable, GroupResult
 from celery.utils import timeutils
 from celery.utils.serialization import (
@@ -112,6 +113,16 @@ class BaseBackend(object):
         return self.store_result(task_id, exc, status=states.FAILURE,
                                  traceback=traceback)
 
+    def fail_from_current_stack(self, task_id, exc=None):
+        type_, real_exc, tb = sys.exc_info()
+        try:
+            exc = real_exc if exc is None else exc
+            ei = ExceptionInfo((type_, exc, tb))
+            self.mark_as_failure(task_id, exc, ei.traceback)
+            return ei
+        finally:
+            del(tb)
+
     def mark_as_retry(self, task_id, exc, traceback=None):
         """Mark task as being retries. Stores the current
         exception (if any)."""
@@ -226,7 +237,7 @@ class BaseBackend(object):
         raise NotImplementedError(
             'reload_group_result is not supported by this backend.')
 
-    def on_chord_part_return(self, task, propagate=False):
+    def on_chord_part_return(self, task, propagate=True):
         pass
 
     def fallback_chord_unlock(self, group_id, body, result=None,
@@ -245,6 +256,9 @@ class BaseBackend(object):
     def __reduce__(self, args=(), kwargs={}):
         return (unpickle_backend, (self.__class__, args, kwargs))
 
+    def is_cached(self, task_id):
+        return False
+
 
 class BaseDictBackend(BaseBackend):
 
@@ -253,6 +267,9 @@ class BaseDictBackend(BaseBackend):
         self._cache = LRUCache(limit=kwargs.get('max_cached_results') or
                                self.app.conf.CELERY_MAX_CACHED_RESULTS)
 
+    def is_cached(self, task_id):
+        return task_id in self._cache
+
     def store_result(self, task_id, result, status, traceback=None, **kwargs):
         """Store task result and status."""
         result = self.encode_result(result, status)
@@ -463,7 +480,7 @@ class KeyValueStoreBackend(BaseDictBackend):
         else:
             self.fallback_chord_unlock(group_id, body, result, **kwargs)
 
-    def on_chord_part_return(self, task, propagate=False):
+    def on_chord_part_return(self, task, propagate=True):
         if not self.implements_incr:
             return
         from celery import subtask
@@ -475,9 +492,21 @@ class KeyValueStoreBackend(BaseDictBackend):
         deps = GroupResult.restore(gid, backend=task.backend)
         val = self.incr(key)
         if val >= len(deps):
-            subtask(task.request.chord).delay(deps.join(propagate=propagate))
-            deps.delete()
-            self.client.delete(key)
+            j = deps.join_native if deps.supports_native_join else deps.join
+            callback = subtask(task.request.chord)
+            try:
+                ret = j(propagate=propagate)
+            except Exception, exc:
+                culprit = deps._failed_join_report().next()
+                self.app._tasks[callback.task].backend.fail_from_current_stack(
+                    callback.id, exc=ChordError('Dependency %s raised %r' % (
+                        culprit.id, exc))
+                )
+            else:
+                callback.delay(ret)
+            finally:
+                deps.delete()
+                self.client.delete(key)
         else:
             self.expire(key, 86400)
 

+ 4 - 0
celery/exceptions.py

@@ -124,3 +124,7 @@ class CDeprecationWarning(DeprecationWarning):
 
 class IncompleteStream(Exception):
     """Found the end of a stream of data, but the data is not yet complete."""
+
+
+class ChordError(Exception):
+    """A task part of the chord raised an exception."""

+ 6 - 0
celery/result.py

@@ -535,6 +535,12 @@ class ResultSet(ResultBase):
             acc[results.index(task_id)] = meta['result']
         return acc
 
+    def _failed_join_report(self):
+        for res in self.results:
+            if (res.backend.is_cached(res.id) and
+                    res.state in states.PROPAGATE_STATES):
+                yield res
+
     def __len__(self):
         return len(self.results)
 

+ 41 - 3
docs/userguide/canvas.rst

@@ -692,8 +692,8 @@ get the sum of the resulting numbers::
     >>> from celery import chord
     >>> from tasks import add, tsum
 
-    >>> chord(add.subtask((i, i))
-    ...     for i in xrange(100))(tsum.subtask()).get()
+    >>> chord(add.s(i, i)
+    ...       for i in xrange(100))(tsum.s()).get()
     9900
 
 
@@ -706,7 +706,9 @@ The synchronization step is costly, so you should avoid using chords as much
 as possible. Still, the chord is a powerful primitive to have in your toolbox
 as synchronization is a required step for many parallel algorithms.
 
-Let's break the chord expression down::
+Let's break the chord expression down:
+
+.. code-block:: python
 
     >>> callback = tsum.subtask()
     >>> header = [add.subtask((i, i)) for i in xrange(100)]
@@ -722,6 +724,42 @@ the return value of each task in the header.  The task id returned by
 and get the final return value (but remember to :ref:`never have a task wait
 for other tasks <task-synchronous-subtasks>`)
 
+Error handling
+~~~~~~~~~~~~~~
+
+.. versionadded:: 3.0.14
+
+So what happens if one of the tasks raises an exception?
+
+Errors will propagate to the callback, so the callback will not be executed
+instead the callback changes to failure state, and the error is set
+to the :exc:`~celery.exceptions.ChordError` exception:
+
+.. code-block:: python
+
+    >>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
+    >>> result = c()
+    >>> result.get()
+    Traceback (most recent call last):
+      File "<stdin>", line 1, in <module>
+      File "*/celery/result.py", line 120, in get
+        interval=interval)
+      File "*/celery/backends/amqp.py", line 150, in wait_for
+        raise self.exception_to_python(meta['result'])
+    celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
+        raised ValueError('something something',)
+
+
+While the traceback may be different depending on which result backend is
+being used, you can see the error description includes the id of the task that failed
+and a string representation of the original exception.  You can also
+find the original traceback in ``result.traceback``.
+
+Note that the rest of the tasks will still execute, so the third task
+(``add.s(8, 8)``) is still executed even though the middle task failed.
+Also the :exc:`~celery.exceptions.ChordError` only shows the task that failed
+first (in time): it does not respect the ordering of the header group.
+
 .. _chord-important-notes:
 
 Important Notes