|
@@ -219,6 +219,8 @@ def add_chord_task(app):
|
|
|
opts["chord"] = body
|
|
|
opts["taskset_id"] = setid
|
|
|
r.append(app.AsyncResult(tid))
|
|
|
+ if eager:
|
|
|
+ return header.apply(task_id=setid)
|
|
|
app.backend.on_chord_apply(setid, body,
|
|
|
interval=interval,
|
|
|
max_retries=max_retries,
|
|
@@ -238,9 +240,11 @@ def add_chord_task(app):
|
|
|
body_result.parent = parent
|
|
|
return body_result
|
|
|
|
|
|
- def apply(self, args=(), kwargs={}, **options):
|
|
|
+ def apply(self, args=(), kwargs={}, propagate=True, **options):
|
|
|
body = kwargs["body"]
|
|
|
- res = super(Chord, self).apply(args, kwargs, **options)
|
|
|
- return maybe_subtask(body).apply(args=(res.get().join(), ))
|
|
|
+ res = super(Chord, self).apply(args, dict(kwargs, eager=True),
|
|
|
+ **options)
|
|
|
+ return maybe_subtask(body).apply(
|
|
|
+ args=(res.get(propagate=propagate).get().join(), ))
|
|
|
|
|
|
return Chord
|