|
@@ -170,9 +170,19 @@ def add_chain_task(app):
|
|
|
def apply_async(self, args=(), kwargs={}, **options):
|
|
|
if self.app.conf.CELERY_ALWAYS_EAGER:
|
|
|
return self.apply(args, kwargs, **options)
|
|
|
- tasks = [maybe_subtask(task).clone(task_id=uuid(), **kwargs)
|
|
|
- for task in kwargs["tasks"]]
|
|
|
+ options.pop("publisher", None)
|
|
|
+ taskset_id = options.pop("taskset_id", None)
|
|
|
+ chord = options.pop("chord", None)
|
|
|
+ tasks = [maybe_subtask(t).clone(
|
|
|
+ task_id=options.pop("task_id", uuid()),
|
|
|
+ **options
|
|
|
+ )
|
|
|
+ for t in kwargs["tasks"]]
|
|
|
reduce(lambda a, b: a.link(b), tasks)
|
|
|
+ if taskset_id:
|
|
|
+ tasks[-1].set(taskset_id=taskset_id)
|
|
|
+ if chord:
|
|
|
+ tasks[-1].set(chord=chord)
|
|
|
tasks[0].apply_async()
|
|
|
results = [task.type.AsyncResult(task.options["task_id"])
|
|
|
for task in tasks]
|