|
@@ -265,27 +265,39 @@ def add_chord_task(app):
|
|
|
|
|
|
def run(self, header, body, partial_args=(), interval=1,
|
|
|
max_retries=None, propagate=False, eager=False, **kwargs):
|
|
|
+ group_id = uuid()
|
|
|
+ AsyncResult = self.app.AsyncResult
|
|
|
+ prepare_member = self._prepare_member
|
|
|
+
|
|
|
+ # - convert back to group if serialized
|
|
|
if not isinstance(header, group):
|
|
|
header = group(map(maybe_subtask, header))
|
|
|
- r = []
|
|
|
- group_id = uuid()
|
|
|
- for task in header.tasks:
|
|
|
- opts = task.options
|
|
|
- try:
|
|
|
- tid = opts['task_id']
|
|
|
- except KeyError:
|
|
|
- tid = opts['task_id'] = uuid()
|
|
|
- opts['chord'] = body
|
|
|
- opts['group_id'] = group_id
|
|
|
- r.append(app.AsyncResult(tid))
|
|
|
+ # - eager applies the group inline
|
|
|
if eager:
|
|
|
return header.apply(args=partial_args, task_id=group_id)
|
|
|
+
|
|
|
+ results = [AsyncResult(prepare_member(task, body, group_id))
|
|
|
+ for task in header.tasks]
|
|
|
+
|
|
|
+ # - fallback implementations schedules the chord_unlock task here
|
|
|
app.backend.on_chord_apply(group_id, body,
|
|
|
interval=interval,
|
|
|
max_retries=max_retries,
|
|
|
propagate=propagate,
|
|
|
- result=r)
|
|
|
- return header(*partial_args, task_id=group_id)
|
|
|
+ result=results)
|
|
|
+ # - call the header group, returning the GroupResult.
|
|
|
+ # XXX Python 2.5 doesn't allow kwargs after star-args.
|
|
|
+ return header(*partial_args, **{'task_id': group_id})
|
|
|
+
|
|
|
+ def _prepare_member(self, task, body, group_id):
|
|
|
+ opts = task.options
|
|
|
+ # d.setdefault would work but generating uuid's are expensive
|
|
|
+ try:
|
|
|
+ task_id = opts['task_id']
|
|
|
+ except KeyError:
|
|
|
+ task_id = opts['task_id'] = uuid()
|
|
|
+ opts.update(chord=body, group_id=group_id)
|
|
|
+ return task_id
|
|
|
|
|
|
def apply_async(self, args=(), kwargs={}, task_id=None, **options):
|
|
|
if self.app.conf.CELERY_ALWAYS_EAGER:
|