|
@@ -581,6 +581,15 @@ class chord(Signature):
|
|
|
)
|
|
|
self.subtask_type = 'chord'
|
|
|
|
|
|
+ def apply(self, partial_args=(), args=(), kwargs={}, **options):
|
|
|
+ """Apply the chord task with partial_args."""
|
|
|
+ # partial_args which will passed on to the header tasks are put in
|
|
|
+ # kwargs to be passed on.
|
|
|
+ kwargs['partial_args'] = partial_args
|
|
|
+ # For callbacks: extra args are prepended to the stored args.
|
|
|
+ args, kwargs, options = self._merge(args, kwargs, options)
|
|
|
+ return self.type.apply(args, kwargs, **options)
|
|
|
+
|
|
|
def freeze(self, _id=None, group_id=None, chord=None):
|
|
|
return self.body.freeze(_id, group_id=group_id, chord=chord)
|
|
|
|
|
@@ -615,9 +624,12 @@ class chord(Signature):
|
|
|
return self._type
|
|
|
return self.app.tasks['celery.chord']
|
|
|
|
|
|
- def apply_async(self, args=(), kwargs={}, task_id=None,
|
|
|
- producer=None, publisher=None, connection=None,
|
|
|
- router=None, result_cls=None, **options):
|
|
|
+ def delay(self, *partial_args, **partial_kwargs):
|
|
|
+ # There's no partial_kwargs for chord.
|
|
|
+ return self.apply_async(partial_args)
|
|
|
+
|
|
|
+ def apply_async(self, partial_args=(), args=(), kwargs={}, task_id=None,
|
|
|
+ **options):
|
|
|
args = (tuple(args) + tuple(self.args)
|
|
|
if args and not self.immutable else self.args)
|
|
|
body = kwargs.get('body') or self.kwargs['body']
|
|
@@ -626,14 +638,14 @@ class chord(Signature):
|
|
|
|
|
|
_chord = self.type
|
|
|
if _chord.app.conf.CELERY_ALWAYS_EAGER:
|
|
|
- return self.apply((), kwargs, task_id=task_id, **options)
|
|
|
+ return self.apply(partial_args, args, kwargs, task_id=task_id, **options)
|
|
|
res = body.freeze(task_id)
|
|
|
- parent = _chord(self.tasks, body, args, **options)
|
|
|
+ parent = _chord(self.tasks, body, partial_args, **options)
|
|
|
res.parent = parent
|
|
|
return res
|
|
|
|
|
|
def __call__(self, body=None, **options):
|
|
|
- return self.apply_async((), {'body': body} if body else {}, **options)
|
|
|
+ return self.apply_async((), (), {'body': body} if body else {}, **options)
|
|
|
|
|
|
def clone(self, *args, **kwargs):
|
|
|
s = Signature.clone(self, *args, **kwargs)
|