Jelajahi Sumber

Chaining a group should give GroupResults, and also pass partial args given to subchains to the first task (terminology is evolving :) Closes #1057

Ask Solem 12 tahun lalu
induk
melakukan
22c1021c0e
2 mengubah file dengan 23 tambahan dan 5 penghapusan
  1. 19 5
      celery/app/builtins.py
  2. 4 0
      celery/canvas.py

+ 19 - 5
celery/app/builtins.py

@@ -186,6 +186,7 @@ def add_group_task(app):
 @shared_task
 @shared_task
 def add_chain_task(app):
 def add_chain_task(app):
     from celery.canvas import chord, group, maybe_subtask
     from celery.canvas import chord, group, maybe_subtask
+    from celery.result import GroupResult
     _app = app
     _app = app
 
 
     class Chain(app.Task):
     class Chain(app.Task):
@@ -202,20 +203,33 @@ def add_chain_task(app):
                 # First task get partial args from chain.
                 # First task get partial args from chain.
                 task = maybe_subtask(steps.popleft())
                 task = maybe_subtask(steps.popleft())
                 task = task.clone() if i else task.clone(args)
                 task = task.clone() if i else task.clone(args)
+                AsyncResult = task.type.AsyncResult
                 i += 1
                 i += 1
                 tid = task.options.get('task_id')
                 tid = task.options.get('task_id')
                 if tid is None:
                 if tid is None:
                     tid = task.options['task_id'] = uuid()
                     tid = task.options['task_id'] = uuid()
-                res = task.type.AsyncResult(tid)
+                res = AsyncResult(tid)
 
 
-                # automatically upgrade group(..) | s to chord(group, s)
+                # groups must be turned into GroupResults
                 if isinstance(task, group):
                 if isinstance(task, group):
+                    #
+                    gid = task.options.get('group')
+                    if gid is None:
+                        gid = task.options['group'] = uuid()
+                    group_results = []
+                    for sub in task.tasks:
+                        tid = sub.options.get('task_id')
+                        if tid is None:
+                            tid = sub.options['task_id'] = uuid()
+                        group_results.append(AsyncResult(tid))
+                    res = GroupResult(gid, group_results)
+
+                    # automatically upgrade group(..) | s to chord(group, s)
                     try:
                     try:
                         next_step = steps.popleft()
                         next_step = steps.popleft()
-                    except IndexError:
-                        next_step = None
-                    if next_step is not None:
                         task = chord(task, body=next_step, task_id=tid)
                         task = chord(task, body=next_step, task_id=tid)
+                    except IndexError:
+                        res = GroupResult(gid, group_results)
                 if prev_task:
                 if prev_task:
                     # link previous task to this task.
                     # link previous task to this task.
                     prev_task.link(task)
                     prev_task.link(task)

+ 4 - 0
celery/canvas.py

@@ -217,6 +217,10 @@ class chain(Signature):
 
 
     @classmethod
     @classmethod
     def from_dict(self, d):
     def from_dict(self, d):
+        tasks = d['kwargs']['tasks']
+        if d['args'] and tasks:
+            # partial args passed on to first task in chain (Issue #1057).
+            tasks[0]['args'] = d['args'] + tasks[0]['args']
         return chain(*d['kwargs']['tasks'], **kwdict(d['options']))
         return chain(*d['kwargs']['tasks'], **kwdict(d['options']))
 
 
     def __repr__(self):
     def __repr__(self):