Przeglądaj źródła

Fix for #1150 by sharing non-abstract custom task classes between apps

Ask Solem 11 lat temu
rodzic
commit
310140d1ae
3 zmienionych plików z 53 dodań i 9 usunięć
  1. 1 0
      celery/app/base.py
  2. 12 8
      celery/app/builtins.py
  3. 40 1
      celery/app/task.py

+ 1 - 0
celery/app/base.py

@@ -225,6 +225,7 @@ class Celery(object):
             'app': self,
             'accept_magic_kwargs': False,
             'run': fun if bind else staticmethod(fun),
+            '_decorated': True,
             '__doc__': fun.__doc__,
             '__module__': fun.__module__}, **options))()
         task = self._tasks[T.name]  # return global instance.

+ 12 - 8
celery/app/builtins.py

@@ -18,7 +18,7 @@ __all__ = ['shared_task', 'load_shared_tasks']
 
 #: global list of functions defining tasks that should be
 #: added to all apps.
-_shared_tasks = []
+_shared_tasks = set()
 
 
 def shared_task(constructor):
@@ -29,13 +29,14 @@ def shared_task(constructor):
 
     The function must take a single ``app`` argument.
     """
-    _shared_tasks.append(constructor)
+    _shared_tasks.add(constructor)
     return constructor
 
 
 def load_shared_tasks(app):
     """Create built-in tasks for an app instance."""
-    for constructor in _shared_tasks:
+    constructors = set(_shared_tasks)
+    for constructor in constructors:
         constructor(app)
 
 
@@ -49,7 +50,7 @@ def add_backend_cleanup_task(app):
     :program:`celery beat` to be running).
 
     """
-    @app.task(name='celery.backend_cleanup', _force_evaluate=True)
+    @app.task(name='celery.backend_cleanup', shared=False, _force_evaluate=True)
     def backend_cleanup():
         app.backend.cleanup()
     return backend_cleanup
@@ -68,7 +69,7 @@ def add_unlock_chord_task(app):
 
     default_propagate = app.conf.CELERY_CHORD_PROPAGATES
 
-    @app.task(name='celery.chord_unlock', max_retries=None,
+    @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
               default_retry_delay=1, ignore_result=True, _force_evaluate=True)
     def unlock_chord(group_id, callback, interval=None, propagate=None,
                      max_retries=None, result=None,
@@ -124,7 +125,7 @@ def add_unlock_chord_task(app):
 def add_map_task(app):
     from celery.canvas import signature
 
-    @app.task(name='celery.map', _force_evaluate=True)
+    @app.task(name='celery.map', shared=False, _force_evaluate=True)
     def xmap(task, it):
         task = signature(task).type
         return [task(item) for item in it]
@@ -135,7 +136,7 @@ def add_map_task(app):
 def add_starmap_task(app):
     from celery.canvas import signature
 
-    @app.task(name='celery.starmap', _force_evaluate=True)
+    @app.task(name='celery.starmap', shared=False, _force_evaluate=True)
     def xstarmap(task, it):
         task = signature(task).type
         return [task(*item) for item in it]
@@ -146,7 +147,7 @@ def add_starmap_task(app):
 def add_chunk_task(app):
     from celery.canvas import chunks as _chunks
 
-    @app.task(name='celery.chunks', _force_evaluate=True)
+    @app.task(name='celery.chunks', shared=False, _force_evaluate=True)
     def chunks(task, it, n):
         return _chunks.apply_chunks(task, it, n)
     return chunks
@@ -162,6 +163,7 @@ def add_group_task(app):
         app = _app
         name = 'celery.group'
         accept_magic_kwargs = False
+        _decorated = True
 
         def run(self, tasks, result, group_id, partial_args):
             app = self.app
@@ -226,6 +228,7 @@ def add_chain_task(app):
         app = _app
         name = 'celery.chain'
         accept_magic_kwargs = False
+        _decorated = True
 
         def prepare_steps(self, args, tasks):
             steps = deque(tasks)
@@ -315,6 +318,7 @@ def add_chord_task(app):
         name = 'celery.chord'
         accept_magic_kwargs = False
         ignore_result = False
+        _decorated = True
 
         def run(self, header, body, partial_args=(), interval=None,
                 countdown=1, max_retries=None, propagate=None,

+ 40 - 1
celery/app/task.py

@@ -44,6 +44,23 @@ R_SELF_TASK = '<@task {0.name} bound to other {0.__self__}>'
 R_INSTANCE = '<@task: {0.name} of {app}{flags}>'
 
 
+class _CompatShared(object):
+
+    def __init__(self, name, cons):
+        self.name = name
+        self.cons = cons
+
+    def __hash__(self):
+        return hash(self.name)
+
+    def __repr__(self):
+        return '<OldTask: %r>' % (self.name, )
+
+    def __call__(self, app):
+        return self.cons(app)
+
+
+
 def _strflags(flags, default=''):
     if flags:
         return ' ({0})'.format(', '.join(flags))
@@ -121,13 +138,15 @@ class TaskType(type):
     from the module and class name.
 
     """
+    _creation_count = {}  # used by old non-abstract task classes
 
     def __new__(cls, name, bases, attrs):
         new = super(TaskType, cls).__new__
         task_module = attrs.get('__module__') or '__main__'
 
         # - Abstract class: abstract attribute should not be inherited.
-        if attrs.pop('abstract', None) or not attrs.get('autoregister', True):
+        abstract = attrs.pop('abstract', None)
+        if abstract or not attrs.get('autoregister', True):
             return new(cls, name, bases, attrs)
 
         # The 'app' attribute is now a property, with the real app located
@@ -141,6 +160,26 @@ class TaskType(type):
         if not task_name:
             attrs['name'] = task_name = gen_task_name(app, name, task_module)
 
+        if not attrs.get('_decorated'):
+            # non decorated tasks must also be shared in case
+            # an app is created multiple times due to modules
+            # imported under multiple names.
+            # Hairy stuff,  here to be compatible with 2.x.
+            # People should not use non-abstract task classes anymore,
+            # use the task decorator.
+            from celery.app.builtins import shared_task, _shared_tasks
+            unique_name = '.'.join([task_module, name])
+            if unique_name not in cls._creation_count:
+                # the creation count is used as a safety
+                # so that the same task is not added recursively
+                # to the set of constructors.
+                cls._creation_count[unique_name] = 1
+                shared_task(_CompatShared(
+                    unique_name,
+                    lambda app: TaskType.__new__(cls, name, bases,
+                                                 dict(attrs, _app=app)),
+                ))
+
         # - Create and register class.
         # Because of the way import happens (recursively)
         # we may or may not be the first time the task tries to register