Просмотр исходного кода

Group/Chord.apply_async no longer starts separate task, also fixes many canvas bugs. Closes #1120

Ask Solem 11 лет назад
Родитель
Сommit
ff72f55c4c
5 измененных файлов с 75 добавлено и 37 удалено
  1. 11 12
      celery/app/builtins.py
  2. 1 0
      celery/app/trace.py
  3. 52 22
      celery/canvas.py
  4. 10 2
      celery/tests/app/test_builtins.py
  5. 1 1
      celery/tests/tasks/test_canvas.py

+ 11 - 12
celery/app/builtins.py

@@ -189,13 +189,8 @@ def add_group_task(app):
 
             def prepare_member(task):
                 task = maybe_signature(task)
-                opts = task.options
-                opts['group_id'] = group_id
-                try:
-                    tid = opts['task_id']
-                except KeyError:
-                    tid = opts['task_id'] = uuid()
-                return task, AsyncResult(tid)
+                task.options['group_id'] = group_id
+                return task, task.freeze()
 
             try:
                 tasks, res = list(zip(
@@ -245,13 +240,14 @@ def add_chain_task(app):
                 res = task.freeze()
                 i += 1
 
-                if isinstance(task, group):
+                if isinstance(task, group) and steps and \
+                        not isinstance(steps[0], group):
                     # automatically upgrade group(..) | s to chord(group, s)
                     try:
                         next_step = steps.popleft()
                         # for chords we freeze by pretending it's a normal
                         # task instead of a group.
-                        res = Signature.freeze(task)
+                        res = Signature.freeze(next_step)
                         task = chord(task, body=next_step, task_id=res.task_id)
                     except IndexError:
                         pass  # no callback, so keep as group
@@ -259,7 +255,8 @@ def add_chain_task(app):
                     # link previous task to this task.
                     prev_task.link(task)
                     # set the results parent attribute.
-                    res.parent = prev_res
+                    if not res.parent:
+                        res.parent = prev_res
 
                 if not isinstance(prev_task, chord):
                     results.append(res)
@@ -338,6 +335,9 @@ def add_chord_task(app):
             results = [AsyncResult(prepare_member(task, body, group_id))
                        for task in header.tasks]
 
+            # - call the header group, returning the GroupResult.
+            final_res = header(*partial_args, task_id=group_id)
+
             # - fallback implementations schedules the chord_unlock task here
             app.backend.on_chord_apply(group_id, body,
                                        interval=interval,
@@ -345,8 +345,7 @@ def add_chord_task(app):
                                        max_retries=max_retries,
                                        propagate=propagate,
                                        result=results)
-            # - call the header group, returning the GroupResult.
-            return header(*partial_args, task_id=group_id)
+            return final_res
 
         def _prepare_member(self, task, body, group_id):
             opts = task.options

+ 1 - 0
celery/app/trace.py

@@ -240,6 +240,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 else:
                     # callback tasks must be applied before the result is
                     # stored, so that result.children is populated.
+                    print('CALLBACKS: %r' % (task_request.callbacks, ))
                     [signature(callback).apply_async((retval, ))
                         for callback in task_request.callbacks or []]
                     if publish_result:

+ 52 - 22
celery/canvas.py

@@ -201,10 +201,13 @@ class Signature(dict):
 
     def set(self, immutable=None, **options):
         if immutable is not None:
-            self.immutable = immutable
+            self.set_immutable(immutable)
         self.options.update(options)
         return self
 
+    def set_immutable(self, immutable):
+        self.immutable = immutable
+
     def apply_async(self, args=(), kwargs={}, **options):
         # For callbacks: extra args are prepended to the stored args.
         if args or kwargs or options:
@@ -419,6 +422,11 @@ def _maybe_group(tasks):
     return tasks
 
 
+def _maybe_clone(tasks):
+    return [s.clone() if isinstance(s, Signature) else signature(s)
+            for s in tasks]
+
+
 @Signature.register_type
 class group(Signature):
 
@@ -439,27 +447,38 @@ class group(Signature):
                 task['args'] = task._merge(d['args'])[0]
         return group(tasks, app=app, **kwdict(d['options']))
 
-    def apply_async(self, *args, **kwargs):
-        if not self.tasks:
-            return self.freeze()  # empty group returns GroupResult
-        return Signature.apply_async(self, *args, **kwargs)
-
-    def apply(self, *args, **kwargs):
-        if not self.tasks:
-            return self.freeze()  # empty group returns GroupResult
-        return Signature.apply(self, *args, **kwargs)
-
-    def __call__(self, *partial_args, **opts):
-        tasks = [task.clone() for task in self.tasks]
+    def apply_async(self, args=(), kwargs=None, **options):
+        tasks = _maybe_clone(self.tasks)
         if not tasks:
-            return
+            return self.freeze()
         # taking the app from the first task in the list,
         # there may be a better solution to this, e.g.
         # consolidate tasks with the same app and apply them in
         # batches.
         type = tasks[0].type.app.tasks[self['task']]
-        return type(*type.prepare(dict(self.options, **opts),
-                                  tasks, partial_args))
+        return type(*type.prepare(dict(self.options, **options),
+                                  tasks, args))
+
+    def set_immutable(self, immutable):
+        for task in self.tasks:
+            task.set_immutable(immutable)
+
+    def link(self, sig):
+        # Simply link to first task
+        sig = sig.clone().set(immutable=True)
+        return self.tasks[0].link(sig)
+
+    def link_error(self, sig):
+        sig = sig.clone().set(immutable=True)
+        return self.tasks[0].link_error(sig)
+
+    def apply(self, *args, **kwargs):
+        if not self.tasks:
+            return self.freeze()  # empty group returns GroupResult
+        return Signature.apply(self, *args, **kwargs)
+
+    def __call__(self, *partial_args, **options):
+        return self.apply_async(partial_args, **options)
 
     def freeze(self, _id=None):
         opts = self.options
@@ -520,17 +539,23 @@ class chord(Signature):
     def type(self):
         return self._type or self.tasks[0].type.app.tasks['celery.chord']
 
-    def __call__(self, body=None, task_id=None, **kwargs):
-        _chord = self.type
-        body = (body or self.kwargs['body']).clone()
-        kwargs = dict(self.kwargs, body=body, **kwargs)
+    def apply_async(self, args=(), kwargs={}, task_id=None, **options):
+        body = kwargs.get('body') or self.kwargs['body']
+        kwargs = dict(self.kwargs, **kwargs)
+        body = body.clone(**options)
+
+        _chord = self._type or body.type.app.tasks['celery.chord']
+
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
-            return self.apply((), kwargs)
+            return self.apply((), kwargs, task_id=task_id, **options)
         res = body.freeze(task_id)
-        parent = _chord(**kwargs)
+        parent = _chord(self.tasks, body, args, **options)
         res.parent = parent
         return res
 
+    def __call__(self, body=None, **options):
+        return self.apply_async((), {'body': body} if body else {}, **options)
+
     def clone(self, *args, **kwargs):
         s = Signature.clone(self, *args, **kwargs)
         # need to make copy of body
@@ -548,6 +573,11 @@ class chord(Signature):
         self.body.link_error(errback)
         return errback
 
+    def set_immutable(self, immutable):
+        # changes mutability of header only, not callback.
+        for task in self.tasks:
+            task.set_immutable(immutable)
+
     def __repr__(self):
         if self.body:
             return self.body.reprcall(self.tasks)

+ 10 - 2
celery/tests/app/test_builtins.py

@@ -198,11 +198,19 @@ class test_chord(BuiltinsCase):
     def test_forward_options(self):
         body = self.xsum.s()
         x = chord([self.add.s(i, i) for i in range(10)], body=body)
+        x._type = Mock()
+        x._type.app.conf.CELERY_ALWAYS_EAGER = False
         x.apply_async(group_id='some_group_id')
-        self.assertEqual(body.options['group_id'], 'some_group_id')
+        self.assertTrue(x._type.called)
+        resbody = x._type.call_args[0][1]
+        self.assertEqual(resbody.options['group_id'], 'some_group_id')
         x2 = chord([self.add.s(i, i) for i in range(10)], body=body)
+        x2._type = Mock()
+        x2._type.app.conf.CELERY_ALWAYS_EAGER = False
         x2.apply_async(chord='some_chord_id')
-        self.assertEqual(body.options['chord'], 'some_chord_id')
+        self.assertTrue(x2._type.called)
+        resbody = x2._type.call_args[0][1]
+        self.assertEqual(resbody.options['chord'], 'some_chord_id')
 
     def test_apply_eager(self):
         self.app.conf.CELERY_ALWAYS_EAGER = True

+ 1 - 1
celery/tests/tasks/test_canvas.py

@@ -270,7 +270,7 @@ class test_group(CanvasCase):
 
     def test_call_empty_group(self):
         x = group()
-        self.assertIsNone(x())
+        self.assertFalse(len(x()))
 
     def test_skew(self):
         g = group([self.add.s(i, i) for i in range(10)])