__init__.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # -*- coding: utf-8 -*-
  2. """Celery Application."""
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import os
  5. from celery.local import Proxy
  6. from celery import _state
  7. from celery._state import (
  8. get_current_app as current_app,
  9. get_current_task as current_task,
  10. connect_on_app_finalize, set_default_app, _get_active_apps, _task_stack,
  11. )
  12. from .base import Celery, AppPickler
  13. __all__ = [
  14. 'Celery', 'AppPickler', 'default_app', 'app_or_default',
  15. 'bugreport', 'enable_trace', 'disable_trace', 'shared_task',
  16. 'set_default_app', 'current_app', 'current_task',
  17. 'push_current_task', 'pop_current_task',
  18. ]
  19. #: Proxy always returning the app set as default.
  20. default_app = Proxy(lambda: _state.default_app)
  21. #: Function returning the app provided or the default app if none.
  22. #:
  23. #: The environment variable :envvar:`CELERY_TRACE_APP` is used to
  24. #: trace app leaks. When enabled an exception is raised if there
  25. #: is no active app.
  26. app_or_default = None
  27. #: Function used to push a task to the thread local stack
  28. #: keeping track of the currently executing task.
  29. #: You must remember to pop the task after.
  30. push_current_task = _task_stack.push
  31. #: Function used to pop a task from the thread local stack
  32. #: keeping track of the currently executing task.
  33. pop_current_task = _task_stack.pop
  34. def bugreport(app=None):
  35. return (app or current_app()).bugreport()
  36. def _app_or_default(app=None):
  37. if app is None:
  38. return _state.get_current_app()
  39. return app
  40. def _app_or_default_trace(app=None): # pragma: no cover
  41. from traceback import print_stack
  42. try:
  43. from billiard.process import current_process
  44. except ImportError:
  45. current_process = None
  46. if app is None:
  47. if getattr(_state._tls, 'current_app', None):
  48. print('-- RETURNING TO CURRENT APP --') # noqa+
  49. print_stack()
  50. return _state._tls.current_app
  51. if not current_process or current_process()._name == 'MainProcess':
  52. raise Exception('DEFAULT APP')
  53. print('-- RETURNING TO DEFAULT APP --') # noqa+
  54. print_stack()
  55. return _state.default_app
  56. return app
  57. def enable_trace():
  58. global app_or_default
  59. app_or_default = _app_or_default_trace
  60. def disable_trace():
  61. global app_or_default
  62. app_or_default = _app_or_default
  63. if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover
  64. enable_trace()
  65. else:
  66. disable_trace()
  67. def shared_task(*args, **kwargs):
  68. """Create shared tasks (decorator).
  69. This can be used by library authors to create tasks that will work
  70. for any app environment.
  71. Returns:
  72. ~celery.local.Proxy: A proxy that always takes the task from the
  73. current apps task registry.
  74. Example:
  75. >>> from celery import Celery, shared_task
  76. >>> @shared_task
  77. ... def add(x, y):
  78. ... return x + y
  79. >>> app1 = Celery(broker='amqp://')
  80. >>> add.app is app1
  81. True
  82. >>> app2 = Celery(broker='redis://')
  83. >>> add.app is app2
  84. """
  85. def create_shared_task(**options):
  86. def __inner(fun):
  87. name = options.get('name')
  88. # Set as shared task so that unfinalized apps,
  89. # and future apps will load the task.
  90. connect_on_app_finalize(
  91. lambda app: app._task_from_fun(fun, **options)
  92. )
  93. # Force all finalized apps to take this task as well.
  94. for app in _get_active_apps():
  95. if app.finalized:
  96. with app._finalize_mutex:
  97. app._task_from_fun(fun, **options)
  98. # Return a proxy that always gets the task from the current
  99. # apps task registry.
  100. def task_by_cons():
  101. app = current_app()
  102. return app.tasks[
  103. name or app.gen_task_name(fun.__name__, fun.__module__)
  104. ]
  105. return Proxy(task_by_cons)
  106. return __inner
  107. if len(args) == 1 and callable(args[0]):
  108. return create_shared_task(**kwargs)(args[0])
  109. return create_shared_task(*args, **kwargs)