1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- # -*- coding: utf-8 -*-
- """
- celery._state
- ~~~~~~~~~~~~~~~
- This is an internal module containing thread state
- like the ``current_app``, and ``current_task``.
- This module shouldn't be used directly.
- """
- from __future__ import absolute_import
- import threading
- import weakref
- from celery.local import Proxy
- from celery.utils.threads import LocalStack
- #: Global default app used when no current app.
- default_app = None
- #: List of all app instances (weakrefs), must not be used directly.
- _apps = set()
- class _TLS(threading.local):
- #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
- #: sets this, so it will always contain the last instantiated app,
- #: and is the default app returned by :func:`app_or_default`.
- current_app = None
- _tls = _TLS()
- _task_stack = LocalStack()
- def set_default_app(app):
- global default_app
- default_app = app
- def get_current_app():
- if default_app is None:
- #: creates the global fallback app instance.
- from celery.app import Celery, default_loader
- set_default_app(Celery('default', loader=default_loader,
- set_as_current=False,
- accept_magic_kwargs=True))
- return _tls.current_app or default_app
- def get_current_task():
- """Currently executing task."""
- return _task_stack.top
- def get_current_worker_task():
- """Currently executing task, that was applied by the worker.
- This is used to differentiate between the actual task
- executed by the worker and any task that was called within
- a task (using ``task.__call__`` or ``task.apply``)
- """
- for task in reversed(_task_stack.stack):
- if not task.request.called_directly:
- return task
- #: Proxy to current app.
- current_app = Proxy(get_current_app)
- #: Proxy to current task.
- current_task = Proxy(get_current_task)
- def _register_app(app):
- _apps.add(weakref.ref(app))
- def _get_active_apps():
- dirty = []
- try:
- for appref in _apps:
- app = appref()
- if app is None:
- dirty.append(appref)
- else:
- yield app
- finally:
- while dirty:
- _apps.discard(dirty.pop())
|