|
@@ -717,11 +717,12 @@ class group(Signature):
|
|
|
yield task, task.freeze(group_id=group_id, root_id=root_id)
|
|
|
|
|
|
def _apply_tasks(self, tasks, producer=None, app=None,
|
|
|
- add_to_parent=None, **options):
|
|
|
+ add_to_parent=None, chord=None, **options):
|
|
|
app = app or self.app
|
|
|
with app.producer_or_acquire(producer) as producer:
|
|
|
for sig, res in tasks:
|
|
|
sig.apply_async(producer=producer, add_to_parent=False,
|
|
|
+ chord=sig.options.get('chord') or chord,
|
|
|
**options)
|
|
|
yield res # <-- r.parent, etc set in the frozen result.
|
|
|
|
|
@@ -868,9 +869,11 @@ class chord(Signature):
|
|
|
root_id=None, parent_id=None):
|
|
|
if not isinstance(self.tasks, group):
|
|
|
self.tasks = group(self.tasks)
|
|
|
- self.tasks.freeze(parent_id=parent_id, root_id=root_id)
|
|
|
+ bodyres = self.body.freeze(_id, parent_id=self.id, root_id=root_id)
|
|
|
+ self.tasks.freeze(parent_id=parent_id, root_id=root_id, chord=self.body)
|
|
|
self.id = self.tasks.id
|
|
|
- return self.body.freeze(_id, parent_id=self.id, root_id=root_id)
|
|
|
+ self.body.set_parent_id(self.id)
|
|
|
+ return bodyres
|
|
|
|
|
|
def set_parent_id(self, parent_id):
|
|
|
tasks = self.tasks
|