_state.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # -*- coding: utf-8 -*-
  2. """Internal state.
  3. This is an internal module containing thread state
  4. like the ``current_app``, and ``current_task``.
  5. This module shouldn't be used directly.
  6. """
  7. import os
  8. import sys
  9. import threading
  10. import weakref
  11. from celery.local import Proxy
  12. from celery.utils.threads import LocalStack
  13. __all__ = [
  14. 'set_default_app', 'get_current_app', 'get_current_task',
  15. 'get_current_worker_task', 'current_app', 'current_task',
  16. 'connect_on_app_finalize',
  17. ]
  18. #: Global default app used when no current app.
  19. default_app = None
  20. #: Function returning the app provided or the default app if none.
  21. #:
  22. #: The environment variable :envvar:`CELERY_TRACE_APP` is used to
  23. #: trace app leaks. When enabled an exception is raised if there
  24. #: is no active app.
  25. app_or_default = None
  26. #: List of all app instances (weakrefs), mustn't be used directly.
  27. _apps = weakref.WeakSet()
  28. #: Global set of functions to call whenever a new app is finalized.
  29. #: Shared tasks, and built-in tasks are created by adding callbacks here.
  30. _on_app_finalizers = set()
  31. _task_join_will_block = False
  32. def connect_on_app_finalize(callback):
  33. """Connect callback to be called when any app is finalized."""
  34. _on_app_finalizers.add(callback)
  35. return callback
  36. def _announce_app_finalized(app):
  37. callbacks = set(_on_app_finalizers)
  38. for callback in callbacks:
  39. callback(app)
  40. def _set_task_join_will_block(blocks):
  41. global _task_join_will_block
  42. _task_join_will_block = blocks
  43. def task_join_will_block():
  44. return _task_join_will_block
  45. class _TLS(threading.local):
  46. #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
  47. #: sets this, so it will always contain the last instantiated app,
  48. #: and is the default app returned by :func:`app_or_default`.
  49. current_app = None
  50. _tls = _TLS()
  51. _task_stack = LocalStack()
  52. #: Function used to push a task to the thread local stack
  53. #: keeping track of the currently executing task.
  54. #: You must remember to pop the task after.
  55. push_current_task = _task_stack.push
  56. #: Function used to pop a task from the thread local stack
  57. #: keeping track of the currently executing task.
  58. pop_current_task = _task_stack.pop
  59. def set_default_app(app):
  60. """Set default app."""
  61. global default_app
  62. default_app = app
  63. def _get_current_app():
  64. if default_app is None:
  65. #: creates the global fallback app instance.
  66. from celery.app.base import Celery
  67. set_default_app(Celery(
  68. 'default', fixups=[], set_as_current=False,
  69. loader=os.environ.get('CELERY_LOADER') or 'default',
  70. ))
  71. return _tls.current_app or default_app
  72. def _set_current_app(app):
  73. _tls.current_app = app
  74. if os.environ.get('C_STRICT_APP'): # pragma: no cover
  75. def get_current_app():
  76. """Return the current app."""
  77. raise RuntimeError('USES CURRENT APP')
  78. elif os.environ.get('C_WARN_APP'): # pragma: no cover
  79. def get_current_app(): # noqa
  80. import traceback
  81. print('-- USES CURRENT_APP', file=sys.stderr) # noqa+
  82. traceback.print_stack(file=sys.stderr)
  83. return _get_current_app()
  84. else:
  85. get_current_app = _get_current_app
  86. def get_current_task():
  87. """Currently executing task."""
  88. return _task_stack.top
  89. def get_current_worker_task():
  90. """Currently executing task, that was applied by the worker.
  91. This is used to differentiate between the actual task
  92. executed by the worker and any task that was called within
  93. a task (using ``task.__call__`` or ``task.apply``)
  94. """
  95. for task in reversed(_task_stack.stack):
  96. if not task.request.called_directly:
  97. return task
  98. #: Proxy to current app.
  99. current_app = Proxy(get_current_app)
  100. #: Proxy to current task.
  101. current_task = Proxy(get_current_task)
  102. def _register_app(app):
  103. _apps.add(app)
  104. def _deregister_app(app):
  105. _apps.discard(app)
  106. def _get_active_apps():
  107. return _apps
  108. def _app_or_default(app=None):
  109. if app is None:
  110. return get_current_app()
  111. return app
  112. def _app_or_default_trace(app=None): # pragma: no cover
  113. from traceback import print_stack
  114. try:
  115. from billiard.process import current_process
  116. except ImportError:
  117. current_process = None
  118. if app is None:
  119. if getattr(_tls, 'current_app', None):
  120. print('-- RETURNING TO CURRENT APP --') # noqa+
  121. print_stack()
  122. return _tls.current_app
  123. if not current_process or current_process()._name == 'MainProcess':
  124. raise Exception('DEFAULT APP')
  125. print('-- RETURNING TO DEFAULT APP --') # noqa+
  126. print_stack()
  127. return default_app
  128. return app
  129. def enable_trace():
  130. """Enable tracing of app instances."""
  131. global app_or_default
  132. app_or_default = _app_or_default_trace
  133. def disable_trace():
  134. """Disable tracing of app instances."""
  135. global app_or_default
  136. app_or_default = _app_or_default
  137. if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover
  138. enable_trace()
  139. else:
  140. disable_trace()