|
@@ -17,37 +17,16 @@ from ..utils import uuid
|
|
|
from .sets import TaskSet
|
|
|
|
|
|
|
|
|
-class Chord(current_app.Task):
|
|
|
- accept_magic_kwargs = False
|
|
|
- name = "celery.chord"
|
|
|
-
|
|
|
- def run(self, set, body, interval=1, max_retries=None,
|
|
|
- propagate=False, **kwargs):
|
|
|
- if not isinstance(set, TaskSet):
|
|
|
- set = TaskSet(set)
|
|
|
- r = []
|
|
|
- setid = uuid()
|
|
|
- for task in set.tasks:
|
|
|
- tid = uuid()
|
|
|
- task.options.update(task_id=tid, chord=body)
|
|
|
- r.append(current_app.AsyncResult(tid))
|
|
|
- self.backend.on_chord_apply(setid, body,
|
|
|
- interval=interval,
|
|
|
- max_retries=max_retries,
|
|
|
- propagate=propagate,
|
|
|
- result=r)
|
|
|
- return set.apply_async(taskset_id=setid)
|
|
|
-
|
|
|
-
|
|
|
class chord(object):
|
|
|
- Chord = Chord
|
|
|
+ Chord = None
|
|
|
|
|
|
def __init__(self, tasks, **options):
|
|
|
self.tasks = tasks
|
|
|
self.options = options
|
|
|
+ self.Chord = self.Chord or current_app.tasks["celery.chord"]
|
|
|
|
|
|
def __call__(self, body, **options):
|
|
|
tid = body.options.setdefault("task_id", uuid())
|
|
|
self.Chord.apply_async((list(self.tasks), body), self.options,
|
|
|
**options)
|
|
|
- return body.type.app.AsyncResult(tid)
|
|
|
+ return body.type.AsyncResult(tid)
|