execute.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import sys
  2. import inspect
  3. import traceback
  4. from datetime import datetime, timedelta
  5. from carrot.connection import DjangoBrokerConnection
  6. from billiard.utils.functional import curry
  7. from celery import signals
  8. from celery.conf import AMQP_CONNECTION_TIMEOUT
  9. from celery.utils import gen_unique_id, noop, fun_takes_kwargs
  10. from celery.result import AsyncResult, EagerResult
  11. from celery.registry import tasks
  12. from celery.messaging import TaskPublisher, with_connection
  13. from celery.exceptions import RetryTaskError
  14. from celery.datastructures import ExceptionInfo
  15. TASK_EXEC_OPTIONS = ("routing_key", "exchange",
  16. "immediate", "mandatory",
  17. "priority", "serializer")
  18. def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
  19. task_id=None, publisher=None, connection=None, connect_timeout=None,
  20. **options):
  21. """Run a task asynchronously by the celery daemon(s).
  22. :param task: The task to run (a callable object, or a :class:`Task`
  23. instance
  24. :keyword args: The positional arguments to pass on to the
  25. task (a ``list``).
  26. :keyword kwargs: The keyword arguments to pass on to the task (a ``dict``)
  27. :keyword countdown: Number of seconds into the future that the task should
  28. execute. Defaults to immediate delivery (Do not confuse that with
  29. the ``immediate`` setting, they are unrelated).
  30. :keyword eta: A :class:`datetime.datetime` object that describes the
  31. absolute time when the task should execute. May not be specified
  32. if ``countdown`` is also supplied. (Do not confuse this with the
  33. ``immediate`` setting, they are unrelated).
  34. :keyword routing_key: The routing key used to route the task to a worker
  35. server.
  36. :keyword exchange: The named exchange to send the task to. Defaults to
  37. :attr:`celery.task.base.Task.exchange`.
  38. :keyword immediate: Request immediate delivery. Will raise an exception
  39. if the task cannot be routed to a worker immediately.
  40. (Do not confuse this parameter with the ``countdown`` and ``eta``
  41. settings, as they are unrelated).
  42. :keyword mandatory: Mandatory routing. Raises an exception if there's
  43. no running workers able to take on this task.
  44. :keyword connection: Re-use existing AMQP connection.
  45. The ``connect_timeout`` argument is not respected if this is set.
  46. :keyword connect_timeout: The timeout in seconds, before we give up
  47. on establishing a connection to the AMQP server.
  48. :keyword priority: The task priority, a number between ``0`` and ``9``.
  49. :keyword serializer: A string identifying the default serialization
  50. method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
  51. Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
  52. methods that have been registered with
  53. :mod:`carrot.serialization.registry`.
  54. **Note**: If the ``CELERY_ALWAYS_EAGER`` setting is set, it will be
  55. replaced by a local :func:`apply` call instead.
  56. """
  57. from celery.conf import ALWAYS_EAGER
  58. if ALWAYS_EAGER:
  59. return apply(task, args, kwargs)
  60. for option_name in TASK_EXEC_OPTIONS:
  61. if option_name not in options:
  62. options[option_name] = getattr(task, option_name, None)
  63. if countdown: # Convert countdown to ETA.
  64. eta = datetime.now() + timedelta(seconds=countdown)
  65. def _delay_task(connection):
  66. publish = publisher or TaskPublisher(connection)
  67. try:
  68. return publish.delay_task(task.name, args or [], kwargs or {},
  69. task_id=task_id,
  70. eta=eta,
  71. **options)
  72. finally:
  73. publisher or publish.close()
  74. task_id = with_connection(_delay_task, connection=connection,
  75. connect_timeout=connect_timeout)
  76. return AsyncResult(task_id)
  77. def delay_task(task_name, *args, **kwargs):
  78. """Delay a task for execution by the ``celery`` daemon.
  79. :param task_name: the name of a task registered in the task registry.
  80. :param \*args: positional arguments to pass on to the task.
  81. :param \*\*kwargs: keyword arguments to pass on to the task.
  82. :raises celery.exceptions.NotRegistered: exception if no such task
  83. has been registered in the task registry.
  84. :rtype: :class:`celery.result.AsyncResult`.
  85. Example
  86. >>> r = delay_task("update_record", name="George Constanza", age=32)
  87. >>> r.ready()
  88. True
  89. >>> r.result
  90. "Record was updated"
  91. """
  92. if task_name not in tasks:
  93. raise tasks.NotRegistered(
  94. "Task with name %s not registered in the task registry." % (
  95. task_name))
  96. task = tasks[task_name]
  97. return apply_async(task, args, kwargs)
  98. def apply(task, args, kwargs, **options):
  99. """Apply the task locally.
  100. This will block until the task completes, and returns a
  101. :class:`celery.result.EagerResult` instance.
  102. """
  103. args = args or []
  104. kwargs = kwargs or {}
  105. task_id = gen_unique_id()
  106. retries = options.get("retries", 0)
  107. # If it's a Task class we need to instantiate it, so it's callable.
  108. task = inspect.isclass(task) and task() or task
  109. default_kwargs = {"task_name": task.name,
  110. "task_id": task_id,
  111. "task_retries": retries,
  112. "task_is_eager": True,
  113. "logfile": None,
  114. "loglevel": 0}
  115. supported_keys = fun_takes_kwargs(task.run, default_kwargs)
  116. extend_with = dict((key, val) for key, val in default_kwargs.items()
  117. if key in supported_keys)
  118. kwargs.update(extend_with)
  119. trace = TaskTrace(task.name, task_id, args, kwargs, task=task)
  120. retval = trace.execute()
  121. return EagerResult(task_id, retval, trace.status,
  122. traceback=trace.strtb)
  123. class TraceInfo(object):
  124. def __init__(self, status="PENDING", retval=None, exc_info=None):
  125. self.status = status
  126. self.retval = retval
  127. self.exc_info = exc_info
  128. self.exc_type = None
  129. self.exc_value = None
  130. self.tb = None
  131. self.strtb = None
  132. if self.exc_info:
  133. self.exc_type, self.exc_value, self.tb = exc_info
  134. self.strtb = "\n".join(traceback.format_exception(*exc_info))
  135. @classmethod
  136. def trace(cls, fun, args, kwargs):
  137. """Trace the execution of a function, calling the appropiate callback
  138. if the function raises retry, an failure or returned successfully."""
  139. try:
  140. return cls("SUCCESS", retval=fun(*args, **kwargs))
  141. except (SystemExit, KeyboardInterrupt):
  142. raise
  143. except RetryTaskError, exc:
  144. return cls("RETRY", retval=exc, exc_info=sys.exc_info())
  145. except Exception, exc:
  146. return cls("FAILURE", retval=exc, exc_info=sys.exc_info())
  147. class TaskTrace(object):
  148. def __init__(self, task_name, task_id, args, kwargs, task=None):
  149. self.task_id = task_id
  150. self.task_name = task_name
  151. self.args = args
  152. self.kwargs = kwargs
  153. self.task = task or tasks[self.task_name]
  154. self.status = "PENDING"
  155. self.strtb = None
  156. self._trace_handlers = {"FAILURE": self.handle_failure,
  157. "RETRY": self.handle_retry,
  158. "SUCCESS": self.handle_success}
  159. def __call__(self):
  160. return self.execute()
  161. def execute(self):
  162. signals.task_prerun.send(sender=self.task, task_id=self.task_id,
  163. task=self.task, args=self.args,
  164. kwargs=self.kwargs)
  165. retval = self._trace()
  166. signals.task_postrun.send(sender=self.task, task_id=self.task_id,
  167. task=self.task, args=self.args,
  168. kwargs=self.kwargs, retval=retval)
  169. return retval
  170. def _trace(self):
  171. trace = TraceInfo.trace(self.task, self.args, self.kwargs)
  172. self.status = trace.status
  173. self.strtb = trace.strtb
  174. handler = self._trace_handlers[trace.status]
  175. return handler(trace.retval, trace.exc_type, trace.tb, trace.strtb)
  176. def handle_success(self, retval, *args):
  177. """Handle successful execution."""
  178. self.task.on_success(retval, self.task_id, self.args, self.kwargs)
  179. return retval
  180. def handle_retry(self, exc, type_, tb, strtb):
  181. """Handle retry exception."""
  182. self.task.on_retry(exc, self.task_id, self.args, self.kwargs)
  183. # Create a simpler version of the RetryTaskError that stringifies
  184. # the original exception instead of including the exception instance.
  185. # This is for reporting the retry in logs, e-mail etc, while
  186. # guaranteeing pickleability.
  187. message, orig_exc = exc.args
  188. expanded_msg = "%s: %s" % (message, str(orig_exc))
  189. return ExceptionInfo((type_,
  190. type_(expanded_msg, None),
  191. tb))
  192. def handle_failure(self, exc, type_, tb, strtb):
  193. """Handle exception."""
  194. self.task.on_failure(exc, self.task_id, self.args, self.kwargs)
  195. return ExceptionInfo((type_, exc, tb))