job.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. """
  2. Jobs Executable by the Worker Server.
  3. """
  4. from celery.registry import tasks, NotRegistered
  5. from celery.datastructures import ExceptionInfo
  6. from celery.backends import default_backend
  7. from django.core.mail import mail_admins
  8. from celery.monitoring import TaskTimerStats
  9. from celery.task.base import RetryTaskError
  10. import multiprocessing
  11. import traceback
  12. import socket
  13. import sys
  14. # pep8.py borks on a inline signature separator and
  15. # says "trailing whitespace" ;)
  16. EMAIL_SIGNATURE_SEP = "-- "
  17. TASK_FAIL_EMAIL_BODY = """
  18. Task %%(name)s with id %%(id)s raised exception: %%(exc)s
  19. Task was called with args:%%(args)s kwargs:%%(kwargs)s.
  20. The contents of the full traceback was:
  21. %%(traceback)s
  22. %(EMAIL_SIGNATURE_SEP)s
  23. Just thought I'd let you know!
  24. celeryd at %%(hostname)s.
  25. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  26. def jail(task_id, task_name, func, args, kwargs):
  27. """Wraps the task in a jail, which catches all exceptions, and
  28. saves the status and result of the task execution to the task
  29. meta backend.
  30. If the call was successful, it saves the result to the task result
  31. backend, and sets the task status to ``"DONE"``.
  32. If the call results in an exception, it saves the exception as the task
  33. result, and sets the task status to ``"FAILURE"``.
  34. :param task_id: The id of the task.
  35. :param task_name: The name of the task.
  36. :param func: Callable object to execute.
  37. :param args: List of positional args to pass on to the function.
  38. :param kwargs: Keyword arguments mapping to pass on to the function.
  39. :returns: the function return value on success, or
  40. the exception instance on failure.
  41. """
  42. ignore_result = getattr(func, "ignore_result", False)
  43. timer_stat = TaskTimerStats.start(task_id, task_name, args, kwargs)
  44. # See: http://groups.google.com/group/django-users/browse_thread/
  45. # thread/78200863d0c07c6d/38402e76cf3233e8?hl=en&lnk=gst&
  46. # q=multiprocessing#38402e76cf3233e8
  47. from django.db import connection
  48. connection.close()
  49. # Reset cache connection only if using memcached/libmemcached
  50. from django.core import cache
  51. # XXX At Opera we use a custom memcached backend that uses libmemcached
  52. # instead of libmemcache (cmemcache). Should find a better solution for
  53. # this, but for now "memcached" should probably be unique enough of a
  54. # string to not make problems.
  55. cache_backend = cache.settings.CACHE_BACKEND
  56. if hasattr(cache, "parse_backend_uri"):
  57. cache_scheme = cache.parse_backend_uri(cache_backend)[0]
  58. else:
  59. # Django <= 1.0.2
  60. cache_scheme = cache_backend.split(":", 1)[0]
  61. if "memcached" in cache_scheme:
  62. cache.cache.close()
  63. # Backend process cleanup
  64. default_backend.process_cleanup()
  65. try:
  66. result = func(*args, **kwargs)
  67. except (SystemExit, KeyboardInterrupt):
  68. raise
  69. except RetryTaskError, exc:
  70. ### Task is to be retried.
  71. # RetryTaskError stores both a small message describing the retry
  72. # and the original exception.
  73. message, orig_exc = exc.args
  74. default_backend.mark_as_retry(task_id, orig_exc)
  75. # Create a simpler version of the RetryTaskError that stringifies
  76. # the original exception instead of including the exception instance.
  77. # This is for reporting the retry in logs, e-mail etc, while
  78. # guaranteeing pickleability.
  79. expanded_msg = "%s: %s" % (message, str(orig_exc))
  80. type_, _, tb = sys.exc_info()
  81. retval = ExceptionInfo((type_,
  82. type_(expanded_msg, None),
  83. tb))
  84. except Exception, exc:
  85. ### Task ended in failure.
  86. # mark_as_failure returns an exception that is guaranteed to
  87. # be pickleable.
  88. stored_exc = default_backend.mark_as_failure(task_id, exc)
  89. # wrap exception info + traceback and return it to caller.
  90. type_, _, tb = sys.exc_info()
  91. retval = ExceptionInfo((type_, stored_exc, tb))
  92. else:
  93. ### Task executed successfully.
  94. if not ignore_result:
  95. default_backend.mark_as_done(task_id, result)
  96. retval = result
  97. finally:
  98. timer_stat.stop()
  99. return retval
  100. class TaskWrapper(object):
  101. """Class wrapping a task to be run.
  102. :param task_name: see :attr:`task_name`.
  103. :param task_id: see :attr:`task_id`.
  104. :param task_func: see :attr:`task_func`
  105. :param args: see :attr:`args`
  106. :param kwargs: see :attr:`kwargs`.
  107. .. attribute:: task_name
  108. Kind of task. Must be a name registered in the task registry.
  109. .. attribute:: task_id
  110. UUID of the task.
  111. .. attribute:: task_func
  112. The tasks callable object.
  113. .. attribute:: args
  114. List of positional arguments to apply to the task.
  115. .. attribute:: kwargs
  116. Mapping of keyword arguments to apply to the task.
  117. .. attribute:: message
  118. The original message sent. Used for acknowledging the message.
  119. """
  120. success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  121. fail_msg = """
  122. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  123. """
  124. fail_email_subject = """
  125. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  126. """
  127. fail_email_body = TASK_FAIL_EMAIL_BODY
  128. def __init__(self, task_name, task_id, task_func, args, kwargs,
  129. on_ack=None, retries=0, **opts):
  130. self.task_name = task_name
  131. self.task_id = task_id
  132. self.task_func = task_func
  133. self.retries = retries
  134. self.args = args
  135. self.kwargs = kwargs
  136. self.logger = kwargs.get("logger")
  137. self.on_ack = on_ack
  138. for opt in ("success_msg", "fail_msg", "fail_email_subject",
  139. "fail_email_body"):
  140. setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
  141. if not self.logger:
  142. self.logger = multiprocessing.get_logger()
  143. def __repr__(self):
  144. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  145. self.__class__.__name__,
  146. self.task_name, self.task_id,
  147. self.args, self.kwargs)
  148. @classmethod
  149. def from_message(cls, message, message_data, logger=None):
  150. """Create a :class:`TaskWrapper` from a task message sent by
  151. :class:`celery.messaging.TaskPublisher`.
  152. :raises UnknownTaskError: if the message does not describe a task,
  153. the message is also rejected.
  154. :returns: :class:`TaskWrapper` instance.
  155. """
  156. task_name = message_data["task"]
  157. task_id = message_data["id"]
  158. args = message_data["args"]
  159. kwargs = message_data["kwargs"]
  160. retries = message_data.get("retries", 0)
  161. # Convert any unicode keys in the keyword arguments to ascii.
  162. kwargs = dict((key.encode("utf-8"), value)
  163. for key, value in kwargs.items())
  164. if task_name not in tasks:
  165. raise NotRegistered(task_name)
  166. task_func = tasks[task_name]
  167. return cls(task_name, task_id, task_func, args, kwargs,
  168. retries=retries, on_ack=message.ack, logger=logger)
  169. def extend_with_default_kwargs(self, loglevel, logfile):
  170. """Extend the tasks keyword arguments with standard task arguments.
  171. These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
  172. """
  173. kwargs = dict(self.kwargs)
  174. task_func_kwargs = {"logfile": logfile,
  175. "loglevel": loglevel,
  176. "task_id": self.task_id,
  177. "task_name": self.task_name,
  178. "task_retries": self.retries}
  179. kwargs.update(task_func_kwargs)
  180. return kwargs
  181. def execute(self, loglevel=None, logfile=None):
  182. """Execute the task in a :func:`jail` and store return value
  183. and status in the task meta backend.
  184. :keyword loglevel: The loglevel used by the task.
  185. :keyword logfile: The logfile used by the task.
  186. """
  187. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  188. # acknowledge task as being processed.
  189. if self.on_ack:
  190. self.on_ack()
  191. return jail(self.task_id, self.task_name, self.task_func,
  192. self.args, task_func_kwargs)
  193. def on_success(self, ret_value, meta):
  194. """The handler used if the task was successfully processed (
  195. without raising an exception)."""
  196. task_id = meta.get("task_id")
  197. task_name = meta.get("task_name")
  198. msg = self.success_msg.strip() % {
  199. "id": task_id,
  200. "name": task_name,
  201. "return_value": ret_value}
  202. self.logger.info(msg)
  203. def on_failure(self, exc_info, meta):
  204. """The handler used if the task raised an exception."""
  205. from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
  206. task_id = meta.get("task_id")
  207. task_name = meta.get("task_name")
  208. context = {
  209. "hostname": socket.gethostname(),
  210. "id": task_id,
  211. "name": task_name,
  212. "exc": exc_info.exception,
  213. "traceback": exc_info.traceback,
  214. "args": self.args,
  215. "kwargs": self.kwargs,
  216. }
  217. self.logger.error(self.fail_msg.strip() % context)
  218. task_obj = tasks.get(task_name, object)
  219. send_error_email = SEND_CELERY_TASK_ERROR_EMAILS and not \
  220. getattr(task_obj, "disable_error_emails", False)
  221. if send_error_email:
  222. subject = self.fail_email_subject.strip() % context
  223. body = self.fail_email_body.strip() % context
  224. mail_admins(subject, body, fail_silently=True)
  225. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  226. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  227. :param pool: A :class:`multiprocessing.Pool` instance.
  228. :keyword loglevel: The loglevel used by the task.
  229. :keyword logfile: The logfile used by the task.
  230. :returns :class:`multiprocessing.AsyncResult` instance.
  231. """
  232. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  233. jail_args = [self.task_id, self.task_name, self.task_func,
  234. self.args, task_func_kwargs]
  235. return pool.apply_async(jail, args=jail_args,
  236. callbacks=[self.on_success], errbacks=[self.on_failure],
  237. on_ack=self.on_ack,
  238. meta={"task_id": self.task_id, "task_name": self.task_name})