Browse Source

Renames celery.app.builtins.shared_task -> celery._state.connect_on_app_finalize. Closes #1937

Ask Solem 11 years ago
parent
commit
d028eed511
6 changed files with 41 additions and 48 deletions
  1. 18 1
      celery/_state.py
  2. 4 5
      celery/app/__init__.py
  3. 6 6
      celery/app/base.py
  4. 10 33
      celery/app/builtins.py
  5. 2 2
      celery/app/task.py
  6. 1 1
      celery/tests/app/test_app.py

+ 18 - 1
celery/_state.py

@@ -20,7 +20,8 @@ from celery.local import Proxy
 from celery.utils.threads import LocalStack
 
 __all__ = ['set_default_app', 'get_current_app', 'get_current_task',
-           'get_current_worker_task', 'current_app', 'current_task']
+           'get_current_worker_task', 'current_app', 'current_task',
+           'connect_on_app_finalize']
 
 #: Global default app used when no current app.
 default_app = None
@@ -28,9 +29,25 @@ default_app = None
 #: List of all app instances (weakrefs), must not be used directly.
 _apps = weakref.WeakSet()
 
+#: global set of functions to call whenever a new app is finalized
+#: E.g. Shared tasks, and builtin tasks are created
+#: by adding callbacks here.
+_on_app_finalizers = set()
+
 _task_join_will_block = False
 
 
+def connect_on_app_finalize(callback):
+    _on_app_finalizers.add(callback)
+    return callback
+
+
+def _announce_app_finalized(app):
+    callbacks = set(_on_app_finalizers)
+    for callback in callbacks:
+        callback(app)
+
+
 def _set_task_join_will_block(blocks):
     global _task_join_will_block
     _task_join_will_block = blocks

+ 4 - 5
celery/app/__init__.py

@@ -13,15 +13,12 @@ import os
 from celery.local import Proxy
 from celery import _state
 from celery._state import (
-    set_default_app,
     get_current_app as current_app,
     get_current_task as current_task,
-    _get_active_apps,
-    _task_stack,
+    connect_on_app_finalize, set_default_app, _get_active_apps, _task_stack,
 )
 from celery.utils import gen_task_name
 
-from .builtins import shared_task as _shared_task
 from .base import Celery, AppPickler
 
 __all__ = ['Celery', 'AppPickler', 'default_app', 'app_or_default',
@@ -128,7 +125,9 @@ def shared_task(*args, **kwargs):
             name = options.get('name')
             # Set as shared task so that unfinalized apps,
             # and future apps will load the task.
-            _shared_task(lambda app: app._task_from_fun(fun, **options))
+            connect_on_app_finalize(
+                lambda app: app._task_from_fun(fun, **options)
+            )
 
             # Force all finalized apps to take this task as well.
             for app in _get_active_apps():

+ 6 - 6
celery/app/base.py

@@ -27,7 +27,8 @@ from celery import platforms
 from celery import signals
 from celery._state import (
     _task_stack, get_current_app, _set_current_app, set_default_app,
-    _register_app, get_current_worker_task,
+    _register_app, get_current_worker_task, connect_on_app_finalize,
+    _announce_app_finalized,
 )
 from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
 from celery.five import items, values
@@ -38,7 +39,6 @@ from celery.utils.imports import instantiate, symbol_by_name
 from celery.utils.objects import mro_lookup
 
 from .annotations import prepare as prepare_annotations
-from .builtins import shared_task, load_shared_tasks
 from .defaults import DEFAULTS, find_deprecated_settings
 from .registry import TaskRegistry
 from .utils import (
@@ -208,8 +208,8 @@ class Celery(object):
             # a differnt task instance.  This makes sure it will always use
             # the task instance from the current app.
             # Really need a better solution for this :(
-            from . import shared_task as proxies_to_curapp
-            return proxies_to_curapp(*args, _force_evaluate=True, **opts)
+            from . import shared_task
+            return shared_task(*args, _force_evaluate=True, **opts)
 
         def inner_create_task_cls(shared=True, filter=None, **opts):
             _filt = filter  # stupid 2to3
@@ -218,7 +218,7 @@ class Celery(object):
                 if shared:
                     cons = lambda app: app._task_from_fun(fun, **opts)
                     cons.__name__ = fun.__name__
-                    shared_task(cons)
+                    connect_on_app_finalize(cons)
                 if self.accept_magic_kwargs:  # compat mode
                     task = self._task_from_fun(fun, **opts)
                     if filter:
@@ -271,7 +271,7 @@ class Celery(object):
                 if auto and not self.autofinalize:
                     raise RuntimeError('Contract breach: app not finalized')
                 self.finalized = True
-                load_shared_tasks(self)
+                _announce_app_finalized(self)
 
                 pending = self._pending
                 while pending:

+ 10 - 33
celery/app/builtins.py

@@ -11,39 +11,16 @@ from __future__ import absolute_import
 
 from collections import deque
 
-from celery._state import get_current_worker_task
+from celery._state import get_current_worker_task, connect_on_app_finalize
 from celery.utils import uuid
 from celery.utils.log import get_logger
 
-__all__ = ['shared_task', 'load_shared_tasks']
+__all__ = []
 
 logger = get_logger(__name__)
 
-#: global list of functions defining tasks that should be
-#: added to all apps.
-_shared_tasks = set()
 
-
-def shared_task(constructor):
-    """Decorator that specifies a function that generates a built-in task.
-
-    The function will then be called for every new app instance created
-    (lazily, so more exactly when the task registry for that app is needed).
-
-    The function must take a single ``app`` argument.
-    """
-    _shared_tasks.add(constructor)
-    return constructor
-
-
-def load_shared_tasks(app):
-    """Create built-in tasks for an app instance."""
-    constructors = set(_shared_tasks)
-    for constructor in constructors:
-        constructor(app)
-
-
-@shared_task
+@connect_on_app_finalize
 def add_backend_cleanup_task(app):
     """The backend cleanup task can be used to clean up the default result
     backend.
@@ -60,7 +37,7 @@ def add_backend_cleanup_task(app):
     return backend_cleanup
 
 
-@shared_task
+@connect_on_app_finalize
 def add_unlock_chord_task(app):
     """This task is used by result backends without native chord support.
 
@@ -127,7 +104,7 @@ def add_unlock_chord_task(app):
     return unlock_chord
 
 
-@shared_task
+@connect_on_app_finalize
 def add_map_task(app):
     from celery.canvas import signature
 
@@ -138,7 +115,7 @@ def add_map_task(app):
     return xmap
 
 
-@shared_task
+@connect_on_app_finalize
 def add_starmap_task(app):
     from celery.canvas import signature
 
@@ -149,7 +126,7 @@ def add_starmap_task(app):
     return xstarmap
 
 
-@shared_task
+@connect_on_app_finalize
 def add_chunk_task(app):
     from celery.canvas import chunks as _chunks
 
@@ -159,7 +136,7 @@ def add_chunk_task(app):
     return chunks
 
 
-@shared_task
+@connect_on_app_finalize
 def add_group_task(app):
     _app = app
     from celery.canvas import maybe_signature, signature
@@ -226,7 +203,7 @@ def add_group_task(app):
     return Group
 
 
-@shared_task
+@connect_on_app_finalize
 def add_chain_task(app):
     from celery.canvas import (
         Signature, chain, chord, group, maybe_signature, maybe_unroll_group,
@@ -322,7 +299,7 @@ def add_chain_task(app):
     return Chain
 
 
-@shared_task
+@connect_on_app_finalize
 def add_chord_task(app):
     """Every chord is executed in a dedicated task, so that the chord
     can be used as a signature, and this generates the task

+ 2 - 2
celery/app/task.py

@@ -176,14 +176,14 @@ class TaskType(type):
             # Hairy stuff,  here to be compatible with 2.x.
             # People should not use non-abstract task classes anymore,
             # use the task decorator.
-            from celery.app.builtins import shared_task
+            from celery._state import connect_on_app_finalize
             unique_name = '.'.join([task_module, name])
             if unique_name not in cls._creation_count:
                 # the creation count is used as a safety
                 # so that the same task is not added recursively
                 # to the set of constructors.
                 cls._creation_count[unique_name] = 1
-                shared_task(_CompatShared(
+                connect_on_app_finalize(_CompatShared(
                     unique_name,
                     lambda app: TaskType.__new__(cls, name, bases,
                                                  dict(attrs, _app=app)),

+ 1 - 1
celery/tests/app/test_app.py

@@ -251,7 +251,7 @@ class test_App(AppCase):
             _state._task_stack.pop()
 
     def test_task_not_shared(self):
-        with patch('celery.app.base.shared_task') as sh:
+        with patch('celery.app.base.connect_on_app_finalize') as sh:
             @self.app.task(shared=False)
             def foo():
                 pass