1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021 |
- # -*- coding: utf-8 -*-
- """Task implementation: request context and the task base class."""
- from __future__ import absolute_import, unicode_literals
- import sys
- from billiard.einfo import ExceptionInfo
- from kombu.exceptions import OperationalError
- from kombu.utils.uuid import uuid
- from celery import current_app, group, states
- from celery._state import _task_stack
- from celery.canvas import signature
- from celery.exceptions import (Ignore, ImproperlyConfigured,
- MaxRetriesExceededError, Reject, Retry)
- from celery.five import items, python_2_unicode_compatible
- from celery.local import class_property
- from celery.result import EagerResult, denied_join_result
- from celery.utils import abstract
- from celery.utils.functional import mattrgetter, maybe_list
- from celery.utils.imports import instantiate
- from celery.utils.nodenames import gethostname
- from celery.utils.serialization import raise_with_context
- from .annotations import resolve_all as resolve_all_annotations
- from .registry import _unpickle_task_v2
- from .utils import appstr
- __all__ = ('Context', 'Task')
- #: extracts attributes related to publishing a message from an object.
- extract_exec_options = mattrgetter(
- 'queue', 'routing_key', 'exchange', 'priority', 'expires',
- 'serializer', 'delivery_mode', 'compression', 'time_limit',
- 'soft_time_limit', 'immediate', 'mandatory', # imm+man is deprecated
- )
- # We take __repr__ very seriously around here ;)
- R_BOUND_TASK = '<class {0.__name__} of {app}{flags}>'
- R_UNBOUND_TASK = '<unbound {0.__name__}{flags}>'
- R_SELF_TASK = '<@task {0.name} bound to other {0.__self__}>'
- R_INSTANCE = '<@task: {0.name} of {app}{flags}>'
- #: Here for backwards compatibility as tasks no longer use a custom meta-class.
- TaskType = type
- def _strflags(flags, default=''):
- if flags:
- return ' ({0})'.format(', '.join(flags))
- return default
- def _reprtask(task, fmt=None, flags=None):
- flags = list(flags) if flags is not None else []
- flags.append('v2 compatible') if task.__v2_compat__ else None
- if not fmt:
- fmt = R_BOUND_TASK if task._app else R_UNBOUND_TASK
- return fmt.format(
- task, flags=_strflags(flags),
- app=appstr(task._app) if task._app else None,
- )
- @python_2_unicode_compatible
- class Context(object):
- """Task request variables (Task.request)."""
- logfile = None
- loglevel = None
- hostname = None
- id = None
- args = None
- kwargs = None
- retries = 0
- eta = None
- expires = None
- is_eager = False
- headers = None
- delivery_info = None
- reply_to = None
- root_id = None
- parent_id = None
- correlation_id = None
- taskset = None # compat alias to group
- group = None
- chord = None
- chain = None
- utc = None
- called_directly = True
- callbacks = None
- errbacks = None
- timelimit = None
- origin = None
- _children = None # see property
- _protected = 0
- def __init__(self, *args, **kwargs):
- self.update(*args, **kwargs)
- def update(self, *args, **kwargs):
- return self.__dict__.update(*args, **kwargs)
- def clear(self):
- return self.__dict__.clear()
- def get(self, key, default=None):
- return getattr(self, key, default)
- def __repr__(self):
- return '<Context: {0!r}>'.format(vars(self))
- def as_execution_options(self):
- limit_hard, limit_soft = self.timelimit or (None, None)
- return {
- 'task_id': self.id,
- 'root_id': self.root_id,
- 'parent_id': self.parent_id,
- 'group_id': self.group,
- 'chord': self.chord,
- 'chain': self.chain,
- 'link': self.callbacks,
- 'link_error': self.errbacks,
- 'expires': self.expires,
- 'soft_time_limit': limit_soft,
- 'time_limit': limit_hard,
- 'headers': self.headers,
- 'retries': self.retries,
- 'reply_to': self.reply_to,
- 'origin': self.origin,
- }
- @property
- def children(self):
- # children must be an empty list for every thread
- if self._children is None:
- self._children = []
- return self._children
- @abstract.CallableTask.register
- @python_2_unicode_compatible
- class Task(object):
- """Task base class.
- Note:
- When called tasks apply the :meth:`run` method. This method must
- be defined by all tasks (that is unless the :meth:`__call__` method
- is overridden).
- """
- __trace__ = None
- __v2_compat__ = False # set by old base in celery.task.base
- MaxRetriesExceededError = MaxRetriesExceededError
- OperationalError = OperationalError
- #: Execution strategy used, or the qualified name of one.
- Strategy = 'celery.worker.strategy:default'
- #: Request class used, or the qualified name of one.
- Request = 'celery.worker.request:Request'
- #: This is the instance bound to if the task is a method of a class.
- __self__ = None
- #: The application instance associated with this task class.
- _app = None
- #: Name of the task.
- name = None
- #: Enable argument checking.
- #: You can set this to false if you don't want the signature to be
- #: checked when calling the task.
- #: Defaults to :attr:`app.strict_typing <@Celery.strict_typing>`.
- typing = None
- #: Maximum number of retries before giving up. If set to :const:`None`,
- #: it will **never** stop retrying.
- max_retries = 3
- #: Default time in seconds before a retry of the task should be
- #: executed. 3 minutes by default.
- default_retry_delay = 3 * 60
- #: Rate limit for this task type. Examples: :const:`None` (no rate
- #: limit), `'100/s'` (hundred tasks a second), `'100/m'` (hundred tasks
- #: a minute),`'100/h'` (hundred tasks an hour)
- rate_limit = None
- #: If enabled the worker won't store task state and return values
- #: for this task. Defaults to the :setting:`task_ignore_result`
- #: setting.
- ignore_result = None
- #: If enabled the request will keep track of subtasks started by
- #: this task, and this information will be sent with the result
- #: (``result.children``).
- trail = True
- #: If enabled the worker will send monitoring events related to
- #: this task (but only if the worker is configured to send
- #: task related events).
- #: Note that this has no effect on the task-failure event case
- #: where a task is not registered (as it will have no task class
- #: to check this flag).
- send_events = True
- #: When enabled errors will be stored even if the task is otherwise
- #: configured to ignore results.
- store_errors_even_if_ignored = None
- #: The name of a serializer that are registered with
- #: :mod:`kombu.serialization.registry`. Default is `'pickle'`.
- serializer = None
- #: Hard time limit.
- #: Defaults to the :setting:`task_time_limit` setting.
- time_limit = None
- #: Soft time limit.
- #: Defaults to the :setting:`task_soft_time_limit` setting.
- soft_time_limit = None
- #: The result store backend used for this task.
- backend = None
- #: If disabled this task won't be registered automatically.
- autoregister = True
- #: If enabled the task will report its status as 'started' when the task
- #: is executed by a worker. Disabled by default as the normal behavior
- #: is to not report that level of granularity. Tasks are either pending,
- #: finished, or waiting to be retried.
- #:
- #: Having a 'started' status can be useful for when there are long
- #: running tasks and there's a need to report what task is currently
- #: running.
- #:
- #: The application default can be overridden using the
- #: :setting:`task_track_started` setting.
- track_started = None
- #: When enabled messages for this task will be acknowledged **after**
- #: the task has been executed, and not *just before* (the
- #: default behavior).
- #:
- #: Please note that this means the task may be executed twice if the
- #: worker crashes mid execution.
- #:
- #: The application default can be overridden with the
- #: :setting:`task_acks_late` setting.
- acks_late = None
- #: Even if :attr:`acks_late` is enabled, the worker will
- #: acknowledge tasks when the worker process executing them abruptly
- #: exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).
- #:
- #: Setting this to true allows the message to be re-queued instead,
- #: so that the task will execute again by the same worker, or another
- #: worker.
- #:
- #: Warning: Enabling this can cause message loops; make sure you know
- #: what you're doing.
- reject_on_worker_lost = None
- #: Tuple of expected exceptions.
- #:
- #: These are errors that are expected in normal operation
- #: and that shouldn't be regarded as a real error by the worker.
- #: Currently this means that the state will be updated to an error
- #: state, but the worker won't log the event as an error.
- throws = ()
- #: Default task expiry time.
- expires = None
- #: Max length of result representation used in logs and events.
- resultrepr_maxsize = 1024
- #: Task request stack, the current request will be the topmost.
- request_stack = None
- #: Some may expect a request to exist even if the task hasn't been
- #: called. This should probably be deprecated.
- _default_request = None
- #: Deprecated attribute ``abstract`` here for compatibility.
- abstract = True
- _exec_options = None
- __bound__ = False
- from_config = (
- ('serializer', 'task_serializer'),
- ('rate_limit', 'task_default_rate_limit'),
- ('track_started', 'task_track_started'),
- ('acks_late', 'task_acks_late'),
- ('reject_on_worker_lost', 'task_reject_on_worker_lost'),
- ('ignore_result', 'task_ignore_result'),
- ('store_errors_even_if_ignored', 'task_store_errors_even_if_ignored'),
- )
- _backend = None # set by backend property.
- # - Tasks are lazily bound, so that configuration is not set
- # - until the task is actually used
- @classmethod
- def bind(cls, app):
- was_bound, cls.__bound__ = cls.__bound__, True
- cls._app = app
- conf = app.conf
- cls._exec_options = None # clear option cache
- if cls.typing is None:
- cls.typing = app.strict_typing
- for attr_name, config_name in cls.from_config:
- if getattr(cls, attr_name, None) is None:
- setattr(cls, attr_name, conf[config_name])
- # decorate with annotations from config.
- if not was_bound:
- cls.annotate()
- from celery.utils.threads import LocalStack
- cls.request_stack = LocalStack()
- # PeriodicTask uses this to add itself to the PeriodicTask schedule.
- cls.on_bound(app)
- return app
- @classmethod
- def on_bound(cls, app):
- """Called when the task is bound to an app.
- Note:
- This class method can be defined to do additional actions when
- the task class is bound to an app.
- """
- pass
- @classmethod
- def _get_app(cls):
- if cls._app is None:
- cls._app = current_app
- if not cls.__bound__:
- # The app property's __set__ method is not called
- # if Task.app is set (on the class), so must bind on use.
- cls.bind(cls._app)
- return cls._app
- app = class_property(_get_app, bind)
- @classmethod
- def annotate(cls):
- for d in resolve_all_annotations(cls.app.annotations, cls):
- for key, value in items(d):
- if key.startswith('@'):
- cls.add_around(key[1:], value)
- else:
- setattr(cls, key, value)
- @classmethod
- def add_around(cls, attr, around):
- orig = getattr(cls, attr)
- if getattr(orig, '__wrapped__', None):
- orig = orig.__wrapped__
- meth = around(orig)
- meth.__wrapped__ = orig
- setattr(cls, attr, meth)
- def __call__(self, *args, **kwargs):
- _task_stack.push(self)
- self.push_request(args=args, kwargs=kwargs)
- try:
- # add self if this is a bound task
- if self.__self__ is not None:
- return self.run(self.__self__, *args, **kwargs)
- return self.run(*args, **kwargs)
- finally:
- self.pop_request()
- _task_stack.pop()
- def __reduce__(self):
- # - tasks are pickled into the name of the task only, and the receiver
- # - simply grabs it from the local registry.
- # - in later versions the module of the task is also included,
- # - and the receiving side tries to import that module so that
- # - it will work even if the task hasn't been registered.
- mod = type(self).__module__
- mod = mod if mod and mod in sys.modules else None
- return (_unpickle_task_v2, (self.name, mod), None)
- def run(self, *args, **kwargs):
- """The body of the task executed by workers."""
- raise NotImplementedError('Tasks must define the run method.')
- def start_strategy(self, app, consumer, **kwargs):
- return instantiate(self.Strategy, self, app, consumer, **kwargs)
- def delay(self, *args, **kwargs):
- """Star argument version of :meth:`apply_async`.
- Does not support the extra options enabled by :meth:`apply_async`.
- Arguments:
- *args (Any): Positional arguments passed on to the task.
- **kwargs (Any): Keyword arguments passed on to the task.
- Returns:
- celery.result.AsyncResult: Future promise.
- """
- return self.apply_async(args, kwargs)
- def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
- link=None, link_error=None, shadow=None, **options):
- """Apply tasks asynchronously by sending a message.
- Arguments:
- args (Tuple): The positional arguments to pass on to the task.
- kwargs (Dict): The keyword arguments to pass on to the task.
- countdown (float): Number of seconds into the future that the
- task should execute. Defaults to immediate execution.
- eta (~datetime.datetime): Absolute time and date of when the task
- should be executed. May not be specified if `countdown`
- is also supplied.
- expires (float, ~datetime.datetime): Datetime or
- seconds in the future for the task should expire.
- The task won't be executed after the expiration time.
- shadow (str): Override task name used in logs/monitoring.
- Default is retrieved from :meth:`shadow_name`.
- connection (kombu.Connection): Re-use existing broker connection
- instead of acquiring one from the connection pool.
- retry (bool): If enabled sending of the task message will be
- retried in the event of connection loss or failure.
- Default is taken from the :setting:`task_publish_retry`
- setting. Note that you need to handle the
- producer/connection manually for this to work.
- retry_policy (Mapping): Override the retry policy used.
- See the :setting:`task_publish_retry_policy` setting.
- queue (str, kombu.Queue): The queue to route the task to.
- This must be a key present in :setting:`task_queues`, or
- :setting:`task_create_missing_queues` must be
- enabled. See :ref:`guide-routing` for more
- information.
- exchange (str, kombu.Exchange): Named custom exchange to send the
- task to. Usually not used in combination with the ``queue``
- argument.
- routing_key (str): Custom routing key used to route the task to a
- worker server. If in combination with a ``queue`` argument
- only used to specify custom routing keys to topic exchanges.
- priority (int): The task priority, a number between 0 and 9.
- Defaults to the :attr:`priority` attribute.
- serializer (str): Serialization method to use.
- Can be `pickle`, `json`, `yaml`, `msgpack` or any custom
- serialization method that's been registered
- with :mod:`kombu.serialization.registry`.
- Defaults to the :attr:`serializer` attribute.
- compression (str): Optional compression method
- to use. Can be one of ``zlib``, ``bzip2``,
- or any custom compression methods registered with
- :func:`kombu.compression.register`.
- Defaults to the :setting:`task_compression` setting.
- link (~@Signature): A single, or a list of tasks signatures
- to apply if the task returns successfully.
- link_error (~@Signature): A single, or a list of task signatures
- to apply if an error occurs while executing the task.
- producer (kombu.Producer): custom producer to use when publishing
- the task.
- add_to_parent (bool): If set to True (default) and the task
- is applied while executing another task, then the result
- will be appended to the parent tasks ``request.children``
- attribute. Trailing can also be disabled by default using the
- :attr:`trail` attribute
- publisher (kombu.Producer): Deprecated alias to ``producer``.
- headers (Dict): Message headers to be included in the message.
- Returns:
- celery.result.AsyncResult: Promise of future evaluation.
- Raises:
- TypeError: If not enough arguments are passed, or too many
- arguments are passed. Note that signature checks may
- be disabled by specifying ``@task(typing=False)``.
- kombu.exceptions.OperationalError: If a connection to the
- transport cannot be made, or if the connection is lost.
- Note:
- Also supports all keyword arguments supported by
- :meth:`kombu.Producer.publish`.
- """
- if self.typing:
- try:
- check_arguments = self.__header__
- except AttributeError: # pragma: no cover
- pass
- else:
- check_arguments(*(args or ()), **(kwargs or {}))
- app = self._get_app()
- if app.conf.task_always_eager:
- with denied_join_result():
- return self.apply(args, kwargs, task_id=task_id or uuid(),
- link=link, link_error=link_error, **options)
- # add 'self' if this is a "task_method".
- if self.__self__ is not None:
- args = args if isinstance(args, tuple) else tuple(args or ())
- args = (self.__self__,) + args
- if self.__v2_compat__:
- shadow = shadow or self.shadow_name(self(), args, kwargs, options)
- else:
- shadow = shadow or self.shadow_name(args, kwargs, options)
- preopts = self._get_exec_options()
- options = dict(preopts, **options) if options else preopts
- return app.send_task(
- self.name, args, kwargs, task_id=task_id, producer=producer,
- link=link, link_error=link_error, result_cls=self.AsyncResult,
- shadow=shadow, task_type=self,
- **options
- )
- def shadow_name(self, args, kwargs, options):
- """Override for custom task name in worker logs/monitoring.
- Example:
- .. code-block:: python
- from celery.utils.imports import qualname
- def shadow_name(task, args, kwargs, options):
- return qualname(args[0])
- @app.task(shadow_name=shadow_name, serializer='pickle')
- def apply_function_async(fun, *args, **kwargs):
- return fun(*args, **kwargs)
- Arguments:
- args (Tuple): Task positional arguments.
- kwargs (Dict): Task keyword arguments.
- options (Dict): Task execution options.
- """
- pass
- def signature_from_request(self, request=None, args=None, kwargs=None,
- queue=None, **extra_options):
- request = self.request if request is None else request
- args = request.args if args is None else args
- kwargs = request.kwargs if kwargs is None else kwargs
- options = request.as_execution_options()
- if queue:
- options['queue'] = queue
- else:
- delivery_info = request.delivery_info or {}
- exchange = delivery_info.get('exchange')
- routing_key = delivery_info.get('routing_key')
- if exchange == '' and routing_key:
- # sent to anon-exchange
- options['queue'] = routing_key
- else:
- options.update(delivery_info)
- return self.signature(
- args, kwargs, options, type=self, **extra_options
- )
- subtask_from_request = signature_from_request # XXX compat
- def retry(self, args=None, kwargs=None, exc=None, throw=True,
- eta=None, countdown=None, max_retries=None, **options):
- """Retry the task.
- Example:
- >>> from imaginary_twitter_lib import Twitter
- >>> from proj.celery import app
- >>> @app.task(bind=True)
- ... def tweet(self, auth, message):
- ... twitter = Twitter(oauth=auth)
- ... try:
- ... twitter.post_status_update(message)
- ... except twitter.FailWhale as exc:
- ... # Retry in 5 minutes.
- ... raise self.retry(countdown=60 * 5, exc=exc)
- Note:
- Although the task will never return above as `retry` raises an
- exception to notify the worker, we use `raise` in front of the
- retry to convey that the rest of the block won't be executed.
- Arguments:
- args (Tuple): Positional arguments to retry with.
- kwargs (Dict): Keyword arguments to retry with.
- exc (Exception): Custom exception to report when the max retry
- limit has been exceeded (default:
- :exc:`~@MaxRetriesExceededError`).
- If this argument is set and retry is called while
- an exception was raised (``sys.exc_info()`` is set)
- it will attempt to re-raise the current exception.
- If no exception was raised it will raise the ``exc``
- argument provided.
- countdown (float): Time in seconds to delay the retry for.
- eta (~datetime.datetime): Explicit time and date to run the
- retry at.
- max_retries (int): If set, overrides the default retry limit for
- this execution. Changes to this parameter don't propagate to
- subsequent task retry attempts. A value of :const:`None`,
- means "use the default", so if you want infinite retries you'd
- have to set the :attr:`max_retries` attribute of the task to
- :const:`None` first.
- time_limit (int): If set, overrides the default time limit.
- soft_time_limit (int): If set, overrides the default soft
- time limit.
- throw (bool): If this is :const:`False`, don't raise the
- :exc:`~@Retry` exception, that tells the worker to mark
- the task as being retried. Note that this means the task
- will be marked as failed if the task raises an exception,
- or successful if it returns after the retry call.
- **options (Any): Extra options to pass on to :meth:`apply_async`.
- Raises:
- celery.exceptions.Retry:
- To tell the worker that the task has been re-sent for retry.
- This always happens, unless the `throw` keyword argument
- has been explicitly set to :const:`False`, and is considered
- normal operation.
- """
- request = self.request
- retries = request.retries + 1
- max_retries = self.max_retries if max_retries is None else max_retries
- # Not in worker or emulated by (apply/always_eager),
- # so just raise the original exception.
- if request.called_directly:
- # raises orig stack if PyErr_Occurred,
- # and augments with exc' if that argument is defined.
- raise_with_context(exc or Retry('Task can be retried', None))
- if not eta and countdown is None:
- countdown = self.default_retry_delay
- is_eager = request.is_eager
- S = self.signature_from_request(
- request, args, kwargs,
- countdown=countdown, eta=eta, retries=retries,
- **options
- )
- if max_retries is not None and retries > max_retries:
- if exc:
- # On Py3: will augment any current exception with
- # the exc' argument provided (raise exc from orig)
- raise_with_context(exc)
- raise self.MaxRetriesExceededError(
- "Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
- self.name, request.id, S.args, S.kwargs))
- ret = Retry(exc=exc, when=eta or countdown)
- if is_eager:
- # if task was executed eagerly using apply(),
- # then the retry must also be executed eagerly.
- S.apply().get()
- if throw:
- raise ret
- return ret
- try:
- S.apply_async()
- except Exception as exc:
- raise Reject(exc, requeue=False)
- if throw:
- raise ret
- return ret
- def apply(self, args=None, kwargs=None,
- link=None, link_error=None,
- task_id=None, retries=None, throw=None,
- logfile=None, loglevel=None, headers=None, **options):
- """Execute this task locally, by blocking until the task returns.
- Arguments:
- args (Tuple): positional arguments passed on to the task.
- kwargs (Dict): keyword arguments passed on to the task.
- throw (bool): Re-raise task exceptions.
- Defaults to the :setting:`task_eager_propagates` setting.
- Returns:
- celery.result.EagerResult: pre-evaluated result.
- """
- # trace imports Task, so need to import inline.
- from celery.app.trace import build_tracer
- app = self._get_app()
- args = args or ()
- # add 'self' if this is a bound method.
- if self.__self__ is not None:
- args = (self.__self__,) + tuple(args)
- kwargs = kwargs or {}
- task_id = task_id or uuid()
- retries = retries or 0
- if throw is None:
- throw = app.conf.task_eager_propagates
- # Make sure we get the task instance, not class.
- task = app._tasks[self.name]
- request = {
- 'id': task_id,
- 'retries': retries,
- 'is_eager': True,
- 'logfile': logfile,
- 'loglevel': loglevel or 0,
- 'hostname': gethostname(),
- 'callbacks': maybe_list(link),
- 'errbacks': maybe_list(link_error),
- 'headers': headers,
- 'delivery_info': {'is_eager': True},
- }
- tb = None
- tracer = build_tracer(
- task.name, task, eager=True,
- propagate=throw, app=self._get_app(),
- )
- ret = tracer(task_id, args, kwargs, request)
- retval = ret.retval
- if isinstance(retval, ExceptionInfo):
- retval, tb = retval.exception, retval.traceback
- state = states.SUCCESS if ret.info is None else ret.info.state
- return EagerResult(task_id, retval, state, traceback=tb)
- def AsyncResult(self, task_id, **kwargs):
- """Get AsyncResult instance for this kind of task.
- Arguments:
- task_id (str): Task id to get result for.
- """
- return self._get_app().AsyncResult(task_id, backend=self.backend,
- task_name=self.name, **kwargs)
- def signature(self, args=None, *starargs, **starkwargs):
- """Create signature.
- Returns:
- :class:`~celery.signature`: object for
- this task, wrapping arguments and execution options
- for a single task invocation.
- """
- starkwargs.setdefault('app', self.app)
- return signature(self, args, *starargs, **starkwargs)
- subtask = signature
- def s(self, *args, **kwargs):
- """Create signature.
- Shortcut for ``.s(*a, **k) -> .signature(a, k)``.
- """
- return self.signature(args, kwargs)
- def si(self, *args, **kwargs):
- """Create immutable signature.
- Shortcut for ``.si(*a, **k) -> .signature(a, k, immutable=True)``.
- """
- return self.signature(args, kwargs, immutable=True)
- def chunks(self, it, n):
- """Create a :class:`~celery.canvas.chunks` task for this task."""
- from celery import chunks
- return chunks(self.s(), it, n, app=self.app)
- def map(self, it):
- """Create a :class:`~celery.canvas.xmap` task from ``it``."""
- from celery import xmap
- return xmap(self.s(), it, app=self.app)
- def starmap(self, it):
- """Create a :class:`~celery.canvas.xstarmap` task from ``it``."""
- from celery import xstarmap
- return xstarmap(self.s(), it, app=self.app)
- def send_event(self, type_, retry=True, retry_policy=None, **fields):
- """Send monitoring event message.
- This can be used to add custom event types in :pypi:`Flower`
- and other monitors.
- Arguments:
- type_ (str): Type of event, e.g. ``"task-failed"``.
- Keyword Arguments:
- retry (bool): Retry sending the message
- if the connection is lost. Default is taken from the
- :setting:`task_publish_retry` setting.
- retry_policy (Mapping): Retry settings. Default is taken
- from the :setting:`task_publish_retry_policy` setting.
- **fields (Any): Map containing information about the event.
- Must be JSON serializable.
- """
- req = self.request
- if retry_policy is None:
- retry_policy = self.app.conf.task_publish_retry_policy
- with self.app.events.default_dispatcher(hostname=req.hostname) as d:
- return d.send(
- type_,
- uuid=req.id, retry=retry, retry_policy=retry_policy, **fields)
- def replace(self, sig):
- """Replace this task, with a new task inheriting the task id.
- .. versionadded:: 4.0
- Arguments:
- sig (~@Signature): signature to replace with.
- Raises:
- ~@Ignore: This is always raised, so the best practice
- is to always use ``raise self.replace(...)`` to convey
- to the reader that the task won't continue after being replaced.
- """
- chord = self.request.chord
- if 'chord' in sig.options:
- raise ImproperlyConfigured(
- "A signature replacing a task must not be part of a chord"
- )
- if isinstance(sig, group):
- sig |= self.app.tasks['celery.accumulate'].s(index=0).set(
- link=self.request.callbacks,
- link_error=self.request.errbacks,
- )
- if self.request.chain:
- for t in reversed(self.request.chain):
- sig |= signature(t, app=self.app)
- sig.set(
- chord=chord,
- group_id=self.request.group,
- root_id=self.request.root_id,
- )
- sig.freeze(self.request.id)
- sig.delay()
- raise Ignore('Replaced by new task')
- def add_to_chord(self, sig, lazy=False):
- """Add signature to the chord the current task is a member of.
- .. versionadded:: 4.0
- Currently only supported by the Redis result backend.
- Arguments:
- sig (~@Signature): Signature to extend chord with.
- lazy (bool): If enabled the new task won't actually be called,
- and ``sig.delay()`` must be called manually.
- """
- if not self.request.chord:
- raise ValueError('Current task is not member of any chord')
- sig.set(
- group_id=self.request.group,
- chord=self.request.chord,
- root_id=self.request.root_id,
- )
- result = sig.freeze()
- self.backend.add_to_chord(self.request.group, result)
- return sig.delay() if not lazy else sig
- def update_state(self, task_id=None, state=None, meta=None):
- """Update task state.
- Arguments:
- task_id (str): Id of the task to update.
- Defaults to the id of the current task.
- state (str): New state.
- meta (Dict): State meta-data.
- """
- if task_id is None:
- task_id = self.request.id
- self.backend.store_result(task_id, meta, state)
- def on_success(self, retval, task_id, args, kwargs):
- """Success handler.
- Run by the worker if the task executes successfully.
- Arguments:
- retval (Any): The return value of the task.
- task_id (str): Unique id of the executed task.
- args (Tuple): Original arguments for the executed task.
- kwargs (Dict): Original keyword arguments for the executed task.
- Returns:
- None: The return value of this handler is ignored.
- """
- pass
- def on_retry(self, exc, task_id, args, kwargs, einfo):
- """Retry handler.
- This is run by the worker when the task is to be retried.
- Arguments:
- exc (Exception): The exception sent to :meth:`retry`.
- task_id (str): Unique id of the retried task.
- args (Tuple): Original arguments for the retried task.
- kwargs (Dict): Original keyword arguments for the retried task.
- einfo (~billiard.einfo.ExceptionInfo): Exception information.
- Returns:
- None: The return value of this handler is ignored.
- """
- pass
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- """Error handler.
- This is run by the worker when the task fails.
- Arguments:
- exc (Exception): The exception raised by the task.
- task_id (str): Unique id of the failed task.
- args (Tuple): Original arguments for the task that failed.
- kwargs (Dict): Original keyword arguments for the task that failed.
- einfo (~billiard.einfo.ExceptionInfo): Exception information.
- Returns:
- None: The return value of this handler is ignored.
- """
- pass
- def after_return(self, status, retval, task_id, args, kwargs, einfo):
- """Handler called after the task returns.
- Arguments:
- status (str): Current task state.
- retval (Any): Task return value/exception.
- task_id (str): Unique id of the task.
- args (Tuple): Original arguments for the task.
- kwargs (Dict): Original keyword arguments for the task.
- einfo (~billiard.einfo.ExceptionInfo): Exception information.
- Returns:
- None: The return value of this handler is ignored.
- """
- pass
- def add_trail(self, result):
- if self.trail:
- self.request.children.append(result)
- return result
- def push_request(self, *args, **kwargs):
- self.request_stack.push(Context(*args, **kwargs))
- def pop_request(self):
- self.request_stack.pop()
- def __repr__(self):
- """``repr(task)``."""
- return _reprtask(self, R_SELF_TASK if self.__self__ else R_INSTANCE)
- def _get_request(self):
- """Get current request object."""
- req = self.request_stack.top
- if req is None:
- # task was not called, but some may still expect a request
- # to be there, perhaps that should be deprecated.
- if self._default_request is None:
- self._default_request = Context()
- return self._default_request
- return req
- request = property(_get_request)
- def _get_exec_options(self):
- if self._exec_options is None:
- self._exec_options = extract_exec_options(self)
- return self._exec_options
- @property
- def backend(self):
- backend = self._backend
- if backend is None:
- return self.app.backend
- return backend
- @backend.setter
- def backend(self, value): # noqa
- self._backend = value
- @property
- def __name__(self):
- return self.__class__.__name__
- BaseTask = Task # noqa: E305 XXX compat alias
|