Sfoglia il codice sorgente

Must now enable CELERY_CHORD_PROPAGATES to get the new chord error behavior

Ask Solem 12 anni fa
parent
commit
fe39c9eba0

+ 20 - 6
celery/app/builtins.py

@@ -71,14 +71,22 @@ def add_unlock_chord_task(app):
     from celery.exceptions import ChordError
     from celery.exceptions import ChordError
     from celery.result import from_serializable
     from celery.result import from_serializable
 
 
+    default_propagate = app.conf.CELERY_CHORD_PROPAGATES
+
     @app.task(name='celery.chord_unlock', max_retries=None,
     @app.task(name='celery.chord_unlock', max_retries=None,
               default_retry_delay=1, ignore_result=True, _force_evaluate=True)
               default_retry_delay=1, ignore_result=True, _force_evaluate=True)
-    def unlock_chord(group_id, callback, interval=None, propagate=True,
+    def unlock_chord(group_id, callback, interval=None, propagate=None,
                      max_retries=None, result=None,
                      max_retries=None, result=None,
                      Result=app.AsyncResult, GroupResult=app.GroupResult,
                      Result=app.AsyncResult, GroupResult=app.GroupResult,
                      from_serializable=from_serializable):
                      from_serializable=from_serializable):
-        if interval is None:
-            interval = unlock_chord.default_retry_delay
+        # if propagate is disabled exceptions raised by chord tasks
+        # will be sent as part of the result list to the chord callback.
+        # Since 3.1 propagate will be enabled by default, and instead
+        # the chord callback changes state to FAILURE with the
+        # exception set to ChordError.
+        propagate = default_propagate if propagate is None else propagate
+
+        # check if the task group is ready, and if so apply the callback.
         deps = GroupResult(
         deps = GroupResult(
             group_id,
             group_id,
             [from_serializable(r, Result=Result) for r in result],
             [from_serializable(r, Result=Result) for r in result],
@@ -91,13 +99,17 @@ def add_unlock_chord_task(app):
                 ret = j(propagate=propagate)
                 ret = j(propagate=propagate)
             except Exception, exc:
             except Exception, exc:
                 culprit = deps._failed_join_report().next()
                 culprit = deps._failed_join_report().next()
-
                 app._tasks[callback.task].backend.fail_from_current_stack(
                 app._tasks[callback.task].backend.fail_from_current_stack(
                     callback.id, exc=ChordError('Dependency %s raised %r' % (
                     callback.id, exc=ChordError('Dependency %s raised %r' % (
                         culprit.id, exc)),
                         culprit.id, exc)),
                 )
                 )
             else:
             else:
-                callback.delay(ret)
+                try:
+                    callback.delay(ret)
+                except Exception, exc:
+                    app._tasks[callback.task].backend.fail_from_current_stack(
+                        callback.id,
+                        exc=ChordError('Call callback error: %r' % (exc, )))
         else:
         else:
             return unlock_chord.retry(countdown=interval,
             return unlock_chord.retry(countdown=interval,
                                       max_retries=max_retries)
                                       max_retries=max_retries)
@@ -284,6 +296,7 @@ def add_chord_task(app):
     from celery import group
     from celery import group
     from celery.canvas import maybe_subtask
     from celery.canvas import maybe_subtask
     _app = app
     _app = app
+    default_propagate = app.conf.CELERY_CHORD_PROPAGATES
 
 
     class Chord(app.Task):
     class Chord(app.Task):
         app = _app
         app = _app
@@ -292,7 +305,8 @@ def add_chord_task(app):
         ignore_result = False
         ignore_result = False
 
 
         def run(self, header, body, partial_args=(), interval=1, countdown=1,
         def run(self, header, body, partial_args=(), interval=1, countdown=1,
-                max_retries=None, propagate=True, eager=False, **kwargs):
+                max_retries=None, propagate=None, eager=False, **kwargs):
+            propagate = default_propagate if propagate is None else propagate
             group_id = uuid()
             group_id = uuid()
             AsyncResult = self.app.AsyncResult
             AsyncResult = self.app.AsyncResult
             prepare_member = self._prepare_member
             prepare_member = self._prepare_member

+ 2 - 0
celery/app/defaults.py

@@ -105,6 +105,8 @@ NAMESPACES = {
         'BROADCAST_EXCHANGE_TYPE': Option('fanout'),
         'BROADCAST_EXCHANGE_TYPE': Option('fanout'),
         'CACHE_BACKEND': Option(),
         'CACHE_BACKEND': Option(),
         'CACHE_BACKEND_OPTIONS': Option({}, type='dict'),
         'CACHE_BACKEND_OPTIONS': Option({}, type='dict'),
+        # chord propagate will be True from v3.1
+        'CHORD_PROPAGATES': Option(False, type='bool'),
         'CREATE_MISSING_QUEUES': Option(True, type='bool'),
         'CREATE_MISSING_QUEUES': Option(True, type='bool'),
         'DEFAULT_RATE_LIMIT': Option(type='string'),
         'DEFAULT_RATE_LIMIT': Option(type='string'),
         'DISABLE_RATE_LIMITS': Option(False, type='bool'),
         'DISABLE_RATE_LIMITS': Option(False, type='bool'),

+ 3 - 1
celery/backends/base.py

@@ -480,11 +480,13 @@ class KeyValueStoreBackend(BaseDictBackend):
         else:
         else:
             self.fallback_chord_unlock(group_id, body, result, **kwargs)
             self.fallback_chord_unlock(group_id, body, result, **kwargs)
 
 
-    def on_chord_part_return(self, task, propagate=True):
+    def on_chord_part_return(self, task, propagate=None):
         if not self.implements_incr:
         if not self.implements_incr:
             return
             return
         from celery import subtask
         from celery import subtask
         from celery.result import GroupResult
         from celery.result import GroupResult
+        if propagate is None:
+            propagate = self.app.conf.CELERY_CHORD_PROPAGATES
         gid = task.request.group
         gid = task.request.group
         if not gid:
         if not gid:
             return
             return

+ 8 - 4
celery/result.py

@@ -711,9 +711,13 @@ class EagerResult(AsyncResult):
 def from_serializable(r, Result=AsyncResult):
 def from_serializable(r, Result=AsyncResult):
     # earlier backends may just pickle, so check if
     # earlier backends may just pickle, so check if
     # result is already prepared.
     # result is already prepared.
+    print('R IS: %r' % (r, ))
     if not isinstance(r, ResultBase):
     if not isinstance(r, ResultBase):
-        id, nodes = r
-        if nodes:
-            return GroupResult(id, [Result(id) for id, _ in nodes])
-        return AsyncResult(id)
+        if isinstance(r, (list, tuple)):
+            id, nodes = r
+            if nodes:
+                return GroupResult(id, [Result(id) for id, _ in nodes])
+            return AsyncResult(id)
+        else:
+            return AsyncResult(r)
     return r
     return r

+ 21 - 0
docs/configuration.rst

@@ -976,6 +976,27 @@ Result backends caches ready results used by the client.
 This is the total number of results to cache before older results are evicted.
 This is the total number of results to cache before older results are evicted.
 The default is 5000.
 The default is 5000.
 
 
+.. setting:: CELERY_CHORD_PROPAGATES
+
+CELERY_CHORD_PROPAGATES
+~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.0.14
+
+This setting defines what happens when a task part of a chord raises an
+exception:
+
+- If propagate is True the chord callback will change state to FAILURE
+  with the exception value set to a :exc:`~celery.exceptions.ChordError`
+  instance containing information about the error and the task that failed.
+
+    This is the default behavior in Celery 3.1+
+
+- If propagate is False the exception value will instead be forwarded
+  to the chord callback.
+
+    This was the default behavior before version 3.1.
+
 .. setting:: CELERY_TRACK_STARTED
 .. setting:: CELERY_TRACK_STARTED
 
 
 CELERY_TRACK_STARTED
 CELERY_TRACK_STARTED

+ 9 - 3
docs/userguide/canvas.rst

@@ -729,11 +729,13 @@ for other tasks <task-synchronous-subtasks>`)
 Error handling
 Error handling
 ~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~
 
 
-.. versionadded:: 3.0.14
-
 So what happens if one of the tasks raises an exception?
 So what happens if one of the tasks raises an exception?
 
 
-Errors will propagate to the callback, so the callback will not be executed
+This was not documented for some time and before version 3.1
+the exception value will be forwarded to the chord callback.
+
+
+From 3.1 errors will propagate to the callback, so the callback will not be executed
 instead the callback changes to failure state, and the error is set
 instead the callback changes to failure state, and the error is set
 to the :exc:`~celery.exceptions.ChordError` exception:
 to the :exc:`~celery.exceptions.ChordError` exception:
 
 
@@ -751,6 +753,10 @@ to the :exc:`~celery.exceptions.ChordError` exception:
     celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
     celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
         raised ValueError('something something',)
         raised ValueError('something something',)
 
 
+If you're running 3.0.14 or later you can enable the new behavior via
+the :setting:`CELERY_CHORD_PROPAGATES` setting::
+
+    CELERY_CHORD_PROPAGATES = True
 
 
 While the traceback may be different depending on which result backend is
 While the traceback may be different depending on which result backend is
 being used, you can see the error description includes the id of the task that failed
 being used, you can see the error description includes the id of the task that failed