__init__.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. """
  2. The Multiprocessing Worker Server
  3. Documentation for this module is in ``docs/reference/celery.worker.rst``.
  4. """
  5. from carrot.connection import DjangoBrokerConnection
  6. from celery.worker.controllers import Mediator, PeriodicWorkController
  7. from celery.worker.job import TaskWrapper
  8. from celery.registry import NotRegistered
  9. from celery.messaging import get_consumer_set
  10. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  11. from celery.conf import AMQP_CONNECTION_RETRY, AMQP_CONNECTION_MAX_RETRIES
  12. from celery.log import setup_logger
  13. from celery.pool import TaskPool
  14. from celery.utils import retry_over_time
  15. from celery.datastructures import SharedCounter
  16. from Queue import Queue
  17. import traceback
  18. import logging
  19. import socket
  20. class AMQPListener(object):
  21. """Listen for messages received from the AMQP broker and
  22. move them the the bucket queue for task processing.
  23. :param bucket_queue: See :attr:`bucket_queue`.
  24. :param hold_queue: See :attr:`hold_queue`.
  25. .. attribute:: bucket_queue
  26. The queue that holds tasks ready for processing immediately.
  27. .. attribute:: hold_queue
  28. The queue that holds paused tasks. Reasons for being paused include
  29. a countdown/eta or that it's waiting for retry.
  30. .. attribute:: logger
  31. The logger used.
  32. """
  33. def __init__(self, bucket_queue, hold_queue, logger,
  34. initial_prefetch_count=2):
  35. self.amqp_connection = None
  36. self.task_consumer = None
  37. self.bucket_queue = bucket_queue
  38. self.hold_queue = hold_queue
  39. self.logger = logger
  40. self.prefetch_count = SharedCounter(initial_prefetch_count)
  41. def start(self):
  42. """Start the consumer.
  43. If the connection is lost, it tries to re-establish the connection
  44. over time and restart consuming messages.
  45. """
  46. while True:
  47. self.reset_connection()
  48. try:
  49. self.consume_messages()
  50. except (socket.error,
  51. self.amqp_connection.ConnectionException):
  52. self.logger.error("AMQPListener: Connection to broker lost. "
  53. + "Trying to re-establish connection...")
  54. def consume_messages(self):
  55. """Consume messages forever (or until an exception is raised)."""
  56. task_consumer = self.task_consumer
  57. self.logger.debug("AMQPListener: Starting message consumer...")
  58. it = task_consumer.iterconsume(limit=None)
  59. self.logger.debug("AMQPListener: Ready to accept tasks!")
  60. while True:
  61. self.task_consumer.qos(prefetch_count=int(self.prefetch_count))
  62. it.next()
  63. def stop(self):
  64. """Stop processing AMQP messages and close the connection
  65. to the broker."""
  66. self.close_connection()
  67. def receive_message(self, message_data, message):
  68. """The callback called when a new message is received.
  69. If the message has an ``eta`` we move it to the hold queue,
  70. otherwise we move it the bucket queue for immediate processing.
  71. """
  72. try:
  73. task = TaskWrapper.from_message(message, message_data,
  74. logger=self.logger)
  75. except NotRegistered, exc:
  76. self.logger.error("Unknown task ignored: %s" % (exc))
  77. return
  78. eta = message_data.get("eta")
  79. if eta:
  80. self.prefetch_count.increment()
  81. self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
  82. task.task_name, task.task_id, eta))
  83. self.hold_queue.put((task, eta, self.prefetch_count.decrement))
  84. else:
  85. self.logger.info("Got task from broker: %s[%s]" % (
  86. task.task_name, task.task_id))
  87. self.bucket_queue.put(task)
  88. def close_connection(self):
  89. """Close the AMQP connection."""
  90. if self.task_consumer:
  91. self.task_consumer.close()
  92. self.task_consumer = None
  93. if self.amqp_connection:
  94. self.logger.debug(
  95. "AMQPListener: Closing connection to the broker...")
  96. self.amqp_connection.close()
  97. self.amqp_connection = None
  98. def reset_connection(self):
  99. """Reset the AMQP connection, and reinitialize the
  100. :class:`carrot.messaging.ConsumerSet` instance.
  101. Resets the task consumer in :attr:`task_consumer`.
  102. """
  103. self.logger.debug(
  104. "AMQPListener: Re-establishing connection to the broker...")
  105. self.close_connection()
  106. self.amqp_connection = self._open_connection()
  107. self.task_consumer = get_consumer_set(connection=self.amqp_connection)
  108. self.task_consumer.register_callback(self.receive_message)
  109. def _open_connection(self):
  110. """Retries connecting to the AMQP broker over time.
  111. See :func:`celery.utils.retry_over_time`.
  112. """
  113. def _connection_error_handler(exc, interval):
  114. """Callback handler for connection errors."""
  115. self.logger.error("AMQP Listener: Connection Error: %s. " % exc
  116. + "Trying again in %d seconds..." % interval)
  117. def _establish_connection():
  118. """Establish a connection to the AMQP broker."""
  119. conn = DjangoBrokerConnection()
  120. connected = conn.connection # Connection is established lazily.
  121. return conn
  122. if not AMQP_CONNECTION_RETRY:
  123. return _establish_connection()
  124. conn = retry_over_time(_establish_connection, socket.error,
  125. errback=_connection_error_handler,
  126. max_retries=AMQP_CONNECTION_MAX_RETRIES)
  127. self.logger.debug("AMQPListener: Connection Established.")
  128. return conn
  129. class WorkController(object):
  130. """Executes tasks waiting in the task queue.
  131. :param concurrency: see :attr:`concurrency`.
  132. :param logfile: see :attr:`logfile`.
  133. :param loglevel: see :attr:`loglevel`.
  134. .. attribute:: concurrency
  135. The number of simultaneous processes doing work (default:
  136. :const:`celery.conf.DAEMON_CONCURRENCY`)
  137. .. attribute:: loglevel
  138. The loglevel used (default: :const:`logging.INFO`)
  139. .. attribute:: logfile
  140. The logfile used, if no logfile is specified it uses ``stderr``
  141. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  142. .. attribute:: logger
  143. The :class:`logging.Logger` instance used for logging.
  144. .. attribute:: is_detached
  145. Flag describing if the worker is running as a daemon or not.
  146. .. attribute:: pool
  147. The :class:`multiprocessing.Pool` instance used.
  148. .. attribute:: bucket_queue
  149. The :class:`Queue.Queue` that holds tasks ready for immediate
  150. processing.
  151. .. attribute:: hold_queue
  152. The :class:`Queue.Queue` that holds paused tasks. Reasons for holding
  153. back the task include waiting for ``eta`` to pass or the task is being
  154. retried.
  155. .. attribute:: periodic_work_controller
  156. Instance of :class:`celery.worker.controllers.PeriodicWorkController`.
  157. .. attribute:: mediator
  158. Instance of :class:`celery.worker.controllers.Mediator`.
  159. .. attribute:: amqp_listener
  160. Instance of :class:`AMQPListener`.
  161. """
  162. loglevel = logging.ERROR
  163. concurrency = DAEMON_CONCURRENCY
  164. logfile = DAEMON_LOG_FILE
  165. _state = None
  166. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  167. is_detached=False):
  168. # Options
  169. self.loglevel = loglevel or self.loglevel
  170. self.concurrency = concurrency or self.concurrency
  171. self.logfile = logfile or self.logfile
  172. self.is_detached = is_detached
  173. self.logger = setup_logger(loglevel, logfile)
  174. # Queues
  175. self.bucket_queue = Queue()
  176. self.hold_queue = Queue()
  177. self.logger.debug("Instantiating thread components...")
  178. # Threads+Pool
  179. self.periodic_work_controller = PeriodicWorkController(
  180. self.bucket_queue,
  181. self.hold_queue)
  182. self.pool = TaskPool(self.concurrency, logger=self.logger)
  183. self.amqp_listener = AMQPListener(self.bucket_queue, self.hold_queue,
  184. logger=self.logger,
  185. initial_prefetch_count=concurrency)
  186. self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
  187. # The order is important here;
  188. # the first in the list is the first to start,
  189. # and they must be stopped in reverse order.
  190. self.components = [self.pool,
  191. self.mediator,
  192. self.periodic_work_controller,
  193. self.amqp_listener]
  194. def start(self):
  195. """Starts the workers main loop."""
  196. self._state = "RUN"
  197. try:
  198. for component in self.components:
  199. self.logger.debug("Starting thread %s..." % \
  200. component.__class__.__name__)
  201. component.start()
  202. finally:
  203. self.stop()
  204. def safe_process_task(self, task):
  205. """Same as :meth:`process_task`, but catches all exceptions
  206. the task raises and log them as errors, to make sure the
  207. worker doesn't die."""
  208. try:
  209. try:
  210. self.process_task(task)
  211. except Exception, exc:
  212. self.logger.critical("Internal error %s: %s\n%s" % (
  213. exc.__class__, exc, traceback.format_exc()))
  214. except (SystemExit, KeyboardInterrupt):
  215. self.stop()
  216. def process_task(self, task):
  217. """Process task by sending it to the pool of workers."""
  218. task.execute_using_pool(self.pool, self.loglevel, self.logfile)
  219. def stop(self):
  220. """Gracefully shutdown the worker server."""
  221. # shut down the periodic work controller thread
  222. if self._state != "RUN":
  223. return
  224. [component.stop() for component in reversed(self.components)]