execute.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. import sys
  2. import inspect
  3. import traceback
  4. from datetime import datetime, timedelta
  5. from carrot.connection import DjangoBrokerConnection
  6. from billiard.utils.functional import curry
  7. from celery import signals
  8. from celery.conf import AMQP_CONNECTION_TIMEOUT
  9. from celery.utils import gen_unique_id, noop, fun_takes_kwargs
  10. from celery.result import AsyncResult, EagerResult
  11. from celery.registry import tasks
  12. from celery.messaging import TaskPublisher
  13. from celery.exceptions import RetryTaskError
  14. from celery.datastructures import ExceptionInfo
  15. def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
  16. routing_key=None, exchange=None, task_id=None,
  17. immediate=None, mandatory=None, priority=None, connection=None,
  18. connect_timeout=AMQP_CONNECTION_TIMEOUT, serializer=None, **opts):
  19. """Run a task asynchronously by the celery daemon(s).
  20. :param task: The task to run (a callable object, or a :class:`Task`
  21. instance
  22. :param args: The positional arguments to pass on to the task (a ``list``).
  23. :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
  24. :param countdown: Number of seconds into the future that the task should
  25. execute. Defaults to immediate delivery (Do not confuse that with
  26. the ``immediate`` setting, they are unrelated).
  27. :param eta: A :class:`datetime.datetime` object that describes the
  28. absolute time when the task should execute. May not be specified
  29. if ``countdown`` is also supplied. (Do not confuse this with the
  30. ``immediate`` setting, they are unrelated).
  31. :keyword routing_key: The routing key used to route the task to a worker
  32. server.
  33. :keyword exchange: The named exchange to send the task to. Defaults to
  34. :attr:`celery.task.base.Task.exchange`.
  35. :keyword immediate: Request immediate delivery. Will raise an exception
  36. if the task cannot be routed to a worker immediately.
  37. (Do not confuse this parameter with the ``countdown`` and ``eta``
  38. settings, as they are unrelated).
  39. :keyword mandatory: Mandatory routing. Raises an exception if there's
  40. no running workers able to take on this task.
  41. :keyword connection: Re-use existing AMQP connection.
  42. The ``connect_timeout`` argument is not respected if this is set.
  43. :keyword connect_timeout: The timeout in seconds, before we give up
  44. on establishing a connection to the AMQP server.
  45. :keyword priority: The task priority, a number between ``0`` and ``9``.
  46. :keyword serializer: A string identifying the default serialization
  47. method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
  48. Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
  49. methods that have been registered with
  50. :mod:`carrot.serialization.registry`.
  51. **Note**: If the ``CELERY_ALWAYS_EAGER`` setting is set, it will be
  52. replaced by a local :func:`apply` call instead.
  53. """
  54. args = args or []
  55. kwargs = kwargs or {}
  56. routing_key = routing_key or getattr(task, "routing_key", None)
  57. exchange = exchange or getattr(task, "exchange", None)
  58. if immediate is None:
  59. immediate = getattr(task, "immediate", None)
  60. if mandatory is None:
  61. mandatory = getattr(task, "mandatory", None)
  62. if priority is None:
  63. priority = getattr(task, "priority", None)
  64. serializer = serializer or getattr(task, "serializer", None)
  65. taskset_id = opts.get("taskset_id")
  66. publisher = opts.get("publisher")
  67. retries = opts.get("retries", 0)
  68. if countdown:
  69. eta = datetime.now() + timedelta(seconds=countdown)
  70. from celery.conf import ALWAYS_EAGER
  71. if ALWAYS_EAGER:
  72. return apply(task, args, kwargs)
  73. need_to_close_connection = False
  74. if not publisher:
  75. if not connection:
  76. connection = DjangoBrokerConnection(
  77. connect_timeout=connect_timeout)
  78. need_to_close_connection = True
  79. publisher = TaskPublisher(connection=connection)
  80. delay_task = publisher.delay_task
  81. if taskset_id:
  82. delay_task = curry(publisher.delay_task_in_set, taskset_id)
  83. task_id = delay_task(task.name, args, kwargs,
  84. task_id=task_id, retries=retries,
  85. routing_key=routing_key, exchange=exchange,
  86. mandatory=mandatory, immediate=immediate,
  87. serializer=serializer, priority=priority,
  88. eta=eta)
  89. if need_to_close_connection:
  90. publisher.close()
  91. connection.close()
  92. return AsyncResult(task_id)
  93. def delay_task(task_name, *args, **kwargs):
  94. """Delay a task for execution by the ``celery`` daemon.
  95. :param task_name: the name of a task registered in the task registry.
  96. :param \*args: positional arguments to pass on to the task.
  97. :param \*\*kwargs: keyword arguments to pass on to the task.
  98. :raises celery.exceptions.NotRegistered: exception if no such task
  99. has been registered in the task registry.
  100. :rtype: :class:`celery.result.AsyncResult`.
  101. Example
  102. >>> r = delay_task("update_record", name="George Constanza", age=32)
  103. >>> r.ready()
  104. True
  105. >>> r.result
  106. "Record was updated"
  107. """
  108. if task_name not in tasks:
  109. raise tasks.NotRegistered(
  110. "Task with name %s not registered in the task registry." % (
  111. task_name))
  112. task = tasks[task_name]
  113. return apply_async(task, args, kwargs)
  114. def apply(task, args, kwargs, **options):
  115. """Apply the task locally.
  116. This will block until the task completes, and returns a
  117. :class:`celery.result.EagerResult` instance.
  118. """
  119. args = args or []
  120. kwargs = kwargs or {}
  121. task_id = gen_unique_id()
  122. retries = options.get("retries", 0)
  123. # If it's a Task class we need to instantiate it, so it's callable.
  124. task = inspect.isclass(task) and task() or task
  125. default_kwargs = {"task_name": task.name,
  126. "task_id": task_id,
  127. "task_retries": retries,
  128. "task_is_eager": True,
  129. "logfile": None,
  130. "loglevel": 0}
  131. supported_keys = fun_takes_kwargs(task.run, default_kwargs)
  132. extend_with = dict((key, val) for key, val in default_kwargs.items()
  133. if key in supported_keys)
  134. kwargs.update(extend_with)
  135. trace = TaskTrace(task.name, task_id, args, kwargs, task=task)
  136. retval = trace.execute()
  137. return EagerResult(task_id, retval, trace.status,
  138. traceback=trace.strtb)
  139. class TraceInfo(object):
  140. def __init__(self, status="PENDING", retval=None, exc_info=None):
  141. self.status = status
  142. self.retval = retval
  143. self.exc_info = exc_info
  144. self.exc_type = None
  145. self.exc_value = None
  146. self.tb = None
  147. self.strtb = None
  148. if self.exc_info:
  149. self.exc_type, self.exc_value, self.tb = exc_info
  150. self.strtb = "\n".join(traceback.format_exception(*exc_info))
  151. @classmethod
  152. def trace(cls, fun, args, kwargs):
  153. """Trace the execution of a function, calling the appropiate callback
  154. if the function raises retry, an failure or returned successfully."""
  155. try:
  156. return cls("SUCCESS", retval=fun(*args, **kwargs))
  157. except (SystemExit, KeyboardInterrupt):
  158. raise
  159. except RetryTaskError, exc:
  160. return cls("RETRY", retval=exc, exc_info=sys.exc_info())
  161. except Exception, exc:
  162. return cls("FAILURE", retval=exc, exc_info=sys.exc_info())
  163. class TaskTrace(object):
  164. def __init__(self, task_name, task_id, args, kwargs, task=None):
  165. self.task_id = task_id
  166. self.task_name = task_name
  167. self.args = args
  168. self.kwargs = kwargs
  169. self.task = task or tasks[self.task_name]
  170. self.status = "PENDING"
  171. self.strtb = None
  172. self._trace_handlers = {"FAILURE": self.handle_failure,
  173. "RETRY": self.handle_retry,
  174. "SUCCESS": self.handle_success}
  175. def __call__(self):
  176. return self.execute()
  177. def execute(self):
  178. signals.task_prerun.send(sender=self.task, task_id=self.task_id,
  179. task=self.task, args=self.args,
  180. kwargs=self.kwargs)
  181. retval = self._trace()
  182. signals.task_postrun.send(sender=self.task, task_id=self.task_id,
  183. task=self.task, args=self.args,
  184. kwargs=self.kwargs, retval=retval)
  185. return retval
  186. def _trace(self):
  187. trace = TraceInfo.trace(self.task, self.args, self.kwargs)
  188. self.status = trace.status
  189. self.strtb = trace.strtb
  190. handler = self._trace_handlers[trace.status]
  191. return handler(trace.retval, trace.exc_type, trace.tb, trace.strtb)
  192. def handle_success(self, retval, *args):
  193. """Handle successful execution."""
  194. self.task.on_success(retval, self.task_id, self.args, self.kwargs)
  195. return retval
  196. def handle_retry(self, exc, type_, tb, strtb):
  197. """Handle retry exception."""
  198. self.task.on_retry(exc, self.task_id, self.args, self.kwargs)
  199. # Create a simpler version of the RetryTaskError that stringifies
  200. # the original exception instead of including the exception instance.
  201. # This is for reporting the retry in logs, e-mail etc, while
  202. # guaranteeing pickleability.
  203. message, orig_exc = exc.args
  204. expanded_msg = "%s: %s" % (message, str(orig_exc))
  205. return ExceptionInfo((type_,
  206. type_(expanded_msg, None),
  207. tb))
  208. def handle_failure(self, exc, type_, tb, strtb):
  209. """Handle exception."""
  210. self.task.on_failure(exc, self.task_id, self.args, self.kwargs)
  211. return ExceptionInfo((type_, exc, tb))