|
@@ -746,6 +746,7 @@ class chain(Signature):
|
|
|
task_id=prev_res.task_id, root_id=root_id, app=app,
|
|
|
)
|
|
|
prev_res = prev_prev_res
|
|
|
+
|
|
|
if is_last_task:
|
|
|
# chain(task_id=id) means task id is set for the last task
|
|
|
# in the chain. If the chord is part of a chord/group
|
|
@@ -769,7 +770,16 @@ class chain(Signature):
|
|
|
task.link(prev_task)
|
|
|
|
|
|
if prev_res:
|
|
|
- prev_res.parent = res
|
|
|
+ if prev_res.parent:
|
|
|
+ # If previous task was a chord,
|
|
|
+ # the freeze above would have set a parent for
|
|
|
+ # us, but we'd be overwriting it here.
|
|
|
+
|
|
|
+ # so fix this relationship so it's:
|
|
|
+ # chord body -> group -> THIS RES
|
|
|
+ prev_res.parent.parent = res
|
|
|
+ else:
|
|
|
+ prev_res.parent = res
|
|
|
|
|
|
if is_first_task and parent_id is not None:
|
|
|
task.set_parent_id(parent_id)
|
|
@@ -1226,7 +1236,7 @@ class chord(Signature):
|
|
|
if not isinstance(self.tasks, group):
|
|
|
self.tasks = group(self.tasks, app=self.app)
|
|
|
bodyres = self.body.freeze(_id, parent_id=self.id, root_id=root_id)
|
|
|
- self.tasks.freeze(
|
|
|
+ bodyres.parent = self.tasks.freeze(
|
|
|
parent_id=parent_id, root_id=root_id, chord=self.body)
|
|
|
self.id = self.tasks.id
|
|
|
self.body.set_parent_id(self.id)
|