job.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. """
  2. Jobs Executable by the Worker Server.
  3. """
  4. import sys
  5. import time
  6. import socket
  7. import warnings
  8. from django.core.mail import mail_admins
  9. from celery import conf
  10. from celery import platform
  11. from celery.log import get_default_logger
  12. from celery.utils import noop, fun_takes_kwargs
  13. from celery.loaders import current_loader
  14. from celery.execute.trace import TaskTrace
  15. from celery.registry import tasks
  16. from celery.datastructures import ExceptionInfo
  17. # pep8.py borks on a inline signature separator and
  18. # says "trailing whitespace" ;)
  19. EMAIL_SIGNATURE_SEP = "-- "
  20. TASK_FAIL_EMAIL_BODY = """
  21. Task %%(name)s with id %%(id)s raised exception: %%(exc)s
  22. Task was called with args: %%(args)s kwargs: %%(kwargs)s.
  23. The contents of the full traceback was:
  24. %%(traceback)s
  25. %(EMAIL_SIGNATURE_SEP)s
  26. Just to let you know,
  27. celeryd at %%(hostname)s.
  28. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  29. class AlreadyExecutedError(Exception):
  30. """Tasks can only be executed once, as they might change
  31. world-wide state."""
  32. class WorkerTaskTrace(TaskTrace):
  33. """Wraps the task in a jail, catches all exceptions, and
  34. saves the status and result of the task execution to the task
  35. meta backend.
  36. If the call was successful, it saves the result to the task result
  37. backend, and sets the task status to ``"SUCCESS"``.
  38. If the call raises :exc:`celery.exceptions.RetryTaskError`, it extracts
  39. the original exception, uses that as the result and sets the task status
  40. to ``"RETRY"``.
  41. If the call results in an exception, it saves the exception as the task
  42. result, and sets the task status to ``"FAILURE"``.
  43. :param task_name: The name of the task to execute.
  44. :param task_id: The unique id of the task.
  45. :param args: List of positional args to pass on to the function.
  46. :param kwargs: Keyword arguments mapping to pass on to the function.
  47. :returns: the function return value on success, or
  48. the exception instance on failure.
  49. """
  50. def __init__(self, *args, **kwargs):
  51. self.loader = kwargs.pop("loader", current_loader)
  52. super(WorkerTaskTrace, self).__init__(*args, **kwargs)
  53. self._store_errors = True
  54. if self.task.ignore_result:
  55. self._store_errors = conf.STORE_ERRORS_EVEN_IF_IGNORED
  56. self.super = super(WorkerTaskTrace, self)
  57. def execute_safe(self, *args, **kwargs):
  58. """Same as :meth:`execute`, but catches errors."""
  59. try:
  60. return self.execute(*args, **kwargs)
  61. except Exception, exc:
  62. _type, _value, _tb = sys.exc_info()
  63. _value = self.task.backend.prepare_exception(exc)
  64. exc_info = ExceptionInfo((_type, _value, _tb))
  65. warnings.warn("Exception outside body: %s: %s\n%s" % tuple(
  66. map(str, (exc.__class__, exc, exc_info.traceback))))
  67. return exc_info
  68. def execute(self):
  69. """Execute, trace and store the result of the task."""
  70. self.loader.on_task_init(self.task_id, self.task)
  71. self.task.backend.process_cleanup()
  72. return self._trace()
  73. def handle_success(self, retval, *args):
  74. """Handle successful execution."""
  75. if not self.task.ignore_result:
  76. self.task.backend.mark_as_done(self.task_id, retval)
  77. return self.super.handle_success(retval, *args)
  78. def handle_retry(self, exc, type_, tb, strtb):
  79. """Handle retry exception."""
  80. message, orig_exc = exc.args
  81. if self._store_errors:
  82. self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
  83. return self.super.handle_retry(exc, type_, tb, strtb)
  84. def handle_failure(self, exc, type_, tb, strtb):
  85. """Handle exception."""
  86. if self._store_errors:
  87. exc = self.task.backend.mark_as_failure(self.task_id, exc, strtb)
  88. else:
  89. exc = self.task.backend.prepare_exception(exc)
  90. return self.super.handle_failure(exc, type_, tb, strtb)
  91. def execute_and_trace(task_name, *args, **kwargs):
  92. platform.set_mp_process_title("celeryd", info=task_name)
  93. try:
  94. return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
  95. finally:
  96. platform.set_mp_process_title("celeryd")
  97. class TaskWrapper(object):
  98. """Class wrapping a task to be passed around and finally
  99. executed inside of the worker.
  100. :param task_name: see :attr:`task_name`.
  101. :param task_id: see :attr:`task_id`.
  102. :param args: see :attr:`args`
  103. :param kwargs: see :attr:`kwargs`.
  104. .. attribute:: task_name
  105. Kind of task. Must be a name registered in the task registry.
  106. .. attribute:: task_id
  107. UUID of the task.
  108. .. attribute:: args
  109. List of positional arguments to apply to the task.
  110. .. attribute:: kwargs
  111. Mapping of keyword arguments to apply to the task.
  112. .. attribute:: message
  113. The original message sent. Used for acknowledging the message.
  114. .. attribute executed
  115. Set if the task has been executed. A task should only be executed
  116. once.
  117. """
  118. success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  119. fail_msg = """
  120. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  121. """
  122. fail_email_subject = """
  123. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  124. """
  125. fail_email_body = TASK_FAIL_EMAIL_BODY
  126. def __init__(self, task_name, task_id, args, kwargs,
  127. on_ack=noop, retries=0, **opts):
  128. self.task_name = task_name
  129. self.task_id = task_id
  130. self.retries = retries
  131. self.args = args
  132. self.kwargs = kwargs
  133. self.logger = opts.get("logger")
  134. self.eventer = opts.get("eventer")
  135. self.on_ack = on_ack
  136. self.executed = False
  137. self.time_start = None
  138. for opt in ("success_msg", "fail_msg", "fail_email_subject",
  139. "fail_email_body"):
  140. setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
  141. if not self.logger:
  142. self.logger = get_default_logger()
  143. self.task = tasks[self.task_name]
  144. def __repr__(self):
  145. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  146. self.__class__.__name__,
  147. self.task_name, self.task_id,
  148. self.args, self.kwargs)
  149. @classmethod
  150. def from_message(cls, message, message_data, logger=None, eventer=None):
  151. """Create a :class:`TaskWrapper` from a task message sent by
  152. :class:`celery.messaging.TaskPublisher`.
  153. :raises UnknownTaskError: if the message does not describe a task,
  154. the message is also rejected.
  155. :returns: :class:`TaskWrapper` instance.
  156. """
  157. task_name = message_data["task"]
  158. task_id = message_data["id"]
  159. args = message_data["args"]
  160. kwargs = message_data["kwargs"]
  161. retries = message_data.get("retries", 0)
  162. # Convert any unicode keys in the keyword arguments to ascii.
  163. kwargs = dict((key.encode("utf-8"), value)
  164. for key, value in kwargs.items())
  165. return cls(task_name, task_id, args, kwargs,
  166. retries=retries, on_ack=message.ack,
  167. logger=logger, eventer=eventer)
  168. def extend_with_default_kwargs(self, loglevel, logfile):
  169. """Extend the tasks keyword arguments with standard task arguments.
  170. Currently these are ``logfile``, ``loglevel``, ``task_id``,
  171. ``task_name`` and ``task_retries``.
  172. See :meth:`celery.task.base.Task.run` for more information.
  173. """
  174. kwargs = dict(self.kwargs)
  175. default_kwargs = {"logfile": logfile,
  176. "loglevel": loglevel,
  177. "task_id": self.task_id,
  178. "task_name": self.task_name,
  179. "task_retries": self.retries}
  180. fun = self.task.run
  181. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  182. extend_with = dict((key, val) for key, val in default_kwargs.items()
  183. if key in supported_keys)
  184. kwargs.update(extend_with)
  185. return kwargs
  186. def _get_tracer_args(self, loglevel=None, logfile=None):
  187. """Get the :class:`WorkerTaskTrace` tracer for this task."""
  188. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  189. return self.task_name, self.task_id, self.args, task_func_kwargs
  190. def _set_executed_bit(self):
  191. """Set task as executed to make sure it's not executed again."""
  192. if self.executed:
  193. raise AlreadyExecutedError(
  194. "Task %s[%s] has already been executed" % (
  195. self.task_name, self.task_id))
  196. self.executed = True
  197. def execute(self, loglevel=None, logfile=None):
  198. """Execute the task in a :class:`WorkerTaskTrace`.
  199. :keyword loglevel: The loglevel used by the task.
  200. :keyword logfile: The logfile used by the task.
  201. """
  202. # Make sure task has not already been executed.
  203. self._set_executed_bit()
  204. # acknowledge task as being processed.
  205. self.on_ack()
  206. tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile))
  207. return tracer.execute()
  208. def send_event(self, type, **fields):
  209. if self.eventer:
  210. self.eventer.send(type, **fields)
  211. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  212. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  213. :param pool: A :class:`multiprocessing.Pool` instance.
  214. :keyword loglevel: The loglevel used by the task.
  215. :keyword logfile: The logfile used by the task.
  216. :returns :class:`multiprocessing.AsyncResult` instance.
  217. """
  218. # Make sure task has not already been executed.
  219. self._set_executed_bit()
  220. self.send_event("task-accepted", uuid=self.task_id)
  221. args = self._get_tracer_args(loglevel, logfile)
  222. self.time_start = time.time()
  223. return pool.apply_async(execute_and_trace, args=args,
  224. callbacks=[self.on_success], errbacks=[self.on_failure],
  225. on_ack=self.on_ack)
  226. def on_success(self, ret_value):
  227. """The handler used if the task was successfully processed (
  228. without raising an exception)."""
  229. runtime = time.time() - self.time_start
  230. self.send_event("task-succeeded", uuid=self.task_id,
  231. result=ret_value, runtime=runtime)
  232. msg = self.success_msg.strip() % {
  233. "id": self.task_id,
  234. "name": self.task_name,
  235. "return_value": ret_value}
  236. self.logger.info(msg)
  237. def on_failure(self, exc_info):
  238. """The handler used if the task raised an exception."""
  239. self.send_event("task-failed", uuid=self.task_id,
  240. exception=exc_info.exception,
  241. traceback=exc_info.traceback)
  242. context = {
  243. "hostname": socket.gethostname(),
  244. "id": self.task_id,
  245. "name": self.task_name,
  246. "exc": exc_info.exception,
  247. "traceback": exc_info.traceback,
  248. "args": self.args,
  249. "kwargs": self.kwargs,
  250. }
  251. self.logger.error(self.fail_msg.strip() % context)
  252. task_obj = tasks.get(self.task_name, object)
  253. send_error_email = conf.CELERY_SEND_TASK_ERROR_EMAILS and not \
  254. task_obj.disable_error_emails
  255. if send_error_email:
  256. subject = self.fail_email_subject.strip() % context
  257. body = self.fail_email_body.strip() % context
  258. mail_admins(subject, body, fail_silently=True)