|
@@ -245,13 +245,12 @@ def add_chain_task(app):
|
|
|
tasks[0].apply_async()
|
|
|
return result
|
|
|
|
|
|
- def apply(self, args=(), kwargs={}, **options):
|
|
|
- tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']]
|
|
|
- res = prev = None
|
|
|
- for task in tasks:
|
|
|
- res = task.apply((prev.get(), ) if prev else ())
|
|
|
- res.parent, prev = prev, res
|
|
|
- return res
|
|
|
+ def apply(self, args=(), kwargs={}, subtask=maybe_subtask, **options):
|
|
|
+ last, fargs = None, args # fargs passed to first task only
|
|
|
+ for task in kwargs['tasks']:
|
|
|
+ res = subtask(task).clone(fargs).apply(last and (last.get(), ))
|
|
|
+ res.parent, last, fargs = last, res, None
|
|
|
+ return last
|
|
|
return Chain
|
|
|
|
|
|
|