1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- # -*- coding: utf-8 -*-
- """Celery Application."""
- from __future__ import absolute_import, print_function, unicode_literals
- from celery.local import Proxy
- from celery import _state
- from celery._state import (
- app_or_default, enable_trace, disable_trace,
- push_current_task, pop_current_task,
- )
- from .base import Celery
- from .utils import AppPickler
- __all__ = (
- 'Celery', 'AppPickler', 'app_or_default', 'default_app',
- 'bugreport', 'enable_trace', 'disable_trace', 'shared_task',
- 'push_current_task', 'pop_current_task',
- )
- #: Proxy always returning the app set as default.
- default_app = Proxy(lambda: _state.default_app)
- def bugreport(app=None):
- """Return information useful in bug reports."""
- return (app or _state.get_current_app()).bugreport()
- def shared_task(*args, **kwargs):
- """Create shared task (decorator).
- This can be used by library authors to create tasks that'll work
- for any app environment.
- Returns:
- ~celery.local.Proxy: A proxy that always takes the task from the
- current apps task registry.
- Example:
- >>> from celery import Celery, shared_task
- >>> @shared_task
- ... def add(x, y):
- ... return x + y
- ...
- >>> app1 = Celery(broker='amqp://')
- >>> add.app is app1
- True
- >>> app2 = Celery(broker='redis://')
- >>> add.app is app2
- True
- """
- def create_shared_task(**options):
- def __inner(fun):
- name = options.get('name')
- # Set as shared task so that unfinalized apps,
- # and future apps will register a copy of this task.
- _state.connect_on_app_finalize(
- lambda app: app._task_from_fun(fun, **options)
- )
- # Force all finalized apps to take this task as well.
- for app in _state._get_active_apps():
- if app.finalized:
- with app._finalize_mutex:
- app._task_from_fun(fun, **options)
- # Return a proxy that always gets the task from the current
- # apps task registry.
- def task_by_cons():
- app = _state.get_current_app()
- return app.tasks[
- name or app.gen_task_name(fun.__name__, fun.__module__)
- ]
- return Proxy(task_by_cons)
- return __inner
- if len(args) == 1 and callable(args[0]):
- return create_shared_task(**kwargs)(args[0])
- return create_shared_task(*args, **kwargs)
|