__init__.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app
  4. ~~~~~~~~~~
  5. Celery Application.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. from celery.local import Proxy
  10. from celery import _state
  11. from celery._state import ( # noqa
  12. set_default_app,
  13. get_current_app as current_app,
  14. get_current_task as current_task,
  15. _get_active_apps,
  16. )
  17. from celery.utils import gen_task_name
  18. from .builtins import shared_task as _shared_task
  19. from .base import Celery, AppPickler # noqa
  20. #: Proxy always returning the app set as default.
  21. default_app = Proxy(lambda: _state.default_app)
  22. #: Function returning the app provided or the default app if none.
  23. #:
  24. #: The environment variable :envvar:`CELERY_TRACE_APP` is used to
  25. #: trace app leaks. When enabled an exception is raised if there
  26. #: is no active app.
  27. app_or_default = None
  28. #: The 'default' loader is the default loader used by old applications.
  29. default_loader = os.environ.get('CELERY_LOADER') or 'default'
  30. #: Global fallback app instance.
  31. set_default_app(Celery('default', loader=default_loader,
  32. set_as_current=False,
  33. accept_magic_kwargs=True))
  34. def bugreport():
  35. return current_app().bugreport()
  36. def _app_or_default(app=None):
  37. if app is None:
  38. return _state.get_current_app()
  39. return app
  40. def _app_or_default_trace(app=None): # pragma: no cover
  41. from traceback import print_stack
  42. from billiard import current_process
  43. if app is None:
  44. if getattr(_state._tls, 'current_app', None):
  45. print('-- RETURNING TO CURRENT APP --') # noqa+
  46. print_stack()
  47. return _state._tls.current_app
  48. if current_process()._name == 'MainProcess':
  49. raise Exception('DEFAULT APP')
  50. print('-- RETURNING TO DEFAULT APP --') # noqa+
  51. print_stack()
  52. return _state.default_app
  53. return app
  54. def enable_trace():
  55. global app_or_default
  56. app_or_default = _app_or_default_trace
  57. def disable_trace():
  58. global app_or_default
  59. app_or_default = _app_or_default
  60. if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover
  61. enable_trace()
  62. else:
  63. disable_trace()
  64. App = Celery # XXX Compat
  65. def shared_task(*args, **kwargs):
  66. """Task decorator that creates shared tasks,
  67. and returns a proxy that always returns the task from the current apps
  68. task registry.
  69. This can be used by library authors to create tasks that will work
  70. for any app environment.
  71. Example:
  72. >>> from celery import Celery, shared_task
  73. >>> @shared_task
  74. ... def add(x, y):
  75. ... return x + y
  76. >>> app1 = Celery(broker='amqp://')
  77. >>> add.app is app1
  78. True
  79. >>> app2 = Celery(broker='redis://')
  80. >>> add.app is app2
  81. """
  82. def create_shared_task(**options):
  83. def __inner(fun):
  84. name = options.get('name')
  85. # Set as shared task so that unfinalized apps,
  86. # and future apps will load the task.
  87. _shared_task(lambda app: app._task_from_fun(fun, **options))
  88. # Force all finalized apps to take this task as well.
  89. for app in _get_active_apps():
  90. if app.finalized:
  91. with app._finalize_mutex:
  92. app._task_from_fun(fun, **options)
  93. # Returns a proxy that always gets the task from the current
  94. # apps task registry.
  95. def task_by_cons():
  96. app = current_app()
  97. return app.tasks[name or gen_task_name(app,
  98. fun.__name__, fun.__module__)]
  99. return Proxy(task_by_cons)
  100. return __inner
  101. if len(args) == 1 and callable(args[0]):
  102. return create_shared_task(**kwargs)(args[0])
  103. return create_shared_task(**kwargs)