|
@@ -185,7 +185,7 @@ def add_group_task(app):
|
|
|
|
|
|
@shared_task
|
|
@shared_task
|
|
def add_chain_task(app):
|
|
def add_chain_task(app):
|
|
- from celery.canvas import chord, group, maybe_subtask
|
|
|
|
|
|
+ from celery.canvas import Signature, chord, group, maybe_subtask
|
|
_app = app
|
|
_app = app
|
|
|
|
|
|
class Chain(app.Task):
|
|
class Chain(app.Task):
|
|
@@ -209,6 +209,9 @@ def add_chain_task(app):
|
|
# automatically upgrade group(..) | s to chord(group, s)
|
|
# automatically upgrade group(..) | s to chord(group, s)
|
|
try:
|
|
try:
|
|
next_step = steps.popleft()
|
|
next_step = steps.popleft()
|
|
|
|
+ # for chords we freeze by pretending it's a normal
|
|
|
|
+ # task instead of a group.
|
|
|
|
+ res = Signature._freeze(task)
|
|
task = chord(task, body=next_step, task_id=res.task_id)
|
|
task = chord(task, body=next_step, task_id=res.task_id)
|
|
except IndexError:
|
|
except IndexError:
|
|
pass
|
|
pass
|