_state.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery._state
  4. ~~~~~~~~~~~~~~~
  5. This is an internal module containing thread state
  6. like the ``current_app``, and ``current_task``.
  7. This module shouldn't be used directly.
  8. """
  9. from __future__ import absolute_import
  10. import threading
  11. import weakref
  12. from celery.local import Proxy
  13. from celery.utils.threads import LocalStack
  14. #: Global default app used when no current app.
  15. default_app = None
  16. #: List of all app instances (weakrefs), must not be used directly.
  17. _apps = set()
  18. class _TLS(threading.local):
  19. #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
  20. #: sets this, so it will always contain the last instantiated app,
  21. #: and is the default app returned by :func:`app_or_default`.
  22. current_app = None
  23. _tls = _TLS()
  24. _task_stack = LocalStack()
  25. def set_default_app(app):
  26. global default_app
  27. if default_app is None:
  28. default_app = app
  29. def get_current_app():
  30. if default_app is None:
  31. # creates the default app, but we want to defer that.
  32. import celery.app # noqa
  33. return _tls.current_app or default_app
  34. def get_current_task():
  35. """Currently executing task."""
  36. return _task_stack.top
  37. def get_current_worker_task():
  38. """Currently executing task, that was applied by the worker.
  39. This is used to differentiate between the actual task
  40. executed by the worker and any task that was called within
  41. a task (using ``task.__call__`` or ``task.apply``)
  42. """
  43. for task in reversed(_task_stack.stack):
  44. if not task.request.called_directly:
  45. return task
  46. #: Proxy to current app.
  47. current_app = Proxy(get_current_app)
  48. #: Proxy to current task.
  49. current_task = Proxy(get_current_task)
  50. def _register_app(app):
  51. _apps.add(weakref.ref(app))
  52. def _get_active_apps():
  53. dirty = []
  54. try:
  55. for appref in _apps:
  56. app = appref()
  57. if app is None:
  58. dirty.append(appref)
  59. else:
  60. yield app
  61. finally:
  62. while dirty:
  63. _apps.discard(dirty.pop())