123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- # -*- coding: utf-8 -*-
- """
- celery.app
- ~~~~~~~~~~
- Celery Application.
- """
- from __future__ import absolute_import
- import os
- from celery.local import Proxy
- from celery import _state
- from celery._state import ( # noqa
- set_default_app,
- get_current_app as current_app,
- get_current_task as current_task,
- _get_active_apps,
- )
- from celery.utils import gen_task_name
- from .builtins import shared_task as _shared_task
- from .base import Celery, AppPickler # noqa
- #: Proxy always returning the app set as default.
- default_app = Proxy(lambda: _state.default_app)
- #: Function returning the app provided or the default app if none.
- #:
- #: The environment variable :envvar:`CELERY_TRACE_APP` is used to
- #: trace app leaks. When enabled an exception is raised if there
- #: is no active app.
- app_or_default = None
- #: The 'default' loader is the default loader used by old applications.
- default_loader = os.environ.get('CELERY_LOADER') or 'default'
- #: Global fallback app instance.
- set_default_app(Celery('default', loader=default_loader,
- set_as_current=False,
- accept_magic_kwargs=True))
- def bugreport():
- return current_app().bugreport()
- def _app_or_default(app=None):
- if app is None:
- return _state.get_current_app()
- return app
- def _app_or_default_trace(app=None): # pragma: no cover
- from traceback import print_stack
- from billiard import current_process
- if app is None:
- if getattr(_state._tls, 'current_app', None):
- print('-- RETURNING TO CURRENT APP --') # noqa+
- print_stack()
- return _state._tls.current_app
- if current_process()._name == 'MainProcess':
- raise Exception('DEFAULT APP')
- print('-- RETURNING TO DEFAULT APP --') # noqa+
- print_stack()
- return _state.default_app
- return app
- def enable_trace():
- global app_or_default
- app_or_default = _app_or_default_trace
- def disable_trace():
- global app_or_default
- app_or_default = _app_or_default
- if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover
- enable_trace()
- else:
- disable_trace()
- App = Celery # XXX Compat
- def shared_task(*args, **kwargs):
- """Task decorator that creates shared tasks,
- and returns a proxy that always returns the task from the current apps
- task registry.
- This can be used by library authors to create tasks that will work
- for any app environment.
- Example:
- >>> from celery import Celery, shared_task
- >>> @shared_task
- ... def add(x, y):
- ... return x + y
- >>> app1 = Celery(broker='amqp://')
- >>> add.app is app1
- True
- >>> app2 = Celery(broker='redis://')
- >>> add.app is app2
- """
- def create_shared_task(**options):
- def __inner(fun):
- name = options.get('name')
- # Set as shared task so that unfinalized apps,
- # and future apps will load the task.
- _shared_task(lambda app: app._task_from_fun(fun, **options))
- # Force all finalized apps to take this task as well.
- for app in _get_active_apps():
- if app.finalized:
- with app._finalize_mutex:
- app._task_from_fun(fun, **options)
- # Returns a proxy that always gets the task from the current
- # apps task registry.
- def task_by_cons():
- app = current_app()
- return app.tasks[name or gen_task_name(app,
- fun.__name__, fun.__module__)]
- return Proxy(task_by_cons)
- return __inner
- if len(args) == 1 and callable(args[0]):
- return create_shared_task(**kwargs)(args[0])
- return create_shared_task(**kwargs)
|