_state.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. # -*- coding: utf-8 -*-
  2. """Internal state.
  3. This is an internal module containing thread state
  4. like the ``current_app``, and ``current_task``.
  5. This module shouldn't be used directly.
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import os
  9. import sys
  10. import threading
  11. import weakref
  12. from celery.local import Proxy
  13. from celery.utils.threads import LocalStack
  14. __all__ = (
  15. 'set_default_app', 'get_current_app', 'get_current_task',
  16. 'get_current_worker_task', 'current_app', 'current_task',
  17. 'connect_on_app_finalize',
  18. )
  19. #: Global default app used when no current app.
  20. default_app = None
  21. #: Function returning the app provided or the default app if none.
  22. #:
  23. #: The environment variable :envvar:`CELERY_TRACE_APP` is used to
  24. #: trace app leaks. When enabled an exception is raised if there
  25. #: is no active app.
  26. app_or_default = None
  27. #: List of all app instances (weakrefs), mustn't be used directly.
  28. _apps = weakref.WeakSet()
  29. #: Global set of functions to call whenever a new app is finalized.
  30. #: Shared tasks, and built-in tasks are created by adding callbacks here.
  31. _on_app_finalizers = set()
  32. _task_join_will_block = False
  33. def connect_on_app_finalize(callback):
  34. """Connect callback to be called when any app is finalized."""
  35. _on_app_finalizers.add(callback)
  36. return callback
  37. def _announce_app_finalized(app):
  38. callbacks = set(_on_app_finalizers)
  39. for callback in callbacks:
  40. callback(app)
  41. def _set_task_join_will_block(blocks):
  42. global _task_join_will_block
  43. _task_join_will_block = blocks
  44. def task_join_will_block():
  45. return _task_join_will_block
  46. class _TLS(threading.local):
  47. #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
  48. #: sets this, so it will always contain the last instantiated app,
  49. #: and is the default app returned by :func:`app_or_default`.
  50. current_app = None
  51. _tls = _TLS()
  52. _task_stack = LocalStack()
  53. #: Function used to push a task to the thread local stack
  54. #: keeping track of the currently executing task.
  55. #: You must remember to pop the task after.
  56. push_current_task = _task_stack.push
  57. #: Function used to pop a task from the thread local stack
  58. #: keeping track of the currently executing task.
  59. pop_current_task = _task_stack.pop
  60. def set_default_app(app):
  61. """Set default app."""
  62. global default_app
  63. default_app = app
  64. def _get_current_app():
  65. if default_app is None:
  66. #: creates the global fallback app instance.
  67. from celery.app.base import Celery
  68. set_default_app(Celery(
  69. 'default', fixups=[], set_as_current=False,
  70. loader=os.environ.get('CELERY_LOADER') or 'default',
  71. ))
  72. return _tls.current_app or default_app
  73. def _set_current_app(app):
  74. _tls.current_app = app
  75. if os.environ.get('C_STRICT_APP'): # pragma: no cover
  76. def get_current_app():
  77. """Return the current app."""
  78. raise RuntimeError('USES CURRENT APP')
  79. elif os.environ.get('C_WARN_APP'): # pragma: no cover
  80. def get_current_app(): # noqa
  81. import traceback
  82. print('-- USES CURRENT_APP', file=sys.stderr) # noqa+
  83. traceback.print_stack(file=sys.stderr)
  84. return _get_current_app()
  85. else:
  86. get_current_app = _get_current_app
  87. def get_current_task():
  88. """Currently executing task."""
  89. return _task_stack.top
  90. def get_current_worker_task():
  91. """Currently executing task, that was applied by the worker.
  92. This is used to differentiate between the actual task
  93. executed by the worker and any task that was called within
  94. a task (using ``task.__call__`` or ``task.apply``)
  95. """
  96. for task in reversed(_task_stack.stack):
  97. if not task.request.called_directly:
  98. return task
  99. #: Proxy to current app.
  100. current_app = Proxy(get_current_app)
  101. #: Proxy to current task.
  102. current_task = Proxy(get_current_task)
  103. def _register_app(app):
  104. _apps.add(app)
  105. def _deregister_app(app):
  106. _apps.discard(app)
  107. def _get_active_apps():
  108. return _apps
  109. def _app_or_default(app=None):
  110. if app is None:
  111. return get_current_app()
  112. return app
  113. def _app_or_default_trace(app=None): # pragma: no cover
  114. from traceback import print_stack
  115. try:
  116. from billiard.process import current_process
  117. except ImportError:
  118. current_process = None
  119. if app is None:
  120. if getattr(_tls, 'current_app', None):
  121. print('-- RETURNING TO CURRENT APP --') # noqa+
  122. print_stack()
  123. return _tls.current_app
  124. if not current_process or current_process()._name == 'MainProcess':
  125. raise Exception('DEFAULT APP')
  126. print('-- RETURNING TO DEFAULT APP --') # noqa+
  127. print_stack()
  128. return default_app
  129. return app
  130. def enable_trace():
  131. """Enable tracing of app instances."""
  132. global app_or_default
  133. app_or_default = _app_or_default_trace
  134. def disable_trace():
  135. """Disable tracing of app instances."""
  136. global app_or_default
  137. app_or_default = _app_or_default
  138. if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover
  139. enable_trace()
  140. else:
  141. disable_trace()