瀏覽代碼

[4.0][canvas] Fixes regressions with chain. Closes #3066

- chain did not implement .clone properly, so reusing the same chain
  instance did not work.
- chain.freeze() returned the first task in the chain rather than the last.
- async backend.get() did not properly account for cached results.
Ask Solem 9 年之前
父節點
當前提交
11c2a4324f
共有 2 個文件被更改,包括 10 次插入2 次删除
  1. 4 1
      celery/backends/async.py
  2. 6 1
      celery/canvas.py

+ 4 - 1
celery/backends/async.py

@@ -98,7 +98,10 @@ class AsyncBackendMixin(object):
 
         bucket = deque()
         for result in results:
-            self._collect_into(result, bucket)
+            if result._cache:
+                bucket.append(result)
+            else:
+                self._collect_into(result, bucket)
 
         for _ in self._wait_for_pending(
                 result,

+ 6 - 1
celery/canvas.py

@@ -414,6 +414,11 @@ class chain(Signature):
         if self.tasks:
             return self.apply_async(args, kwargs)
 
+    def clone(self, *args, **kwargs):
+        s = Signature.clone(self, *args, **kwargs)
+        s.kwargs['tasks'] = [sig.clone() for sig in s.kwargs['tasks']]
+        return s
+
     def apply_async(self, args=(), kwargs={}, **options):
         # python is best at unpacking kwargs, so .run is here to do that.
         app = self.app
@@ -454,7 +459,7 @@ class chain(Signature):
             self.args, self.tasks, root_id, parent_id, None,
             self.app, _id, group_id, chord, clone=False,
         )
-        return results[-1]
+        return results[0]
 
     def prepare_steps(self, args, tasks,
                       root_id=None, parent_id=None, link_error=None, app=None,