__init__.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. """
  2. The Multiprocessing Worker Server
  3. Documentation for this module is in ``docs/reference/celery.worker.rst``.
  4. """
  5. from carrot.connection import DjangoAMQPConnection
  6. from celery.worker.controllers import Mediator, PeriodicWorkController
  7. from celery.worker.job import TaskWrapper
  8. from celery.registry import NotRegistered
  9. from celery.messaging import get_consumer_set
  10. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  11. from celery.conf import AMQP_CONNECTION_RETRY, AMQP_CONNECTION_MAX_RETRIES
  12. from celery.log import setup_logger
  13. from celery.pool import TaskPool
  14. from celery.utils import retry_over_time
  15. from Queue import Queue
  16. import traceback
  17. import logging
  18. import socket
  19. class AMQPListener(object):
  20. """Listen for messages received from the AMQP broker and
  21. move them the the bucket queue for task processing.
  22. :param bucket_queue: See :attr:`bucket_queue`.
  23. :param hold_queue: See :attr:`hold_queue`.
  24. .. attribute:: bucket_queue
  25. The queue that holds tasks ready for processing immediately.
  26. .. attribute:: hold_queue
  27. The queue that holds paused tasks. Reasons for being paused include
  28. a countdown/eta or that it's waiting for retry.
  29. .. attribute:: logger
  30. The logger used.
  31. """
  32. def __init__(self, bucket_queue, hold_queue, logger):
  33. self.amqp_connection = None
  34. self.task_consumer = None
  35. self.bucket_queue = bucket_queue
  36. self.hold_queue = hold_queue
  37. self.logger = logger
  38. def start(self):
  39. """Start the consumer.
  40. If the connection is lost, it tries to re-establish the connection
  41. over time and restart consuming messages.
  42. """
  43. while True:
  44. self.reset_connection()
  45. try:
  46. self.consume_messages()
  47. except (socket.error,
  48. self.amqp_connection.ConnectionException):
  49. self.logger.error("AMQPListener: Connection to broker lost. "
  50. + "Trying to re-establish connection...")
  51. def consume_messages(self):
  52. """Consume messages forever (or until an exception is raised)."""
  53. task_consumer = self.task_consumer
  54. self.logger.debug("AMQPListener: Starting message consumer...")
  55. it = task_consumer.iterconsume(limit=None)
  56. self.logger.debug("AMQPListener: Ready to accept tasks!")
  57. while True:
  58. it.next()
  59. def stop(self):
  60. """Stop processing AMQP messages and close the connection
  61. to the broker."""
  62. self.close_connection()
  63. def receive_message(self, message_data, message):
  64. """The callback called when a new message is received.
  65. If the message has an ``eta`` we move it to the hold queue,
  66. otherwise we move it the bucket queue for immediate processing.
  67. """
  68. try:
  69. task = TaskWrapper.from_message(message, message_data,
  70. logger=self.logger)
  71. except NotRegistered, exc:
  72. self.logger.error("Unknown task ignored: %s" % (exc))
  73. return
  74. eta = message_data.get("eta")
  75. if eta:
  76. self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
  77. task.task_name, task.task_id, eta))
  78. self.hold_queue.put((task, eta))
  79. else:
  80. self.logger.info("Got task from broker: %s[%s]" % (
  81. task.task_name, task.task_id))
  82. self.bucket_queue.put(task)
  83. def close_connection(self):
  84. """Close the AMQP connection."""
  85. if self.task_consumer:
  86. self.task_consumer.close()
  87. self.task_consumer = None
  88. if self.amqp_connection:
  89. self.logger.debug(
  90. "AMQPListener: Closing connection to the broker...")
  91. self.amqp_connection.close()
  92. self.amqp_connection = None
  93. def reset_connection(self):
  94. """Reset the AMQP connection, and reinitialize the
  95. :class:`carrot.messaging.ConsumerSet` instance.
  96. Resets the task consumer in :attr:`task_consumer`.
  97. """
  98. self.logger.debug(
  99. "AMQPListener: Re-establishing connection to the broker...")
  100. self.close_connection()
  101. self.amqp_connection = self._open_connection()
  102. self.task_consumer = get_consumer_set(connection=self.amqp_connection)
  103. self.task_consumer.register_callback(self.receive_message)
  104. def _open_connection(self):
  105. """Retries connecting to the AMQP broker over time.
  106. See :func:`celery.utils.retry_over_time`.
  107. """
  108. def _connection_error_handler(exc, interval):
  109. """Callback handler for connection errors."""
  110. self.logger.error("AMQP Listener: Connection Error: %s. " % exc
  111. + "Trying again in %d seconds..." % interval)
  112. def _establish_connection():
  113. """Establish a connection to the AMQP broker."""
  114. conn = DjangoAMQPConnection()
  115. connected = conn.connection # Connection is established lazily.
  116. return conn
  117. if not AMQP_CONNECTION_RETRY:
  118. return _establish_connection()
  119. conn = retry_over_time(_establish_connection, socket.error,
  120. errback=_connection_error_handler,
  121. max_retries=AMQP_CONNECTION_MAX_RETRIES)
  122. self.logger.debug("AMQPListener: Connection Established.")
  123. return conn
  124. class WorkController(object):
  125. """Executes tasks waiting in the task queue.
  126. :param concurrency: see :attr:`concurrency`.
  127. :param logfile: see :attr:`logfile`.
  128. :param loglevel: see :attr:`loglevel`.
  129. .. attribute:: concurrency
  130. The number of simultaneous processes doing work (default:
  131. :const:`celery.conf.DAEMON_CONCURRENCY`)
  132. .. attribute:: loglevel
  133. The loglevel used (default: :const:`logging.INFO`)
  134. .. attribute:: logfile
  135. The logfile used, if no logfile is specified it uses ``stderr``
  136. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  137. .. attribute:: logger
  138. The :class:`logging.Logger` instance used for logging.
  139. .. attribute:: is_detached
  140. Flag describing if the worker is running as a daemon or not.
  141. .. attribute:: pool
  142. The :class:`multiprocessing.Pool` instance used.
  143. .. attribute:: bucket_queue
  144. The :class:`Queue.Queue` that holds tasks ready for immediate
  145. processing.
  146. .. attribute:: hold_queue
  147. The :class:`Queue.Queue` that holds paused tasks. Reasons for holding
  148. back the task include waiting for ``eta`` to pass or the task is being
  149. retried.
  150. .. attribute:: periodic_work_controller
  151. Instance of :class:`celery.worker.controllers.PeriodicWorkController`.
  152. .. attribute:: mediator
  153. Instance of :class:`celery.worker.controllers.Mediator`.
  154. .. attribute:: amqp_listener
  155. Instance of :class:`AMQPListener`.
  156. """
  157. loglevel = logging.ERROR
  158. concurrency = DAEMON_CONCURRENCY
  159. logfile = DAEMON_LOG_FILE
  160. _state = None
  161. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  162. is_detached=False):
  163. # Options
  164. self.loglevel = loglevel or self.loglevel
  165. self.concurrency = concurrency or self.concurrency
  166. self.logfile = logfile or self.logfile
  167. self.is_detached = is_detached
  168. self.logger = setup_logger(loglevel, logfile)
  169. # Queues
  170. self.bucket_queue = Queue()
  171. self.hold_queue = Queue()
  172. self.logger.debug("Instantiating thread components...")
  173. # Threads+Pool
  174. self.periodic_work_controller = PeriodicWorkController(
  175. self.bucket_queue,
  176. self.hold_queue)
  177. self.pool = TaskPool(self.concurrency, logger=self.logger)
  178. self.amqp_listener = AMQPListener(self.bucket_queue, self.hold_queue,
  179. logger=self.logger)
  180. self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
  181. # The order is important here;
  182. # the first in the list is the first to start,
  183. # and they must be stopped in reverse order.
  184. self.components = [self.pool,
  185. self.mediator,
  186. self.periodic_work_controller,
  187. self.amqp_listener]
  188. def start(self):
  189. """Starts the workers main loop."""
  190. self._state = "RUN"
  191. try:
  192. for component in self.components:
  193. self.logger.debug("Starting thread %s..." % \
  194. component.__class__.__name__)
  195. component.start()
  196. finally:
  197. self.stop()
  198. def safe_process_task(self, task):
  199. """Same as :meth:`process_task`, but catches all exceptions
  200. the task raises and log them as errors, to make sure the
  201. worker doesn't die."""
  202. try:
  203. try:
  204. self.process_task(task)
  205. except Exception, exc:
  206. self.logger.critical("Internal error %s: %s\n%s" % (
  207. exc.__class__, exc, traceback.format_exc()))
  208. except (SystemExit, KeyboardInterrupt):
  209. self.stop()
  210. def process_task(self, task):
  211. """Process task by sending it to the pool of workers."""
  212. task.execute_using_pool(self.pool, self.loglevel, self.logfile)
  213. def stop(self):
  214. """Gracefully shutdown the worker server."""
  215. # shut down the periodic work controller thread
  216. if self._state != "RUN":
  217. return
  218. [component.stop() for component in reversed(self.components)]