Kaynağa Gözat

Canvas: Fixes chord with a group of chords. Closes #1921

Ask Solem 11 yıl önce
ebeveyn
işleme
c82a1be845
2 değiştirilmiş dosya ile 20 ekleme ve 21 silme
  1. 3 15
      celery/app/builtins.py
  2. 17 6
      celery/canvas.py

+ 3 - 15
celery/app/builtins.py

@@ -183,7 +183,7 @@ def add_group_task(app):
                     [stask.apply(group_id=group_id) for stask in taskit],
                 )
             with app.producer_or_acquire() as pub:
-                [stask.apply_async(group_id=group_id, publisher=pub,
+                [stask.apply_async(group_id=group_id, producer=pub,
                                    add_to_parent=False) for stask in taskit]
             parent = get_current_worker_task()
             if parent:
@@ -345,20 +345,18 @@ def add_chord_task(app):
             propagate = default_propagate if propagate is None else propagate
             group_id = uuid()
             AsyncResult = app.AsyncResult
-            prepare_member = self._prepare_member
 
             # - convert back to group if serialized
             tasks = header.tasks if isinstance(header, group) else header
             header = group([
                 maybe_signature(s, app=app).clone() for s in tasks
-            ])
+            ], app=self.app)
             # - eager applies the group inline
             if eager:
                 return header.apply(args=partial_args, task_id=group_id)
 
             body.setdefault('chord_size', len(header.tasks))
-            results = [AsyncResult(prepare_member(task, body, group_id))
-                       for task in header.tasks]
+            results = header.freeze(group_id=group_id, chord=body).results
 
             return self.backend.apply_chord(
                 header, partial_args, group_id,
@@ -366,16 +364,6 @@ def add_chord_task(app):
                 max_retries=max_retries, propagate=propagate, result=results,
             )
 
-        def _prepare_member(self, task, body, group_id):
-            opts = task.options
-            # d.setdefault would work but generating uuid's are expensive
-            try:
-                task_id = opts['task_id']
-            except KeyError:
-                task_id = opts['task_id'] = uuid()
-            opts.update(chord=body, group_id=group_id)
-            return task_id
-
         def apply_async(self, args=(), kwargs={}, task_id=None,
                         group_id=None, chord=None, **options):
             app = self.app

+ 17 - 6
celery/canvas.py

@@ -194,7 +194,7 @@ class Signature(dict):
         return s
     partial = clone
 
-    def freeze(self, _id=None):
+    def freeze(self, _id=None, group_id=None, chord=None):
         opts = self.options
         try:
             tid = opts['task_id']
@@ -202,6 +202,10 @@ class Signature(dict):
             tid = opts['task_id'] = _id or uuid()
         if 'reply_to' not in opts:
             opts['reply_to'] = self.app.oid
+        if group_id:
+            opts['group_id'] = group_id
+        if chord:
+            opts['chord'] = chord
         return self.AsyncResult(tid)
     _freeze = freeze
 
@@ -502,16 +506,21 @@ class group(Signature):
     def __call__(self, *partial_args, **options):
         return self.apply_async(partial_args, **options)
 
-    def freeze(self, _id=None):
+    def freeze(self, _id=None, group_id=None, chord=None):
         opts = self.options
         try:
             gid = opts['task_id']
         except KeyError:
             gid = opts['task_id'] = uuid()
+        if group_id:
+            opts['group_id'] = group_id
+        if chord:
+            opts['chord'] = group_id
         new_tasks, results = [], []
         for task in self.tasks:
             task = maybe_signature(task, app=self._app).clone()
-            results.append(task._freeze())
+            topts = task.options
+            results.append(task.freeze(group_id=group_id, chord=chord))
             new_tasks.append(task)
         self.tasks = self.kwargs['tasks'] = new_tasks
         return self.app.GroupResult(gid, results)
@@ -552,8 +561,8 @@ class chord(Signature):
         )
         self.subtask_type = 'chord'
 
-    def freeze(self, _id=None):
-        return self.body.freeze(_id)
+    def freeze(self, _id=None, group_id=None, chord=None):
+        return self.body.freeze(_id, group_id=group_id, chord=chord)
 
     @classmethod
     def from_dict(self, d, app=None):
@@ -581,7 +590,9 @@ class chord(Signature):
                 app = self.body.type.app
         return app.tasks['celery.chord']
 
-    def apply_async(self, args=(), kwargs={}, task_id=None, **options):
+    def apply_async(self, args=(), kwargs={}, task_id=None,
+            producer=None, publisher=None, connection=None,
+            router=None, result_cls=None, **options):
         body = kwargs.get('body') or self.kwargs['body']
         kwargs = dict(self.kwargs, **kwargs)
         body = body.clone(**options)