_state.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # -*- coding: utf-8 -*-
  2. """Internal state.
  3. This is an internal module containing thread state
  4. like the ``current_app``, and ``current_task``.
  5. This module shouldn't be used directly.
  6. """
  7. import os
  8. import sys
  9. import threading
  10. import weakref
  11. from celery.local import Proxy
  12. from celery.utils.threads import LocalStack
  13. __all__ = [
  14. 'set_default_app', 'get_current_app', 'get_current_task',
  15. 'get_current_worker_task', 'current_app', 'current_task',
  16. 'connect_on_app_finalize',
  17. ]
  18. #: Global default app used when no current app.
  19. default_app = None
  20. #: List of all app instances (weakrefs), mustn't be used directly.
  21. _apps = weakref.WeakSet()
  22. #: Global set of functions to call whenever a new app is finalized.
  23. #: Shared tasks, and built-in tasks are created by adding callbacks here.
  24. _on_app_finalizers = set()
  25. _task_join_will_block = False
  26. def connect_on_app_finalize(callback):
  27. """Connect callback to be called when any app is finalized."""
  28. _on_app_finalizers.add(callback)
  29. return callback
  30. def _announce_app_finalized(app):
  31. callbacks = set(_on_app_finalizers)
  32. for callback in callbacks:
  33. callback(app)
  34. def _set_task_join_will_block(blocks):
  35. global _task_join_will_block
  36. _task_join_will_block = blocks
  37. def task_join_will_block():
  38. return _task_join_will_block
  39. class _TLS(threading.local):
  40. #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
  41. #: sets this, so it will always contain the last instantiated app,
  42. #: and is the default app returned by :func:`app_or_default`.
  43. current_app = None
  44. _tls = _TLS()
  45. _task_stack = LocalStack()
  46. def set_default_app(app):
  47. """Set default app."""
  48. global default_app
  49. default_app = app
  50. def _get_current_app():
  51. if default_app is None:
  52. #: creates the global fallback app instance.
  53. from celery.app import Celery
  54. set_default_app(Celery(
  55. 'default', fixups=[], set_as_current=False,
  56. loader=os.environ.get('CELERY_LOADER') or 'default',
  57. ))
  58. return _tls.current_app or default_app
  59. def _set_current_app(app):
  60. _tls.current_app = app
  61. if os.environ.get('C_STRICT_APP'): # pragma: no cover
  62. def get_current_app():
  63. """Return the current app."""
  64. raise RuntimeError('USES CURRENT APP')
  65. elif os.environ.get('C_WARN_APP'): # pragma: no cover
  66. def get_current_app(): # noqa
  67. import traceback
  68. print('-- USES CURRENT_APP', file=sys.stderr) # noqa+
  69. traceback.print_stack(file=sys.stderr)
  70. return _get_current_app()
  71. else:
  72. get_current_app = _get_current_app
  73. def get_current_task():
  74. """Currently executing task."""
  75. return _task_stack.top
  76. def get_current_worker_task():
  77. """Currently executing task, that was applied by the worker.
  78. This is used to differentiate between the actual task
  79. executed by the worker and any task that was called within
  80. a task (using ``task.__call__`` or ``task.apply``)
  81. """
  82. for task in reversed(_task_stack.stack):
  83. if not task.request.called_directly:
  84. return task
  85. #: Proxy to current app.
  86. current_app = Proxy(get_current_app)
  87. #: Proxy to current task.
  88. current_task = Proxy(get_current_task)
  89. def _register_app(app):
  90. _apps.add(app)
  91. def _deregister_app(app):
  92. _apps.discard(app)
  93. def _get_active_apps():
  94. return _apps