execute.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. from carrot.connection import DjangoBrokerConnection
  2. from celery.conf import AMQP_CONNECTION_TIMEOUT
  3. from celery.result import AsyncResult, EagerResult
  4. from celery.messaging import TaskPublisher
  5. from celery.registry import tasks
  6. from celery.utils import gen_unique_id, noop, fun_takes_kwargs
  7. from functools import partial as curry
  8. from datetime import datetime, timedelta
  9. from celery.exceptions import RetryTaskError
  10. from celery.datastructures import ExceptionInfo
  11. from celery.backends import default_backend
  12. from celery.loaders import current_loader
  13. from celery.monitoring import TaskTimerStats
  14. from celery import signals
  15. import sys
  16. import inspect
  17. import warnings
  18. import traceback
  19. def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
  20. routing_key=None, exchange=None, task_id=None,
  21. immediate=None, mandatory=None, priority=None, connection=None,
  22. connect_timeout=AMQP_CONNECTION_TIMEOUT, serializer=None, **opts):
  23. """Run a task asynchronously by the celery daemon(s).
  24. :param task: The task to run (a callable object, or a :class:`Task`
  25. instance
  26. :param args: The positional arguments to pass on to the task (a ``list``).
  27. :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
  28. :param countdown: Number of seconds into the future that the task should
  29. execute. Defaults to immediate delivery (Do not confuse that with
  30. the ``immediate`` setting, they are unrelated).
  31. :param eta: A :class:`datetime.datetime` object that describes the
  32. absolute time when the task should execute. May not be specified
  33. if ``countdown`` is also supplied. (Do not confuse this with the
  34. ``immediate`` setting, they are unrelated).
  35. :keyword routing_key: The routing key used to route the task to a worker
  36. server.
  37. :keyword exchange: The named exchange to send the task to. Defaults to
  38. :attr:`celery.task.base.Task.exchange`.
  39. :keyword immediate: Request immediate delivery. Will raise an exception
  40. if the task cannot be routed to a worker immediately.
  41. (Do not confuse this parameter with the ``countdown`` and ``eta``
  42. settings, as they are unrelated).
  43. :keyword mandatory: Mandatory routing. Raises an exception if there's
  44. no running workers able to take on this task.
  45. :keyword connection: Re-use existing AMQP connection.
  46. The ``connect_timeout`` argument is not respected if this is set.
  47. :keyword connect_timeout: The timeout in seconds, before we give up
  48. on establishing a connection to the AMQP server.
  49. :keyword priority: The task priority, a number between ``0`` and ``9``.
  50. :keyword serializer: A string identifying the default serialization
  51. method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
  52. Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
  53. methods that have been registered with
  54. :mod:`carrot.serialization.registry`.
  55. """
  56. args = args or []
  57. kwargs = kwargs or {}
  58. routing_key = routing_key or getattr(task, "routing_key", None)
  59. exchange = exchange or getattr(task, "exchange", None)
  60. if immediate is None:
  61. immediate = getattr(task, "immediate", None)
  62. if mandatory is None:
  63. mandatory = getattr(task, "mandatory", None)
  64. if priority is None:
  65. priority = getattr(task, "priority", None)
  66. serializer = serializer or getattr(task, "serializer", None)
  67. taskset_id = opts.get("taskset_id")
  68. publisher = opts.get("publisher")
  69. retries = opts.get("retries")
  70. if countdown:
  71. eta = datetime.now() + timedelta(seconds=countdown)
  72. from celery.conf import ALWAYS_EAGER
  73. if ALWAYS_EAGER:
  74. return apply(task, args, kwargs)
  75. need_to_close_connection = False
  76. if not publisher:
  77. if not connection:
  78. connection = DjangoBrokerConnection(
  79. connect_timeout=connect_timeout)
  80. need_to_close_connection = True
  81. publisher = TaskPublisher(connection=connection)
  82. delay_task = publisher.delay_task
  83. if taskset_id:
  84. delay_task = curry(publisher.delay_task_in_set, taskset_id)
  85. task_id = delay_task(task.name, args, kwargs,
  86. task_id=task_id, retries=retries,
  87. routing_key=routing_key, exchange=exchange,
  88. mandatory=mandatory, immediate=immediate,
  89. serializer=serializer, priority=priority,
  90. eta=eta)
  91. if need_to_close_connection:
  92. publisher.close()
  93. connection.close()
  94. return AsyncResult(task_id)
  95. def delay_task(task_name, *args, **kwargs):
  96. """Delay a task for execution by the ``celery`` daemon.
  97. :param task_name: the name of a task registered in the task registry.
  98. :param \*args: positional arguments to pass on to the task.
  99. :param \*\*kwargs: keyword arguments to pass on to the task.
  100. :raises celery.exceptions.NotRegistered: exception if no such task
  101. has been registered in the task registry.
  102. :rtype: :class:`celery.result.AsyncResult`.
  103. Example
  104. >>> r = delay_task("update_record", name="George Constanza", age=32)
  105. >>> r.ready()
  106. True
  107. >>> r.result
  108. "Record was updated"
  109. """
  110. if task_name not in tasks:
  111. raise tasks.NotRegistered(
  112. "Task with name %s not registered in the task registry." % (
  113. task_name))
  114. task = tasks[task_name]
  115. return apply_async(task, args, kwargs)
  116. def apply(task, args, kwargs, **options):
  117. """Apply the task locally.
  118. This will block until the task completes, and returns a
  119. :class:`celery.result.EagerResult` instance.
  120. """
  121. args = args or []
  122. kwargs = kwargs or {}
  123. task_id = gen_unique_id()
  124. retries = options.get("retries", 0)
  125. # If it's a Task class we need to have to instance
  126. # for it to be callable.
  127. task = inspect.isclass(task) and task() or task
  128. default_kwargs = {"task_name": task.name,
  129. "task_id": task_id,
  130. "task_retries": retries,
  131. "task_is_eager": True,
  132. "logfile": None,
  133. "loglevel": 0}
  134. fun = getattr(task, "run", task)
  135. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  136. extend_with = dict((key, val) for key, val in default_kwargs.items()
  137. if key in supported_keys)
  138. kwargs.update(extend_with)
  139. try:
  140. ret_value = task(*args, **kwargs)
  141. status = "DONE"
  142. strtb = None
  143. except Exception, exc:
  144. type_, value_, tb = sys.exc_info()
  145. strtb = "\n".join(traceback.format_exception(type_, value_, tb))
  146. ret_value = exc
  147. status = "FAILURE"
  148. return EagerResult(task_id, ret_value, status, traceback=strtb)
  149. class ExecuteWrapper(object):
  150. """Wraps the task in a jail, which catches all exceptions, and
  151. saves the status and result of the task execution to the task
  152. meta backend.
  153. If the call was successful, it saves the result to the task result
  154. backend, and sets the task status to ``"DONE"``.
  155. If the call raises :exc:`celery.exceptions.RetryTaskError`, it extracts
  156. the original exception, uses that as the result and sets the task status
  157. to ``"RETRY"``.
  158. If the call results in an exception, it saves the exception as the task
  159. result, and sets the task status to ``"FAILURE"``.
  160. :param fun: Callable object to execute.
  161. :param task_id: The unique id of the task.
  162. :param task_name: Name of the task.
  163. :param args: List of positional args to pass on to the function.
  164. :param kwargs: Keyword arguments mapping to pass on to the function.
  165. :returns: the function return value on success, or
  166. the exception instance on failure.
  167. """
  168. def __init__(self, fun, task_id, task_name, args=None, kwargs=None):
  169. self.fun = fun
  170. self.task_id = task_id
  171. self.task_name = task_name
  172. self.args = args or []
  173. self.kwargs = kwargs or {}
  174. def __call__(self, *args, **kwargs):
  175. return self.execute_safe()
  176. def execute_safe(self, *args, **kwargs):
  177. try:
  178. return self.execute(*args, **kwargs)
  179. except Exception, exc:
  180. type_, value_, tb = sys.exc_info()
  181. exc = default_backend.prepare_exception(exc)
  182. warnings.warn("Exception happend outside of task body: %s: %s" % (
  183. str(exc.__class__), str(exc)))
  184. return ExceptionInfo((type_, exc, tb))
  185. def execute(self):
  186. # Convenience variables
  187. fun = self.fun
  188. task_id = self.task_id
  189. task_name = self.task_name
  190. args = self.args
  191. kwargs = self.kwargs
  192. # Run task loader init handler.
  193. current_loader.on_task_init(task_id, fun)
  194. # Backend process cleanup
  195. default_backend.process_cleanup()
  196. # Send pre-run signal.
  197. signals.task_prerun.send(sender=fun, task_id=task_id, task=fun,
  198. args=args, kwargs=kwargs)
  199. retval = None
  200. timer_stat = TaskTimerStats.start(task_id, task_name, args, kwargs)
  201. try:
  202. result = fun(*args, **kwargs)
  203. except (SystemExit, KeyboardInterrupt):
  204. raise
  205. except RetryTaskError, exc:
  206. retval = self.handle_retry(exc, sys.exc_info())
  207. except Exception, exc:
  208. retval = self.handle_failure(exc, sys.exc_info())
  209. else:
  210. retval = self.handle_success(result)
  211. finally:
  212. timer_stat.stop()
  213. # Send post-run signal.
  214. signals.task_postrun.send(sender=fun, task_id=task_id, task=fun,
  215. args=args, kwargs=kwargs, retval=retval)
  216. return retval
  217. def handle_success(self, retval):
  218. """Handle successful execution.
  219. Saves the result to the current result store (skipped if the callable
  220. has a ``ignore_result`` attribute set to ``True``).
  221. If the callable has a ``on_success`` function, it as called with
  222. ``retval`` as argument.
  223. :param retval: The return value.
  224. """
  225. if not getattr(self.fun, "ignore_result", False):
  226. default_backend.mark_as_done(self.task_id, retval)
  227. # Run success handler last to be sure the status is saved.
  228. success_handler = getattr(self.fun, "on_success", noop)
  229. success_handler(retval, self.task_id, self.args, self.kwargs)
  230. return retval
  231. def handle_retry(self, exc, exc_info):
  232. """Handle retry exception."""
  233. ### Task is to be retried.
  234. type_, value_, tb = exc_info
  235. strtb = "\n".join(traceback.format_exception(type_, value_, tb))
  236. # RetryTaskError stores both a small message describing the retry
  237. # and the original exception.
  238. message, orig_exc = exc.args
  239. default_backend.mark_as_retry(self.task_id, orig_exc, strtb)
  240. # Create a simpler version of the RetryTaskError that stringifies
  241. # the original exception instead of including the exception instance.
  242. # This is for reporting the retry in logs, e-mail etc, while
  243. # guaranteeing pickleability.
  244. expanded_msg = "%s: %s" % (message, str(orig_exc))
  245. retval = ExceptionInfo((type_,
  246. type_(expanded_msg, None),
  247. tb))
  248. # Run retry handler last to be sure the status is saved.
  249. retry_handler = getattr(self.fun, "on_retry", noop)
  250. retry_handler(exc, self.task_id, self.args, self.kwargs)
  251. return retval
  252. def handle_failure(self, exc, exc_info):
  253. """Handle exception."""
  254. ### Task ended in failure.
  255. type_, value_, tb = exc_info
  256. strtb = "\n".join(traceback.format_exception(type_, value_, tb))
  257. # mark_as_failure returns an exception that is guaranteed to
  258. # be pickleable.
  259. stored_exc = default_backend.mark_as_failure(self.task_id, exc, strtb)
  260. # wrap exception info + traceback and return it to caller.
  261. retval = ExceptionInfo((type_, stored_exc, tb))
  262. # Run error handler last to be sure the status is stored.
  263. error_handler = getattr(self.fun, "on_failure", noop)
  264. error_handler(stored_exc, self.task_id, self.args, self.kwargs)
  265. return retval