__init__.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # -*- coding: utf-8 -*-
  2. """Celery Application."""
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. from celery.local import Proxy
  5. from celery import _state
  6. from celery._state import (
  7. app_or_default, enable_trace, disable_trace,
  8. push_current_task, pop_current_task,
  9. )
  10. from .base import Celery
  11. from .utils import AppPickler
  12. __all__ = (
  13. 'Celery', 'AppPickler', 'app_or_default', 'default_app',
  14. 'bugreport', 'enable_trace', 'disable_trace', 'shared_task',
  15. 'push_current_task', 'pop_current_task',
  16. )
  17. #: Proxy always returning the app set as default.
  18. default_app = Proxy(lambda: _state.default_app)
  19. def bugreport(app=None):
  20. """Return information useful in bug reports."""
  21. return (app or _state.get_current_app()).bugreport()
  22. def shared_task(*args, **kwargs):
  23. """Create shared task (decorator).
  24. This can be used by library authors to create tasks that'll work
  25. for any app environment.
  26. Returns:
  27. ~celery.local.Proxy: A proxy that always takes the task from the
  28. current apps task registry.
  29. Example:
  30. >>> from celery import Celery, shared_task
  31. >>> @shared_task
  32. ... def add(x, y):
  33. ... return x + y
  34. ...
  35. >>> app1 = Celery(broker='amqp://')
  36. >>> add.app is app1
  37. True
  38. >>> app2 = Celery(broker='redis://')
  39. >>> add.app is app2
  40. True
  41. """
  42. def create_shared_task(**options):
  43. def __inner(fun):
  44. name = options.get('name')
  45. # Set as shared task so that unfinalized apps,
  46. # and future apps will register a copy of this task.
  47. _state.connect_on_app_finalize(
  48. lambda app: app._task_from_fun(fun, **options)
  49. )
  50. # Force all finalized apps to take this task as well.
  51. for app in _state._get_active_apps():
  52. if app.finalized:
  53. with app._finalize_mutex:
  54. app._task_from_fun(fun, **options)
  55. # Return a proxy that always gets the task from the current
  56. # apps task registry.
  57. def task_by_cons():
  58. app = _state.get_current_app()
  59. return app.tasks[
  60. name or app.gen_task_name(fun.__name__, fun.__module__)
  61. ]
  62. return Proxy(task_by_cons)
  63. return __inner
  64. if len(args) == 1 and callable(args[0]):
  65. return create_shared_task(**kwargs)(args[0])
  66. return create_shared_task(*args, **kwargs)