|
@@ -355,12 +355,18 @@ class chain(Signature):
|
|
|
if self.tasks:
|
|
|
return self.apply_async(args, kwargs)
|
|
|
|
|
|
- def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
|
|
|
- task_id=None, link=None, link_error=None,
|
|
|
- publisher=None, root_id=None, **options):
|
|
|
+ def apply_async(self, args=(), kwargs={}, **options):
|
|
|
+ # python is best at unpacking kwargs, so .run is here to do that.
|
|
|
app = self.app
|
|
|
if app.conf.CELERY_ALWAYS_EAGER:
|
|
|
return self.apply(args, kwargs, **options)
|
|
|
+ return self.run(args, kwargs, app=app, **(
|
|
|
+ dict(self.options, **options) if options else self.options))
|
|
|
+
|
|
|
+ def run(self, args=(), kwargs={}, group_id=None, chord=None,
|
|
|
+ task_id=None, link=None, link_error=None,
|
|
|
+ publisher=None, producer=None, root_id=None, app=None, **options):
|
|
|
+ app = app or self.app
|
|
|
tasks, results = self.prepare_steps(
|
|
|
args, self.tasks, root_id, link_error,
|
|
|
)
|
|
@@ -443,8 +449,7 @@ class chain(Signature):
|
|
|
last, fargs = None, args
|
|
|
for task in self.tasks:
|
|
|
res = task.clone(fargs).apply(
|
|
|
- last and (last.get(), ), **options
|
|
|
- )
|
|
|
+ last and (last.get(), ), **dict(self.options, **options))
|
|
|
res.parent, last, fargs = last, res, None
|
|
|
return last
|
|
|
|