|
@@ -581,11 +581,7 @@ class chord(Signature):
|
|
|
)
|
|
|
self.subtask_type = 'chord'
|
|
|
|
|
|
- def apply(self, partial_args=(), args=(), kwargs={}, **options):
|
|
|
- """Apply the chord task with partial_args."""
|
|
|
- # partial_args which will passed on to the header tasks are put in
|
|
|
- # kwargs to be passed on.
|
|
|
- kwargs['partial_args'] = partial_args
|
|
|
+ def apply(self, args=(), kwargs={}, **options):
|
|
|
# For callbacks: extra args are prepended to the stored args.
|
|
|
args, kwargs, options = self._merge(args, kwargs, options)
|
|
|
return self.type.apply(args, kwargs, **options)
|
|
@@ -628,8 +624,9 @@ class chord(Signature):
|
|
|
# There's no partial_kwargs for chord.
|
|
|
return self.apply_async(partial_args)
|
|
|
|
|
|
- def apply_async(self, partial_args=(), args=(), kwargs={}, task_id=None,
|
|
|
- **options):
|
|
|
+ def apply_async(self, args=(), kwargs={}, task_id=None,
|
|
|
+ producer=None, publisher=None, connection=None,
|
|
|
+ router=None, result_cls=None, **options):
|
|
|
args = (tuple(args) + tuple(self.args)
|
|
|
if args and not self.immutable else self.args)
|
|
|
body = kwargs.get('body') or self.kwargs['body']
|
|
@@ -638,16 +635,15 @@ class chord(Signature):
|
|
|
|
|
|
_chord = self.type
|
|
|
if _chord.app.conf.CELERY_ALWAYS_EAGER:
|
|
|
- return self.apply(partial_args, args, kwargs,
|
|
|
- task_id=task_id, **options)
|
|
|
+ return self.apply(args, kwargs, task_id=task_id, **options)
|
|
|
res = body.freeze(task_id)
|
|
|
- parent = _chord(self.tasks, body, partial_args, **options)
|
|
|
+ parent = _chord(self.tasks, body, args, **options)
|
|
|
res.parent = parent
|
|
|
return res
|
|
|
|
|
|
def __call__(self, body=None, **options):
|
|
|
return self.apply_async(
|
|
|
- (), (), {'body': body} if body else {}, **options)
|
|
|
+ (), {'body': body} if body else {}, **options)
|
|
|
|
|
|
def clone(self, *args, **kwargs):
|
|
|
s = Signature.clone(self, *args, **kwargs)
|