123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743 |
- # -*- coding: utf-8 -*-
- """
- celery.app.base
- ~~~~~~~~~~~~~~~
- Actual App instance implementation.
- """
- from __future__ import absolute_import
- import os
- import threading
- import warnings
- from collections import defaultdict, deque
- from copy import deepcopy
- from operator import attrgetter
- from amqp import promise
- try:
- from billiard.util import register_after_fork
- except ImportError:
- register_after_fork = None
- from kombu.clocks import LamportClock
- from kombu.common import oid_from
- from kombu.utils import cached_property, uuid
- from celery import platforms
- from celery import signals
- from celery._state import (
- _task_stack, get_current_app, _set_current_app, set_default_app,
- _register_app, get_current_worker_task, connect_on_app_finalize,
- _announce_app_finalized,
- )
- from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
- from celery.five import values
- from celery.loaders import get_loader_cls
- from celery.local import PromiseProxy, maybe_evaluate
- from celery.utils import gen_task_name
- from celery.utils.dispatch import Signal
- from celery.utils.functional import first, maybe_list, head_from_fun
- from celery.utils.imports import instantiate, symbol_by_name
- from celery.utils.objects import FallbackContext, mro_lookup
- from .annotations import prepare as prepare_annotations
- from .defaults import DEFAULTS, find_deprecated_settings
- from .registry import TaskRegistry
- from .utils import (
- AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2, appstr,
- )
- # Load all builtin tasks
- from . import builtins # noqa
- __all__ = ['Celery']
- _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
- BUILTIN_FIXUPS = {
- 'celery.fixups.django:fixup',
- }
- ERR_ENVVAR_NOT_SET = """\
- The environment variable {0!r} is not set,
- and as such the configuration could not be loaded.
- Please set this variable and make it point to
- a configuration module."""
- _after_fork_registered = False
- def app_has_custom(app, attr):
- return mro_lookup(app.__class__, attr, stop=(Celery, object),
- monkey_patched=[__name__])
- def _unpickle_appattr(reverse_name, args):
- """Given an attribute name and a list of args, gets
- the attribute from the current app and calls it."""
- return get_current_app()._rgetattr(reverse_name)(*args)
- def _global_after_fork(obj):
- # Previously every app would call:
- # `register_after_fork(app, app._after_fork)`
- # but this created a leak as `register_after_fork` stores concrete object
- # references and once registered an object cannot be removed without
- # touching and iterating over the private afterfork registry list.
- #
- # See Issue #1949
- from celery import _state
- from multiprocessing import util as mputil
- for app in _state._apps:
- try:
- app._after_fork(obj)
- except Exception as exc:
- if mputil._logger:
- mputil._logger.info(
- 'after forker raised exception: %r', exc, exc_info=1)
- def _ensure_after_fork():
- global _after_fork_registered
- _after_fork_registered = True
- if register_after_fork is not None:
- register_after_fork(_global_after_fork, _global_after_fork)
- class Celery(object):
- #: This is deprecated, use :meth:`reduce_keys` instead
- Pickler = AppPickler
- SYSTEM = platforms.SYSTEM
- IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
- amqp_cls = 'celery.app.amqp:AMQP'
- backend_cls = None
- events_cls = 'celery.events:Events'
- loader_cls = 'celery.loaders.app:AppLoader'
- log_cls = 'celery.app.log:Logging'
- control_cls = 'celery.app.control:Control'
- task_cls = 'celery.app.task:Task'
- registry_cls = TaskRegistry
- _fixups = None
- _pool = None
- _conf = None
- builtin_fixups = BUILTIN_FIXUPS
- #: Signal sent when app is loading configuration.
- on_configure = None
- #: Signal sent after app has prepared the configuration.
- on_after_configure = None
- #: Signal sent after app has been finalized.
- on_after_finalize = None
- #: ignored
- accept_magic_kwargs = False
- def __init__(self, main=None, loader=None, backend=None,
- amqp=None, events=None, log=None, control=None,
- set_as_current=True, tasks=None, broker=None, include=None,
- changes=None, config_source=None, fixups=None, task_cls=None,
- autofinalize=True, **kwargs):
- self.clock = LamportClock()
- self.main = main
- self.amqp_cls = amqp or self.amqp_cls
- self.events_cls = events or self.events_cls
- self.loader_cls = loader or self.loader_cls
- self.log_cls = log or self.log_cls
- self.control_cls = control or self.control_cls
- self.task_cls = task_cls or self.task_cls
- self.set_as_current = set_as_current
- self.registry_cls = symbol_by_name(self.registry_cls)
- self.user_options = defaultdict(set)
- self.steps = defaultdict(set)
- self.autofinalize = autofinalize
- self.configured = False
- self._config_source = config_source
- self._pending_defaults = deque()
- self._pending_periodic_tasks = deque()
- self.finalized = False
- self._finalize_mutex = threading.Lock()
- self._pending = deque()
- self._tasks = tasks
- if not isinstance(self._tasks, TaskRegistry):
- self._tasks = TaskRegistry(self._tasks or {})
- # If the class defines a custom __reduce_args__ we need to use
- # the old way of pickling apps, which is pickling a list of
- # args instead of the new way that pickles a dict of keywords.
- self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
- # these options are moved to the config to
- # simplify pickling of the app object.
- self._preconf = changes or {}
- if broker:
- self._preconf['BROKER_URL'] = broker
- if backend:
- self._preconf['CELERY_RESULT_BACKEND'] = backend
- if include:
- self._preconf['CELERY_IMPORTS'] = include
- # - Apply fixups.
- self.fixups = set(self.builtin_fixups) if fixups is None else fixups
- # ...store fixup instances in _fixups to keep weakrefs alive.
- self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
- if self.set_as_current:
- self.set_current()
- # Signals
- if self.on_configure is None:
- # used to be a method pre 3.2
- self.on_configure = Signal()
- self.on_after_configure = Signal()
- self.on_after_finalize = Signal()
- self.on_init()
- _register_app(self)
- def set_current(self):
- _set_current_app(self)
- def set_default(self):
- set_default_app(self)
- def __enter__(self):
- return self
- def __exit__(self, *exc_info):
- self.close()
- def close(self):
- self._maybe_close_pool()
- def on_init(self):
- """Optional callback called at init."""
- pass
- def start(self, argv=None):
- return instantiate(
- 'celery.bin.celery:CeleryCommand',
- app=self).execute_from_commandline(argv)
- def worker_main(self, argv=None):
- return instantiate(
- 'celery.bin.worker:worker',
- app=self).execute_from_commandline(argv)
- def task(self, *args, **opts):
- """Creates new task class from any callable."""
- if _EXECV and opts.get('lazy', True):
- # When using execv the task in the original module will point to a
- # different app, so doing things like 'add.request' will point to
- # a different task instance. This makes sure it will always use
- # the task instance from the current app.
- # Really need a better solution for this :(
- from . import shared_task
- return shared_task(*args, lazy=False, **opts)
- def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
- _filt = filter # stupid 2to3
- def _create_task_cls(fun):
- if shared:
- cons = lambda app: app._task_from_fun(fun, **opts)
- cons.__name__ = fun.__name__
- connect_on_app_finalize(cons)
- if not lazy or self.finalized:
- ret = self._task_from_fun(fun, **opts)
- else:
- # return a proxy object that evaluates on first use
- ret = PromiseProxy(self._task_from_fun, (fun, ), opts,
- __doc__=fun.__doc__)
- self._pending.append(ret)
- if _filt:
- return _filt(ret)
- return ret
- return _create_task_cls
- if len(args) == 1:
- if callable(args[0]):
- return inner_create_task_cls(**opts)(*args)
- raise TypeError('argument 1 to @task() must be a callable')
- if args:
- raise TypeError(
- '@task() takes exactly 1 argument ({0} given)'.format(
- sum([len(args), len(opts)])))
- return inner_create_task_cls(**opts)
- def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
- if not self.finalized and not self.autofinalize:
- raise RuntimeError('Contract breach: app not finalized')
- name = name or self.gen_task_name(fun.__name__, fun.__module__)
- base = base or self.Task
- if name not in self._tasks:
- run = fun if bind else staticmethod(fun)
- task = type(fun.__name__, (base, ), dict({
- 'app': self,
- 'name': name,
- 'run': run,
- '_decorated': True,
- '__doc__': fun.__doc__,
- '__module__': fun.__module__,
- '__header__': staticmethod(head_from_fun(fun, bound=bind)),
- '__wrapped__': run}, **options))()
- self._tasks[task.name] = task
- task.bind(self) # connects task to this app
- else:
- task = self._tasks[name]
- return task
- def gen_task_name(self, name, module):
- return gen_task_name(self, name, module)
- def finalize(self, auto=False):
- with self._finalize_mutex:
- if not self.finalized:
- if auto and not self.autofinalize:
- raise RuntimeError('Contract breach: app not finalized')
- self.finalized = True
- _announce_app_finalized(self)
- pending = self._pending
- while pending:
- maybe_evaluate(pending.popleft())
- for task in values(self._tasks):
- task.bind(self)
- self.on_after_finalize.send(sender=self)
- def add_defaults(self, fun):
- if not callable(fun):
- d, fun = fun, lambda: d
- if self.configured:
- return self._conf.add_defaults(fun())
- self._pending_defaults.append(fun)
- def config_from_object(self, obj, silent=False, force=False):
- self._config_source = obj
- if force or self.configured:
- self._conf = None
- return self.loader.config_from_object(obj, silent=silent)
- def config_from_envvar(self, variable_name, silent=False, force=False):
- module_name = os.environ.get(variable_name)
- if not module_name:
- if silent:
- return False
- raise ImproperlyConfigured(
- ERR_ENVVAR_NOT_SET.format(variable_name))
- return self.config_from_object(module_name, silent=silent, force=force)
- def config_from_cmdline(self, argv, namespace='celery'):
- (self._conf if self.configured else self.conf).update(
- self.loader.cmdline_config_parser(argv, namespace)
- )
- def setup_security(self, allowed_serializers=None, key=None, cert=None,
- store=None, digest='sha1', serializer='json'):
- from celery.security import setup_security
- return setup_security(allowed_serializers, key, cert,
- store, digest, serializer, app=self)
- def autodiscover_tasks(self, packages, related_name='tasks', force=False):
- if force:
- return self._autodiscover_tasks(packages, related_name)
- signals.import_modules.connect(promise(
- self._autodiscover_tasks, (packages, related_name),
- ), weak=False, sender=self)
- def _autodiscover_tasks(self, packages, related_name='tasks', **kwargs):
- # argument may be lazy
- packages = packages() if callable(packages) else packages
- self.loader.autodiscover_tasks(packages, related_name)
- def send_task(self, name, args=None, kwargs=None, countdown=None,
- eta=None, task_id=None, producer=None, connection=None,
- router=None, result_cls=None, expires=None,
- publisher=None, link=None, link_error=None,
- add_to_parent=True, group_id=None, retries=0, chord=None,
- reply_to=None, time_limit=None, soft_time_limit=None,
- root_id=None, parent_id=None, route_name=None,
- shadow=None, **options):
- amqp = self.amqp
- task_id = task_id or uuid()
- producer = producer or publisher # XXX compat
- router = router or amqp.router
- conf = self.conf
- if conf.CELERY_ALWAYS_EAGER: # pragma: no cover
- warnings.warn(AlwaysEagerIgnored(
- 'CELERY_ALWAYS_EAGER has no effect on send_task',
- ), stacklevel=2)
- options = router.route(options, route_name or name, args, kwargs)
- message = amqp.create_task_message(
- task_id, name, args, kwargs, countdown, eta, group_id,
- expires, retries, chord,
- maybe_list(link), maybe_list(link_error),
- reply_to or self.oid, time_limit, soft_time_limit,
- self.conf.CELERY_SEND_TASK_SENT_EVENT,
- root_id, parent_id, shadow,
- )
- if connection:
- producer = amqp.Producer(connection)
- with self.producer_or_acquire(producer) as P:
- self.backend.on_task_call(P, task_id)
- amqp.send_task_message(P, name, message, **options)
- result = (result_cls or self.AsyncResult)(task_id)
- if add_to_parent:
- parent = get_current_worker_task()
- if parent:
- parent.add_trail(result)
- return result
- def connection(self, hostname=None, userid=None, password=None,
- virtual_host=None, port=None, ssl=None,
- connect_timeout=None, transport=None,
- transport_options=None, heartbeat=None,
- login_method=None, failover_strategy=None, **kwargs):
- conf = self.conf
- return self.amqp.Connection(
- hostname or conf.BROKER_URL,
- userid or conf.BROKER_USER,
- password or conf.BROKER_PASSWORD,
- virtual_host or conf.BROKER_VHOST,
- port or conf.BROKER_PORT,
- transport=transport or conf.BROKER_TRANSPORT,
- ssl=self.either('BROKER_USE_SSL', ssl),
- heartbeat=heartbeat,
- login_method=login_method or conf.BROKER_LOGIN_METHOD,
- failover_strategy=(
- failover_strategy or conf.BROKER_FAILOVER_STRATEGY
- ),
- transport_options=dict(
- conf.BROKER_TRANSPORT_OPTIONS, **transport_options or {}
- ),
- connect_timeout=self.either(
- 'BROKER_CONNECTION_TIMEOUT', connect_timeout
- ),
- )
- broker_connection = connection
- def _acquire_connection(self, pool=True):
- """Helper for :meth:`connection_or_acquire`."""
- if pool:
- return self.pool.acquire(block=True)
- return self.connection()
- def connection_or_acquire(self, connection=None, pool=True, *_, **__):
- return FallbackContext(connection, self._acquire_connection, pool=pool)
- default_connection = connection_or_acquire # XXX compat
- def producer_or_acquire(self, producer=None):
- return FallbackContext(
- producer, self.amqp.producer_pool.acquire, block=True,
- )
- default_producer = producer_or_acquire # XXX compat
- def prepare_config(self, c):
- """Prepare configuration before it is merged with the defaults."""
- return find_deprecated_settings(c)
- def now(self):
- return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
- def mail_admins(self, subject, body, fail_silently=False):
- conf = self.conf
- if conf.ADMINS:
- to = [admin_email for _, admin_email in conf.ADMINS]
- return self.loader.mail_admins(
- subject, body, fail_silently, to=to,
- sender=conf.SERVER_EMAIL,
- host=conf.EMAIL_HOST,
- port=conf.EMAIL_PORT,
- user=conf.EMAIL_HOST_USER,
- password=conf.EMAIL_HOST_PASSWORD,
- timeout=conf.EMAIL_TIMEOUT,
- use_ssl=conf.EMAIL_USE_SSL,
- use_tls=conf.EMAIL_USE_TLS,
- charset=conf.EMAIL_CHARSET,
- )
- def select_queues(self, queues=None):
- return self.amqp.queues.select(queues)
- def either(self, default_key, *values):
- """Fallback to the value of a configuration key if none of the
- `*values` are true."""
- return first(None, values) or self.conf.get(default_key)
- def bugreport(self):
- return bugreport(self)
- def _get_backend(self):
- from celery.backends import get_backend_by_url
- backend, url = get_backend_by_url(
- self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
- self.loader)
- return backend(app=self, url=url)
- def _load_config(self):
- if isinstance(self.on_configure, Signal):
- self.on_configure.send(sender=self)
- else:
- # used to be a method pre 3.2
- self.on_configure()
- if self._config_source:
- self.loader.config_from_object(self._config_source)
- defaults = dict(deepcopy(DEFAULTS), **self._preconf)
- self.configured = True
- s = self._conf = Settings(
- {}, [self.prepare_config(self.loader.conf), defaults],
- )
- # load lazy config dict initializers.
- pending_def = self._pending_defaults
- while pending_def:
- s.add_defaults(maybe_evaluate(pending_def.popleft()()))
- # load lazy periodic tasks
- pending_beat = self._pending_periodic_tasks
- while pending_beat:
- pargs, pkwargs = pending_beat.popleft()
- self._add_periodic_task(*pargs, **pkwargs)
- self.on_after_configure.send(sender=self, source=s)
- return s
- def _after_fork(self, obj_):
- self._maybe_close_pool()
- def _maybe_close_pool(self):
- if self._pool:
- self._pool.force_close_all()
- self._pool = None
- amqp = self.__dict__.get('amqp')
- if amqp is not None and amqp._producer_pool is not None:
- amqp._producer_pool.force_close_all()
- amqp._producer_pool = None
- def signature(self, *args, **kwargs):
- kwargs['app'] = self
- return self.canvas.signature(*args, **kwargs)
- def add_periodic_task(self, *args, **kwargs):
- if not self.configured:
- return self._pending_periodic_tasks.append((args, kwargs))
- return self._add_periodic_task(*args, **kwargs)
- def _add_periodic_task(self, schedule, sig,
- args=(), kwargs={}, name=None, **opts):
- from .task import Task
- sig = (self.signature(sig.name, args, kwargs)
- if isinstance(sig, Task) else sig.clone(args, kwargs))
- name = name or ':'.join([sig.name, ','.join(map(str, sig.args))])
- self._conf.CELERYBEAT_SCHEDULE[name] = {
- 'schedule': schedule,
- 'task': sig.name,
- 'args': sig.args,
- 'kwargs': sig.kwargs,
- 'options': dict(sig.options, **opts),
- }
- def create_task_cls(self):
- """Creates a base task class using default configuration
- taken from this app."""
- return self.subclass_with_self(
- self.task_cls, name='Task', attribute='_app',
- keep_reduce=True, abstract=True,
- )
- def subclass_with_self(self, Class, name=None, attribute='app',
- reverse=None, keep_reduce=False, **kw):
- """Subclass an app-compatible class by setting its app attribute
- to be this app instance.
- App-compatible means that the class has a class attribute that
- provides the default app it should use, e.g.
- ``class Foo: app = None``.
- :param Class: The app-compatible class to subclass.
- :keyword name: Custom name for the target class.
- :keyword attribute: Name of the attribute holding the app,
- default is 'app'.
- """
- Class = symbol_by_name(Class)
- reverse = reverse if reverse else Class.__name__
- def __reduce__(self):
- return _unpickle_appattr, (reverse, self.__reduce_args__())
- attrs = dict({attribute: self}, __module__=Class.__module__,
- __doc__=Class.__doc__, **kw)
- if not keep_reduce:
- attrs['__reduce__'] = __reduce__
- return type(name or Class.__name__, (Class, ), attrs)
- def _rgetattr(self, path):
- return attrgetter(path)(self)
- def __repr__(self):
- return '<{0} {1}>'.format(type(self).__name__, appstr(self))
- def __reduce__(self):
- if self._using_v1_reduce:
- return self.__reduce_v1__()
- return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
- def __reduce_v1__(self):
- # Reduce only pickles the configuration changes,
- # so the default configuration doesn't have to be passed
- # between processes.
- return (
- _unpickle_app,
- (self.__class__, self.Pickler) + self.__reduce_args__(),
- )
- def __reduce_keys__(self):
- """Return keyword arguments used to reconstruct the object
- when unpickling."""
- return {
- 'main': self.main,
- 'changes': self._conf.changes if self._conf else self._preconf,
- 'loader': self.loader_cls,
- 'backend': self.backend_cls,
- 'amqp': self.amqp_cls,
- 'events': self.events_cls,
- 'log': self.log_cls,
- 'control': self.control_cls,
- 'fixups': self.fixups,
- 'config_source': self._config_source,
- 'task_cls': self.task_cls,
- }
- def __reduce_args__(self):
- """Deprecated method, please use :meth:`__reduce_keys__` instead."""
- return (self.main, self._conf.changes if self._conf else {},
- self.loader_cls, self.backend_cls, self.amqp_cls,
- self.events_cls, self.log_cls, self.control_cls,
- False, self._config_source)
- @cached_property
- def Worker(self):
- return self.subclass_with_self('celery.apps.worker:Worker')
- @cached_property
- def WorkController(self, **kwargs):
- return self.subclass_with_self('celery.worker:WorkController')
- @cached_property
- def Beat(self, **kwargs):
- return self.subclass_with_self('celery.apps.beat:Beat')
- @cached_property
- def Task(self):
- return self.create_task_cls()
- @cached_property
- def annotations(self):
- return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
- @cached_property
- def AsyncResult(self):
- return self.subclass_with_self('celery.result:AsyncResult')
- @cached_property
- def ResultSet(self):
- return self.subclass_with_self('celery.result:ResultSet')
- @cached_property
- def GroupResult(self):
- return self.subclass_with_self('celery.result:GroupResult')
- @cached_property
- def TaskSet(self): # XXX compat
- """Deprecated! Please use :class:`celery.group` instead."""
- return self.subclass_with_self('celery.task.sets:TaskSet')
- @cached_property
- def TaskSetResult(self): # XXX compat
- """Deprecated! Please use :attr:`GroupResult` instead."""
- return self.subclass_with_self('celery.result:TaskSetResult')
- @property
- def pool(self):
- if self._pool is None:
- _ensure_after_fork()
- limit = self.conf.BROKER_POOL_LIMIT
- self._pool = self.connection().Pool(limit=limit)
- return self._pool
- @property
- def current_task(self):
- return _task_stack.top
- @cached_property
- def oid(self):
- return oid_from(self)
- @cached_property
- def amqp(self):
- return instantiate(self.amqp_cls, app=self)
- @cached_property
- def backend(self):
- return self._get_backend()
- @property
- def conf(self):
- if self._conf is None:
- self._load_config()
- return self._conf
- @conf.setter
- def conf(self, d): # noqa
- self._conf = d
- @cached_property
- def control(self):
- return instantiate(self.control_cls, app=self)
- @cached_property
- def events(self):
- return instantiate(self.events_cls, app=self)
- @cached_property
- def loader(self):
- return get_loader_cls(self.loader_cls)(app=self)
- @cached_property
- def log(self):
- return instantiate(self.log_cls, app=self)
- @cached_property
- def canvas(self):
- from celery import canvas
- return canvas
- @cached_property
- def tasks(self):
- self.finalize(auto=True)
- return self._tasks
- @cached_property
- def timezone(self):
- from celery.utils.timeutils import timezone
- conf = self.conf
- tz = conf.CELERY_TIMEZONE
- if not tz:
- return (timezone.get_timezone('UTC') if conf.CELERY_ENABLE_UTC
- else timezone.local)
- return timezone.get_timezone(conf.CELERY_TIMEZONE)
- App = Celery # compat
|