123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- from celery.task import chord, subtask, task, TaskSet
- from celery.result import AsyncResult, ResultSet
- from collections import deque
- @task
- def add(x, y):
- return x + y
- @task
- def make_request(id, url):
- print("GET %r" % (url, ))
- return url
- @task
- def B_callback(urls, id):
- print("batch %s done" % (id, ))
- return urls
- @task
- def B(id):
- return chord(make_request.subtask((id, "%s %r" % (id, i, )))
- for i in xrange(10))(B_callback.subtask((id, )))
- @task
- def A():
- return TaskSet(B.subtask((c, )) for c in "ABCDEFGH").apply_async()
- def joinall(R, timeout=None, propagate=True):
- stack = deque([R])
- try:
- use_native = joinall.backend.supports_native_join
- except AttributeError:
- use_native = False
- while stack:
- res = stack.popleft()
- if isinstance(res, ResultSet):
- j = res.join_native if use_native else res.join
- stack.extend(j(timeout=timeout, propagate=propagate))
- elif isinstance(res, AsyncResult):
- stack.append(res.get(timeout=timeout, propagate=propagate))
- else:
- yield res
- @task
- def unlock_graph(result, callback, interval=1, propagate=False,
- max_retries=None):
- if result.ready():
- second_level_res = result.get()
- if second_level_res.ready():
- subtask(callback).delay(list(joinall(
- second_level_res, propagate=propagate)))
- else:
- unlock_graph.retry(countdown=interval, max_retries=max_retries)
- @task
- def A_callback(res):
- print("Everything is done: %r" % (res, ))
- return res
- class chord2(object):
- def __init__(self, tasks, **options):
- self.tasks = tasks
- self.options = options
- def __call__(self, body, **options):
- body.options.setdefault("task_id", uuid())
- unlock_graph.apply_async()
|