job.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. """
  2. Jobs Executable by the Worker Server.
  3. """
  4. from celery.registry import tasks, NotRegistered
  5. from celery.execute import ExecuteWrapper
  6. from celery.utils import noop
  7. from django.core.mail import mail_admins
  8. import multiprocessing
  9. import socket
  10. import sys
  11. # pep8.py borks on a inline signature separator and
  12. # says "trailing whitespace" ;)
  13. EMAIL_SIGNATURE_SEP = "-- "
  14. TASK_FAIL_EMAIL_BODY = """
  15. Task %%(name)s with id %%(id)s raised exception: %%(exc)s
  16. Task was called with args:%%(args)s kwargs:%%(kwargs)s.
  17. The contents of the full traceback was:
  18. %%(traceback)s
  19. %(EMAIL_SIGNATURE_SEP)s
  20. Just thought I'd let you know!
  21. celeryd at %%(hostname)s.
  22. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  23. class TaskWrapper(object):
  24. """Class wrapping a task to be run.
  25. :param task_name: see :attr:`task_name`.
  26. :param task_id: see :attr:`task_id`.
  27. :param task_func: see :attr:`task_func`
  28. :param args: see :attr:`args`
  29. :param kwargs: see :attr:`kwargs`.
  30. .. attribute:: task_name
  31. Kind of task. Must be a name registered in the task registry.
  32. .. attribute:: task_id
  33. UUID of the task.
  34. .. attribute:: task_func
  35. The tasks callable object.
  36. .. attribute:: args
  37. List of positional arguments to apply to the task.
  38. .. attribute:: kwargs
  39. Mapping of keyword arguments to apply to the task.
  40. .. attribute:: message
  41. The original message sent. Used for acknowledging the message.
  42. """
  43. success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  44. fail_msg = """
  45. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  46. """
  47. fail_email_subject = """
  48. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  49. """
  50. fail_email_body = TASK_FAIL_EMAIL_BODY
  51. def __init__(self, task_name, task_id, task_func, args, kwargs,
  52. on_ack=noop, retries=0, **opts):
  53. self.task_name = task_name
  54. self.task_id = task_id
  55. self.task_func = task_func
  56. self.retries = retries
  57. self.args = args
  58. self.kwargs = kwargs
  59. self.logger = kwargs.get("logger")
  60. self.on_ack = on_ack
  61. for opt in ("success_msg", "fail_msg", "fail_email_subject",
  62. "fail_email_body"):
  63. setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
  64. if not self.logger:
  65. self.logger = multiprocessing.get_logger()
  66. def __repr__(self):
  67. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  68. self.__class__.__name__,
  69. self.task_name, self.task_id,
  70. self.args, self.kwargs)
  71. @classmethod
  72. def from_message(cls, message, message_data, logger=None):
  73. """Create a :class:`TaskWrapper` from a task message sent by
  74. :class:`celery.messaging.TaskPublisher`.
  75. :raises UnknownTaskError: if the message does not describe a task,
  76. the message is also rejected.
  77. :returns: :class:`TaskWrapper` instance.
  78. """
  79. task_name = message_data["task"]
  80. task_id = message_data["id"]
  81. args = message_data["args"]
  82. kwargs = message_data["kwargs"]
  83. retries = message_data.get("retries", 0)
  84. # Convert any unicode keys in the keyword arguments to ascii.
  85. kwargs = dict((key.encode("utf-8"), value)
  86. for key, value in kwargs.items())
  87. if task_name not in tasks:
  88. raise NotRegistered(task_name)
  89. task_func = tasks[task_name]
  90. return cls(task_name, task_id, task_func, args, kwargs,
  91. retries=retries, on_ack=message.ack, logger=logger)
  92. def extend_with_default_kwargs(self, loglevel, logfile):
  93. """Extend the tasks keyword arguments with standard task arguments.
  94. These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
  95. """
  96. kwargs = dict(self.kwargs)
  97. task_func_kwargs = {"logfile": logfile,
  98. "loglevel": loglevel,
  99. "task_id": self.task_id,
  100. "task_name": self.task_name,
  101. "task_retries": self.retries}
  102. kwargs.update(task_func_kwargs)
  103. return kwargs
  104. def _executeable(self, loglevel=None, logfile=None):
  105. """Get the :class:`celery.execute.ExecuteWrapper` for this task."""
  106. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  107. return ExecuteWrapper(self.task_func, self.task_id, self.task_name,
  108. self.args, task_func_kwargs)
  109. def execute(self, loglevel=None, logfile=None):
  110. """Execute the task in a :class:`celery.execute.ExecuteWrapper`.
  111. :keyword loglevel: The loglevel used by the task.
  112. :keyword logfile: The logfile used by the task.
  113. """
  114. # acknowledge task as being processed.
  115. self.on_ack()
  116. return self._executeable(loglevel, logfile)()
  117. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  118. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  119. :param pool: A :class:`multiprocessing.Pool` instance.
  120. :keyword loglevel: The loglevel used by the task.
  121. :keyword logfile: The logfile used by the task.
  122. :returns :class:`multiprocessing.AsyncResult` instance.
  123. """
  124. wrapper = self._executeable(loglevel, logfile)
  125. return pool.apply_async(wrapper,
  126. callbacks=[self.on_success], errbacks=[self.on_failure],
  127. on_ack=self.on_ack,
  128. meta={"task_id": self.task_id, "task_name": self.task_name})
  129. def on_success(self, ret_value, meta):
  130. """The handler used if the task was successfully processed (
  131. without raising an exception)."""
  132. task_id = meta.get("task_id")
  133. task_name = meta.get("task_name")
  134. msg = self.success_msg.strip() % {
  135. "id": task_id,
  136. "name": task_name,
  137. "return_value": ret_value}
  138. self.logger.info(msg)
  139. def on_failure(self, exc_info, meta):
  140. """The handler used if the task raised an exception."""
  141. from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
  142. task_id = meta.get("task_id")
  143. task_name = meta.get("task_name")
  144. context = {
  145. "hostname": socket.gethostname(),
  146. "id": task_id,
  147. "name": task_name,
  148. "exc": exc_info.exception,
  149. "traceback": exc_info.traceback,
  150. "args": self.args,
  151. "kwargs": self.kwargs,
  152. }
  153. self.logger.error(self.fail_msg.strip() % context)
  154. task_obj = tasks.get(task_name, object)
  155. send_error_email = SEND_CELERY_TASK_ERROR_EMAILS and not \
  156. getattr(task_obj, "disable_error_emails", False)
  157. if send_error_email:
  158. subject = self.fail_email_subject.strip() % context
  159. body = self.fail_email_body.strip() % context
  160. mail_admins(subject, body, fail_silently=True)