Przeglądaj źródła

Fixes anon subtasks and chains. Closes #1607

Ask Solem 11 lat temu
rodzic
commit
ef124e78ae

+ 11 - 9
celery/canvas.py

@@ -20,8 +20,6 @@ from itertools import chain as _chain
 from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
 
 from celery._state import current_app
-from celery.exceptions import NotRegistered
-from celery.result import AsyncResult, GroupResult
 from celery.utils.functional import (
     maybe_list, is_list, regen,
     chunks as _chunks,
@@ -185,7 +183,7 @@ class Signature(dict):
         except KeyError:
             tid = opts['task_id'] = _id or uuid()
         if 'reply_to' not in opts:
-            opts['reply_to'] = self.type.app.oid
+            opts['reply_to'] = self.app.oid
         return self.AsyncResult(tid)
     _freeze = freeze
 
@@ -278,21 +276,25 @@ class Signature(dict):
 
     @cached_property
     def type(self):
-        return self._type or (self._app or current_app).tasks[self['task']]
+        return self._type or self.app.tasks[self['task']]
+
+    @cached_property
+    def app(self):
+        return self._app or current_app
 
     @cached_property
     def AsyncResult(self):
         try:
             return self.type.AsyncResult
         except KeyError:  # task not registered
-            return AsyncResult
+            return self.app.AsyncResult
 
     @cached_property
     def _apply_async(self):
         try:
             return self.type.apply_async
         except KeyError:
-            return _partial((self._app or current_app).send_task, self['task'])
+            return _partial(self.app.send_task, self['task'])
     id = _getitem_property('options.task_id')
     task = _getitem_property('task')
     args = _getitem_property('args')
@@ -330,8 +332,8 @@ class chain(Signature):
     def type(self):
         try:
             return self._type or self.tasks[0].type.app.tasks['celery.chain']
-        except NotRegistered:
-            return (self._app or current_app).tasks['celery.chain']
+        except KeyError:
+            return self.app.tasks['celery.chain']
 
     def __repr__(self):
         return ' | '.join(repr(t) for t in self.tasks)
@@ -492,7 +494,7 @@ class group(Signature):
             results.append(task._freeze())
             new_tasks.append(task)
         self.tasks = self.kwargs['tasks'] = new_tasks
-        return GroupResult(gid, results)
+        return self.app.GroupResult(gid, results)
     _freeze = freeze
 
     def skew(self, start=1.0, stop=None, step=1.0):

+ 1 - 1
celery/tests/app/test_builtins.py

@@ -100,7 +100,7 @@ class test_group(BuiltinsCase):
         x.apply_async()
 
     def test_apply_empty(self):
-        x = group()
+        x = group(app=self.app)
         x.apply()
         res = x.apply_async()
         self.assertFalse(res)

+ 1 - 1
celery/tests/tasks/test_canvas.py

@@ -269,7 +269,7 @@ class test_group(CanvasCase):
         self.assertTrue(group.from_dict(dict(x)))
 
     def test_call_empty_group(self):
-        x = group()
+        x = group(app=self.app)
         self.assertFalse(len(x()))
 
     def test_skew(self):