| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 | # -*- coding: utf-8 -*-"""Internal state.This is an internal module containing thread statelike the ``current_app``, and ``current_task``.This module shouldn't be used directly."""import osimport sysimport threadingimport weakreffrom celery.local import Proxyfrom celery.utils.threads import LocalStack__all__ = [    'set_default_app', 'get_current_app', 'get_current_task',    'get_current_worker_task', 'current_app', 'current_task',    'connect_on_app_finalize',]#: Global default app used when no current app.default_app = None#: List of all app instances (weakrefs), mustn't be used directly._apps = weakref.WeakSet()#: Global set of functions to call whenever a new app is finalized.#: Shared tasks, and built-in tasks are created by adding callbacks here._on_app_finalizers = set()_task_join_will_block = Falsedef connect_on_app_finalize(callback):    """Connect callback to be called when any app is finalized."""    _on_app_finalizers.add(callback)    return callbackdef _announce_app_finalized(app):    callbacks = set(_on_app_finalizers)    for callback in callbacks:        callback(app)def _set_task_join_will_block(blocks):    global _task_join_will_block    _task_join_will_block = blocksdef task_join_will_block():    return _task_join_will_blockclass _TLS(threading.local):    #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute    #: sets this, so it will always contain the last instantiated app,    #: and is the default app returned by :func:`app_or_default`.    current_app = None_tls = _TLS()_task_stack = LocalStack()def set_default_app(app):    """Set default app."""    global default_app    default_app = appdef _get_current_app():    if default_app is None:        #: creates the global fallback app instance.        from celery.app import Celery        set_default_app(Celery(            'default', fixups=[], set_as_current=False,            loader=os.environ.get('CELERY_LOADER') or 'default',        ))    return _tls.current_app or default_appdef _set_current_app(app):    _tls.current_app = appif os.environ.get('C_STRICT_APP'):  # pragma: no cover    def get_current_app():        """Return the current app."""        raise RuntimeError('USES CURRENT APP')elif os.environ.get('C_WARN_APP'):  # pragma: no cover    def get_current_app():  # noqa        import traceback        print('-- USES CURRENT_APP', file=sys.stderr)  # noqa+        traceback.print_stack(file=sys.stderr)        return _get_current_app()else:    get_current_app = _get_current_appdef get_current_task():    """Currently executing task."""    return _task_stack.topdef get_current_worker_task():    """Currently executing task, that was applied by the worker.    This is used to differentiate between the actual task    executed by the worker and any task that was called within    a task (using ``task.__call__`` or ``task.apply``)    """    for task in reversed(_task_stack.stack):        if not task.request.called_directly:            return task#: Proxy to current app.current_app = Proxy(get_current_app)#: Proxy to current task.current_task = Proxy(get_current_task)def _register_app(app):    _apps.add(app)def _deregister_app(app):    _apps.discard(app)def _get_active_apps():    return _apps
 |