Explorar el Código

chain.apply_async now returns the last result, and results have a link back to its parent

Ask Solem hace 13 años
padre
commit
b771b3bfe3
Se han modificado 2 ficheros con 30 adiciones y 5 borrados
  1. 10 1
      celery/result.py
  2. 20 4
      celery/task/__init__.py

+ 10 - 1
celery/result.py

@@ -56,11 +56,16 @@ class AsyncResult(object):
     #: The task result backend to use.
     backend = None
 
-    def __init__(self, id, backend=None, task_name=None, app=None):
+    #: Parent result (if part of a chain)
+    parent = None
+
+    def __init__(self, id, backend=None, task_name=None,
+            app=None, parent=None):
         self.app = app_or_default(app)
         self.id = id
         self.backend = backend or self.app.backend
         self.task_name = task_name
+        self.parent = parent
 
     def serializable(self):
         return self.id, None
@@ -209,6 +214,10 @@ class AsyncResult(object):
                 graph.add_edge(parent, node)
         return graph
 
+    def set_parent(self, parent):
+        self.parent = parent
+        return parent
+
     @cached_property
     def graph(self):
         return self.build_graph()

+ 20 - 4
celery/task/__init__.py

@@ -14,6 +14,7 @@ from __future__ import absolute_import
 from .. import current_app
 from ..app import app_or_default, current_task as _current_task
 from ..local import Proxy
+from ..utils import uuid
 
 from .base import BaseTask, Task, PeriodicTask  # noqa
 from .sets import group, TaskSet, subtask       # noqa
@@ -93,7 +94,22 @@ def periodic_task(*args, **options):
 backend_cleanup = Proxy(lambda: current_app.tasks["celery.backend_cleanup"])
 
 
-def chain(*tasks):
-    tasks = [task.clone() for task in tasks]
-    reduce(lambda a, b: a.link(b), tasks)
-    return tasks[0]
+class chain(object):
+
+    def __init__(self, *tasks):
+        self.tasks = tasks
+
+    def apply_async(self, **kwargs):
+        tasks = [task.clone(task_id=uuid(), **kwargs)
+                    for task in self.tasks]
+        reduce(lambda a, b: a.link(b), tasks)
+        tasks[0].apply_async()
+        results = [task.type.AsyncResult(task.options["task_id"])
+                        for task in tasks]
+
+        def update_parent(result, parent):
+            result.parent = parent
+            return parent
+
+        reduce(update_parent, reversed(results))
+        return results[-1]