execute.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. from carrot.connection import DjangoBrokerConnection
  2. from celery.conf import AMQP_CONNECTION_TIMEOUT
  3. from celery.result import AsyncResult, EagerResult
  4. from celery.messaging import TaskPublisher
  5. from celery.registry import tasks
  6. from celery.utils import gen_unique_id, noop
  7. from functools import partial as curry
  8. from datetime import datetime, timedelta
  9. from multiprocessing import get_logger
  10. from celery.exceptions import RetryTaskError
  11. from celery.datastructures import ExceptionInfo
  12. from celery.backends import default_backend
  13. from celery.loaders import current_loader
  14. from celery.monitoring import TaskTimerStats
  15. from celery import signals
  16. import sys
  17. import traceback
  18. import inspect
  19. def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
  20. routing_key=None, exchange=None, task_id=None,
  21. immediate=None, mandatory=None, priority=None, connection=None,
  22. connect_timeout=AMQP_CONNECTION_TIMEOUT, **opts):
  23. """Run a task asynchronously by the celery daemon(s).
  24. :param task: The task to run (a callable object, or a :class:`Task`
  25. instance
  26. :param args: The positional arguments to pass on to the task (a ``list``).
  27. :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
  28. :param countdown: Number of seconds into the future that the task should
  29. execute. Defaults to immediate delivery (Do not confuse that with
  30. the ``immediate`` setting, they are unrelated).
  31. :param eta: A :class:`datetime.datetime` object that describes the
  32. absolute time when the task should execute. May not be specified
  33. if ``countdown`` is also supplied. (Do not confuse this with the
  34. ``immediate`` setting, they are unrelated).
  35. :keyword routing_key: The routing key used to route the task to a worker
  36. server.
  37. :keyword exchange: The named exchange to send the task to. Defaults to
  38. :attr:`celery.task.base.Task.exchange`.
  39. :keyword immediate: Request immediate delivery. Will raise an exception
  40. if the task cannot be routed to a worker immediately.
  41. (Do not confuse this parameter with the ``countdown`` and ``eta``
  42. settings, as they are unrelated).
  43. :keyword mandatory: Mandatory routing. Raises an exception if there's
  44. no running workers able to take on this task.
  45. :keyword connection: Re-use existing AMQP connection.
  46. The ``connect_timeout`` argument is not respected if this is set.
  47. :keyword connect_timeout: The timeout in seconds, before we give up
  48. on establishing a connection to the AMQP server.
  49. :keyword priority: The task priority, a number between ``0`` and ``9``.
  50. """
  51. args = args or []
  52. kwargs = kwargs or {}
  53. routing_key = routing_key or getattr(task, "routing_key", None)
  54. exchange = exchange or getattr(task, "exchange", None)
  55. immediate = immediate or getattr(task, "immediate", None)
  56. mandatory = mandatory or getattr(task, "mandatory", None)
  57. priority = priority or getattr(task, "priority", None)
  58. taskset_id = opts.get("taskset_id")
  59. publisher = opts.get("publisher")
  60. retries = opts.get("retries")
  61. if countdown:
  62. eta = datetime.now() + timedelta(seconds=countdown)
  63. from celery.conf import ALWAYS_EAGER
  64. if ALWAYS_EAGER:
  65. return apply(task, args, kwargs)
  66. need_to_close_connection = False
  67. if not publisher:
  68. if not connection:
  69. connection = DjangoBrokerConnection(
  70. connect_timeout=connect_timeout)
  71. need_to_close_connection = True
  72. publisher = TaskPublisher(connection=connection)
  73. delay_task = publisher.delay_task
  74. if taskset_id:
  75. delay_task = curry(publisher.delay_task_in_set, taskset_id)
  76. task_id = delay_task(task.name, args, kwargs,
  77. task_id=task_id, retries=retries,
  78. routing_key=routing_key, exchange=exchange,
  79. mandatory=mandatory, immediate=immediate,
  80. priority=priority, eta=eta)
  81. if need_to_close_connection:
  82. publisher.close()
  83. connection.close()
  84. return AsyncResult(task_id)
  85. def delay_task(task_name, *args, **kwargs):
  86. """Delay a task for execution by the ``celery`` daemon.
  87. :param task_name: the name of a task registered in the task registry.
  88. :param \*args: positional arguments to pass on to the task.
  89. :param \*\*kwargs: keyword arguments to pass on to the task.
  90. :raises celery.exceptions.NotRegistered: exception if no such task
  91. has been registered in the task registry.
  92. :rtype: :class:`celery.result.AsyncResult`.
  93. Example
  94. >>> r = delay_task("update_record", name="George Constanza", age=32)
  95. >>> r.ready()
  96. True
  97. >>> r.result
  98. "Record was updated"
  99. """
  100. if task_name not in tasks:
  101. raise tasks.NotRegistered(
  102. "Task with name %s not registered in the task registry." % (
  103. task_name))
  104. task = tasks[task_name]
  105. return apply_async(task, args, kwargs)
  106. def apply(task, args, kwargs, **options):
  107. """Apply the task locally.
  108. This will block until the task completes, and returns a
  109. :class:`celery.result.EagerResult` instance.
  110. """
  111. args = args or []
  112. kwargs = kwargs or {}
  113. task_id = gen_unique_id()
  114. retries = options.get("retries", 0)
  115. # If it's a Task class we need to have to instance
  116. # for it to be callable.
  117. task = inspect.isclass(task) and task() or task
  118. kwargs.update({"task_name": task.name,
  119. "task_id": task_id,
  120. "task_retries": retries,
  121. "task_is_eager": True,
  122. "logfile": None,
  123. "loglevel": 0})
  124. try:
  125. ret_value = task(*args, **kwargs)
  126. status = "DONE"
  127. strtb = None
  128. except Exception, exc:
  129. type_, value_, tb = sys.exc_info()
  130. strtb = "\n".join(traceback.format_exception(type_, value_, tb))
  131. ret_value = exc
  132. status = "FAILURE"
  133. return EagerResult(task_id, ret_value, status, traceback=strtb)
  134. class ExecuteWrapper(object):
  135. """Wraps the task in a jail, which catches all exceptions, and
  136. saves the status and result of the task execution to the task
  137. meta backend.
  138. If the call was successful, it saves the result to the task result
  139. backend, and sets the task status to ``"DONE"``.
  140. If the call raises :exc:`celery.exceptions.RetryTaskError`, it extracts
  141. the original exception, uses that as the result and sets the task status
  142. to ``"RETRY"``.
  143. If the call results in an exception, it saves the exception as the task
  144. result, and sets the task status to ``"FAILURE"``.
  145. :param fun: Callable object to execute.
  146. :param task_id: The unique id of the task.
  147. :param task_name: Name of the task.
  148. :param args: List of positional args to pass on to the function.
  149. :param kwargs: Keyword arguments mapping to pass on to the function.
  150. :returns: the function return value on success, or
  151. the exception instance on failure.
  152. """
  153. def __init__(self, fun, task_id, task_name, args=None, kwargs=None):
  154. self.fun = fun
  155. self.task_id = task_id
  156. self.task_name = task_name
  157. self.args = args or []
  158. self.kwargs = kwargs or {}
  159. def __call__(self, *args, **kwargs):
  160. return self.execute()
  161. def execute(self):
  162. # Convenience variables
  163. fun = self.fun
  164. task_id = self.task_id
  165. task_name = self.task_name
  166. args = self.args
  167. kwargs = self.kwargs
  168. # Run task loader init handler.
  169. current_loader.on_task_init(task_id, fun)
  170. # Backend process cleanup
  171. default_backend.process_cleanup()
  172. # Send pre-run signal.
  173. signals.task_prerun.send(sender=fun, task_id=task_id, task=fun,
  174. args=args, kwargs=kwargs)
  175. retval = None
  176. timer_stat = TaskTimerStats.start(task_id, task_name, args, kwargs)
  177. try:
  178. result = fun(*args, **kwargs)
  179. except (SystemExit, KeyboardInterrupt):
  180. raise
  181. except RetryTaskError, exc:
  182. retval = self.handle_retry(exc, sys.exc_info())
  183. except Exception, exc:
  184. retval = self.handle_failure(exc, sys.exc_info())
  185. else:
  186. retval = self.handle_success(result)
  187. finally:
  188. timer_stat.stop()
  189. # Send post-run signal.
  190. signals.task_postrun.send(sender=fun, task_id=task_id, task=fun,
  191. args=args, kwargs=kwargs, retval=retval)
  192. return retval
  193. def handle_success(self, retval):
  194. """Handle successful execution.
  195. Saves the result to the current result store (skipped if the callable
  196. has a ``ignore_result`` attribute set to ``True``).
  197. If the callable has a ``on_success`` function, it as called with
  198. ``retval`` as argument.
  199. :param retval: The return value.
  200. """
  201. if not getattr(self.fun, "ignore_result", False):
  202. default_backend.mark_as_done(self.task_id, retval)
  203. # Run success handler last to be sure the status is saved.
  204. success_handler = getattr(self.fun, "on_success", noop)
  205. success_handler(retval, self.task_id, self.args, self.kwargs)
  206. return retval
  207. def handle_retry(self, exc, exc_info):
  208. """Handle retry exception."""
  209. ### Task is to be retried.
  210. type_, value_, tb = exc_info
  211. strtb = "\n".join(traceback.format_exception(type_, value_, tb))
  212. # RetryTaskError stores both a small message describing the retry
  213. # and the original exception.
  214. message, orig_exc = exc.args
  215. default_backend.mark_as_retry(self.task_id, orig_exc, strtb)
  216. # Create a simpler version of the RetryTaskError that stringifies
  217. # the original exception instead of including the exception instance.
  218. # This is for reporting the retry in logs, e-mail etc, while
  219. # guaranteeing pickleability.
  220. expanded_msg = "%s: %s" % (message, str(orig_exc))
  221. retval = ExceptionInfo((type_,
  222. type_(expanded_msg, None),
  223. tb))
  224. # Run retry handler last to be sure the status is saved.
  225. retry_handler = getattr(self.fun, "on_retry", noop)
  226. retry_handler(exc, self.task_id, self.args, self.kwargs)
  227. return retval
  228. def handle_failure(self, exc, exc_info):
  229. """Handle exception."""
  230. ### Task ended in failure.
  231. type_, value_, tb = exc_info
  232. strtb = "\n".join(traceback.format_exception(type_, value_, tb))
  233. # mark_as_failure returns an exception that is guaranteed to
  234. # be pickleable.
  235. stored_exc = default_backend.mark_as_failure(self.task_id, exc, strtb)
  236. # wrap exception info + traceback and return it to caller.
  237. retval = ExceptionInfo((type_, stored_exc, tb))
  238. # Run error handler last to be sure the status is stored.
  239. error_handler = getattr(self.fun, "on_failure", noop)
  240. error_handler(stored_exc, self.task_id, self.args, self.kwargs)
  241. return retval