12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262 |
- # -*- coding: utf-8 -*-
- """Actual App instance implementation."""
- from __future__ import absolute_import, unicode_literals
- import os
- import threading
- import warnings
- from collections import defaultdict, deque
- from datetime import datetime
- from operator import attrgetter
- from kombu import pools
- from kombu.clocks import LamportClock
- from kombu.common import oid_from
- from kombu.utils.compat import register_after_fork
- from kombu.utils.objects import cached_property
- from kombu.utils.uuid import uuid
- from vine import starpromise
- from vine.utils import wraps
- from celery import platforms, signals
- from celery._state import (_announce_app_finalized, _deregister_app,
- _register_app, _set_current_app, _task_stack,
- connect_on_app_finalize, get_current_app,
- get_current_worker_task, set_default_app)
- from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
- from celery.five import (UserDict, bytes_if_py2, python_2_unicode_compatible,
- values)
- from celery.loaders import get_loader_cls
- from celery.local import PromiseProxy, maybe_evaluate
- from celery.utils import abstract
- from celery.utils.collections import AttributeDictMixin
- from celery.utils.dispatch import Signal
- from celery.utils.functional import first, head_from_fun, maybe_list
- from celery.utils.imports import gen_task_name, instantiate, symbol_by_name
- from celery.utils.log import get_logger
- from celery.utils.objects import FallbackContext, mro_lookup
- from celery.utils.time import (get_exponential_backoff_interval, timezone,
- to_utc)
- # Load all builtin tasks
- from . import builtins # noqa
- from . import backends
- from .annotations import prepare as prepare_annotations
- from .defaults import find_deprecated_settings
- from .registry import TaskRegistry
- from .utils import (AppPickler, Settings, _new_key_to_old, _old_key_to_new,
- _unpickle_app, _unpickle_app_v2, appstr, bugreport,
- detect_settings)
- __all__ = ('Celery',)
- logger = get_logger(__name__)
- BUILTIN_FIXUPS = {
- 'celery.fixups.django:fixup',
- }
- USING_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
- ERR_ENVVAR_NOT_SET = """
- The environment variable {0!r} is not set,
- and as such the configuration could not be loaded.
- Please set this variable and make sure it points to
- a valid configuration module.
- Example:
- {0}="proj.celeryconfig"
- """
- def app_has_custom(app, attr):
- """Return true if app has customized method `attr`.
- Note:
- This is used for optimizations in cases where we know
- how the default behavior works, but need to account
- for someone using inheritance to override a method/property.
- """
- return mro_lookup(app.__class__, attr, stop={Celery, object},
- monkey_patched=[__name__])
- def _unpickle_appattr(reverse_name, args):
- """Unpickle app."""
- # Given an attribute name and a list of args, gets
- # the attribute from the current app and calls it.
- return get_current_app()._rgetattr(reverse_name)(*args)
- def _after_fork_cleanup_app(app):
- # This is used with multiprocessing.register_after_fork,
- # so need to be at module level.
- try:
- app._after_fork()
- except Exception as exc: # pylint: disable=broad-except
- logger.info('after forker raised exception: %r', exc, exc_info=1)
- class PendingConfiguration(UserDict, AttributeDictMixin):
- # `app.conf` will be of this type before being explicitly configured,
- # meaning the app can keep any configuration set directly
- # on `app.conf` before the `app.config_from_object` call.
- #
- # accessing any key will finalize the configuration,
- # replacing `app.conf` with a concrete settings object.
- callback = None
- _data = None
- def __init__(self, conf, callback):
- object.__setattr__(self, '_data', conf)
- object.__setattr__(self, 'callback', callback)
- def __setitem__(self, key, value):
- self._data[key] = value
- def clear(self):
- self._data.clear()
- def update(self, *args, **kwargs):
- self._data.update(*args, **kwargs)
- def setdefault(self, *args, **kwargs):
- return self._data.setdefault(*args, **kwargs)
- def __contains__(self, key):
- # XXX will not show finalized configuration
- # setdefault will cause `key in d` to happen,
- # so for setdefault to be lazy, so does contains.
- return key in self._data
- def __len__(self):
- return len(self.data)
- def __repr__(self):
- return repr(self.data)
- @cached_property
- def data(self):
- return self.callback()
- @python_2_unicode_compatible
- class Celery(object):
- """Celery application.
- Arguments:
- main (str): Name of the main module if running as `__main__`.
- This is used as the prefix for auto-generated task names.
- Keyword Arguments:
- broker (str): URL of the default broker used.
- backend (Union[str, type]): The result store backend class,
- or the name of the backend class to use.
- Default is the value of the :setting:`result_backend` setting.
- autofinalize (bool): If set to False a :exc:`RuntimeError`
- will be raised if the task registry or tasks are used before
- the app is finalized.
- set_as_current (bool): Make this the global current app.
- include (List[str]): List of modules every worker should import.
- amqp (Union[str, type]): AMQP object or class name.
- events (Union[str, type]): Events object or class name.
- log (Union[str, type]): Log object or class name.
- control (Union[str, type]): Control object or class name.
- tasks (Union[str, type]): A task registry, or the name of
- a registry class.
- fixups (List[str]): List of fix-up plug-ins (e.g., see
- :mod:`celery.fixups.django`).
- config_source (Union[str, type]): Take configuration from a class,
- or object. Attributes may include any setings described in
- the documentation.
- """
- #: This is deprecated, use :meth:`reduce_keys` instead
- Pickler = AppPickler
- SYSTEM = platforms.SYSTEM
- IS_macOS, IS_WINDOWS = platforms.IS_macOS, platforms.IS_WINDOWS
- #: Name of the `__main__` module. Required for standalone scripts.
- #:
- #: If set this will be used instead of `__main__` when automatically
- #: generating task names.
- main = None
- #: Custom options for command-line programs.
- #: See :ref:`extending-commandoptions`
- user_options = None
- #: Custom bootsteps to extend and modify the worker.
- #: See :ref:`extending-bootsteps`.
- steps = None
- builtin_fixups = BUILTIN_FIXUPS
- amqp_cls = 'celery.app.amqp:AMQP'
- backend_cls = None
- events_cls = 'celery.app.events:Events'
- loader_cls = None
- log_cls = 'celery.app.log:Logging'
- control_cls = 'celery.app.control:Control'
- task_cls = 'celery.app.task:Task'
- registry_cls = TaskRegistry
- _fixups = None
- _pool = None
- _conf = None
- _after_fork_registered = False
- #: Signal sent when app is loading configuration.
- on_configure = None
- #: Signal sent after app has prepared the configuration.
- on_after_configure = None
- #: Signal sent after app has been finalized.
- on_after_finalize = None
- #: Signal sent by every new process after fork.
- on_after_fork = None
- def __init__(self, main=None, loader=None, backend=None,
- amqp=None, events=None, log=None, control=None,
- set_as_current=True, tasks=None, broker=None, include=None,
- changes=None, config_source=None, fixups=None, task_cls=None,
- autofinalize=True, namespace=None, strict_typing=True,
- **kwargs):
- self.clock = LamportClock()
- self.main = main
- self.amqp_cls = amqp or self.amqp_cls
- self.events_cls = events or self.events_cls
- self.loader_cls = loader or self._get_default_loader()
- self.log_cls = log or self.log_cls
- self.control_cls = control or self.control_cls
- self.task_cls = task_cls or self.task_cls
- self.set_as_current = set_as_current
- self.registry_cls = symbol_by_name(self.registry_cls)
- self.user_options = defaultdict(set)
- self.steps = defaultdict(set)
- self.autofinalize = autofinalize
- self.namespace = namespace
- self.strict_typing = strict_typing
- self.configured = False
- self._config_source = config_source
- self._pending_defaults = deque()
- self._pending_periodic_tasks = deque()
- self.finalized = False
- self._finalize_mutex = threading.Lock()
- self._pending = deque()
- self._tasks = tasks
- if not isinstance(self._tasks, TaskRegistry):
- self._tasks = self.registry_cls(self._tasks or {})
- # If the class defines a custom __reduce_args__ we need to use
- # the old way of pickling apps: pickling a list of
- # args instead of the new way that pickles a dict of keywords.
- self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
- # these options are moved to the config to
- # simplify pickling of the app object.
- self._preconf = changes or {}
- self._preconf_set_by_auto = set()
- self.__autoset('broker_url', broker)
- self.__autoset('result_backend', backend)
- self.__autoset('include', include)
- self._conf = Settings(
- PendingConfiguration(
- self._preconf, self._finalize_pending_conf),
- prefix=self.namespace,
- keys=(_old_key_to_new, _new_key_to_old),
- )
- # - Apply fix-ups.
- self.fixups = set(self.builtin_fixups) if fixups is None else fixups
- # ...store fixup instances in _fixups to keep weakrefs alive.
- self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
- if self.set_as_current:
- self.set_current()
- # Signals
- if self.on_configure is None:
- # used to be a method pre 4.0
- self.on_configure = Signal(name='app.on_configure')
- self.on_after_configure = Signal(
- name='app.on_after_configure',
- providing_args={'source'},
- )
- self.on_after_finalize = Signal(name='app.on_after_finalize')
- self.on_after_fork = Signal(name='app.on_after_fork')
- self.on_init()
- _register_app(self)
- def _get_default_loader(self):
- # the --loader command-line argument sets the environment variable.
- return (
- os.environ.get('CELERY_LOADER') or
- self.loader_cls or
- 'celery.loaders.app:AppLoader'
- )
- def on_init(self):
- """Optional callback called at init."""
- pass
- def __autoset(self, key, value):
- if value:
- self._preconf[key] = value
- self._preconf_set_by_auto.add(key)
- def set_current(self):
- """Make this the current app for this thread."""
- _set_current_app(self)
- def set_default(self):
- """Make this the default app for all threads."""
- set_default_app(self)
- def _ensure_after_fork(self):
- if not self._after_fork_registered:
- self._after_fork_registered = True
- if register_after_fork is not None:
- register_after_fork(self, _after_fork_cleanup_app)
- def close(self):
- """Clean up after the application.
- Only necessary for dynamically created apps, and you should
- probably use the :keyword:`with` statement instead.
- Example:
- >>> with Celery(set_as_current=False) as app:
- ... with app.connection_for_write() as conn:
- ... pass
- """
- self._pool = None
- _deregister_app(self)
- def start(self, argv=None):
- """Run :program:`celery` using `argv`.
- Uses :data:`sys.argv` if `argv` is not specified.
- """
- return instantiate(
- 'celery.bin.celery:CeleryCommand', app=self
- ).execute_from_commandline(argv)
- def worker_main(self, argv=None):
- """Run :program:`celery worker` using `argv`.
- Uses :data:`sys.argv` if `argv` is not specified.
- """
- return instantiate(
- 'celery.bin.worker:worker', app=self
- ).execute_from_commandline(argv)
- def task(self, *args, **opts):
- """Decorator to create a task class out of any callable.
- Examples:
- .. code-block:: python
- @app.task
- def refresh_feed(url):
- store_feed(feedparser.parse(url))
- with setting extra options:
- .. code-block:: python
- @app.task(exchange='feeds')
- def refresh_feed(url):
- return store_feed(feedparser.parse(url))
- Note:
- App Binding: For custom apps the task decorator will return
- a proxy object, so that the act of creating the task is not
- performed until the task is used or the task registry is accessed.
- If you're depending on binding to be deferred, then you must
- not access any attributes on the returned object until the
- application is fully set up (finalized).
- """
- if USING_EXECV and opts.get('lazy', True):
- # When using execv the task in the original module will point to a
- # different app, so doing things like 'add.request' will point to
- # a different task instance. This makes sure it will always use
- # the task instance from the current app.
- # Really need a better solution for this :(
- from . import shared_task
- return shared_task(*args, lazy=False, **opts)
- def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
- _filt = filter # stupid 2to3
- def _create_task_cls(fun):
- if shared:
- def cons(app):
- return app._task_from_fun(fun, **opts)
- cons.__name__ = fun.__name__
- connect_on_app_finalize(cons)
- if not lazy or self.finalized:
- ret = self._task_from_fun(fun, **opts)
- else:
- # return a proxy object that evaluates on first use
- ret = PromiseProxy(self._task_from_fun, (fun,), opts,
- __doc__=fun.__doc__)
- self._pending.append(ret)
- if _filt:
- return _filt(ret)
- return ret
- return _create_task_cls
- if len(args) == 1:
- if callable(args[0]):
- return inner_create_task_cls(**opts)(*args)
- raise TypeError('argument 1 to @task() must be a callable')
- if args:
- raise TypeError(
- '@task() takes exactly 1 argument ({0} given)'.format(
- sum([len(args), len(opts)])))
- return inner_create_task_cls(**opts)
- def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
- if not self.finalized and not self.autofinalize:
- raise RuntimeError('Contract breach: app not finalized')
- name = name or self.gen_task_name(fun.__name__, fun.__module__)
- base = base or self.Task
- if name not in self._tasks:
- run = fun if bind else staticmethod(fun)
- task = type(fun.__name__, (base,), dict({
- 'app': self,
- 'name': name,
- 'run': run,
- '_decorated': True,
- '__doc__': fun.__doc__,
- '__module__': fun.__module__,
- '__header__': staticmethod(head_from_fun(fun, bound=bind)),
- '__wrapped__': run}, **options))()
- # for some reason __qualname__ cannot be set in type()
- # so we have to set it here.
- try:
- task.__qualname__ = fun.__qualname__
- except AttributeError:
- pass
- self._tasks[task.name] = task
- task.bind(self) # connects task to this app
- autoretry_for = tuple(options.get('autoretry_for', ()))
- retry_kwargs = options.get('retry_kwargs', {})
- retry_backoff = int(options.get('retry_backoff', False))
- retry_backoff_max = int(options.get('retry_backoff_max', 600))
- retry_jitter = options.get('retry_jitter', True)
- if autoretry_for and not hasattr(task, '_orig_run'):
- @wraps(task.run)
- def run(*args, **kwargs):
- try:
- return task._orig_run(*args, **kwargs)
- except autoretry_for as exc:
- if retry_backoff:
- retry_kwargs['countdown'] = \
- get_exponential_backoff_interval(
- factor=retry_backoff,
- retries=task.request.retries,
- maximum=retry_backoff_max,
- full_jitter=retry_jitter)
- raise task.retry(exc=exc, **retry_kwargs)
- task._orig_run, task.run = task.run, run
- else:
- task = self._tasks[name]
- return task
- def register_task(self, task):
- """Utility for registering a task-based class.
- Note:
- This is here for compatibility with old Celery 1.0
- style task classes, you should not need to use this for
- new projects.
- """
- if not task.name:
- task_cls = type(task)
- task.name = self.gen_task_name(
- task_cls.__name__, task_cls.__module__)
- self.tasks[task.name] = task
- task._app = self
- task.bind(self)
- return task
- def gen_task_name(self, name, module):
- return gen_task_name(self, name, module)
- def finalize(self, auto=False):
- """Finalize the app.
- This loads built-in tasks, evaluates pending task decorators,
- reads configuration, etc.
- """
- with self._finalize_mutex:
- if not self.finalized:
- if auto and not self.autofinalize:
- raise RuntimeError('Contract breach: app not finalized')
- self.finalized = True
- _announce_app_finalized(self)
- pending = self._pending
- while pending:
- maybe_evaluate(pending.popleft())
- for task in values(self._tasks):
- task.bind(self)
- self.on_after_finalize.send(sender=self)
- def add_defaults(self, fun):
- """Add default configuration from dict ``d``.
- If the argument is a callable function then it will be regarded
- as a promise, and it won't be loaded until the configuration is
- actually needed.
- This method can be compared to:
- .. code-block:: pycon
- >>> celery.conf.update(d)
- with a difference that 1) no copy will be made and 2) the dict will
- not be transferred when the worker spawns child processes, so
- it's important that the same configuration happens at import time
- when pickle restores the object on the other side.
- """
- if not callable(fun):
- d, fun = fun, lambda: d
- if self.configured:
- return self._conf.add_defaults(fun())
- self._pending_defaults.append(fun)
- def config_from_object(self, obj,
- silent=False, force=False, namespace=None):
- """Read configuration from object.
- Object is either an actual object or the name of a module to import.
- Example:
- >>> celery.config_from_object('myapp.celeryconfig')
- >>> from myapp import celeryconfig
- >>> celery.config_from_object(celeryconfig)
- Arguments:
- silent (bool): If true then import errors will be ignored.
- force (bool): Force reading configuration immediately.
- By default the configuration will be read only when required.
- """
- self._config_source = obj
- self.namespace = namespace or self.namespace
- if force or self.configured:
- self._conf = None
- if self.loader.config_from_object(obj, silent=silent):
- return self.conf
- def config_from_envvar(self, variable_name, silent=False, force=False):
- """Read configuration from environment variable.
- The value of the environment variable must be the name
- of a module to import.
- Example:
- >>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig'
- >>> celery.config_from_envvar('CELERY_CONFIG_MODULE')
- """
- module_name = os.environ.get(variable_name)
- if not module_name:
- if silent:
- return False
- raise ImproperlyConfigured(
- ERR_ENVVAR_NOT_SET.strip().format(variable_name))
- return self.config_from_object(module_name, silent=silent, force=force)
- def config_from_cmdline(self, argv, namespace='celery'):
- self._conf.update(
- self.loader.cmdline_config_parser(argv, namespace)
- )
- def setup_security(self, allowed_serializers=None, key=None, cert=None,
- store=None, digest='sha1', serializer='json'):
- """Setup the message-signing serializer.
- This will affect all application instances (a global operation).
- Disables untrusted serializers and if configured to use the ``auth``
- serializer will register the ``auth`` serializer with the provided
- settings into the Kombu serializer registry.
- Arguments:
- allowed_serializers (Set[str]): List of serializer names, or
- content_types that should be exempt from being disabled.
- key (str): Name of private key file to use.
- Defaults to the :setting:`security_key` setting.
- cert (str): Name of certificate file to use.
- Defaults to the :setting:`security_certificate` setting.
- store (str): Directory containing certificates.
- Defaults to the :setting:`security_cert_store` setting.
- digest (str): Digest algorithm used when signing messages.
- Default is ``sha1``.
- serializer (str): Serializer used to encode messages after
- they've been signed. See :setting:`task_serializer` for
- the serializers supported. Default is ``json``.
- """
- from celery.security import setup_security
- return setup_security(allowed_serializers, key, cert,
- store, digest, serializer, app=self)
- def autodiscover_tasks(self, packages=None,
- related_name='tasks', force=False):
- """Auto-discover task modules.
- Searches a list of packages for a "tasks.py" module (or use
- related_name argument).
- If the name is empty, this will be delegated to fix-ups (e.g., Django).
- For example if you have a directory layout like this:
- .. code-block:: text
- foo/__init__.py
- tasks.py
- models.py
- bar/__init__.py
- tasks.py
- models.py
- baz/__init__.py
- models.py
- Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will
- result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.
- Arguments:
- packages (List[str]): List of packages to search.
- This argument may also be a callable, in which case the
- value returned is used (for lazy evaluation).
- related_name (str): The name of the module to find. Defaults
- to "tasks": meaning "look for 'module.tasks' for every
- module in ``packages``."
- force (bool): By default this call is lazy so that the actual
- auto-discovery won't happen until an application imports
- the default modules. Forcing will cause the auto-discovery
- to happen immediately.
- """
- if force:
- return self._autodiscover_tasks(packages, related_name)
- signals.import_modules.connect(starpromise(
- self._autodiscover_tasks, packages, related_name,
- ), weak=False, sender=self)
- def _autodiscover_tasks(self, packages, related_name, **kwargs):
- if packages:
- return self._autodiscover_tasks_from_names(packages, related_name)
- return self._autodiscover_tasks_from_fixups(related_name)
- def _autodiscover_tasks_from_names(self, packages, related_name):
- # packages argument can be lazy
- return self.loader.autodiscover_tasks(
- packages() if callable(packages) else packages, related_name,
- )
- def _autodiscover_tasks_from_fixups(self, related_name):
- return self._autodiscover_tasks_from_names([
- pkg for fixup in self._fixups
- for pkg in fixup.autodiscover_tasks()
- if hasattr(fixup, 'autodiscover_tasks')
- ], related_name=related_name)
- def send_task(self, name, args=None, kwargs=None, countdown=None,
- eta=None, task_id=None, producer=None, connection=None,
- router=None, result_cls=None, expires=None,
- publisher=None, link=None, link_error=None,
- add_to_parent=True, group_id=None, retries=0, chord=None,
- reply_to=None, time_limit=None, soft_time_limit=None,
- root_id=None, parent_id=None, route_name=None,
- shadow=None, chain=None, task_type=None, **options):
- """Send task by name.
- Supports the same arguments as :meth:`@-Task.apply_async`.
- Arguments:
- name (str): Name of task to call (e.g., `"tasks.add"`).
- result_cls (~@AsyncResult): Specify custom result class.
- """
- parent = have_parent = None
- amqp = self.amqp
- task_id = task_id or uuid()
- producer = producer or publisher # XXX compat
- router = router or amqp.router
- conf = self.conf
- if conf.task_always_eager: # pragma: no cover
- warnings.warn(AlwaysEagerIgnored(
- 'task_always_eager has no effect on send_task',
- ), stacklevel=2)
- options = router.route(
- options, route_name or name, args, kwargs, task_type)
- if not root_id or not parent_id:
- parent = self.current_worker_task
- if parent:
- if not root_id:
- root_id = parent.request.root_id or parent.request.id
- if not parent_id:
- parent_id = parent.request.id
- message = amqp.create_task_message(
- task_id, name, args, kwargs, countdown, eta, group_id,
- expires, retries, chord,
- maybe_list(link), maybe_list(link_error),
- reply_to or self.oid, time_limit, soft_time_limit,
- self.conf.task_send_sent_event,
- root_id, parent_id, shadow, chain,
- argsrepr=options.get('argsrepr'),
- kwargsrepr=options.get('kwargsrepr'),
- )
- if connection:
- producer = amqp.Producer(connection, auto_declare=False)
- with self.producer_or_acquire(producer) as P:
- with P.connection._reraise_as_library_errors():
- self.backend.on_task_call(P, task_id)
- amqp.send_task_message(P, name, message, **options)
- result = (result_cls or self.AsyncResult)(task_id)
- if add_to_parent:
- if not have_parent:
- parent, have_parent = self.current_worker_task, True
- if parent:
- parent.add_trail(result)
- return result
- def connection_for_read(self, url=None, **kwargs):
- """Establish connection used for consuming.
- See Also:
- :meth:`connection` for supported arguments.
- """
- return self._connection(url or self.conf.broker_read_url, **kwargs)
- def connection_for_write(self, url=None, **kwargs):
- """Establish connection used for producing.
- See Also:
- :meth:`connection` for supported arguments.
- """
- return self._connection(url or self.conf.broker_write_url, **kwargs)
- def connection(self, hostname=None, userid=None, password=None,
- virtual_host=None, port=None, ssl=None,
- connect_timeout=None, transport=None,
- transport_options=None, heartbeat=None,
- login_method=None, failover_strategy=None, **kwargs):
- """Establish a connection to the message broker.
- Please use :meth:`connection_for_read` and
- :meth:`connection_for_write` instead, to convey the intent
- of use for this connection.
- Arguments:
- url: Either the URL or the hostname of the broker to use.
- hostname (str): URL, Hostname/IP-address of the broker.
- If a URL is used, then the other argument below will
- be taken from the URL instead.
- userid (str): Username to authenticate as.
- password (str): Password to authenticate with
- virtual_host (str): Virtual host to use (domain).
- port (int): Port to connect to.
- ssl (bool, Dict): Defaults to the :setting:`broker_use_ssl`
- setting.
- transport (str): defaults to the :setting:`broker_transport`
- setting.
- transport_options (Dict): Dictionary of transport specific options.
- heartbeat (int): AMQP Heartbeat in seconds (``pyamqp`` only).
- login_method (str): Custom login method to use (AMQP only).
- failover_strategy (str, Callable): Custom failover strategy.
- **kwargs: Additional arguments to :class:`kombu.Connection`.
- Returns:
- kombu.Connection: the lazy connection instance.
- """
- return self.connection_for_write(
- hostname or self.conf.broker_write_url,
- userid=userid, password=password,
- virtual_host=virtual_host, port=port, ssl=ssl,
- connect_timeout=connect_timeout, transport=transport,
- transport_options=transport_options, heartbeat=heartbeat,
- login_method=login_method, failover_strategy=failover_strategy,
- **kwargs
- )
- def _connection(self, url, userid=None, password=None,
- virtual_host=None, port=None, ssl=None,
- connect_timeout=None, transport=None,
- transport_options=None, heartbeat=None,
- login_method=None, failover_strategy=None, **kwargs):
- conf = self.conf
- return self.amqp.Connection(
- url,
- userid or conf.broker_user,
- password or conf.broker_password,
- virtual_host or conf.broker_vhost,
- port or conf.broker_port,
- transport=transport or conf.broker_transport,
- ssl=self.either('broker_use_ssl', ssl),
- heartbeat=heartbeat or self.conf.broker_heartbeat,
- login_method=login_method or conf.broker_login_method,
- failover_strategy=(
- failover_strategy or conf.broker_failover_strategy
- ),
- transport_options=dict(
- conf.broker_transport_options, **transport_options or {}
- ),
- connect_timeout=self.either(
- 'broker_connection_timeout', connect_timeout
- ),
- )
- broker_connection = connection
- def _acquire_connection(self, pool=True):
- """Helper for :meth:`connection_or_acquire`."""
- if pool:
- return self.pool.acquire(block=True)
- return self.connection_for_write()
- def connection_or_acquire(self, connection=None, pool=True, *_, **__):
- """Context used to acquire a connection from the pool.
- For use within a :keyword:`with` statement to get a connection
- from the pool if one is not already provided.
- Arguments:
- connection (kombu.Connection): If not provided, a connection
- will be acquired from the connection pool.
- """
- return FallbackContext(connection, self._acquire_connection, pool=pool)
- default_connection = connection_or_acquire # XXX compat
- def producer_or_acquire(self, producer=None):
- """Context used to acquire a producer from the pool.
- For use within a :keyword:`with` statement to get a producer
- from the pool if one is not already provided
- Arguments:
- producer (kombu.Producer): If not provided, a producer
- will be acquired from the producer pool.
- """
- return FallbackContext(
- producer, self.producer_pool.acquire, block=True,
- )
- default_producer = producer_or_acquire # XXX compat
- def prepare_config(self, c):
- """Prepare configuration before it is merged with the defaults."""
- return find_deprecated_settings(c)
- def now(self):
- """Return the current time and date as a datetime."""
- now_in_utc = to_utc(datetime.utcnow())
- return now_in_utc.astimezone(self.timezone)
- def select_queues(self, queues=None):
- """Select subset of queues.
- Arguments:
- queues (Sequence[str]): a list of queue names to keep.
- """
- return self.amqp.queues.select(queues)
- def either(self, default_key, *defaults):
- """Get key from configuration or use default values.
- Fallback to the value of a configuration key if none of the
- `*values` are true.
- """
- return first(None, [
- first(None, defaults), starpromise(self.conf.get, default_key),
- ])
- def bugreport(self):
- """Return information useful in bug reports."""
- return bugreport(self)
- def _get_backend(self):
- backend, url = backends.by_url(
- self.backend_cls or self.conf.result_backend,
- self.loader)
- return backend(app=self, url=url)
- def _finalize_pending_conf(self):
- """Get config value by key and finalize loading the configuration.
- Note:
- This is used by PendingConfiguration:
- as soon as you access a key the configuration is read.
- """
- conf = self._conf = self._load_config()
- return conf
- def _load_config(self):
- if isinstance(self.on_configure, Signal):
- self.on_configure.send(sender=self)
- else:
- # used to be a method pre 4.0
- self.on_configure()
- if self._config_source:
- self.loader.config_from_object(self._config_source)
- self.configured = True
- settings = detect_settings(
- self.prepare_config(self.loader.conf), self._preconf,
- ignore_keys=self._preconf_set_by_auto, prefix=self.namespace,
- )
- if self._conf is not None:
- # replace in place, as someone may have referenced app.conf,
- # done some changes, accessed a key, and then try to make more
- # changes to the reference and not the finalized value.
- self._conf.swap_with(settings)
- else:
- self._conf = settings
- # load lazy config dict initializers.
- pending_def = self._pending_defaults
- while pending_def:
- self._conf.add_defaults(maybe_evaluate(pending_def.popleft()()))
- # load lazy periodic tasks
- pending_beat = self._pending_periodic_tasks
- while pending_beat:
- self._add_periodic_task(*pending_beat.popleft())
- self.on_after_configure.send(sender=self, source=self._conf)
- return self._conf
- def _after_fork(self):
- self._pool = None
- try:
- self.__dict__['amqp']._producer_pool = None
- except (AttributeError, KeyError):
- pass
- self.on_after_fork.send(sender=self)
- def signature(self, *args, **kwargs):
- """Return a new :class:`~celery.Signature` bound to this app."""
- kwargs['app'] = self
- return self._canvas.signature(*args, **kwargs)
- def add_periodic_task(self, schedule, sig,
- args=(), kwargs=(), name=None, **opts):
- key, entry = self._sig_to_periodic_task_entry(
- schedule, sig, args, kwargs, name, **opts)
- if self.configured:
- self._add_periodic_task(key, entry)
- else:
- self._pending_periodic_tasks.append((key, entry))
- return key
- def _sig_to_periodic_task_entry(self, schedule, sig,
- args=(), kwargs={}, name=None, **opts):
- sig = (sig.clone(args, kwargs)
- if isinstance(sig, abstract.CallableSignature)
- else self.signature(sig.name, args, kwargs))
- return name or repr(sig), {
- 'schedule': schedule,
- 'task': sig.name,
- 'args': sig.args,
- 'kwargs': sig.kwargs,
- 'options': dict(sig.options, **opts),
- }
- def _add_periodic_task(self, key, entry):
- self._conf.beat_schedule[key] = entry
- def create_task_cls(self):
- """Create a base task class bound to this app."""
- return self.subclass_with_self(
- self.task_cls, name='Task', attribute='_app',
- keep_reduce=True, abstract=True,
- )
- def subclass_with_self(self, Class, name=None, attribute='app',
- reverse=None, keep_reduce=False, **kw):
- """Subclass an app-compatible class.
- App-compatible means that the class has a class attribute that
- provides the default app it should use, for example:
- ``class Foo: app = None``.
- Arguments:
- Class (type): The app-compatible class to subclass.
- name (str): Custom name for the target class.
- attribute (str): Name of the attribute holding the app,
- Default is 'app'.
- reverse (str): Reverse path to this object used for pickling
- purposes. For example, to get ``app.AsyncResult``,
- use ``"AsyncResult"``.
- keep_reduce (bool): If enabled a custom ``__reduce__``
- implementation won't be provided.
- """
- Class = symbol_by_name(Class)
- reverse = reverse if reverse else Class.__name__
- def __reduce__(self):
- return _unpickle_appattr, (reverse, self.__reduce_args__())
- attrs = dict(
- {attribute: self},
- __module__=Class.__module__,
- __doc__=Class.__doc__,
- **kw)
- if not keep_reduce:
- attrs['__reduce__'] = __reduce__
- return type(bytes_if_py2(name or Class.__name__), (Class,), attrs)
- def _rgetattr(self, path):
- return attrgetter(path)(self)
- def __enter__(self):
- return self
- def __exit__(self, *exc_info):
- self.close()
- def __repr__(self):
- return '<{0} {1}>'.format(type(self).__name__, appstr(self))
- def __reduce__(self):
- if self._using_v1_reduce:
- return self.__reduce_v1__()
- return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
- def __reduce_v1__(self):
- # Reduce only pickles the configuration changes,
- # so the default configuration doesn't have to be passed
- # between processes.
- return (
- _unpickle_app,
- (self.__class__, self.Pickler) + self.__reduce_args__(),
- )
- def __reduce_keys__(self):
- """Keyword arguments used to reconstruct the object when unpickling."""
- return {
- 'main': self.main,
- 'changes':
- self._conf.changes if self.configured else self._preconf,
- 'loader': self.loader_cls,
- 'backend': self.backend_cls,
- 'amqp': self.amqp_cls,
- 'events': self.events_cls,
- 'log': self.log_cls,
- 'control': self.control_cls,
- 'fixups': self.fixups,
- 'config_source': self._config_source,
- 'task_cls': self.task_cls,
- 'namespace': self.namespace,
- }
- def __reduce_args__(self):
- """Deprecated method, please use :meth:`__reduce_keys__` instead."""
- return (self.main, self._conf.changes if self.configured else {},
- self.loader_cls, self.backend_cls, self.amqp_cls,
- self.events_cls, self.log_cls, self.control_cls,
- False, self._config_source)
- @cached_property
- def Worker(self):
- """Worker application.
- See Also:
- :class:`~@Worker`.
- """
- return self.subclass_with_self('celery.apps.worker:Worker')
- @cached_property
- def WorkController(self, **kwargs):
- """Embeddable worker.
- See Also:
- :class:`~@WorkController`.
- """
- return self.subclass_with_self('celery.worker:WorkController')
- @cached_property
- def Beat(self, **kwargs):
- """:program:`celery beat` scheduler application.
- See Also:
- :class:`~@Beat`.
- """
- return self.subclass_with_self('celery.apps.beat:Beat')
- @cached_property
- def Task(self):
- """Base task class for this app."""
- return self.create_task_cls()
- @cached_property
- def annotations(self):
- return prepare_annotations(self.conf.task_annotations)
- @cached_property
- def AsyncResult(self):
- """Create new result instance.
- See Also:
- :class:`celery.result.AsyncResult`.
- """
- return self.subclass_with_self('celery.result:AsyncResult')
- @cached_property
- def ResultSet(self):
- return self.subclass_with_self('celery.result:ResultSet')
- @cached_property
- def GroupResult(self):
- """Create new group result instance.
- See Also:
- :class:`celery.result.GroupResult`.
- """
- return self.subclass_with_self('celery.result:GroupResult')
- @property
- def pool(self):
- """Broker connection pool: :class:`~@pool`.
- Note:
- This attribute is not related to the workers concurrency pool.
- """
- if self._pool is None:
- self._ensure_after_fork()
- limit = self.conf.broker_pool_limit
- pools.set_limit(limit)
- self._pool = pools.connections[self.connection_for_write()]
- return self._pool
- @property
- def current_task(self):
- """Instance of task being executed, or :const:`None`."""
- return _task_stack.top
- @property
- def current_worker_task(self):
- """The task currently being executed by a worker or :const:`None`.
- Differs from :data:`current_task` in that it's not affected
- by tasks calling other tasks directly, or eagerly.
- """
- return get_current_worker_task()
- @cached_property
- def oid(self):
- """Universally unique identifier for this app."""
- # since 4.0: thread.get_ident() is not included when
- # generating the process id. This is due to how the RPC
- # backend now dedicates a single thread to receive results,
- # which would not work if each thread has a separate id.
- return oid_from(self, threads=False)
- @cached_property
- def amqp(self):
- """AMQP related functionality: :class:`~@amqp`."""
- return instantiate(self.amqp_cls, app=self)
- @cached_property
- def backend(self):
- """Current backend instance."""
- return self._get_backend()
- @property
- def conf(self):
- """Current configuration."""
- if self._conf is None:
- self._conf = self._load_config()
- return self._conf
- @conf.setter
- def conf(self, d): # noqa
- self._conf = d
- @cached_property
- def control(self):
- """Remote control: :class:`~@control`."""
- return instantiate(self.control_cls, app=self)
- @cached_property
- def events(self):
- """Consuming and sending events: :class:`~@events`."""
- return instantiate(self.events_cls, app=self)
- @cached_property
- def loader(self):
- """Current loader instance."""
- return get_loader_cls(self.loader_cls)(app=self)
- @cached_property
- def log(self):
- """Logging: :class:`~@log`."""
- return instantiate(self.log_cls, app=self)
- @cached_property
- def _canvas(self):
- from celery import canvas
- return canvas
- @cached_property
- def tasks(self):
- """Task registry.
- Warning:
- Accessing this attribute will also auto-finalize the app.
- """
- self.finalize(auto=True)
- return self._tasks
- @property
- def producer_pool(self):
- return self.amqp.producer_pool
- def uses_utc_timezone(self):
- """Check if the application uses the UTC timezone."""
- return self.conf.timezone == 'UTC' or self.conf.timezone is None
- @cached_property
- def timezone(self):
- """Current timezone for this app.
- This is a cached property taking the time zone from the
- :setting:`timezone` setting.
- """
- conf = self.conf
- tz = conf.timezone or 'UTC'
- if not tz:
- if conf.enable_utc:
- return timezone.get_timezone('UTC')
- else:
- if not conf.timezone:
- return timezone.local
- return timezone.get_timezone(tz)
- App = Celery # noqa: E305 XXX compat
|