Browse Source

Support anon chords and groups. Closes #1744

Ask Solem 11 years ago
parent
commit
881be3bdc9
1 changed files with 21 additions and 9 deletions
  1. 21 9
      celery/canvas.py

+ 21 - 9
celery/canvas.py

@@ -477,11 +477,7 @@ class group(Signature):
         tasks = _maybe_clone(self.tasks, app=self._app)
         if not tasks:
             return self.freeze()
-        # taking the app from the first task in the list,
-        # there may be a better solution to this, e.g.
-        # consolidate tasks with the same app and apply them in
-        # batches.
-        type = tasks[0].type.app.tasks[self['task']]
+        type = self.type
         return type(*type.prepare(dict(self.options, **options),
                                   tasks, args))
 
@@ -535,7 +531,13 @@ class group(Signature):
 
     @property
     def type(self):
-        return self._type or self.tasks[0].type.app.tasks[self['task']]
+        if self._type:
+            return self._type
+        # taking the app from the first task in the list, there may be a
+        # better solution for this, e.g. to consolidate tasks with the same
+        # app and apply them in batches.
+        app = self._app if self._app else self.tasks[0].type.app
+        return app.tasks[self['task']]
 
 
 @Signature.register_type
@@ -563,15 +565,25 @@ class chord(Signature):
 
     @property
     def type(self):
-        return self._type or self.tasks[0].type.app.tasks['celery.chord']
+        if self._type:
+            return self._type
+        # we will be able to fix this mess in 3.2 when we no longer
+        # require an actual task implementation for chord/group
+        if self._app:
+            app = self._app
+        else:
+            try:
+                app = self.tasks[0].type.app
+            except IndexError:
+                app = self.body.type.app
+        return app.tasks['celery.chord']
 
     def apply_async(self, args=(), kwargs={}, task_id=None, **options):
         body = kwargs.get('body') or self.kwargs['body']
         kwargs = dict(self.kwargs, **kwargs)
         body = body.clone(**options)
 
-        _chord = self._type or body.type.app.tasks['celery.chord']
-
+        _chord = self.type
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
             return self.apply((), kwargs, task_id=task_id, **options)
         res = body.freeze(task_id)