state.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from __future__ import absolute_import
  2. import threading
  3. from celery.local import Proxy
  4. from celery.utils.threads import LocalStack
  5. default_app = None
  6. class _TLS(threading.local):
  7. #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
  8. #: sets this, so it will always contain the last instantiated app,
  9. #: and is the default app returned by :func:`app_or_default`.
  10. current_app = None
  11. _tls = _TLS()
  12. _task_stack = LocalStack()
  13. def set_default_app(app):
  14. global default_app
  15. if default_app is None:
  16. default_app = app
  17. def get_current_app():
  18. if default_app is None:
  19. # creates the default app, but we want to defer that.
  20. import celery.app # noqa
  21. return _tls.current_app or default_app
  22. def get_current_task():
  23. """Currently executing task."""
  24. return _task_stack.top
  25. def get_current_worker_task():
  26. """Currently executing task, that was applied by the worker.
  27. This is used to differentiate between the actual task
  28. executed by the worker and any task that was called within
  29. a task (using ``task.__call__`` or ``task.apply``)
  30. """
  31. for task in reversed(_task_stack.stack):
  32. if not task.request.called_directly:
  33. return task
  34. current_app = Proxy(get_current_app)
  35. current_task = Proxy(get_current_task)