worker.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. """celery.worker"""
  2. from carrot.connection import DjangoAMQPConnection
  3. from celery.messaging import TaskConsumer
  4. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  5. from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  6. from celery.log import setup_logger
  7. from celery.registry import tasks
  8. from celery.datastructures import TaskProcessQueue
  9. from celery.models import PeriodicTaskMeta
  10. from celery.backends import default_backend
  11. from celery.timer import EventTimer
  12. import multiprocessing
  13. import simplejson
  14. import traceback
  15. import logging
  16. import time
  17. class EmptyQueue(Exception):
  18. """The message queue is currently empty."""
  19. class UnknownTask(Exception):
  20. """Got an unknown task in the queue. The message is requeued and
  21. ignored."""
  22. def jail(task_id, func, args, kwargs):
  23. """Wraps the task in a jail, which catches all exceptions, and
  24. saves the status and result of the task execution to the task
  25. meta backend.
  26. If the call was successful, it saves the result to the task result
  27. backend, and sets the task status to ``"DONE"``.
  28. If the call results in an exception, it saves the exception as the task
  29. result, and sets the task status to ``"FAILURE"``.
  30. :param task_id: The id of the task.
  31. :param func: Callable object to execute.
  32. :param args: List of positional args to pass on to the function.
  33. :param kwargs: Keyword arguments mapping to pass on to the function.
  34. :returns: the function return value on success.
  35. :returns: the exception instance on failure.
  36. """
  37. try:
  38. result = func(*args, **kwargs)
  39. except Exception, exc:
  40. default_backend.mark_as_failure(task_id, exc)
  41. return exc
  42. else:
  43. default_backend.mark_as_done(task_id, result)
  44. return result
  45. class TaskWrapper(object):
  46. """Class wrapping a task to be run.
  47. :param task_name: see :attr:`task_name`.
  48. :param task_id: see :attr:`task_id`.
  49. :param task_func: see :attr:`task_func`
  50. :param args: see :attr:`args`
  51. :param kwargs: see :attr:`kwargs`.
  52. .. attribute:: task_name
  53. Kind of task. Must be a name registered in the task registry.
  54. .. attribute:: task_id
  55. UUID of the task.
  56. .. attribute:: task_func
  57. The tasks callable object.
  58. .. attribute:: args
  59. List of positional arguments to apply to the task.
  60. .. attribute:: kwargs
  61. Mapping of keyword arguments to apply to the task.
  62. """
  63. def __init__(self, task_name, task_id, task_func, args, kwargs):
  64. self.task_name = task_name
  65. self.task_id = task_id
  66. self.task_func = task_func
  67. self.args = args
  68. self.kwargs = kwargs
  69. @classmethod
  70. def from_message(cls, message):
  71. """Create a :class:`TaskWrapper` from a task message sent by
  72. :class:`celery.messaging.TaskPublisher`.
  73. :raises UnknownTask: if the message does not describe a task,
  74. the message is also rejected.
  75. :returns: :class:`TaskWrapper` instance.
  76. """
  77. message_data = simplejson.loads(message.body)
  78. task_name = message_data["task"]
  79. task_id = message_data["id"]
  80. args = message_data["args"]
  81. kwargs = message_data["kwargs"]
  82. if task_name not in tasks:
  83. message.reject()
  84. raise UnknownTask(task_name)
  85. task_func = tasks[task_name]
  86. return cls(task_name, task_id, task_func, args, kwargs)
  87. def extend_with_default_kwargs(self, loglevel, logfile):
  88. """Extend the tasks keyword arguments with standard task arguments.
  89. These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
  90. """
  91. task_func_kwargs = {"logfile": logfile,
  92. "loglevel": loglevel,
  93. "task_id": self.task_id,
  94. "task_name": self.task_name}
  95. task_func_kwargs.update(self.kwargs)
  96. return task_func_kwargs
  97. def execute(self, loglevel=None, logfile=None):
  98. """Execute the task in a :func:`jail` and store return value
  99. and status in the task meta backend.
  100. :keyword loglevel: The loglevel used by the task.
  101. :keyword logfile: The logfile used by the task.
  102. """
  103. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  104. return jail(self.task_id, [
  105. self.task_func, self.args, task_func_kwargs])
  106. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  107. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  108. :param pool: A :class:`multiprocessing.Pool` instance.
  109. :keyword loglevel: The loglevel used by the task.
  110. :keyword logfile: The logfile used by the task.
  111. :returns :class:`multiprocessing.AsyncResult` instance.
  112. """
  113. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  114. jail_args = [self.task_id, self.task_func,
  115. self.args, task_func_kwargs]
  116. return pool.apply_async(jail, jail_args, {},
  117. self.task_name, self.task_id)
  118. class TaskDaemon(object):
  119. """Executes tasks waiting in the task queue.
  120. :param concurrency: see :attr:`concurrency`.
  121. :param logfile: see :attr:`logfile`.
  122. :param loglevel: see :attr:`loglevel`.
  123. :param queue_wakeup_after: see :attr:`queue_wakeup_after`.
  124. .. attribute:: concurrency
  125. The number of simultaneous processes doing work (default:
  126. :const:`celery.conf.DAEMON_CONCURRENCY`)
  127. .. attribute:: loglevel
  128. The loglevel used (default: :const:`logging.INFO`)
  129. .. attribute:: logfile
  130. The logfile used, if no logfile is specified it uses ``stderr``
  131. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  132. .. attribute:: queue_wakeup_after
  133. The time it takes for the daemon to wake up after the queue is empty,
  134. so it can check for more work
  135. (default: :const:`celery.conf.QUEUE_WAKEUP_AFTER`).
  136. .. attribute:: empty_msg_emit_every
  137. How often the daemon emits the ``"Waiting for queue..."`` message.
  138. If this is ``None``, the message will never be logged.
  139. (default: :const:`celery.conf.EMPTY_MSG_EMIT_EVERY`)
  140. .. attribute:: logger
  141. The :class:`logging.Logger` instance used for logging.
  142. .. attribute:: pool
  143. The :class:`multiprocessing.Pool` instance used.
  144. .. attribute:: task_consumer
  145. The :class:`celery.messaging.TaskConsumer` instance used.
  146. """
  147. loglevel = logging.ERROR
  148. concurrency = DAEMON_CONCURRENCY
  149. logfile = DAEMON_LOG_FILE
  150. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  151. empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
  152. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  153. queue_wakeup_after=None):
  154. self.loglevel = loglevel or self.loglevel
  155. self.concurrency = concurrency or self.concurrency
  156. self.logfile = logfile or self.logfile
  157. self.queue_wakeup_after = queue_wakeup_after or \
  158. self.queue_wakeup_after
  159. self.logger = setup_logger(loglevel, logfile)
  160. self.pool = TaskProcessQueue(self.concurrency, logger=self.logger,
  161. done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
  162. self.task_consumer = None
  163. self.reset_connection()
  164. def reset_connection(self):
  165. """Reset the AMQP connection, and reinitialize the
  166. :class:`celery.messaging.TaskConsumer` instance.
  167. Resets the task consumer in :attr:`task_consumer`.
  168. """
  169. if self.task_consumer:
  170. self.task_consumer.connection.close()
  171. amqp_connection = DjangoAMQPConnection()
  172. self.task_consumer = TaskConsumer(connection=amqp_connection)
  173. def connection_diagnostics(self):
  174. """Diagnose the AMQP connection, and reset connection if
  175. necessary."""
  176. if hasattr(self.task_consumer, "backend"):
  177. connection = self.task_consumer.backend.channel.connection
  178. else:
  179. connection = self.task_consumer.channel.connection
  180. if not connection:
  181. self.logger.info(
  182. "AMQP Connection has died, restoring connection.")
  183. self.reset_connection()
  184. def receive_message(self):
  185. """Receive the next message from the message broker.
  186. Tries to reset the AMQP connection if not available.
  187. Returns ``None`` if no message is waiting on the queue.
  188. :rtype: :class:`carrot.messaging.Message` instance.
  189. """
  190. #self.connection_diagnostics()
  191. message = self.task_consumer.fetch()
  192. if message is not None:
  193. message.ack()
  194. return message
  195. def fetch_next_task(self):
  196. """Fetch the next task from the AMQP broker.
  197. Raises :exc:`EmptyQueue` exception if there is no message
  198. waiting on the queue.
  199. :returns: :class:`TaskWrapper` instance.
  200. """
  201. message = self.receive_message()
  202. if message is None: # No messages waiting.
  203. raise EmptyQueue()
  204. task = TaskWrapper.from_message(message)
  205. self.logger.info("Got task from broker: %s[%s]" % (
  206. task.task_name, task.task_id))
  207. return task, message
  208. def execute_next_task(self):
  209. """Execute the next task on the queue using the multiprocessing pool.
  210. Catches all exceptions and logs them with level
  211. :const:`logging.CRITICAL`.
  212. """
  213. task, message = self.fetch_next_task()
  214. try:
  215. result = task.execute_using_pool(self.pool, self.loglevel,
  216. self.logfile)
  217. except Exception, error:
  218. self.logger.critical("Worker got exception %s: %s\n%s" % (
  219. error.__class__, error, traceback.format_exc()))
  220. return
  221. return result, task.task_name, task.task_id
  222. def run_periodic_tasks(self):
  223. """Schedule all waiting periodic tasks for execution.
  224. :rtype: list of :class:`celery.models.PeriodicTaskMeta` objects.
  225. """
  226. waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
  227. [waiting_task.delay()
  228. for waiting_task in waiting_tasks]
  229. return waiting_tasks
  230. def schedule_retry_tasks(self):
  231. """Reschedule all requeued tasks waiting for retry."""
  232. pass
  233. def run(self):
  234. """Starts the workers main loop."""
  235. log_wait = lambda: self.logger.info("Waiting for queue...")
  236. ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
  237. events = [
  238. EventTimer(self.run_periodic_tasks, 1),
  239. EventTimer(self.schedule_retry_tasks, 2),
  240. ]
  241. while True:
  242. [event.tick() for event in events]
  243. try:
  244. result, task_name, task_id = self.execute_next_task()
  245. except ValueError:
  246. # execute_next_task didn't return a r/name/id tuple,
  247. # probably because it got an exception.
  248. continue
  249. except EmptyQueue:
  250. ev_msg_waiting.tick()
  251. time.sleep(self.queue_wakeup_after)
  252. continue
  253. except UnknownTask, e:
  254. self.logger.info("Unknown task ignored: %s" % (e))
  255. continue
  256. except Exception, e:
  257. self.logger.critical("Message queue raised %s: %s\n%s" % (
  258. e.__class__, e, traceback.format_exc()))
  259. continue