__init__.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app
  4. ~~~~~~~~~~
  5. Celery Application.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import os
  10. from celery.local import Proxy
  11. from celery import _state
  12. from celery._state import ( # noqa
  13. set_default_app,
  14. get_current_app as current_app,
  15. get_current_task as current_task,
  16. _get_active_apps,
  17. )
  18. from celery.utils import gen_task_name
  19. from .builtins import shared_task as _shared_task
  20. from .base import Celery, AppPickler # noqa
  21. #: Proxy always returning the app set as default.
  22. default_app = Proxy(lambda: _state.default_app)
  23. #: Function returning the app provided or the default app if none.
  24. #:
  25. #: The environment variable :envvar:`CELERY_TRACE_APP` is used to
  26. #: trace app leaks. When enabled an exception is raised if there
  27. #: is no active app.
  28. app_or_default = None
  29. #: The 'default' loader is the default loader used by old applications.
  30. default_loader = os.environ.get('CELERY_LOADER') or 'default'
  31. def bugreport():
  32. return current_app().bugreport()
  33. def _app_or_default(app=None):
  34. if app is None:
  35. return _state.get_current_app()
  36. return app
  37. def _app_or_default_trace(app=None): # pragma: no cover
  38. from traceback import print_stack
  39. from billiard import current_process
  40. if app is None:
  41. if getattr(_state._tls, 'current_app', None):
  42. print('-- RETURNING TO CURRENT APP --') # noqa+
  43. print_stack()
  44. return _state._tls.current_app
  45. if current_process()._name == 'MainProcess':
  46. raise Exception('DEFAULT APP')
  47. print('-- RETURNING TO DEFAULT APP --') # noqa+
  48. print_stack()
  49. return _state.default_app
  50. return app
  51. def enable_trace():
  52. global app_or_default
  53. app_or_default = _app_or_default_trace
  54. def disable_trace():
  55. global app_or_default
  56. app_or_default = _app_or_default
  57. if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover
  58. enable_trace()
  59. else:
  60. disable_trace()
  61. App = Celery # XXX Compat
  62. def shared_task(*args, **kwargs):
  63. """Task decorator that creates shared tasks,
  64. and returns a proxy that always returns the task from the current apps
  65. task registry.
  66. This can be used by library authors to create tasks that will work
  67. for any app environment.
  68. Example:
  69. >>> from celery import Celery, shared_task
  70. >>> @shared_task
  71. ... def add(x, y):
  72. ... return x + y
  73. >>> app1 = Celery(broker='amqp://')
  74. >>> add.app is app1
  75. True
  76. >>> app2 = Celery(broker='redis://')
  77. >>> add.app is app2
  78. """
  79. def create_shared_task(**options):
  80. def __inner(fun):
  81. name = options.get('name')
  82. # Set as shared task so that unfinalized apps,
  83. # and future apps will load the task.
  84. _shared_task(lambda app: app._task_from_fun(fun, **options))
  85. # Force all finalized apps to take this task as well.
  86. for app in _get_active_apps():
  87. if app.finalized:
  88. with app._finalize_mutex:
  89. app._task_from_fun(fun, **options)
  90. # Returns a proxy that always gets the task from the current
  91. # apps task registry.
  92. def task_by_cons():
  93. app = current_app()
  94. return app.tasks[name or gen_task_name(app,
  95. fun.__name__, fun.__module__)]
  96. return Proxy(task_by_cons)
  97. return __inner
  98. if len(args) == 1 and callable(args[0]):
  99. return create_shared_task(**kwargs)(args[0])
  100. return create_shared_task(**kwargs)