123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- from carrot.connection import DjangoBrokerConnection
- from celery.conf import AMQP_CONNECTION_TIMEOUT
- from celery.result import AsyncResult, EagerResult
- from celery.messaging import TaskPublisher
- from celery.registry import tasks
- from celery.utils import gen_unique_id, noop, fun_takes_kwargs
- from functools import partial as curry
- from datetime import datetime, timedelta
- from celery.exceptions import RetryTaskError
- from celery.datastructures import ExceptionInfo
- from celery.backends import default_backend
- from celery.loaders import current_loader
- from celery.monitoring import TaskTimerStats
- from celery import signals
- import sys
- import inspect
- import warnings
- import traceback
- def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
- routing_key=None, exchange=None, task_id=None,
- immediate=None, mandatory=None, priority=None, connection=None,
- connect_timeout=AMQP_CONNECTION_TIMEOUT, serializer=None, **opts):
- """Run a task asynchronously by the celery daemon(s).
- :param task: The task to run (a callable object, or a :class:`Task`
- instance
- :param args: The positional arguments to pass on to the task (a ``list``).
- :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
- :param countdown: Number of seconds into the future that the task should
- execute. Defaults to immediate delivery (Do not confuse that with
- the ``immediate`` setting, they are unrelated).
- :param eta: A :class:`datetime.datetime` object that describes the
- absolute time when the task should execute. May not be specified
- if ``countdown`` is also supplied. (Do not confuse this with the
- ``immediate`` setting, they are unrelated).
- :keyword routing_key: The routing key used to route the task to a worker
- server.
- :keyword exchange: The named exchange to send the task to. Defaults to
- :attr:`celery.task.base.Task.exchange`.
- :keyword immediate: Request immediate delivery. Will raise an exception
- if the task cannot be routed to a worker immediately.
- (Do not confuse this parameter with the ``countdown`` and ``eta``
- settings, as they are unrelated).
- :keyword mandatory: Mandatory routing. Raises an exception if there's
- no running workers able to take on this task.
- :keyword connection: Re-use existing AMQP connection.
- The ``connect_timeout`` argument is not respected if this is set.
- :keyword connect_timeout: The timeout in seconds, before we give up
- on establishing a connection to the AMQP server.
- :keyword priority: The task priority, a number between ``0`` and ``9``.
- :keyword serializer: A string identifying the default serialization
- method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
- Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
- methods that have been registered with
- :mod:`carrot.serialization.registry`.
- """
- args = args or []
- kwargs = kwargs or {}
- routing_key = routing_key or getattr(task, "routing_key", None)
- exchange = exchange or getattr(task, "exchange", None)
- if immediate is None:
- immediate = getattr(task, "immediate", None)
- if mandatory is None:
- mandatory = getattr(task, "mandatory", None)
- if priority is None:
- priority = getattr(task, "priority", None)
- serializer = serializer or getattr(task, "serializer", None)
- taskset_id = opts.get("taskset_id")
- publisher = opts.get("publisher")
- retries = opts.get("retries")
- if countdown:
- eta = datetime.now() + timedelta(seconds=countdown)
- from celery.conf import ALWAYS_EAGER
- if ALWAYS_EAGER:
- return apply(task, args, kwargs)
- need_to_close_connection = False
- if not publisher:
- if not connection:
- connection = DjangoBrokerConnection(
- connect_timeout=connect_timeout)
- need_to_close_connection = True
- publisher = TaskPublisher(connection=connection)
- delay_task = publisher.delay_task
- if taskset_id:
- delay_task = curry(publisher.delay_task_in_set, taskset_id)
- task_id = delay_task(task.name, args, kwargs,
- task_id=task_id, retries=retries,
- routing_key=routing_key, exchange=exchange,
- mandatory=mandatory, immediate=immediate,
- serializer=serializer, priority=priority,
- eta=eta)
- if need_to_close_connection:
- publisher.close()
- connection.close()
- return AsyncResult(task_id)
- def delay_task(task_name, *args, **kwargs):
- """Delay a task for execution by the ``celery`` daemon.
- :param task_name: the name of a task registered in the task registry.
- :param \*args: positional arguments to pass on to the task.
- :param \*\*kwargs: keyword arguments to pass on to the task.
- :raises celery.exceptions.NotRegistered: exception if no such task
- has been registered in the task registry.
- :rtype: :class:`celery.result.AsyncResult`.
- Example
- >>> r = delay_task("update_record", name="George Constanza", age=32)
- >>> r.ready()
- True
- >>> r.result
- "Record was updated"
- """
- if task_name not in tasks:
- raise tasks.NotRegistered(
- "Task with name %s not registered in the task registry." % (
- task_name))
- task = tasks[task_name]
- return apply_async(task, args, kwargs)
- def apply(task, args, kwargs, **options):
- """Apply the task locally.
- This will block until the task completes, and returns a
- :class:`celery.result.EagerResult` instance.
- """
- args = args or []
- kwargs = kwargs or {}
- task_id = gen_unique_id()
- retries = options.get("retries", 0)
- # If it's a Task class we need to have to instance
- # for it to be callable.
- task = inspect.isclass(task) and task() or task
- default_kwargs = {"task_name": task.name,
- "task_id": task_id,
- "task_retries": retries,
- "task_is_eager": True,
- "logfile": None,
- "loglevel": 0}
- fun = getattr(task, "run", task)
- supported_keys = fun_takes_kwargs(fun, default_kwargs)
- extend_with = dict((key, val) for key, val in default_kwargs.items()
- if key in supported_keys)
- kwargs.update(extend_with)
- try:
- ret_value = task(*args, **kwargs)
- status = "DONE"
- strtb = None
- except Exception, exc:
- type_, value_, tb = sys.exc_info()
- strtb = "\n".join(traceback.format_exception(type_, value_, tb))
- ret_value = exc
- status = "FAILURE"
- return EagerResult(task_id, ret_value, status, traceback=strtb)
- class ExecuteWrapper(object):
- """Wraps the task in a jail, which catches all exceptions, and
- saves the status and result of the task execution to the task
- meta backend.
- If the call was successful, it saves the result to the task result
- backend, and sets the task status to ``"DONE"``.
- If the call raises :exc:`celery.exceptions.RetryTaskError`, it extracts
- the original exception, uses that as the result and sets the task status
- to ``"RETRY"``.
- If the call results in an exception, it saves the exception as the task
- result, and sets the task status to ``"FAILURE"``.
- :param fun: Callable object to execute.
- :param task_id: The unique id of the task.
- :param task_name: Name of the task.
- :param args: List of positional args to pass on to the function.
- :param kwargs: Keyword arguments mapping to pass on to the function.
- :returns: the function return value on success, or
- the exception instance on failure.
- """
- def __init__(self, fun, task_id, task_name, args=None, kwargs=None):
- self.fun = fun
- self.task_id = task_id
- self.task_name = task_name
- self.args = args or []
- self.kwargs = kwargs or {}
- def __call__(self, *args, **kwargs):
- return self.execute_safe()
- def execute_safe(self, *args, **kwargs):
- try:
- return self.execute(*args, **kwargs)
- except Exception, exc:
- type_, value_, tb = sys.exc_info()
- exc = default_backend.prepare_exception(exc)
- warnings.warn("Exception happend outside of task body: %s: %s" % (
- str(exc.__class__), str(exc)))
- return ExceptionInfo((type_, exc, tb))
- def execute(self):
- # Convenience variables
- fun = self.fun
- task_id = self.task_id
- task_name = self.task_name
- args = self.args
- kwargs = self.kwargs
- # Run task loader init handler.
- current_loader.on_task_init(task_id, fun)
- # Backend process cleanup
- default_backend.process_cleanup()
- # Send pre-run signal.
- signals.task_prerun.send(sender=fun, task_id=task_id, task=fun,
- args=args, kwargs=kwargs)
- retval = None
- timer_stat = TaskTimerStats.start(task_id, task_name, args, kwargs)
- try:
- result = fun(*args, **kwargs)
- except (SystemExit, KeyboardInterrupt):
- raise
- except RetryTaskError, exc:
- retval = self.handle_retry(exc, sys.exc_info())
- except Exception, exc:
- retval = self.handle_failure(exc, sys.exc_info())
- else:
- retval = self.handle_success(result)
- finally:
- timer_stat.stop()
- # Send post-run signal.
- signals.task_postrun.send(sender=fun, task_id=task_id, task=fun,
- args=args, kwargs=kwargs, retval=retval)
- return retval
- def handle_success(self, retval):
- """Handle successful execution.
- Saves the result to the current result store (skipped if the callable
- has a ``ignore_result`` attribute set to ``True``).
- If the callable has a ``on_success`` function, it as called with
- ``retval`` as argument.
- :param retval: The return value.
- """
- if not getattr(self.fun, "ignore_result", False):
- default_backend.mark_as_done(self.task_id, retval)
- # Run success handler last to be sure the status is saved.
- success_handler = getattr(self.fun, "on_success", noop)
- success_handler(retval, self.task_id, self.args, self.kwargs)
- return retval
- def handle_retry(self, exc, exc_info):
- """Handle retry exception."""
- ### Task is to be retried.
- type_, value_, tb = exc_info
- strtb = "\n".join(traceback.format_exception(type_, value_, tb))
- # RetryTaskError stores both a small message describing the retry
- # and the original exception.
- message, orig_exc = exc.args
- default_backend.mark_as_retry(self.task_id, orig_exc, strtb)
- # Create a simpler version of the RetryTaskError that stringifies
- # the original exception instead of including the exception instance.
- # This is for reporting the retry in logs, e-mail etc, while
- # guaranteeing pickleability.
- expanded_msg = "%s: %s" % (message, str(orig_exc))
- retval = ExceptionInfo((type_,
- type_(expanded_msg, None),
- tb))
- # Run retry handler last to be sure the status is saved.
- retry_handler = getattr(self.fun, "on_retry", noop)
- retry_handler(exc, self.task_id, self.args, self.kwargs)
- return retval
- def handle_failure(self, exc, exc_info):
- """Handle exception."""
- ### Task ended in failure.
- type_, value_, tb = exc_info
- strtb = "\n".join(traceback.format_exception(type_, value_, tb))
- # mark_as_failure returns an exception that is guaranteed to
- # be pickleable.
- stored_exc = default_backend.mark_as_failure(self.task_id, exc, strtb)
- # wrap exception info + traceback and return it to caller.
- retval = ExceptionInfo((type_, stored_exc, tb))
- # Run error handler last to be sure the status is stored.
- error_handler = getattr(self.fun, "on_failure", noop)
- error_handler(stored_exc, self.task_id, self.args, self.kwargs)
- return retval
|