job.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. """
  2. Jobs Executable by the Worker Server.
  3. """
  4. import sys
  5. import time
  6. import socket
  7. import warnings
  8. from django.core.mail import mail_admins
  9. from celery import conf
  10. from celery import platform
  11. from celery.log import get_default_logger
  12. from celery.utils import noop, fun_takes_kwargs
  13. from celery.loaders import current_loader
  14. from celery.execute.trace import TaskTrace
  15. from celery.registry import tasks
  16. from celery.datastructures import ExceptionInfo
  17. # pep8.py borks on a inline signature separator and
  18. # says "trailing whitespace" ;)
  19. EMAIL_SIGNATURE_SEP = "-- "
  20. TASK_FAIL_EMAIL_BODY = """
  21. Task %%(name)s with id %%(id)s raised exception: %%(exc)s
  22. Task was called with args: %%(args)s kwargs: %%(kwargs)s.
  23. The contents of the full traceback was:
  24. %%(traceback)s
  25. %(EMAIL_SIGNATURE_SEP)s
  26. Just to let you know,
  27. celeryd at %%(hostname)s.
  28. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  29. WANTED_DELIVERY_INFO = ("exchange", "routing_key", "consumer_tag", )
  30. class InvalidTaskError(Exception):
  31. """The task has invalid data or is not properly constructed."""
  32. class AlreadyExecutedError(Exception):
  33. """Tasks can only be executed once, as they might change
  34. world-wide state."""
  35. class WorkerTaskTrace(TaskTrace):
  36. """Wraps the task in a jail, catches all exceptions, and
  37. saves the status and result of the task execution to the task
  38. meta backend.
  39. If the call was successful, it saves the result to the task result
  40. backend, and sets the task status to ``"SUCCESS"``.
  41. If the call raises :exc:`celery.exceptions.RetryTaskError`, it extracts
  42. the original exception, uses that as the result and sets the task status
  43. to ``"RETRY"``.
  44. If the call results in an exception, it saves the exception as the task
  45. result, and sets the task status to ``"FAILURE"``.
  46. :param task_name: The name of the task to execute.
  47. :param task_id: The unique id of the task.
  48. :param args: List of positional args to pass on to the function.
  49. :param kwargs: Keyword arguments mapping to pass on to the function.
  50. :returns: the function return value on success, or
  51. the exception instance on failure.
  52. """
  53. def __init__(self, *args, **kwargs):
  54. self.loader = kwargs.pop("loader", current_loader())
  55. super(WorkerTaskTrace, self).__init__(*args, **kwargs)
  56. self._store_errors = True
  57. if self.task.ignore_result:
  58. self._store_errors = conf.STORE_ERRORS_EVEN_IF_IGNORED
  59. self.super = super(WorkerTaskTrace, self)
  60. def execute_safe(self, *args, **kwargs):
  61. """Same as :meth:`execute`, but catches errors."""
  62. try:
  63. return self.execute(*args, **kwargs)
  64. except Exception, exc:
  65. _type, _value, _tb = sys.exc_info()
  66. _value = self.task.backend.prepare_exception(exc)
  67. exc_info = ExceptionInfo((_type, _value, _tb))
  68. warnings.warn("Exception outside body: %s: %s\n%s" % tuple(
  69. map(str, (exc.__class__, exc, exc_info.traceback))))
  70. return exc_info
  71. def execute(self):
  72. """Execute, trace and store the result of the task."""
  73. self.loader.on_task_init(self.task_id, self.task)
  74. self.task.backend.process_cleanup()
  75. return super(WorkerTaskTrace, self).execute()
  76. def handle_success(self, retval, *args):
  77. """Handle successful execution."""
  78. if not self.task.ignore_result:
  79. self.task.backend.mark_as_done(self.task_id, retval)
  80. return self.super.handle_success(retval, *args)
  81. def handle_retry(self, exc, type_, tb, strtb):
  82. """Handle retry exception."""
  83. message, orig_exc = exc.args
  84. if self._store_errors:
  85. self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
  86. self.super.handle_retry(exc, type_, tb, strtb)
  87. def handle_failure(self, exc, type_, tb, strtb):
  88. """Handle exception."""
  89. if self._store_errors:
  90. exc = self.task.backend.mark_as_failure(self.task_id, exc, strtb)
  91. else:
  92. exc = self.task.backend.prepare_exception(exc)
  93. return self.super.handle_failure(exc, type_, tb, strtb)
  94. def execute_and_trace(task_name, *args, **kwargs):
  95. platform.set_mp_process_title("celeryd", info=task_name)
  96. try:
  97. return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
  98. finally:
  99. platform.set_mp_process_title("celeryd")
  100. class TaskWrapper(object):
  101. """Class wrapping a task to be passed around and finally
  102. executed inside of the worker.
  103. :param task_name: see :attr:`task_name`.
  104. :param task_id: see :attr:`task_id`.
  105. :param args: see :attr:`args`
  106. :param kwargs: see :attr:`kwargs`.
  107. .. attribute:: task_name
  108. Kind of task. Must be a name registered in the task registry.
  109. .. attribute:: task_id
  110. UUID of the task.
  111. .. attribute:: args
  112. List of positional arguments to apply to the task.
  113. .. attribute:: kwargs
  114. Mapping of keyword arguments to apply to the task.
  115. .. attribute:: message
  116. The original message sent. Used for acknowledging the message.
  117. .. attribute executed
  118. Set if the task has been executed. A task should only be executed
  119. once.
  120. """
  121. success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  122. fail_msg = """
  123. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  124. """
  125. fail_email_subject = """
  126. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  127. """
  128. fail_email_body = TASK_FAIL_EMAIL_BODY
  129. executed = False
  130. time_start = None
  131. def __init__(self, task_name, task_id, args, kwargs,
  132. on_ack=noop, retries=0, delivery_info=None, **opts):
  133. self.task_name = task_name
  134. self.task_id = task_id
  135. self.retries = retries
  136. self.args = args
  137. self.kwargs = kwargs
  138. self.on_ack = on_ack
  139. self.delivery_info = delivery_info or {}
  140. self.task = tasks[self.task_name]
  141. for opt in ("success_msg", "fail_msg", "fail_email_subject",
  142. "fail_email_body", "logger", "eventer"):
  143. setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
  144. if not self.logger:
  145. self.logger = get_default_logger()
  146. def __repr__(self):
  147. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  148. self.__class__.__name__,
  149. self.task_name, self.task_id,
  150. self.args, self.kwargs)
  151. @classmethod
  152. def from_message(cls, message, message_data, logger=None, eventer=None):
  153. """Create a :class:`TaskWrapper` from a task message sent by
  154. :class:`celery.messaging.TaskPublisher`.
  155. :raises UnknownTaskError: if the message does not describe a task,
  156. the message is also rejected.
  157. :returns: :class:`TaskWrapper` instance.
  158. """
  159. task_name = message_data["task"]
  160. task_id = message_data["id"]
  161. args = message_data["args"]
  162. kwargs = message_data["kwargs"]
  163. retries = message_data.get("retries", 0)
  164. _delivery_info = getattr(message, "delivery_info", {})
  165. delivery_info = dict((key, _delivery_info.get(key))
  166. for key in WANTED_DELIVERY_INFO)
  167. if not hasattr(kwargs, "items"):
  168. raise InvalidTaskError("Task kwargs must be a dictionary.")
  169. # Convert any unicode keys in the keyword arguments to ascii.
  170. kwargs = dict((key.encode("utf-8"), value)
  171. for key, value in kwargs.items())
  172. return cls(task_name, task_id, args, kwargs,
  173. retries=retries, on_ack=message.ack,
  174. delivery_info=delivery_info,
  175. logger=logger, eventer=eventer)
  176. def extend_with_default_kwargs(self, loglevel, logfile):
  177. """Extend the tasks keyword arguments with standard task arguments.
  178. Currently these are ``logfile``, ``loglevel``, ``task_id``,
  179. ``task_name``, ``task_retries``, and ``delivery_info``.
  180. See :meth:`celery.task.base.Task.run` for more information.
  181. """
  182. kwargs = dict(self.kwargs)
  183. default_kwargs = {"logfile": logfile,
  184. "loglevel": loglevel,
  185. "task_id": self.task_id,
  186. "task_name": self.task_name,
  187. "task_retries": self.retries,
  188. "task_is_eager": False,
  189. "delivery_info": self.delivery_info}
  190. fun = self.task.run
  191. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  192. extend_with = dict((key, val) for key, val in default_kwargs.items()
  193. if key in supported_keys)
  194. kwargs.update(extend_with)
  195. return kwargs
  196. def _get_tracer_args(self, loglevel=None, logfile=None):
  197. """Get the :class:`WorkerTaskTrace` tracer for this task."""
  198. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  199. return self.task_name, self.task_id, self.args, task_func_kwargs
  200. def _set_executed_bit(self):
  201. """Set task as executed to make sure it's not executed again."""
  202. if self.executed:
  203. raise AlreadyExecutedError(
  204. "Task %s[%s] has already been executed" % (
  205. self.task_name, self.task_id))
  206. self.executed = True
  207. def execute(self, loglevel=None, logfile=None):
  208. """Execute the task in a :class:`WorkerTaskTrace`.
  209. :keyword loglevel: The loglevel used by the task.
  210. :keyword logfile: The logfile used by the task.
  211. """
  212. # Make sure task has not already been executed.
  213. self._set_executed_bit()
  214. # acknowledge task as being processed.
  215. self.on_ack()
  216. tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile))
  217. return tracer.execute()
  218. def send_event(self, type, **fields):
  219. if self.eventer:
  220. self.eventer.send(type, **fields)
  221. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  222. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  223. :param pool: A :class:`multiprocessing.Pool` instance.
  224. :keyword loglevel: The loglevel used by the task.
  225. :keyword logfile: The logfile used by the task.
  226. :returns :class:`multiprocessing.AsyncResult` instance.
  227. """
  228. # Make sure task has not already been executed.
  229. self._set_executed_bit()
  230. self.send_event("task-accepted", uuid=self.task_id)
  231. args = self._get_tracer_args(loglevel, logfile)
  232. self.time_start = time.time()
  233. result = pool.apply_async(execute_and_trace, args=args,
  234. callbacks=[self.on_success], errbacks=[self.on_failure])
  235. self.on_ack()
  236. return result
  237. def on_success(self, ret_value):
  238. """The handler used if the task was successfully processed (
  239. without raising an exception)."""
  240. runtime = time.time() - self.time_start
  241. self.send_event("task-succeeded", uuid=self.task_id,
  242. result=ret_value, runtime=runtime)
  243. msg = self.success_msg.strip() % {
  244. "id": self.task_id,
  245. "name": self.task_name,
  246. "return_value": ret_value}
  247. self.logger.info(msg)
  248. def on_failure(self, exc_info):
  249. """The handler used if the task raised an exception."""
  250. self.send_event("task-failed", uuid=self.task_id,
  251. exception=exc_info.exception,
  252. traceback=exc_info.traceback)
  253. context = {
  254. "hostname": socket.gethostname(),
  255. "id": self.task_id,
  256. "name": self.task_name,
  257. "exc": exc_info.exception,
  258. "traceback": exc_info.traceback,
  259. "args": self.args,
  260. "kwargs": self.kwargs,
  261. }
  262. self.logger.error(self.fail_msg.strip() % context)
  263. task_obj = tasks.get(self.task_name, object)
  264. send_error_email = conf.CELERY_SEND_TASK_ERROR_EMAILS and not \
  265. task_obj.disable_error_emails
  266. if send_error_email:
  267. subject = self.fail_email_subject.strip() % context
  268. body = self.fail_email_body.strip() % context
  269. mail_admins(subject, body, fail_silently=True)