worker.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. """celery.worker"""
  2. from carrot.connection import DjangoAMQPConnection
  3. from celery.messaging import TaskConsumer
  4. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  5. from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  6. from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
  7. from celery.log import setup_logger
  8. from celery.registry import tasks
  9. from celery.pool import TaskPool
  10. from celery.datastructures import ExceptionInfo
  11. from celery.backends import default_backend, default_periodic_status_backend
  12. from celery.timer import EventTimer
  13. from django.core.mail import mail_admins
  14. import multiprocessing
  15. import traceback
  16. import logging
  17. import socket
  18. import time
  19. import sys
  20. # pep8.py borks on a inline signature separator and
  21. # says "trailing whitespace" ;)
  22. EMAIL_SIGNATURE_SEP = "-- "
  23. TASK_FAIL_EMAIL_BODY = """
  24. Task %%(name)s with id %%(id)s raised exception: %%(exc)s
  25. The contents of the full traceback was:
  26. %%(traceback)s
  27. %%(EMAIL_SIGNATURE_SEP)s
  28. Just thought I'd let you know!
  29. celeryd at %%(hostname)s.
  30. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  31. class EmptyQueue(Exception):
  32. """The message queue is currently empty."""
  33. class UnknownTask(Exception):
  34. """Got an unknown task in the queue. The message is requeued and
  35. ignored."""
  36. def jail(task_id, func, args, kwargs):
  37. """Wraps the task in a jail, which catches all exceptions, and
  38. saves the status and result of the task execution to the task
  39. meta backend.
  40. If the call was successful, it saves the result to the task result
  41. backend, and sets the task status to ``"DONE"``.
  42. If the call results in an exception, it saves the exception as the task
  43. result, and sets the task status to ``"FAILURE"``.
  44. :param task_id: The id of the task.
  45. :param func: Callable object to execute.
  46. :param args: List of positional args to pass on to the function.
  47. :param kwargs: Keyword arguments mapping to pass on to the function.
  48. :returns: the function return value on success, or
  49. the exception instance on failure.
  50. """
  51. # See: http://groups.google.com/group/django-users/browse_thread/
  52. # thread/78200863d0c07c6d/38402e76cf3233e8?hl=en&lnk=gst&
  53. # q=multiprocessing#38402e76cf3233e8
  54. from django.db import connection
  55. connection.close()
  56. # Convert any unicode keys in the keyword arguments to ascii.
  57. kwargs = dict([(k.encode("utf-8"), v)
  58. for k, v in kwargs.items()])
  59. try:
  60. result = func(*args, **kwargs)
  61. except Exception, exc:
  62. default_backend.mark_as_failure(task_id, exc)
  63. return ExceptionInfo(sys.exc_info())
  64. else:
  65. default_backend.mark_as_done(task_id, result)
  66. return result
  67. class TaskWrapper(object):
  68. """Class wrapping a task to be run.
  69. :param task_name: see :attr:`task_name`.
  70. :param task_id: see :attr:`task_id`.
  71. :param task_func: see :attr:`task_func`
  72. :param args: see :attr:`args`
  73. :param kwargs: see :attr:`kwargs`.
  74. .. attribute:: task_name
  75. Kind of task. Must be a name registered in the task registry.
  76. .. attribute:: task_id
  77. UUID of the task.
  78. .. attribute:: task_func
  79. The tasks callable object.
  80. .. attribute:: args
  81. List of positional arguments to apply to the task.
  82. .. attribute:: kwargs
  83. Mapping of keyword arguments to apply to the task.
  84. """
  85. success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  86. fail_msg = """
  87. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  88. """
  89. fail_email_subject = """
  90. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  91. """
  92. fail_email_body = TASK_FAIL_EMAIL_BODY
  93. def __init__(self, task_name, task_id, task_func, args, kwargs, **opts):
  94. self.task_name = task_name
  95. self.task_id = task_id
  96. self.task_func = task_func
  97. self.args = args
  98. self.kwargs = kwargs
  99. for opt in ("success_msg", "fail_msg", "fail_email_subject",
  100. "fail_email_body", "logger"):
  101. setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
  102. if not self.logger:
  103. self.logger = multiprocessing.get_logger()
  104. def __repr__(self):
  105. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  106. self.__class__.__name__,
  107. self.task_name, self.task_id,
  108. self.args, self.kwargs)
  109. @classmethod
  110. def from_message(cls, message, logger):
  111. """Create a :class:`TaskWrapper` from a task message sent by
  112. :class:`celery.messaging.TaskPublisher`.
  113. :raises UnknownTask: if the message does not describe a task,
  114. the message is also rejected.
  115. :returns: :class:`TaskWrapper` instance.
  116. """
  117. message_data = message.decode()
  118. task_name = message_data["task"]
  119. task_id = message_data["id"]
  120. args = message_data["args"]
  121. kwargs = message_data["kwargs"]
  122. if task_name not in tasks:
  123. raise UnknownTask(task_name)
  124. task_func = tasks[task_name]
  125. return cls(task_name, task_id, task_func, args, kwargs, logger=logger)
  126. def extend_with_default_kwargs(self, loglevel, logfile):
  127. """Extend the tasks keyword arguments with standard task arguments.
  128. These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
  129. """
  130. task_func_kwargs = {"logfile": logfile,
  131. "loglevel": loglevel,
  132. "task_id": self.task_id,
  133. "task_name": self.task_name}
  134. task_func_kwargs.update(self.kwargs)
  135. return task_func_kwargs
  136. def execute(self, loglevel=None, logfile=None):
  137. """Execute the task in a :func:`jail` and store return value
  138. and status in the task meta backend.
  139. :keyword loglevel: The loglevel used by the task.
  140. :keyword logfile: The logfile used by the task.
  141. """
  142. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  143. return jail(self.task_id, [
  144. self.task_func, self.args, task_func_kwargs])
  145. def on_success(self, ret_value, meta):
  146. """The handler used if the task was successfully processed (
  147. without raising an exception)."""
  148. task_id = meta.get("task_id")
  149. task_name = meta.get("task_name")
  150. msg = self.success_msg.strip() % {
  151. "id": task_id,
  152. "name": task_name,
  153. "return_value": ret_value}
  154. self.logger.info(msg)
  155. def on_failure(self, exc_info, meta):
  156. """The handler used if the task raised an exception."""
  157. task_id = meta.get("task_id")
  158. task_name = meta.get("task_name")
  159. context = {
  160. "hostname": socket.gethostname(),
  161. "id": task_id,
  162. "name": task_name,
  163. "exc": exc_info.exception,
  164. "traceback": exc_info.traceback,
  165. }
  166. self.logger.error(self.fail_msg.strip() % context)
  167. if SEND_CELERY_TASK_ERROR_EMAILS:
  168. subject = self.fail_email_subject.strip() % context
  169. body = self.fail_email_body.strip() % context
  170. mail_admins(subject, body, fail_silently=True)
  171. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  172. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  173. :param pool: A :class:`multiprocessing.Pool` instance.
  174. :keyword loglevel: The loglevel used by the task.
  175. :keyword logfile: The logfile used by the task.
  176. :returns :class:`multiprocessing.AsyncResult` instance.
  177. """
  178. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  179. jail_args = [self.task_id, self.task_func,
  180. self.args, task_func_kwargs]
  181. return pool.apply_async(jail, args=jail_args,
  182. callbacks=[self.on_success], errbacks=[self.on_failure],
  183. meta={"task_id": self.task_id, "task_name": self.task_name})
  184. class WorkController(object):
  185. """Executes tasks waiting in the task queue.
  186. :param concurrency: see :attr:`concurrency`.
  187. :param logfile: see :attr:`logfile`.
  188. :param loglevel: see :attr:`loglevel`.
  189. :param queue_wakeup_after: see :attr:`queue_wakeup_after`.
  190. .. attribute:: concurrency
  191. The number of simultaneous processes doing work (default:
  192. :const:`celery.conf.DAEMON_CONCURRENCY`)
  193. .. attribute:: loglevel
  194. The loglevel used (default: :const:`logging.INFO`)
  195. .. attribute:: logfile
  196. The logfile used, if no logfile is specified it uses ``stderr``
  197. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  198. .. attribute:: queue_wakeup_after
  199. The time it takes for the daemon to wake up after the queue is empty,
  200. so it can check for more work
  201. (default: :const:`celery.conf.QUEUE_WAKEUP_AFTER`).
  202. .. attribute:: empty_msg_emit_every
  203. How often the daemon emits the ``"Waiting for queue..."`` message.
  204. If this is ``None``, the message will never be logged.
  205. (default: :const:`celery.conf.EMPTY_MSG_EMIT_EVERY`)
  206. .. attribute:: logger
  207. The :class:`logging.Logger` instance used for logging.
  208. .. attribute:: pool
  209. The :class:`multiprocessing.Pool` instance used.
  210. .. attribute:: task_consumer
  211. The :class:`celery.messaging.TaskConsumer` instance used.
  212. """
  213. loglevel = logging.ERROR
  214. concurrency = DAEMON_CONCURRENCY
  215. logfile = DAEMON_LOG_FILE
  216. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  217. empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
  218. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  219. queue_wakeup_after=None, is_detached=False):
  220. self.loglevel = loglevel or self.loglevel
  221. self.concurrency = concurrency or self.concurrency
  222. self.logfile = logfile or self.logfile
  223. self.queue_wakeup_after = queue_wakeup_after or \
  224. self.queue_wakeup_after
  225. self.logger = setup_logger(loglevel, logfile)
  226. self.pool = TaskPool(self.concurrency, logger=self.logger)
  227. self.task_consumer = None
  228. self.is_detached = is_detached
  229. self.reset_connection()
  230. def reset_connection(self):
  231. """Reset the AMQP connection, and reinitialize the
  232. :class:`celery.messaging.TaskConsumer` instance.
  233. Resets the task consumer in :attr:`task_consumer`.
  234. """
  235. if self.task_consumer:
  236. self.task_consumer.connection.close()
  237. amqp_connection = DjangoAMQPConnection()
  238. self.task_consumer = TaskConsumer(connection=amqp_connection)
  239. self.task_consumer_it = self.task_consumer.iterqueue(infinite=True)
  240. def connection_diagnostics(self):
  241. """Diagnose the AMQP connection, and reset connection if
  242. necessary."""
  243. connection = self.task_consumer.backend.channel.connection
  244. if not connection:
  245. self.logger.info(
  246. "AMQP Connection has died, restoring connection.")
  247. self.reset_connection()
  248. def receive_message(self):
  249. """Receive the next message from the message broker.
  250. Tries to reset the AMQP connection if not available.
  251. Returns ``None`` if no message is waiting on the queue.
  252. :rtype: :class:`carrot.messaging.Message` instance.
  253. """
  254. message = self.task_consumer_it.next()
  255. if not message:
  256. raise EmptyQueue()
  257. return message
  258. def process_task(self, message):
  259. task = TaskWrapper.from_message(message, logger=self.logger)
  260. self.logger.info("Got task from broker: %s[%s]" % (
  261. task.task_name, task.task_id))
  262. self.logger.debug("Got a task: %s. Trying to execute it..." % task)
  263. result = task.execute_using_pool(self.pool, self.loglevel,
  264. self.logfile)
  265. self.logger.debug("Task %s has been executed asynchronously." % task)
  266. return result
  267. def execute_next_task(self):
  268. """Execute the next task on the queue using the multiprocessing pool.
  269. Catches all exceptions and logs them with level
  270. :const:`logging.CRITICAL`.
  271. Raises :exc:`EmptyQueue` exception if there is no message
  272. waiting on the queue.
  273. """
  274. self.process_task(self.receive_message())
  275. def run_periodic_tasks(self):
  276. """Schedule all waiting periodic tasks for execution.
  277. """
  278. self.logger.debug("Looking for periodic tasks ready for execution...")
  279. default_periodic_status_backend.run_periodic_tasks()
  280. def schedule_retry_tasks(self):
  281. """Reschedule all requeued tasks waiting for retry."""
  282. pass
  283. def run(self):
  284. """Starts the workers main loop."""
  285. log_wait = lambda: self.logger.info("Waiting for queue...")
  286. ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
  287. events = [
  288. EventTimer(self.run_periodic_tasks, 1),
  289. EventTimer(self.schedule_retry_tasks, 2),
  290. ]
  291. self.pool.run()
  292. # If not running as daemon, and DEBUG logging level is enabled,
  293. # print pool PIDs and sleep for a second before we start.
  294. if self.logger.isEnabledFor(logging.DEBUG):
  295. self.logger.debug("Pool child processes: [%s]" % (
  296. "|".join(map(str, self.pool.get_worker_pids()))))
  297. if not self.is_detached:
  298. time.sleep(1)
  299. while True:
  300. [event.tick() for event in events]
  301. try:
  302. self.execute_next_task()
  303. except ValueError:
  304. # execute_next_task didn't return a r/name/id tuple,
  305. # probably because it got an exception.
  306. continue
  307. except EmptyQueue:
  308. ev_msg_waiting.tick()
  309. time.sleep(self.queue_wakeup_after)
  310. continue
  311. except UnknownTask, exc:
  312. self.logger.info("Unknown task ignored: %s" % (exc))
  313. continue
  314. except Exception, exc:
  315. self.logger.critical("Message queue raised %s: %s\n%s" % (
  316. exc.__class__, exc, traceback.format_exc()))
  317. continue