瀏覽代碼

Canvas: Fixes chain isinstance checks that now must point to _chain. Closes #3536

Ask Solem 8 年之前
父節點
當前提交
a825f0e8ea
共有 2 個文件被更改,包括 13 次插入6 次删除
  1. 2 1
      celery/app/trace.py
  2. 11 5
      celery/canvas.py

+ 2 - 1
celery/app/trace.py

@@ -419,7 +419,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                         # execute first task in chain
                         chain = task_request.chain
                         if chain:
-                            signature(chain.pop(), app=app).apply_async(
+                            _chsig = signature(chain.pop(), app=app)
+                            _chsig.apply_async(
                                 (retval,), chain=chain,
                                 parent_id=uuid, root_id=root_id,
                             )

+ 11 - 5
celery/canvas.py

@@ -378,6 +378,7 @@ class Signature(dict):
         if isinstance(other, chord) and len(other.tasks) == 1:
             # chord with one header -> header[0] | body
             other = other.tasks[0] | other.body
+
         if isinstance(self, group):
             if isinstance(other, group):
                 # group() | group() -> single group
@@ -399,6 +400,7 @@ class Signature(dict):
                 return sig
             # task | group() -> chain
             return _chain(self, other, app=self.app)
+
         if not isinstance(self, _chain) and isinstance(other, _chain):
             # task | chain -> chain
             return _chain(
@@ -531,7 +533,7 @@ class _chain(Signature):
                 tasks = d['kwargs']['tasks'] = list(tasks)
             # First task must be signature object to get app
             tasks[0] = maybe_signature(tasks[0], app=app)
-        return _upgrade(d, chain(tasks, app=app, **d['options']))
+        return _upgrade(d, _chain(tasks, app=app, **d['options']))
 
     def __init__(self, *tasks, **options):
         tasks = (regen(tasks[0]) if len(tasks) == 1 and is_list(tasks[0])
@@ -627,6 +629,11 @@ class _chain(Signature):
         prev_res = None
         tasks, results = [], []
         i = 0
+        # NOTE: We are doing this in reverse order.
+        # The result is a list of tasks in reverse order, that is
+        # passed as the ``chain`` message field.
+        # As it's reversed the worker can just do ``chain.pop()`` to
+        # get the next task in the chain.
         while steps:
             task = steps_pop()
             is_first_task, is_last_task = not steps, not i
@@ -642,7 +649,7 @@ class _chain(Signature):
             elif is_first_task:
                 task.args = tuple(args) + tuple(task.args)
 
-            if isinstance(task, chain):
+            if isinstance(task, _chain):
                 # splice the chain
                 steps_extend(task.tasks)
                 continue
@@ -778,7 +785,6 @@ class chain(_chain):
     """
 
     # could be function, but must be able to reference as :class:`chain`.
-
     def __new__(cls, *tasks, **kwargs):
         # This forces `chain(X, Y, Z)` to work the same way as `X | Y | Z`
         if not kwargs and tasks:
@@ -890,7 +896,7 @@ def _maybe_group(tasks, app):
     if isinstance(tasks, dict):
         tasks = signature(tasks, app=app)
 
-    if isinstance(tasks, (group, chain)):
+    if isinstance(tasks, (group, _chain)):
         tasks = tasks.tasks
     elif isinstance(tasks, abstract.CallableSignature):
         tasks = [tasks]
@@ -1308,7 +1314,7 @@ class chord(Signature):
 
     def __repr__(self):
         if self.body:
-            if isinstance(self.body, chain):
+            if isinstance(self.body, _chain):
                 return remove_repeating_from_task(
                     self.body.tasks[0]['task'],
                     '%({0} | {1!r})'.format(