Przeglądaj źródła

Cleanup celery/app/__init__.py module

Ask Solem 8 lat temu
rodzic
commit
d854e071bf

+ 60 - 1
celery/_state.py

@@ -23,6 +23,13 @@ __all__ = [
 #: Global default app used when no current app.
 default_app = None
 
+#: Function returning the app provided or the default app if none.
+#:
+#: The environment variable :envvar:`CELERY_TRACE_APP` is used to
+#: trace app leaks.  When enabled an exception is raised if there
+#: is no active app.
+app_or_default = None
+
 #: List of all app instances (weakrefs), mustn't be used directly.
 _apps = weakref.WeakSet()
 
@@ -64,6 +71,16 @@ _tls = _TLS()
 _task_stack = LocalStack()
 
 
+#: Function used to push a task to the thread local stack
+#: keeping track of the currently executing task.
+#: You must remember to pop the task after.
+push_current_task = _task_stack.push
+
+#: Function used to pop a task from the thread local stack
+#: keeping track of the currently executing task.
+pop_current_task = _task_stack.pop
+
+
 def set_default_app(app):
     """Set default app."""
     global default_app
@@ -73,7 +90,7 @@ def set_default_app(app):
 def _get_current_app():
     if default_app is None:
         #: creates the global fallback app instance.
-        from celery.app import Celery
+        from celery.app.base import Celery
         set_default_app(Celery(
             'default', fixups=[], set_as_current=False,
             loader=os.environ.get('CELERY_LOADER') or 'default',
@@ -133,3 +150,45 @@ def _deregister_app(app):
 
 def _get_active_apps():
     return _apps
+
+
+def _app_or_default(app=None):
+    if app is None:
+        return get_current_app()
+    return app
+
+
+def _app_or_default_trace(app=None):  # pragma: no cover
+    from traceback import print_stack
+    try:
+        from billiard.process import current_process
+    except ImportError:
+        current_process = None
+    if app is None:
+        if getattr(_tls, 'current_app', None):
+            print('-- RETURNING TO CURRENT APP --')  # noqa+
+            print_stack()
+            return _tls.current_app
+        if not current_process or current_process()._name == 'MainProcess':
+            raise Exception('DEFAULT APP')
+        print('-- RETURNING TO DEFAULT APP --')      # noqa+
+        print_stack()
+        return default_app
+    return app
+
+
+def enable_trace():
+    """Enable tracing of app instances."""
+    global app_or_default
+    app_or_default = _app_or_default_trace
+
+
+def disable_trace():
+    """Disable tracing of app instances."""
+    global app_or_default
+    app_or_default = _app_or_default
+
+if os.environ.get('CELERY_TRACE_APP'):  # pragma: no cover
+    enable_trace()
+else:
+    disable_trace()

+ 10 - 70
celery/app/__init__.py

@@ -1,88 +1,28 @@
 # -*- coding: utf-8 -*-
 """Celery Application."""
 from __future__ import absolute_import, print_function, unicode_literals
-import os
 from celery.local import Proxy
 from celery import _state
 from celery._state import (
-    get_current_app as current_app,
-    get_current_task as current_task,
-    connect_on_app_finalize, set_default_app, _get_active_apps, _task_stack,
+    app_or_default, enable_trace, disable_trace,
+    push_current_task, pop_current_task,
 )
-from .base import Celery, AppPickler
+from .base import Celery
+from .utils import AppPickler
 
 __all__ = [
-    'Celery', 'AppPickler', 'default_app', 'app_or_default',
+    'Celery', 'AppPickler', 'app_or_default', 'default_app',
     'bugreport', 'enable_trace', 'disable_trace', 'shared_task',
-    'set_default_app', 'current_app', 'current_task',
     'push_current_task', 'pop_current_task',
 ]
 
 #: Proxy always returning the app set as default.
 default_app = Proxy(lambda: _state.default_app)
 
-#: Function returning the app provided or the default app if none.
-#:
-#: The environment variable :envvar:`CELERY_TRACE_APP` is used to
-#: trace app leaks.  When enabled an exception is raised if there
-#: is no active app.
-app_or_default = None
-
-#: Function used to push a task to the thread local stack
-#: keeping track of the currently executing task.
-#: You must remember to pop the task after.
-push_current_task = _task_stack.push
-
-#: Function used to pop a task from the thread local stack
-#: keeping track of the currently executing task.
-pop_current_task = _task_stack.pop
-
 
 def bugreport(app=None):
     """Return information useful in bug reports."""
-    return (app or current_app()).bugreport()
-
-
-def _app_or_default(app=None):
-    if app is None:
-        return _state.get_current_app()
-    return app
-
-
-def _app_or_default_trace(app=None):  # pragma: no cover
-    from traceback import print_stack
-    try:
-        from billiard.process import current_process
-    except ImportError:
-        current_process = None
-    if app is None:
-        if getattr(_state._tls, 'current_app', None):
-            print('-- RETURNING TO CURRENT APP --')  # noqa+
-            print_stack()
-            return _state._tls.current_app
-        if not current_process or current_process()._name == 'MainProcess':
-            raise Exception('DEFAULT APP')
-        print('-- RETURNING TO DEFAULT APP --')      # noqa+
-        print_stack()
-        return _state.default_app
-    return app
-
-
-def enable_trace():
-    """Enable tracing of app instances."""
-    global app_or_default
-    app_or_default = _app_or_default_trace
-
-
-def disable_trace():
-    """Disable tracing of app instances."""
-    global app_or_default
-    app_or_default = _app_or_default
-
-if os.environ.get('CELERY_TRACE_APP'):  # pragma: no cover
-    enable_trace()
-else:
-    disable_trace()
+    return (app or _state.get_current_app()).bugreport()
 
 
 def shared_task(*args, **kwargs):
@@ -114,13 +54,13 @@ def shared_task(*args, **kwargs):
         def __inner(fun):
             name = options.get('name')
             # Set as shared task so that unfinalized apps,
-            # and future apps will load the task.
-            connect_on_app_finalize(
+            # and future apps will register a copy of this task.
+            _state.connect_on_app_finalize(
                 lambda app: app._task_from_fun(fun, **options)
             )
 
             # Force all finalized apps to take this task as well.
-            for app in _get_active_apps():
+            for app in _state._get_active_apps():
                 if app.finalized:
                     with app._finalize_mutex:
                         app._task_from_fun(fun, **options)
@@ -128,7 +68,7 @@ def shared_task(*args, **kwargs):
             # Return a proxy that always gets the task from the current
             # apps task registry.
             def task_by_cons():
-                app = current_app()
+                app = _state.get_current_app()
                 return app.tasks[
                     name or app.gen_task_name(fun.__name__, fun.__module__)
                 ]

+ 1 - 2
celery/app/trace.py

@@ -32,7 +32,6 @@ from kombu.utils.encoding import safe_repr, safe_str
 from celery import current_app, group
 from celery import states, signals
 from celery._state import _task_stack
-from celery.app import set_default_app
 from celery.app.task import Task as BaseTask, Context
 from celery.exceptions import Ignore, Reject, Retry, InvalidTaskError
 from celery.five import monotonic, text_t
@@ -563,7 +562,7 @@ def setup_worker_optimizations(app, hostname=None):
     # and means that only a single app can be used for workers
     # running in the same process.
     app.set_current()
-    set_default_app(app)
+    app.set_default()
 
     # evaluate all task classes by finalizing the app.
     app.finalize()

+ 2 - 2
celery/backends/base.py

@@ -25,7 +25,7 @@ from kombu.utils.url import maybe_sanitize_url
 
 from celery import states
 from celery import current_app, group, maybe_signature
-from celery.app import current_task
+from celery._state import get_current_task
 from celery.exceptions import (
     ChordError, TimeoutError, TaskRevokedError, ImproperlyConfigured,
 )
@@ -425,7 +425,7 @@ class Backend(object):
         return result
 
     def current_task_children(self, request=None):
-        request = request or getattr(current_task(), 'request', None)
+        request = request or getattr(get_current_task(), 'request', None)
         if request:
             return [r.as_tuple() for r in getattr(request, 'children', [])]
 

+ 1 - 2
celery/backends/rpc.py

@@ -12,9 +12,8 @@ from kombu.common import maybe_declare
 from kombu.utils.compat import register_after_fork
 from kombu.utils.objects import cached_property
 
-from celery import current_task
 from celery import states
-from celery._state import task_join_will_block
+from celery._state import current_task, task_join_will_block
 from celery.five import items, range
 
 from . import base

+ 2 - 2
t/unit/app/test_app.py

@@ -916,9 +916,9 @@ class test_debugging_utils:
     def test_enable_disable_trace(self):
         try:
             _app.enable_trace()
-            assert _app.app_or_default == _app._app_or_default_trace
+            assert _state.app_or_default == _state._app_or_default_trace
             _app.disable_trace()
-            assert _app.app_or_default == _app._app_or_default
+            assert _state.app_or_default == _state._app_or_default
         finally:
             _app.disable_trace()