|
@@ -184,6 +184,7 @@ def add_group_task(app):
|
|
@shared_task
|
|
@shared_task
|
|
def add_chain_task(app):
|
|
def add_chain_task(app):
|
|
from celery.canvas import chord, group, maybe_subtask
|
|
from celery.canvas import chord, group, maybe_subtask
|
|
|
|
+ from celery.result import GroupResult
|
|
_app = app
|
|
_app = app
|
|
|
|
|
|
class Chain(app.Task):
|
|
class Chain(app.Task):
|
|
@@ -200,20 +201,33 @@ def add_chain_task(app):
|
|
# First task get partial args from chain.
|
|
# First task get partial args from chain.
|
|
task = maybe_subtask(steps.popleft())
|
|
task = maybe_subtask(steps.popleft())
|
|
task = task.clone() if i else task.clone(args)
|
|
task = task.clone() if i else task.clone(args)
|
|
|
|
+ AsyncResult = task.type.AsyncResult
|
|
i += 1
|
|
i += 1
|
|
tid = task.options.get('task_id')
|
|
tid = task.options.get('task_id')
|
|
if tid is None:
|
|
if tid is None:
|
|
tid = task.options['task_id'] = uuid()
|
|
tid = task.options['task_id'] = uuid()
|
|
- res = task.type.AsyncResult(tid)
|
|
|
|
|
|
+ res = AsyncResult(tid)
|
|
|
|
|
|
- # automatically upgrade group(..) | s to chord(group, s)
|
|
|
|
|
|
+ # groups must be turned into GroupResults
|
|
if isinstance(task, group):
|
|
if isinstance(task, group):
|
|
|
|
+ #
|
|
|
|
+ gid = task.options.get('group')
|
|
|
|
+ if gid is None:
|
|
|
|
+ gid = task.options['group'] = uuid()
|
|
|
|
+ group_results = []
|
|
|
|
+ for sub in task.tasks:
|
|
|
|
+ tid = sub.options.get('task_id')
|
|
|
|
+ if tid is None:
|
|
|
|
+ tid = sub.options['task_id'] = uuid()
|
|
|
|
+ group_results.append(AsyncResult(tid))
|
|
|
|
+ res = GroupResult(gid, group_results)
|
|
|
|
+
|
|
|
|
+ # automatically upgrade group(..) | s to chord(group, s)
|
|
try:
|
|
try:
|
|
next_step = steps.popleft()
|
|
next_step = steps.popleft()
|
|
- except IndexError:
|
|
|
|
- next_step = None
|
|
|
|
- if next_step is not None:
|
|
|
|
task = chord(task, body=next_step, task_id=tid)
|
|
task = chord(task, body=next_step, task_id=tid)
|
|
|
|
+ except IndexError:
|
|
|
|
+ res = GroupResult(gid, group_results)
|
|
if prev_task:
|
|
if prev_task:
|
|
# link previous task to this task.
|
|
# link previous task to this task.
|
|
prev_task.link(task)
|
|
prev_task.link(task)
|