|
@@ -428,6 +428,7 @@ class KeyValueStoreBackend(BaseBackend):
|
|
return
|
|
return
|
|
from celery import subtask
|
|
from celery import subtask
|
|
from celery.result import GroupResult
|
|
from celery.result import GroupResult
|
|
|
|
+ app = self.app
|
|
if propagate is None:
|
|
if propagate is None:
|
|
propagate = self.app.conf.CELERY_CHORD_PROPAGATES
|
|
propagate = self.app.conf.CELERY_CHORD_PROPAGATES
|
|
gid = task.request.group
|
|
gid = task.request.group
|
|
@@ -442,13 +443,25 @@ class KeyValueStoreBackend(BaseBackend):
|
|
try:
|
|
try:
|
|
ret = j(propagate=propagate)
|
|
ret = j(propagate=propagate)
|
|
except Exception as exc:
|
|
except Exception as exc:
|
|
- culprit = next(deps._failed_join_report())
|
|
|
|
- self.app._tasks[callback.task].backend.fail_from_current_stack(
|
|
|
|
- callback.id, exc=ChordError('Dependency %s raised %r' % (
|
|
|
|
- culprit.id, exc))
|
|
|
|
|
|
+ try:
|
|
|
|
+ culprit = next(deps._failed_join_report())
|
|
|
|
+ reason = 'Dependency {0.id} raised {1!r}'.format(
|
|
|
|
+ culprit, exc,
|
|
|
|
+ )
|
|
|
|
+ except StopIteration:
|
|
|
|
+ reason = repr(exc)
|
|
|
|
+
|
|
|
|
+ app._tasks[callback.task].backend.fail_from_current_stack(
|
|
|
|
+ callback.id, exc=ChordError(reason),
|
|
)
|
|
)
|
|
else:
|
|
else:
|
|
- callback.delay(ret)
|
|
|
|
|
|
+ try:
|
|
|
|
+ callback.delay(ret)
|
|
|
|
+ except Exception as exc:
|
|
|
|
+ app._tasks[callback.task].backend.fail_from_current_stack(
|
|
|
|
+ callback.id,
|
|
|
|
+ exc=ChordError('Callback error: {0!r}'.format(exc)),
|
|
|
|
+ )
|
|
finally:
|
|
finally:
|
|
deps.delete()
|
|
deps.delete()
|
|
self.client.delete(key)
|
|
self.client.delete(key)
|