Sfoglia il codice sorgente

Improvements for #2134 (which fixes Issue #2076)

Ask Solem 10 anni fa
parent
commit
c4dc73dbb8
1 ha cambiato i file con 11 aggiunte e 19 eliminazioni
  1. 11 19
      celery/canvas.py

+ 11 - 19
celery/canvas.py

@@ -397,7 +397,8 @@ class chain(Signature):
         return result
 
     def prepare_steps(self, args, tasks,
-                      root_id=None, link_error=None, app=None):
+                      root_id=None, link_error=None, app=None,
+                      from_dict=Signature.from_dict):
         app = app or self.app
         steps = deque(tasks)
         next_step = prev_task = prev_res = None
@@ -405,6 +406,8 @@ class chain(Signature):
         i = 0
         while steps:
             task = steps.popleft()
+            if not isinstance(task, Signature):
+                task = from_dict(task, app=app)
             if not i:  # first task
                 # first task gets partial args from chain
                 task = task.clone(args)
@@ -466,9 +469,6 @@ class chain(Signature):
     def from_dict(self, d, app=None):
         tasks = d['kwargs']['tasks']
         if d['args'] and tasks:
-            # make sure that tasks are made into signatures (Issue #2076)
-            if not isinstance(tasks[0], Signature):
-                tasks[0] = signature(tasks[0])
             # partial args passed on to first task in chain (Issue #1057).
             tasks[0]['args'] = tasks[0]._merge(d['args'])[0]
         return chain(*d['kwargs']['tasks'], app=app, **d['options'])
@@ -590,17 +590,9 @@ class group(Signature):
 
     @classmethod
     def from_dict(self, d, app=None):
-        tasks = d['kwargs']['tasks']
-        if d['args'] and tasks:
-            # partial args passed on to all tasks in the group (Issue #1057).
-            for task in tasks:
-                # make sure that tasks are made into signatures (Issue #2076)
-                if not isinstance(tasks[0], Signature):
-                    task = signature(task)
-                task['args'] = task._merge(d['args'])[0]
-        return group(tasks, app=app, **d['options'])
-
-    def _prepared(self, tasks, partial_args, group_id, root_id, dict=dict,
+        return group(d['kwargs']['tasks'], app=app, **d['options'])
+
+    def _prepared(self, tasks, partial_args, group_id, root_id, app, dict=dict,
                   Signature=Signature, from_dict=Signature.from_dict):
         for task in tasks:
             if isinstance(task, dict):
@@ -610,11 +602,11 @@ class group(Signature):
                     task = task.clone()
                 else:
                     # serialized sigs must be converted to Signature.
-                    task = from_dict(task)
+                    task = from_dict(task, app=app)
                 if isinstance(task, group):
                     # needs yield_from :(
                     unroll = task._prepared(
-                        task.tasks, partial_args, group_id, root_id,
+                        task.tasks, partial_args, group_id, root_id, app,
                     )
                     for taskN, resN in unroll:
                         yield taskN, resN
@@ -648,7 +640,7 @@ class group(Signature):
             return self.freeze()
 
         options, group_id, root_id = self._freeze_gid(options)
-        tasks = self._prepared(self.tasks, args, group_id, root_id)
+        tasks = self._prepared(self.tasks, args, group_id, root_id, app)
         result = self.app.GroupResult(
             group_id, list(self._apply_tasks(tasks, producer, app, **options)),
         )
@@ -662,7 +654,7 @@ class group(Signature):
         if not self.tasks:
             return self.freeze()  # empty group returns GroupResult
         options, group_id, root_id = self._freeze_gid(options)
-        tasks = self._prepared(self.tasks, args, group_id, root_id)
+        tasks = self._prepared(self.tasks, args, group_id, root_id, app)
         return app.GroupResult(group_id, [
             sig.apply(**options) for sig, _ in tasks
         ])