chord.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. from kombu.utils import gen_unique_id
  2. from celery import current_app
  3. from celery.result import TaskSetResult
  4. from celery.task.sets import TaskSet, subtask
  5. @current_app.task(name="celery.chord_unlock", max_retries=None)
  6. def _unlock_chord(setid, callback, interval=1, max_retries=None):
  7. result = TaskSetResult.restore(setid)
  8. if result.ready():
  9. return subtask(callback).delay(result.join())
  10. _unlock_chord.retry(countdown=interval, max_retries=max_retries)
  11. class Chord(current_app.Task):
  12. accept_magic_kwargs = False
  13. name = "celery.chord"
  14. def run(self, set, body, interval=1, max_retries=None, **kwargs):
  15. if not isinstance(set, TaskSet):
  16. set = TaskSet(set)
  17. r = []
  18. setid = gen_unique_id()
  19. for task in set.tasks:
  20. uuid = gen_unique_id()
  21. task.options.update(task_id=uuid, chord=body)
  22. r.append(current_app.AsyncResult(uuid))
  23. current_app.TaskSetResult(setid, r).save()
  24. self.backend.on_chord_apply(setid, body, interval, max_retries)
  25. return set.apply_async(taskset_id=setid)
  26. class chord(object):
  27. Chord = Chord
  28. def __init__(self, tasks, **options):
  29. self.tasks = tasks
  30. self.options = options
  31. def __call__(self, body, **options):
  32. uuid = body.options.setdefault("task_id", gen_unique_id())
  33. self.Chord.apply_async((list(self.tasks), body), self.options,
  34. **options)
  35. return body.type.app.AsyncResult(uuid)