job.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. """
  2. Jobs Executable by the Worker Server.
  3. """
  4. from celery.registry import tasks
  5. from celery.exceptions import NotRegistered
  6. from celery.execute import ExecuteWrapper
  7. from celery.utils import noop, fun_takes_kwargs
  8. from celery.log import get_default_logger
  9. from django.core.mail import mail_admins
  10. import multiprocessing
  11. import socket
  12. import sys
  13. # pep8.py borks on a inline signature separator and
  14. # says "trailing whitespace" ;)
  15. EMAIL_SIGNATURE_SEP = "-- "
  16. TASK_FAIL_EMAIL_BODY = """
  17. Task %%(name)s with id %%(id)s raised exception: %%(exc)s
  18. Task was called with args: %%(args)s kwargs: %%(kwargs)s.
  19. The contents of the full traceback was:
  20. %%(traceback)s
  21. %(EMAIL_SIGNATURE_SEP)s
  22. Just to let you know,
  23. celeryd at %%(hostname)s.
  24. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  25. class AlreadyExecutedError(Exception):
  26. """Tasks can only be executed once, as they might change
  27. world-wide state."""
  28. class TaskWrapper(object):
  29. """Class wrapping a task to be passed around and finally
  30. executed inside of the worker.
  31. :param task_name: see :attr:`task_name`.
  32. :param task_id: see :attr:`task_id`.
  33. :param task_func: see :attr:`task_func`
  34. :param args: see :attr:`args`
  35. :param kwargs: see :attr:`kwargs`.
  36. .. attribute:: task_name
  37. Kind of task. Must be a name registered in the task registry.
  38. .. attribute:: task_id
  39. UUID of the task.
  40. .. attribute:: task_func
  41. The tasks callable object.
  42. .. attribute:: args
  43. List of positional arguments to apply to the task.
  44. .. attribute:: kwargs
  45. Mapping of keyword arguments to apply to the task.
  46. .. attribute:: message
  47. The original message sent. Used for acknowledging the message.
  48. .. attribute executed
  49. Set if the task has been executed. A task should only be executed
  50. once.
  51. """
  52. success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  53. fail_msg = """
  54. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  55. """
  56. fail_email_subject = """
  57. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  58. """
  59. fail_email_body = TASK_FAIL_EMAIL_BODY
  60. def __init__(self, task_name, task_id, task_func, args, kwargs,
  61. on_ack=noop, retries=0, **opts):
  62. self.task_name = task_name
  63. self.task_id = task_id
  64. self.task_func = task_func
  65. self.retries = retries
  66. self.args = args
  67. self.kwargs = kwargs
  68. self.logger = kwargs.get("logger")
  69. self.on_ack = on_ack
  70. self.executed = False
  71. for opt in ("success_msg", "fail_msg", "fail_email_subject",
  72. "fail_email_body"):
  73. setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
  74. if not self.logger:
  75. self.logger = get_default_logger()
  76. def __repr__(self):
  77. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  78. self.__class__.__name__,
  79. self.task_name, self.task_id,
  80. self.args, self.kwargs)
  81. @classmethod
  82. def from_message(cls, message, message_data, logger=None):
  83. """Create a :class:`TaskWrapper` from a task message sent by
  84. :class:`celery.messaging.TaskPublisher`.
  85. :raises UnknownTaskError: if the message does not describe a task,
  86. the message is also rejected.
  87. :returns: :class:`TaskWrapper` instance.
  88. """
  89. task_name = message_data["task"]
  90. task_id = message_data["id"]
  91. args = message_data["args"]
  92. kwargs = message_data["kwargs"]
  93. retries = message_data.get("retries", 0)
  94. # Convert any unicode keys in the keyword arguments to ascii.
  95. kwargs = dict((key.encode("utf-8"), value)
  96. for key, value in kwargs.items())
  97. if task_name not in tasks:
  98. raise NotRegistered(task_name)
  99. task_func = tasks[task_name]
  100. return cls(task_name, task_id, task_func, args, kwargs,
  101. retries=retries, on_ack=message.ack, logger=logger)
  102. def extend_with_default_kwargs(self, loglevel, logfile):
  103. """Extend the tasks keyword arguments with standard task arguments.
  104. Currently these are ``logfile``, ``loglevel``, ``task_id``,
  105. ``task_name`` and ``task_retries``.
  106. See :meth:`celery.task.base.Task.run` for more information.
  107. """
  108. kwargs = dict(self.kwargs)
  109. default_kwargs = {"logfile": logfile,
  110. "loglevel": loglevel,
  111. "task_id": self.task_id,
  112. "task_name": self.task_name,
  113. "task_retries": self.retries}
  114. fun = getattr(self.task_func, "run", self.task_func)
  115. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  116. extend_with = dict((key, val) for key, val in default_kwargs.items()
  117. if key in supported_keys)
  118. kwargs.update(extend_with)
  119. return kwargs
  120. def _executeable(self, loglevel=None, logfile=None):
  121. """Get the :class:`celery.execute.ExecuteWrapper` for this task."""
  122. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  123. return ExecuteWrapper(self.task_func, self.task_id, self.task_name,
  124. self.args, task_func_kwargs)
  125. def _set_executed_bit(self):
  126. if self.executed:
  127. raise AlreadyExecutedError(
  128. "Task %s[%s] has already been executed" % (
  129. self.task_name, self.task_id))
  130. self.executed = True
  131. def execute(self, loglevel=None, logfile=None):
  132. """Execute the task in a :class:`celery.execute.ExecuteWrapper`.
  133. :keyword loglevel: The loglevel used by the task.
  134. :keyword logfile: The logfile used by the task.
  135. """
  136. # Make sure task has not already been executed.
  137. self._set_executed_bit()
  138. # acknowledge task as being processed.
  139. self.on_ack()
  140. return self._executeable(loglevel, logfile)()
  141. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  142. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  143. :param pool: A :class:`multiprocessing.Pool` instance.
  144. :keyword loglevel: The loglevel used by the task.
  145. :keyword logfile: The logfile used by the task.
  146. :returns :class:`multiprocessing.AsyncResult` instance.
  147. """
  148. # Make sure task has not already been executed.
  149. self._set_executed_bit()
  150. wrapper = self._executeable(loglevel, logfile)
  151. return pool.apply_async(wrapper,
  152. callbacks=[self.on_success], errbacks=[self.on_failure],
  153. on_ack=self.on_ack)
  154. def on_success(self, ret_value):
  155. """The handler used if the task was successfully processed (
  156. without raising an exception)."""
  157. msg = self.success_msg.strip() % {
  158. "id": self.task_id,
  159. "name": self.task_name,
  160. "return_value": ret_value}
  161. self.logger.info(msg)
  162. def on_failure(self, exc_info):
  163. """The handler used if the task raised an exception."""
  164. from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
  165. context = {
  166. "hostname": socket.gethostname(),
  167. "id": self.task_id,
  168. "name": self.task_name,
  169. "exc": exc_info.exception,
  170. "traceback": exc_info.traceback,
  171. "args": self.args,
  172. "kwargs": self.kwargs,
  173. }
  174. self.logger.error(self.fail_msg.strip() % context)
  175. task_obj = tasks.get(self.task_name, object)
  176. send_error_email = SEND_CELERY_TASK_ERROR_EMAILS and not \
  177. getattr(task_obj, "disable_error_emails", False)
  178. if send_error_email:
  179. subject = self.fail_email_subject.strip() % context
  180. body = self.fail_email_body.strip() % context
  181. mail_admins(subject, body, fail_silently=True)