ソースを参照

More useful utilities for callbacks and result graphs

- chain(a, b, c):  Creates a chain of tasks so that: a then b then c,
    and returns the first task in the chain.

- AsyncResult.get_leaf()

    Waits and returns the result of the leaf subtask.
    That is the last node found when traversing the graph,
    but this means that the graph can be 1-dimensional only (in effect
    a list).

    This is useful for task chains like::

        >>> # (2 + 2) * 8 / 2
        >>> chain(add.subtask((2, 2)),
                  mul.subtask((8, ),
                  div.subtask((2, ))).get_leaf()
        16

- subtask.link(subtask) + subtask.link_error(subtask)

    Shortcut to ``s.options.setdefault("link", []).append(subtask)``

- subtask.flatten_links()

    Returns a flattened list of all dependencies (recursively)
Ask Solem 13 年 前
コミット
eb776b5dbe
4 ファイル変更37 行追加2 行削除
  1. 2 1
      celery/bin/celeryctl.py
  2. 6 0
      celery/result.py
  3. 6 0
      celery/task/__init__.py
  4. 23 1
      celery/task/sets.py

+ 2 - 1
celery/bin/celeryctl.py

@@ -387,7 +387,8 @@ class shell(Command):
                        "BaseTask": task.BaseTask,
                        "TaskSet": task.TaskSet,
                        "chord": task.chord,
-                       "group": task.group}
+                       "group": task.group,
+                       "chain": task.chain}
 
         if not without_tasks:
             self.locals.update(dict((task.__name__, task)

+ 6 - 0
celery/result.py

@@ -140,6 +140,12 @@ class AsyncResult(object):
         for _, R in self.iterdeps():
             yield R, R.get(**kwargs)
 
+    def get_leaf(self):
+        value = None
+        for _, R in self.iterdeps():
+            value = R.get()
+        return value
+
     def iterdeps(self, intermediate=False):
         stack = deque([(None, self)])
 

+ 6 - 0
celery/task/__init__.py

@@ -91,3 +91,9 @@ def periodic_task(*args, **options):
     return task(**dict({"base": PeriodicTask}, **options))
 
 backend_cleanup = Proxy(lambda: current_app.tasks["celery.backend_cleanup"])
+
+
+def chain(*tasks):
+    tasks = [task.clone() for task in tasks]
+    reduce(lambda a, b: a.link(b), tasks)
+    return tasks[0]

+ 23 - 1
celery/task/sets.py

@@ -12,11 +12,14 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
+from itertools import chain
+
 from .. import current_app
 from ..app import app_or_default, current_task
 from ..datastructures import AttributeDict
 from ..utils import cached_property, reprcall, uuid
-from ..utils.compat import UserList
+from ..utils.functional import maybe_list
+from ..utils.compat import UserList, chain_from_iterable
 
 
 class subtask(AttributeDict):
@@ -83,6 +86,25 @@ class subtask(AttributeDict):
         options = dict(self.options, **options)
         return self.type.apply_async(args, kwargs, **options)
 
+    def link(self, callback):
+        """Add a callback task to be applied if this task
+        executes successfully."""
+        self.options.setdefault("link", []).append(callback)
+        return callback
+
+    def link_error(self, errback):
+        """Add a callback task to be applied if an error occurs
+        while executing this task."""
+        self.options.setdefault("link_error", []).append(errback)
+        return errback
+
+    def flatten_links(self):
+        """Gives a recursive list of dependencies (unchain if you will,
+        but with links intact)."""
+        return list(chain_from_iterable(chain([[self]],
+                (link.flatten_links()
+                    for link in maybe_list(self.options.get("link")) or []))))
+
     def __reduce__(self):
         # for serialization, the task type is lazily loaded,
         # and not stored in the dict itself.