|
@@ -1,6 +1,6 @@
|
|
|
# Example::
|
|
|
# >>> R = A.apply_async()
|
|
|
-# >>> list(join_tree(R))
|
|
|
+# >>> list(joinall(R))
|
|
|
# [['A 0', 'A 1', 'A 2', 'A 3', 'A 4', 'A 5', 'A 6', 'A 7', 'A 8', 'A 9'],
|
|
|
# ['B 0', 'B 1', 'B 2', 'B 3', 'B 4', 'B 5', 'B 6', 'B 7', 'B 8', 'B 9'],
|
|
|
# ['C 0', 'C 1', 'C 2', 'C 3', 'C 4', 'C 5', 'C 6', 'C 7', 'C 8', 'C 9'],
|
|
@@ -11,12 +11,12 @@
|
|
|
# ['H 0', 'H 1', 'H 2', 'H 3', 'H 4', 'H 5', 'H 6', 'H 7', 'H 8', 'H 9']]
|
|
|
#
|
|
|
#
|
|
|
-# Joining the tree asynchronously with a callback
|
|
|
+# Joining the graph asynchronously with a callback
|
|
|
# (Note: only two levels, the deps are considered final
|
|
|
# when the second task is ready.)
|
|
|
#
|
|
|
-# >>> unlock_tree.apply_async((A.apply_async(),
|
|
|
-# ... A_callback.subtask()), countdown=1)
|
|
|
+# >>> unlock_graph.apply_async((A.apply_async(),
|
|
|
+# ... A_callback.subtask()), countdown=1)
|
|
|
|
|
|
|
|
|
from celery.task import chord, subtask, task, TaskSet
|
|
@@ -51,11 +51,11 @@ def A():
|
|
|
return TaskSet(B.subtask((c, )) for c in "ABCDEFGH").apply_async()
|
|
|
|
|
|
|
|
|
-def join_tree(R, timeout=None, propagate=True):
|
|
|
+def joinall(R, timeout=None, propagate=True):
|
|
|
stack = deque([R])
|
|
|
|
|
|
try:
|
|
|
- use_native = join_tree.app.backend.supports_native_join
|
|
|
+ use_native = joinall.backend.supports_native_join
|
|
|
except AttributeError:
|
|
|
use_native = False
|
|
|
|
|
@@ -71,15 +71,15 @@ def join_tree(R, timeout=None, propagate=True):
|
|
|
|
|
|
|
|
|
@task
|
|
|
-def unlock_tree(result, callback, interval=1, propagate=False,
|
|
|
+def unlock_graph(result, callback, interval=1, propagate=False,
|
|
|
max_retries=None):
|
|
|
if result.ready():
|
|
|
second_level_res = result.get()
|
|
|
if second_level_res.ready():
|
|
|
- subtask(callback).delay(list(join_tree(
|
|
|
+ subtask(callback).delay(list(joinall(
|
|
|
second_level_res, propagate=propagate)))
|
|
|
else:
|
|
|
- unlock_tree.retry(countdown=interval, max_retries=max_retries)
|
|
|
+ unlock_graph.retry(countdown=interval, max_retries=max_retries)
|
|
|
|
|
|
|
|
|
@task
|
|
@@ -96,6 +96,6 @@ class chord2(object):
|
|
|
|
|
|
def __call__(self, body, **options):
|
|
|
body.options.setdefault("task_id", uuid())
|
|
|
- unlock_tree.apply_async()
|
|
|
+ unlock_graph.apply_async()
|
|
|
|
|
|
|