|
@@ -747,6 +747,7 @@ class chain(Signature):
|
|
|
)
|
|
|
prev_res = prev_prev_res
|
|
|
|
|
|
+
|
|
|
if is_last_task:
|
|
|
# chain(task_id=id) means task id is set for the last task
|
|
|
# in the chain. If the chord is part of a chord/group
|
|
@@ -770,13 +771,14 @@ class chain(Signature):
|
|
|
task.link(prev_task)
|
|
|
|
|
|
if prev_res:
|
|
|
- if prev_res.parent:
|
|
|
+ if isinstance(prev_task, chord):
|
|
|
# If previous task was a chord,
|
|
|
# the freeze above would have set a parent for
|
|
|
# us, but we'd be overwriting it here.
|
|
|
|
|
|
# so fix this relationship so it's:
|
|
|
# chord body -> group -> THIS RES
|
|
|
+ assert isinstance(prev_res.parent, GroupResult)
|
|
|
prev_res.parent.parent = res
|
|
|
else:
|
|
|
prev_res.parent = res
|
|
@@ -1235,9 +1237,11 @@ class chord(Signature):
|
|
|
# XXX chord is also a class in outer scope.
|
|
|
if not isinstance(self.tasks, group):
|
|
|
self.tasks = group(self.tasks, app=self.app)
|
|
|
- bodyres = self.body.freeze(_id, parent_id=self.id, root_id=root_id)
|
|
|
- bodyres.parent = self.tasks.freeze(
|
|
|
+ header_result = self.tasks.freeze(
|
|
|
parent_id=parent_id, root_id=root_id, chord=self.body)
|
|
|
+ bodyres = self.body.freeze(
|
|
|
+ _id, parent_id=header_result.id, root_id=root_id)
|
|
|
+ bodyres.parent = header_result
|
|
|
self.id = self.tasks.id
|
|
|
self.body.set_parent_id(self.id)
|
|
|
return bodyres
|
|
@@ -1291,7 +1295,7 @@ class chord(Signature):
|
|
|
countdown=1, max_retries=None, eager=False,
|
|
|
task_id=None, **options):
|
|
|
app = app or self._get_app(body)
|
|
|
- group_id = uuid()
|
|
|
+ group_id = header.options.get('task_id') or uuid()
|
|
|
root_id = body.options.get('root_id')
|
|
|
body.chord_size = self.__length_hint__()
|
|
|
options = dict(self.options, **options) if options else self.options
|
|
@@ -1301,6 +1305,7 @@ class chord(Signature):
|
|
|
|
|
|
results = header.freeze(
|
|
|
group_id=group_id, chord=body, root_id=root_id).results
|
|
|
+ body.set_parent_id(group_id)
|
|
|
bodyres = body.freeze(task_id, root_id=root_id)
|
|
|
|
|
|
parent = app.backend.apply_chord(
|