123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827 |
- """
- celery.app.task
- ~~~~~~~~~~~~~~~
- Tasks Implementation.
- :copyright: (c) 2009 - 2012 by Ask Solem.
- :license: BSD, see LICENSE for more details.
- """
- from __future__ import absolute_import
- import logging
- import sys
- import threading
- from kombu import Exchange
- from kombu.utils import cached_property
- from celery import current_app
- from celery import states
- from celery.datastructures import ExceptionInfo
- from celery.exceptions import MaxRetriesExceededError, RetryTaskError
- from celery.result import EagerResult
- from celery.utils import fun_takes_kwargs, uuid, maybe_reraise
- from celery.utils.functional import mattrgetter, maybe_list
- from celery.utils.imports import instantiate
- from celery.utils.log import get_logger
- from celery.utils.mail import ErrorMail
- from .annotations import resolve_all as resolve_all_annotations
- from .state import get_current_task
- from .registry import _unpickle_task
- extract_exec_options = mattrgetter("queue", "routing_key",
- "exchange", "immediate",
- "mandatory", "priority",
- "serializer", "delivery_mode",
- "compression", "expires")
- class Context(threading.local):
-
- logfile = None
- loglevel = None
- hostname = None
- id = None
- args = None
- kwargs = None
- retries = 0
- is_eager = False
- delivery_info = None
- taskset = None
- chord = None
- called_directly = True
- callbacks = None
- errbacks = None
- _children = None
- def update(self, d, **kwargs):
- self.__dict__.update(d, **kwargs)
- def clear(self):
- self.__dict__.clear()
- def get(self, key, default=None):
- try:
- return getattr(self, key)
- except AttributeError:
- return default
- def __repr__(self):
- return "<Context: %r>" % (vars(self, ))
- @property
- def children(self):
-
- if self._children is None:
- self._children = []
- return self._children
- class TaskType(type):
- """Meta class for tasks.
- Automatically registers the task in the task registry, except
- if the `abstract` attribute is set.
- If no `name` attribute is provided, then no name is automatically
- set to the name of the module it was defined in, and the class name.
- """
- def __new__(cls, name, bases, attrs):
- new = super(TaskType, cls).__new__
- task_module = attrs.get("__module__") or "__main__"
-
- if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
- return new(cls, name, bases, attrs)
-
-
-
- _app1, _app2 = attrs.pop("_app", None), attrs.pop("app", None)
- app = attrs["_app"] = _app1 or _app2 or current_app
-
- autoname = False
- if not attrs.get("name"):
- try:
- module_name = sys.modules[task_module].__name__
- except KeyError:
-
- module_name = task_module
- attrs["name"] = '.'.join([module_name, name])
- autoname = True
-
-
-
-
-
- tasks = app._tasks
- if autoname and task_module == "__main__" and app.main:
- attrs["name"] = '.'.join([app.main, name])
- task_name = attrs["name"]
- if task_name not in tasks:
- tasks.register(new(cls, name, bases, attrs))
- instance = tasks[task_name]
- instance.bind(app)
- return instance.__class__
- def __repr__(cls):
- if cls._app:
- return "<class %s of %s>" % (cls.__name__, cls._app, )
- return "<unbound %s>" % (cls.__name__, )
- class BaseTask(object):
- """Task base class.
- When called tasks apply the :meth:`run` method. This method must
- be defined by all tasks (that is unless the :meth:`__call__` method
- is overridden).
- """
- __metaclass__ = TaskType
- __tracer__ = None
- ErrorMail = ErrorMail
- MaxRetriesExceededError = MaxRetriesExceededError
-
- Strategy = "celery.worker.strategy:default"
-
- _app = None
-
- name = None
-
- abstract = True
-
-
- accept_magic_kwargs = False
-
- request = Context()
-
-
-
- queue = None
-
- routing_key = None
-
- exchange = None
-
- exchange_type = None
-
-
-
-
- delivery_mode = None
-
- mandatory = False
-
- immediate = False
-
-
- priority = None
-
-
- max_retries = 3
-
-
- default_retry_delay = 3 * 60
-
-
-
- rate_limit = None
-
-
-
- ignore_result = False
-
-
- store_errors_even_if_ignored = False
-
-
- send_error_emails = False
- disable_error_emails = False
-
- error_whitelist = ()
-
-
- serializer = None
-
-
- time_limit = None
-
-
- soft_time_limit = None
-
- backend = None
-
- autoregister = True
-
-
-
-
-
-
-
-
-
-
-
- track_started = False
-
-
-
-
-
-
-
-
-
-
- acks_late = None
-
- expires = None
-
- type = "regular"
- __bound__ = False
- from_config = (
- ("exchange_type", "CELERY_DEFAULT_EXCHANGE_TYPE"),
- ("delivery_mode", "CELERY_DEFAULT_DELIVERY_MODE"),
- ("send_error_emails", "CELERY_SEND_TASK_ERROR_EMAILS"),
- ("error_whitelist", "CELERY_TASK_ERROR_WHITELIST"),
- ("serializer", "CELERY_TASK_SERIALIZER"),
- ("rate_limit", "CELERY_DEFAULT_RATE_LIMIT"),
- ("track_started", "CELERY_TRACK_STARTED"),
- ("acks_late", "CELERY_ACKS_LATE"),
- ("ignore_result", "CELERY_IGNORE_RESULT"),
- ("store_errors_even_if_ignored",
- "CELERY_STORE_ERRORS_EVEN_IF_IGNORED"),
- )
- __bound__ = False
-
-
- def bind(self, app):
- self.__bound__ = True
- self._app = app
- conf = app.conf
- for attr_name, config_name in self.from_config:
- if getattr(self, attr_name, None) is None:
- setattr(self, attr_name, conf[config_name])
- self.accept_magic_kwargs = app.accept_magic_kwargs
- if self.accept_magic_kwargs is None:
- self.accept_magic_kwargs = app.accept_magic_kwargs
- if self.backend is None:
- self.backend = app.backend
-
- self.annotate()
-
- self.on_bound(app)
- return app
- def on_bound(self, app):
- """This method can be defined to do additional actions when the
- task class is bound to an app."""
- pass
- def _get_app(self):
- if not self.__bound__ or self._app is None:
-
-
- self.bind(current_app)
- return self._app
- app = property(_get_app, bind)
- def __call__(self, *args, **kwargs):
- return self.run(*args, **kwargs)
-
-
- def __reduce__(self):
- return (_unpickle_task, (self.name, ), None)
- def run(self, *args, **kwargs):
- """The body of the task executed by workers."""
- raise NotImplementedError("Tasks must define the run method.")
- def start_strategy(self, app, consumer):
- return instantiate(self.Strategy, self, app, consumer)
- def get_logger(self, **kwargs):
- """Get task-aware logger object."""
- logger = get_logger(self.name)
- if logger.parent is logging.root:
- logger.parent = get_logger("celery.task")
- return logger
- def establish_connection(self, connect_timeout=None):
- """Establish a connection to the message broker."""
- return self._get_app().broker_connection(
- connect_timeout=connect_timeout)
- def get_publisher(self, connection=None, exchange=None,
- connect_timeout=None, exchange_type=None, **options):
- """Get a celery task message publisher.
- :rtype :class:`~celery.app.amqp.TaskProducer`:
- .. warning::
- If you don't specify a connection, one will automatically
- be established for you, in that case you need to close this
- connection after use::
- >>> publisher = self.get_publisher()
- >>> # ... do something with publisher
- >>> publisher.connection.close()
- """
- exchange = self.exchange if exchange is None else exchange
- if exchange_type is None:
- exchange_type = self.exchange_type
- connection = connection or self.establish_connection(connect_timeout)
- return self._get_app().amqp.TaskProducer(connection,
- exchange=exchange and Exchange(exchange, exchange_type),
- routing_key=self.routing_key, **options)
- def get_consumer(self, connection=None, queues=None, **kwargs):
- """Get message consumer.
- :rtype :class:`kombu.messaging.Consumer`:
- .. warning::
- If you don't specify a connection, one will automatically
- be established for you, in that case you need to close this
- connection after use::
- >>> consumer = self.get_consumer()
- >>> # do something with consumer
- >>> consumer.close()
- >>> consumer.connection.close()
- """
- app = self._get_app()
- connection = connection or self.establish_connection()
- return app.amqp.TaskConsumer(connection,
- queues or app.amqp.queue_or_default(self.queue), **kwargs)
- def delay(self, *args, **kwargs):
- """Star argument version of :meth:`apply_async`.
- Does not support the extra options enabled by :meth:`apply_async`.
- :param \*args: positional arguments passed on to the task.
- :param \*\*kwargs: keyword arguments passed on to the task.
- :returns :class:`celery.result.AsyncResult`:
- """
- return self.apply_async(args, kwargs)
- def apply_async(self, args=None, kwargs=None,
- task_id=None, publisher=None, connection=None,
- router=None, link=None, link_error=None, **options):
- """Apply tasks asynchronously by sending a message.
- :keyword args: The positional arguments to pass on to the
- task (a :class:`list` or :class:`tuple`).
- :keyword kwargs: The keyword arguments to pass on to the
- task (a :class:`dict`)
- :keyword countdown: Number of seconds into the future that the
- task should execute. Defaults to immediate
- execution (do not confuse with the
- `immediate` flag, as they are unrelated).
- :keyword eta: A :class:`~datetime.datetime` object describing
- the absolute time and date of when the task should
- be executed. May not be specified if `countdown`
- is also supplied. (Do not confuse this with the
- `immediate` flag, as they are unrelated).
- :keyword expires: Either a :class:`int`, describing the number of
- seconds, or a :class:`~datetime.datetime` object
- that describes the absolute time and date of when
- the task should expire. The task will not be
- executed after the expiration time.
- :keyword connection: Re-use existing broker connection instead
- of establishing a new one.
- :keyword retry: If enabled sending of the task message will be retried
- in the event of connection loss or failure. Default
- is taken from the :setting:`CELERY_TASK_PUBLISH_RETRY`
- setting. Note you need to handle the
- publisher/connection manually for this to work.
- :keyword retry_policy: Override the retry policy used. See the
- :setting:`CELERY_TASK_PUBLISH_RETRY` setting.
- :keyword routing_key: The routing key used to route the task to a
- worker server. Defaults to the
- :attr:`routing_key` attribute.
- :keyword exchange: The named exchange to send the task to.
- Defaults to the :attr:`exchange` attribute.
- :keyword exchange_type: The exchange type to initialize the exchange
- if not already declared. Defaults to the
- :attr:`exchange_type` attribute.
- :keyword immediate: Request immediate delivery. Will raise an
- exception if the task cannot be routed to a worker
- immediately. (Do not confuse this parameter with
- the `countdown` and `eta` settings, as they are
- unrelated). Defaults to the :attr:`immediate`
- attribute.
- :keyword mandatory: Mandatory routing. Raises an exception if
- there's no running workers able to take on this
- task. Defaults to the :attr:`mandatory`
- attribute.
- :keyword priority: The task priority, a number between 0 and 9.
- Defaults to the :attr:`priority` attribute.
- :keyword serializer: A string identifying the default
- serialization method to use. Can be `pickle`,
- `json`, `yaml`, `msgpack` or any custom
- serialization method that has been registered
- with :mod:`kombu.serialization.registry`.
- Defaults to the :attr:`serializer` attribute.
- :keyword compression: A string identifying the compression method
- to use. Can be one of ``zlib``, ``bzip2``,
- or any custom compression methods registered with
- :func:`kombu.compression.register`. Defaults to
- the :setting:`CELERY_MESSAGE_COMPRESSION`
- setting.
- :keyword link: A single, or a list of subtasks to apply if the
- task exits successfully.
- :keyword link_error: A single, or a list of subtasks to apply
- if an error occurs while executing the task.
- .. note::
- If the :setting:`CELERY_ALWAYS_EAGER` setting is set, it will
- be replaced by a local :func:`apply` call instead.
- """
- app = self._get_app()
- router = router or self.app.amqp.router
- conf = app.conf
- if conf.CELERY_ALWAYS_EAGER:
- return self.apply(args, kwargs, task_id=task_id, **options)
- options = dict(extract_exec_options(self), **options)
- options = router.route(options, self.name, args, kwargs)
- publish = publisher or app.amqp.publisher_pool.acquire(block=True)
- evd = None
- if conf.CELERY_SEND_TASK_SENT_EVENT:
- evd = app.events.Dispatcher(channel=publish.channel,
- buffer_while_offline=False)
- try:
- task_id = publish.delay_task(self.name, args, kwargs,
- task_id=task_id,
- event_dispatcher=evd,
- callbacks=maybe_list(link),
- errbacks=maybe_list(link_error),
- **options)
- finally:
- if not publisher:
- publish.release()
- result = self.AsyncResult(task_id)
- parent = get_current_task()
- if parent:
- parent.request.children.append(result)
- return result
- def retry(self, args=None, kwargs=None, exc=None, throw=True,
- eta=None, countdown=None, max_retries=None, **options):
- """Retry the task.
- :param args: Positional arguments to retry with.
- :param kwargs: Keyword arguments to retry with.
- :keyword exc: Optional exception to raise instead of
- :exc:`~celery.exceptions.MaxRetriesExceededError`
- when the max restart limit has been exceeded.
- :keyword countdown: Time in seconds to delay the retry for.
- :keyword eta: Explicit time and date to run the retry at
- (must be a :class:`~datetime.datetime` instance).
- :keyword max_retries: If set, overrides the default retry limit.
- :keyword \*\*options: Any extra options to pass on to
- meth:`apply_async`.
- :keyword throw: If this is :const:`False`, do not raise the
- :exc:`~celery.exceptions.RetryTaskError` exception,
- that tells the worker to mark the task as being
- retried. Note that this means the task will be
- marked as failed if the task raises an exception,
- or successful if it returns.
- :raises celery.exceptions.RetryTaskError: To tell the worker that
- the task has been re-sent for retry. This always happens,
- unless the `throw` keyword argument has been explicitly set
- to :const:`False`, and is considered normal operation.
- **Example**
- .. code-block:: python
- >>> @task
- >>> def tweet(auth, message):
- ... twitter = Twitter(oauth=auth)
- ... try:
- ... twitter.post_status_update(message)
- ... except twitter.FailWhale, exc:
- ... # Retry in 5 minutes.
- ... return tweet.retry(countdown=60 * 5, exc=exc)
- Although the task will never return above as `retry` raises an
- exception to notify the worker, we use `return` in front of the retry
- to convey that the rest of the block will not be executed.
- """
- request = self.request
- max_retries = self.max_retries if max_retries is None else max_retries
- args = request.args if args is None else args
- kwargs = request.kwargs if kwargs is None else kwargs
- delivery_info = request.delivery_info
-
-
- if request.called_directly:
- maybe_reraise()
- raise exc or RetryTaskError("Task can be retried", None)
- if delivery_info:
- options.setdefault("exchange", delivery_info.get("exchange"))
- options.setdefault("routing_key", delivery_info.get("routing_key"))
- if not eta and countdown is None:
- countdown = self.default_retry_delay
- options.update({"retries": request.retries + 1,
- "task_id": request.id,
- "countdown": countdown,
- "eta": eta})
- if max_retries is not None and options["retries"] > max_retries:
- if exc:
- maybe_reraise()
- raise self.MaxRetriesExceededError(
- "Can't retry %s[%s] args:%s kwargs:%s" % (
- self.name, options["task_id"], args, kwargs))
-
-
- if request.is_eager:
- return self.apply(args=args, kwargs=kwargs, **options).get()
- self.apply_async(args=args, kwargs=kwargs, **options)
- if throw:
- raise RetryTaskError(
- eta and "Retry at %s" % (eta, )
- or "Retry in %s secs." % (countdown, ), exc)
- def apply(self, args=None, kwargs=None, **options):
- """Execute this task locally, by blocking until the task returns.
- :param args: positional arguments passed on to the task.
- :param kwargs: keyword arguments passed on to the task.
- :keyword throw: Re-raise task exceptions. Defaults to
- the :setting:`CELERY_EAGER_PROPAGATES_EXCEPTIONS`
- setting.
- :rtype :class:`celery.result.EagerResult`:
- """
-
- from celery.task.trace import eager_trace_task
- app = self._get_app()
- args = args or []
- kwargs = kwargs or {}
- task_id = options.get("task_id") or uuid()
- retries = options.get("retries", 0)
- throw = app.either("CELERY_EAGER_PROPAGATES_EXCEPTIONS",
- options.pop("throw", None))
-
- task = app._tasks[self.name]
- request = {"id": task_id,
- "retries": retries,
- "is_eager": True,
- "logfile": options.get("logfile"),
- "loglevel": options.get("loglevel", 0),
- "delivery_info": {"is_eager": True}}
- if self.accept_magic_kwargs:
- default_kwargs = {"task_name": task.name,
- "task_id": task_id,
- "task_retries": retries,
- "task_is_eager": True,
- "logfile": options.get("logfile"),
- "loglevel": options.get("loglevel", 0),
- "delivery_info": {"is_eager": True}}
- supported_keys = fun_takes_kwargs(task.run, default_kwargs)
- extend_with = dict((key, val)
- for key, val in default_kwargs.items()
- if key in supported_keys)
- kwargs.update(extend_with)
- retval, info = eager_trace_task(task, task_id, args, kwargs,
- request=request, propagate=throw)
- if isinstance(retval, ExceptionInfo):
- retval = retval.exception
- state, tb = states.SUCCESS, ''
- if info is not None:
- state, tb = info.state, info.strtb
- return EagerResult(task_id, retval, state, traceback=tb)
- def AsyncResult(self, task_id):
- """Get AsyncResult instance for this kind of task.
- :param task_id: Task id to get result for.
- """
- return self._get_app().AsyncResult(task_id, backend=self.backend,
- task_name=self.name)
- def subtask(self, *args, **kwargs):
- """Returns :class:`~celery.subtask` object for
- this task, wrapping arguments and execution options
- for a single task invocation."""
- from celery.canvas import subtask
- return subtask(self, *args, **kwargs)
- def s(self, *args, **kwargs):
- """``.s(*a, **k) -> .subtask(a, k)``"""
- return self.subtask(args, kwargs)
- def update_state(self, task_id=None, state=None, meta=None):
- """Update task state.
- :param task_id: Id of the task to update.
- :param state: New state (:class:`str`).
- :param meta: State metadata (:class:`dict`).
- """
- if task_id is None:
- task_id = self.request.id
- self.backend.store_result(task_id, meta, state)
- def on_retry(self, exc, task_id, args, kwargs, einfo):
- """Retry handler.
- This is run by the worker when the task is to be retried.
- :param exc: The exception sent to :meth:`retry`.
- :param task_id: Unique id of the retried task.
- :param args: Original arguments for the retried task.
- :param kwargs: Original keyword arguments for the retried task.
- :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
- instance, containing the traceback.
- The return value of this handler is ignored.
- """
- pass
- def after_return(self, status, retval, task_id, args, kwargs, einfo):
- """Handler called after the task returns.
- :param status: Current task state.
- :param retval: Task return value/exception.
- :param task_id: Unique id of the task.
- :param args: Original arguments for the task that failed.
- :param kwargs: Original keyword arguments for the task
- that failed.
- :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
- instance, containing the traceback (if any).
- The return value of this handler is ignored.
- """
- pass
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- """Error handler.
- This is run by the worker when the task fails.
- :param exc: The exception raised by the task.
- :param task_id: Unique id of the failed task.
- :param args: Original arguments for the task that failed.
- :param kwargs: Original keyword arguments for the task
- that failed.
- :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
- instance, containing the traceback.
- The return value of this handler is ignored.
- """
- pass
- def send_error_email(self, context, exc, **kwargs):
- if self.send_error_emails and not self.disable_error_emails:
- sender = self.ErrorMail(self, **kwargs)
- sender.send(context, exc)
- def on_success(self, retval, task_id, args, kwargs):
- """Success handler.
- Run by the worker if the task executes successfully.
- :param retval: The return value of the task.
- :param task_id: Unique id of the executed task.
- :param args: Original arguments for the executed task.
- :param kwargs: Original keyword arguments for the executed task.
- The return value of this handler is ignored.
- """
- pass
- def execute(self, request, pool, loglevel, logfile, **kwargs):
- """The method the worker calls to execute the task.
- :param request: A :class:`~celery.worker.job.Request`.
- :param pool: A task pool.
- :param loglevel: Current loglevel.
- :param logfile: Name of the currently used logfile.
- :keyword consumer: The :class:`~celery.worker.consumer.Consumer`.
- """
- request.execute_using_pool(pool, loglevel, logfile)
- def annotate(self):
- for d in resolve_all_annotations(self.app.annotations, self):
- for key, value in d.iteritems():
- setattr(self, key, value)
- def __repr__(self):
- """`repr(task)`"""
- return "<@task: %s>" % (self.name, )
- @cached_property
- def logger(self):
- return self.get_logger()
- @property
- def __name__(self):
- return self.__class__.__name__
|