Procházet zdrojové kódy

Don't instantiate default app

Ask Solem před 11 roky
rodič
revize
5e184e92e6

+ 15 - 10
celery/app/builtins.py

@@ -50,7 +50,8 @@ def add_backend_cleanup_task(app):
     :program:`celery beat` to be running).
 
     """
-    @app.task(name='celery.backend_cleanup', shared=False, _force_evaluate=True)
+    @app.task(name='celery.backend_cleanup',
+              shared=False, _force_evaluate=True)
     def backend_cleanup():
         app.backend.cleanup()
     return backend_cleanup
@@ -92,7 +93,7 @@ def add_unlock_chord_task(app):
         j = deps.join_native if deps.supports_native_join else deps.join
 
         if deps.ready():
-            callback = signature(callback)
+            callback = signature(callback, app=app)
             try:
                 ret = j(propagate=propagate)
             except Exception as exc:
@@ -127,7 +128,7 @@ def add_map_task(app):
 
     @app.task(name='celery.map', shared=False, _force_evaluate=True)
     def xmap(task, it):
-        task = signature(task).type
+        task = signature(task, app=app).type
         return [task(item) for item in it]
     return xmap
 
@@ -138,7 +139,7 @@ def add_starmap_task(app):
 
     @app.task(name='celery.starmap', shared=False, _force_evaluate=True)
     def xstarmap(task, it):
-        task = signature(task).type
+        task = signature(task, app=app).type
         return [task(*item) for item in it]
     return xstarmap
 
@@ -169,7 +170,7 @@ def add_group_task(app):
             app = self.app
             result = from_serializable(result, app)
             # any partial args are added to all tasks in the group
-            taskit = (signature(task).clone(partial_args)
+            taskit = (signature(task, app=app).clone(partial_args)
                       for i, task in enumerate(tasks))
             if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
                 return app.GroupResult(
@@ -189,7 +190,7 @@ def add_group_task(app):
                 options.setdefault('task_id', uuid()))
 
             def prepare_member(task):
-                task = maybe_signature(task, app=app)
+                task = maybe_signature(task, app=self.app)
                 task.options['group_id'] = group_id
                 return task, task.freeze()
 
@@ -231,6 +232,7 @@ def add_chain_task(app):
         _decorated = True
 
         def prepare_steps(self, args, tasks):
+            app = self.app
             steps = deque(tasks)
             next_step = prev_task = prev_res = None
             tasks, results = [], []
@@ -293,9 +295,10 @@ def add_chain_task(app):
 
         def apply(self, args=(), kwargs={}, signature=maybe_signature,
                   **options):
+            app = self.app
             last, fargs = None, args  # fargs passed to first task only
             for task in kwargs['tasks']:
-                res = signature(task).clone(fargs).apply(
+                res = signature(task, app=app).clone(fargs).apply(
                     last and (last.get(), ),
                 )
                 res.parent, last, fargs = last, res, None
@@ -323,9 +326,10 @@ def add_chord_task(app):
         def run(self, header, body, partial_args=(), interval=None,
                 countdown=1, max_retries=None, propagate=None,
                 eager=False, **kwargs):
+            app = self.app
             propagate = default_propagate if propagate is None else propagate
             group_id = uuid()
-            AsyncResult = self.app.AsyncResult
+            AsyncResult = app.AsyncResult
             prepare_member = self._prepare_member
 
             # - convert back to group if serialized
@@ -364,7 +368,8 @@ def add_chord_task(app):
 
         def apply_async(self, args=(), kwargs={}, task_id=None,
                         group_id=None, chord=None, **options):
-            if self.app.conf.CELERY_ALWAYS_EAGER:
+            app = self.app
+            if app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
             header = kwargs.pop('header')
             body = kwargs.pop('body')
@@ -387,6 +392,6 @@ def add_chord_task(app):
             body = kwargs['body']
             res = super(Chord, self).apply(args, dict(kwargs, eager=True),
                                            **options)
-            return maybe_signature(body, app=app).apply(
+            return maybe_signature(body, app=self.app).apply(
                 args=(res.get(propagate=propagate).get(), ))
     return Chord

+ 15 - 6
celery/app/task.py

@@ -18,6 +18,7 @@ from celery._state import _task_stack
 from celery.canvas import signature
 from celery.exceptions import MaxRetriesExceededError, Reject, Retry
 from celery.five import class_property, items, with_metaclass
+from celery.local import Proxy
 from celery.result import EagerResult
 from celery.utils import gen_task_name, fun_takes_kwargs, uuid, maybe_reraise
 from celery.utils.functional import mattrgetter, maybe_list
@@ -60,7 +61,6 @@ class _CompatShared(object):
         return self.cons(app)
 
 
-
 def _strflags(flags, default=''):
     if flags:
         return ' ({0})'.format(', '.join(flags))
@@ -152,8 +152,15 @@ class TaskType(type):
         # The 'app' attribute is now a property, with the real app located
         # in the '_app' attribute.  Previously this was a regular attribute,
         # so we should support classes defining it.
-        _app1, _app2 = attrs.pop('_app', None), attrs.pop('app', None)
-        app = attrs['_app'] = _app1 or _app2 or current_app
+        app = attrs.pop('_app', None) or attrs.pop('app', None)
+        if not isinstance(app, Proxy) and app is None:
+            for base in bases:
+                if base._app:
+                    app = base._app
+                    break
+            else:
+                app = current_app._get_current_object()
+        attrs['_app'] = app
 
         # - Automatically generate missing/empty name.
         task_name = attrs.get('name')
@@ -167,7 +174,7 @@ class TaskType(type):
             # Hairy stuff,  here to be compatible with 2.x.
             # People should not use non-abstract task classes anymore,
             # use the task decorator.
-            from celery.app.builtins import shared_task, _shared_tasks
+            from celery.app.builtins import shared_task
             unique_name = '.'.join([task_module, name])
             if unique_name not in cls._creation_count:
                 # the creation count is used as a safety
@@ -365,10 +372,12 @@ class Task(object):
 
     @classmethod
     def _get_app(self):
-        if not self.__bound__ or self._app is None:
+        if self._app is None:
+            self._app = current_app
+        if not self.__bound__:
             # The app property's __set__  method is not called
             # if Task.app is set (on the class), so must bind on use.
-            self.bind(current_app)
+            self.bind(self._app)
         return self._app
     app = class_property(_get_app, bind)
 

+ 4 - 4
celery/backends/base.py

@@ -454,7 +454,7 @@ class KeyValueStoreBackend(BaseBackend):
     def on_chord_part_return(self, task, propagate=None):
         if not self.implements_incr:
             return
-        from celery import signature
+        from celery import maybe_signature
         from celery.result import GroupResult
         app = self.app
         if propagate is None:
@@ -466,20 +466,20 @@ class KeyValueStoreBackend(BaseBackend):
         try:
             deps = GroupResult.restore(gid, backend=task.backend)
         except Exception as exc:
-            callback = signature(task.request.chord)
+            callback = maybe_signature(task.request.chord, app=self.app)
             return app._tasks[callback.task].backend.fail_from_current_stack(
                 callback.id,
                 exc=ChordError('Cannot restore group: {0!r}'.format(exc)),
             )
         if deps is None:
-            callback = signature(task.request.chord)
+            callback = maybe_signature(task.request.chord, app=self.app)
             return app._tasks[callback.task].backend.fail_from_current_stack(
                 callback.id,
                 exc=ChordError('GroupResult {0} no longer exists'.format(gid))
             )
         val = self.incr(key)
         if val >= len(deps):
-            callback = signature(task.request.chord)
+            callback = maybe_signature(task.request.chord, app=self.app)
             j = deps.join_native if deps.supports_native_join else deps.join
             try:
                 ret = j(propagate=propagate)

+ 9 - 6
celery/canvas.py

@@ -422,8 +422,8 @@ def _maybe_group(tasks):
     return tasks
 
 
-def _maybe_clone(tasks):
-    return [s.clone() if isinstance(s, Signature) else signature(s)
+def _maybe_clone(tasks, app):
+    return [s.clone() if isinstance(s, Signature) else signature(s, app=app)
             for s in tasks]
 
 
@@ -448,7 +448,7 @@ class group(Signature):
         return group(tasks, app=app, **kwdict(d['options']))
 
     def apply_async(self, args=(), kwargs=None, **options):
-        tasks = _maybe_clone(self.tasks)
+        tasks = _maybe_clone(self.tasks, app=self._app)
         if not tasks:
             return self.freeze()
         # taking the app from the first task in the list,
@@ -597,7 +597,10 @@ subtask = signature   # XXX compat
 
 
 def maybe_signature(d, app=None):
-    if d is not None and isinstance(d, dict) and not isinstance(d, Signature):
-        return signature(d, app=app)
-    return d
+    if d is not None and isinstance(d, dict):
+        if not isinstance(d, Signature):
+            return signature(d, app=app)
+        if app is not None:
+            d._app = app
+        return d
 maybe_subtask = maybe_signature  # XXX compat

+ 1 - 1
celery/tests/worker/test_consumer.py

@@ -278,7 +278,7 @@ class test_Gossip(AppCase):
             sig = signature.return_value = Mock()
             task = Mock()
             g.call_task(task)
-            signature.assert_called_with(task)
+            signature.assert_called_with(task, app=c.app)
             sig.apply_async.assert_called_with()
 
             sig.apply_async.side_effect = MemoryError()

+ 1 - 1
celery/worker/consumer.py

@@ -625,7 +625,7 @@ class Gossip(bootsteps.ConsumerStep):
 
     def call_task(self, task):
         try:
-            signature(task).apply_async()
+            signature(task, app=self.app).apply_async()
         except Exception as exc:
             error('Could not call task: %r', exc, exc_info=1)