ソースを参照

Canvas: All signatures now supports app argument

Ask Solem 11 年 前
コミット
832a3c59a7
2 ファイル変更34 行追加29 行削除
  1. 4 3
      celery/app/task.py
  2. 30 26
      celery/canvas.py

+ 4 - 3
celery/app/task.py

@@ -672,6 +672,7 @@ class Task(object):
         """Returns :class:`~celery.subtask` object for
         this task, wrapping arguments and execution options
         for a single task invocation."""
+        starkwargs.setdefault('app', self.app)
         return subtask(self, args, *starargs, **starkwargs)
 
     def s(self, *args, **kwargs):
@@ -685,17 +686,17 @@ class Task(object):
     def chunks(self, it, n):
         """Creates a :class:`~celery.canvas.chunks` task for this task."""
         from celery import chunks
-        return chunks(self.s(), it, n)
+        return chunks(self.s(), it, n, app=self.app)
 
     def map(self, it):
         """Creates a :class:`~celery.canvas.xmap` task from ``it``."""
         from celery import xmap
-        return xmap(self.s(), it)
+        return xmap(self.s(), it, app=self.app)
 
     def starmap(self, it):
         """Creates a :class:`~celery.canvas.xstarmap` task from ``it``."""
         from celery import xstarmap
-        return xstarmap(self.s(), it)
+        return xstarmap(self.s(), it, app=self.app)
 
     def update_state(self, task_id=None, state=None, meta=None):
         """Update task state.

+ 30 - 26
celery/canvas.py

@@ -105,7 +105,7 @@ class Signature(dict):
 
     """
     TYPES = {}
-    _type = None
+    _app = _type = None
 
     @classmethod
     def register_type(cls, subclass, name=None):
@@ -113,14 +113,16 @@ class Signature(dict):
         return subclass
 
     @classmethod
-    def from_dict(self, d):
+    def from_dict(self, d, app=None):
         typ = d.get('subtask_type')
         if typ:
-            return self.TYPES[typ].from_dict(kwdict(d))
-        return Signature(d)
+            return self.TYPES[typ].from_dict(kwdict(d), app=app)
+        return Signature(d, app=app)
 
     def __init__(self, task=None, args=None, kwargs=None, options=None,
-                 type=None, subtask_type=None, immutable=False, **ex):
+                 type=None, subtask_type=None, immutable=False,
+                 app=None, **ex):
+        self._app = app
         init = dict.__init__
 
         if isinstance(task, dict):
@@ -171,7 +173,7 @@ class Signature(dict):
         s = Signature.from_dict({'task': self.task, 'args': tuple(args),
                                  'kwargs': kwargs, 'options': deepcopy(opts),
                                  'subtask_type': self.subtask_type,
-                                 'immutable': self.immutable})
+                                 'immutable': self.immutable}, app=self._app)
         s._type = self._type
         return s
     partial = clone
@@ -232,13 +234,13 @@ class Signature(dict):
 
     def __or__(self, other):
         if not isinstance(self, chain) and isinstance(other, chain):
-            return chain((self, ) + other.tasks)
+            return chain((self, ) + other.tasks, app=self._app)
         elif isinstance(other, chain):
-            return chain(*self.tasks + other.tasks)
+            return chain(*self.tasks + other.tasks, app=self._app)
         elif isinstance(other, Signature):
             if isinstance(self, chain):
-                return chain(*self.tasks + (other, ))
-            return chain(self, other)
+                return chain(*self.tasks + (other, ), app=self._app)
+            return chain(self, other, app=self._app)
         return NotImplemented
 
     def __deepcopy__(self, memo):
@@ -273,7 +275,7 @@ class Signature(dict):
 
     @cached_property
     def type(self):
-        return self._type or current_app.tasks[self['task']]
+        return self._type or (self._app or current_app).tasks[self['task']]
 
     @cached_property
     def AsyncResult(self):
@@ -287,7 +289,7 @@ class Signature(dict):
         try:
             return self.type.apply_async
         except KeyError:
-            return _partial(current_app.send_task, self['task'])
+            return _partial((self._app or current_app).send_task, self['task'])
     id = _getitem_property('options.task_id')
     task = _getitem_property('task')
     args = _getitem_property('args')
@@ -314,19 +316,19 @@ class chain(Signature):
             return self.apply_async(args, kwargs)
 
     @classmethod
-    def from_dict(self, d):
+    def from_dict(self, d, app=None):
         tasks = d['kwargs']['tasks']
         if d['args'] and tasks:
             # partial args passed on to first task in chain (Issue #1057).
             tasks[0]['args'] = tasks[0]._merge(d['args'])[0]
-        return chain(*d['kwargs']['tasks'], **kwdict(d['options']))
+        return chain(*d['kwargs']['tasks'], app=app, **kwdict(d['options']))
 
     @property
     def type(self):
         try:
             return self._type or self.tasks[0].type.app.tasks['celery.chain']
         except NotRegistered:
-            return current_app.tasks['celery.chain']
+            return (self._app or current_app).tasks['celery.chain']
 
     def __repr__(self):
         return ' | '.join(repr(t) for t in self.tasks)
@@ -350,8 +352,8 @@ class _basemap(Signature):
         )
 
     @classmethod
-    def from_dict(cls, d):
-        return cls(*cls._unpack_args(d['kwargs']), **d['options'])
+    def from_dict(cls, d, app=None):
+        return cls(*cls._unpack_args(d['kwargs']), app=app, **d['options'])
 
 
 @Signature.register_type
@@ -386,8 +388,8 @@ class chunks(Signature):
         )
 
     @classmethod
-    def from_dict(self, d):
-        return chunks(*self._unpack_args(d['kwargs']), **d['options'])
+    def from_dict(self, d, app=None):
+        return chunks(*self._unpack_args(d['kwargs']), app=app, **d['options'])
 
     def apply_async(self, args=(), kwargs={}, **opts):
         return self.group().apply_async(args, kwargs, **opts)
@@ -398,11 +400,13 @@ class chunks(Signature):
     def group(self):
         # need to evaluate generators
         task, it, n = self._unpack_args(self.kwargs)
-        return group(xstarmap(task, part) for part in _chunks(iter(it), n))
+        return group((xstarmap(task, part, app=self._app)
+                      for part in _chunks(iter(it), n)),
+                     app=self._app)
 
     @classmethod
-    def apply_chunks(cls, task, it, n):
-        return cls(task, it, n)()
+    def apply_chunks(cls, task, it, n, app=None):
+        return cls(task, it, n, app=app)()
 
 
 def _maybe_group(tasks):
@@ -427,13 +431,13 @@ class group(Signature):
         self.tasks, self.subtask_type = tasks, 'group'
 
     @classmethod
-    def from_dict(self, d):
+    def from_dict(self, d, app=None):
         tasks = d['kwargs']['tasks']
         if d['args'] and tasks:
             # partial args passed on to all tasks in the group (Issue #1057).
             for task in tasks:
                 task['args'] = task._merge(d['args'])[0]
-        return group(tasks, **kwdict(d['options']))
+        return group(tasks, app=app, **kwdict(d['options']))
 
     def apply_async(self, *args, **kwargs):
         if not self.tasks:
@@ -502,9 +506,9 @@ class chord(Signature):
         self.subtask_type = 'chord'
 
     @classmethod
-    def from_dict(self, d):
+    def from_dict(self, d, app=None):
         args, d['kwargs'] = self._unpack_args(**kwdict(d['kwargs']))
-        return self(*args, **kwdict(d))
+        return self(*args, app=app, **kwdict(d))
 
     @staticmethod
     def _unpack_args(header=None, body=None, **kwargs):