Browse Source

refactors some hairy canvas code

Ask Solem 12 years ago
parent
commit
cea57efa02
2 changed files with 25 additions and 18 deletions
  1. 2 18
      celery/app/builtins.py
  2. 23 0
      celery/canvas.py

+ 2 - 18
celery/app/builtins.py

@@ -201,33 +201,17 @@ def add_chain_task(app):
                 # First task get partial args from chain.
                 task = maybe_subtask(steps.popleft())
                 task = task.clone() if i else task.clone(args)
+                res = task._freeze()
                 AsyncResult = task.type.AsyncResult
                 i += 1
-                tid = task.options.get('task_id')
-                if tid is None:
-                    tid = task.options['task_id'] = uuid()
-                res = AsyncResult(tid)
 
-                # groups must be turned into GroupResults
                 if isinstance(task, group):
-                    #
-                    gid = task.options.get('group')
-                    if gid is None:
-                        gid = task.options['group'] = uuid()
-                    group_results = []
-                    for sub in task.tasks:
-                        tid = sub.options.get('task_id')
-                        if tid is None:
-                            tid = sub.options['task_id'] = uuid()
-                        group_results.append(AsyncResult(tid))
-                    res = GroupResult(gid, group_results)
-
                     # automatically upgrade group(..) | s to chord(group, s)
                     try:
                         next_step = steps.popleft()
                         task = chord(task, body=next_step, task_id=tid)
                     except IndexError:
-                        res = GroupResult(gid, group_results)
+                        pass
                 if prev_task:
                     # link previous task to this task.
                     prev_task.link(task)

+ 23 - 0
celery/canvas.py

@@ -19,6 +19,7 @@ from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
 
 from celery import current_app
 from celery.local import Proxy
+from celery.result import GroupResult, from_serializable as serialized_result
 from celery.utils.functional import (
     maybe_list, is_list, regen,
     chunks as _chunks,
@@ -127,6 +128,14 @@ class Signature(dict):
         return s
     partial = clone
 
+    def _freeze(self, _id=None):
+        opts = self.options
+        try:
+            tid = opts['task_id']
+        except KeyError:
+            tid = opts['task_id'] = _id or uuid()
+        return self.type.AsyncResult(tid)
+
     def replace(self, args=None, kwargs=None, options=None):
         s = self.clone()
         if args is not None:
@@ -332,6 +341,20 @@ class group(Signature):
                     [Signature.clone(t) for t in self.tasks], partial_args)
         return self.type(tasks, result, gid, args)
 
+    def _freeze(self, _id=None):
+        opts = self.options
+        try:
+            gid = opts['group']
+        except KeyError:
+            gid = opts['group'] = uuid()
+        new_tasks, results = [], []
+        for task in self.tasks:
+            task = maybe_subtask(task).clone()
+            results.append(task._freeze())
+            new_tasks.append(task)
+        self.tasks = self.kwargs['tasks'] = new_tasks
+        return GroupResult(gid, results)
+
     def skew(self, start=1.0, stop=None, step=1.0):
         it = fxrange(start, stop, step, repeatlast=True)
         for task in self.tasks: