tasks.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # Example::
  2. # >>> R = A.apply_async()
  3. # >>> list(joinall(R))
  4. # [['A 0', 'A 1', 'A 2', 'A 3', 'A 4', 'A 5', 'A 6', 'A 7', 'A 8', 'A 9'],
  5. # ['B 0', 'B 1', 'B 2', 'B 3', 'B 4', 'B 5', 'B 6', 'B 7', 'B 8', 'B 9'],
  6. # ['C 0', 'C 1', 'C 2', 'C 3', 'C 4', 'C 5', 'C 6', 'C 7', 'C 8', 'C 9'],
  7. # ['D 0', 'D 1', 'D 2', 'D 3', 'D 4', 'D 5', 'D 6', 'D 7', 'D 8', 'D 9'],
  8. # ['E 0', 'E 1', 'E 2', 'E 3', 'E 4', 'E 5', 'E 6', 'E 7', 'E 8', 'E 9'],
  9. # ['F 0', 'F 1', 'F 2', 'F 3', 'F 4', 'F 5', 'F 6', 'F 7', 'F 8', 'F 9'],
  10. # ['G 0', 'G 1', 'G 2', 'G 3', 'G 4', 'G 5', 'G 6', 'G 7', 'G 8', 'G 9'],
  11. # ['H 0', 'H 1', 'H 2', 'H 3', 'H 4', 'H 5', 'H 6', 'H 7', 'H 8', 'H 9']]
  12. #
  13. #
  14. # Joining the graph asynchronously with a callback
  15. # (Note: only two levels, the deps are considered final
  16. # when the second task is ready.)
  17. #
  18. # >>> unlock_graph.apply_async((A.apply_async(),
  19. # ... A_callback.subtask()), countdown=1)
  20. from celery import chord, group, task, signature, uuid
  21. from celery.result import AsyncResult, ResultSet, allow_join_result
  22. from collections import deque
  23. @task()
  24. def add(x, y):
  25. return x + y
  26. @task()
  27. def make_request(id, url):
  28. print('GET {0!r}'.format(url))
  29. return url
  30. @task()
  31. def B_callback(urls, id):
  32. print('batch {0} done'.format(id))
  33. return urls
  34. @task()
  35. def B(id):
  36. return chord(
  37. make_request.s(id, '{0} {1!r}'.format(id, i))
  38. for i in range(10)
  39. )(B_callback.s(id))
  40. @task()
  41. def A():
  42. return group(B.s(c) for c in 'ABCDEFGH').apply_async()
  43. def joinall(R, timeout=None, propagate=True):
  44. stack = deque([R])
  45. try:
  46. use_native = joinall.backend.supports_native_join
  47. except AttributeError:
  48. use_native = False
  49. while stack:
  50. res = stack.popleft()
  51. if isinstance(res, ResultSet):
  52. j = res.join_native if use_native else res.join
  53. stack.extend(j(timeout=timeout, propagate=propagate))
  54. elif isinstance(res, AsyncResult):
  55. stack.append(res.get(timeout=timeout, propagate=propagate))
  56. else:
  57. yield res
  58. @task()
  59. def unlock_graph(result, callback,
  60. interval=1, propagate=False, max_retries=None):
  61. if result.ready():
  62. second_level_res = result.get()
  63. if second_level_res.ready():
  64. with allow_join_result():
  65. signature(callback).delay(list(joinall(
  66. second_level_res, propagate=propagate)))
  67. else:
  68. unlock_graph.retry(countdown=interval, max_retries=max_retries)
  69. @task()
  70. def A_callback(res):
  71. print('Everything is done: {0!r}'.format(res))
  72. return res
  73. class chord2(object):
  74. def __init__(self, tasks, **options):
  75. self.tasks = tasks
  76. self.options = options
  77. def __call__(self, body, **options):
  78. body.options.setdefault('task_id', uuid())
  79. unlock_graph.apply_async()