listener.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. """
  2. This module contains the component responsible for consuming messages
  3. from the broker, processing the messages and keeping the broker connections
  4. up and running.
  5. * :meth:`~CarrotListener.start` is an infinite loop, which only iterates
  6. again if the connection is lost. For each iteration (at start, or if the
  7. connection is lost) it calls :meth:`~CarrotListener.reset_connection`,
  8. and starts the consumer by calling :meth:`~CarrotListener.consume_messages`.
  9. * :meth:`~CarrotListener.reset_connection`, clears the internal queues,
  10. establishes a new connection to the broker, sets up the task
  11. consumer (+ QoS), and the broadcast remote control command consumer.
  12. Also if events are enabled it configures the event dispatcher and starts
  13. up the hartbeat thread.
  14. * Finally it can consume messages. :meth:`~CarrotListener.consume_messages`
  15. is simply an infinite loop waiting for events on the AMQP channels.
  16. Both the task consumer and the broadcast consumer uses the same
  17. callback: :meth:`~CarrotListener.receive_message`.
  18. The reason is that some carrot backends doesn't support consuming
  19. from several channels simultaneously, so we use a little nasty trick
  20. (:meth:`~CarrotListener._detect_wait_method`) to select the best
  21. possible channel distribution depending on the functionality supported
  22. by the carrot backend.
  23. * So for each message received the :meth:`~CarrotListener.receive_message`
  24. method is called, this checks the payload of the message for either
  25. a ``task`` key or a ``control`` key.
  26. If the message is a task, it verifies the validity of the message
  27. converts it to a :class:`celery.worker.job.TaskRequest`, and sends
  28. it to :meth:`~CarrotListener.on_task`.
  29. If the message is a control command the message is passed to
  30. :meth:`~CarrotListener.on_control`, which in turn dispatches
  31. the control command using the control dispatcher.
  32. It also tries to handle malformed or invalid messages properly,
  33. so the worker doesn't choke on them and die. Any invalid messages
  34. are acknowledged immediately and logged, so the message is not resent
  35. again, and again.
  36. * If the task has an ETA/countdown, the task is moved to the ``eta_schedule``
  37. so the :class:`timer2.Timer` can schedule it at its
  38. deadline. Tasks without an eta are moved immediately to the ``ready_queue``,
  39. so they can be picked up by the :class:`~celery.worker.controllers.Mediator`
  40. to be sent to the pool.
  41. * When a task with an ETA is received the QoS prefetch count is also
  42. incremented, so another message can be reserved. When the ETA is met
  43. the prefetch count is decremented again, though this cannot happen
  44. immediately because amqplib doesn't support doing broker requests
  45. across threads. Instead the current prefetch count is kept as a
  46. shared counter, so as soon as :meth:`~CarrotListener.consume_messages`
  47. detects that the value has changed it will send out the actual
  48. QoS event to the broker.
  49. * Notice that when the connection is lost all internal queues are cleared
  50. because we can no longer ack the messages reserved in memory.
  51. Hoever, this is not dangerous as the broker will resend them
  52. to another worker when the channel is closed.
  53. * **WARNING**: :meth:`~CarrotListener.stop` does not close the connection!
  54. This is because some pre-acked messages may be in processing,
  55. and they need to be finished before the channel is closed.
  56. For celeryd this means the pool must finish the tasks it has acked
  57. early, *then* close the connection.
  58. """
  59. from __future__ import generators
  60. import socket
  61. import warnings
  62. from carrot.connection import AMQPConnectionException
  63. from celery.app import app_or_default
  64. from celery.datastructures import SharedCounter
  65. from celery.events import EventDispatcher
  66. from celery.exceptions import NotRegistered
  67. from celery.pidbox import BroadcastConsumer
  68. from celery.utils import noop, retry_over_time
  69. from celery.worker.job import TaskRequest, InvalidTaskError
  70. from celery.worker.control import ControlDispatch
  71. from celery.worker.heartbeat import Heart
  72. RUN = 0x1
  73. CLOSE = 0x2
  74. class QoS(object):
  75. """Quality of Service for Channel.
  76. For thread-safe increment/decrement of a channels prefetch count value.
  77. :param consumer: A :class:`carrot.messaging.Consumer` instance.
  78. :param initial_value: Initial prefetch count value.
  79. :param logger: Logger used to log debug messages.
  80. """
  81. prev = None
  82. def __init__(self, consumer, initial_value, logger):
  83. self.consumer = consumer
  84. self.logger = logger
  85. self.value = SharedCounter(initial_value)
  86. def increment(self):
  87. """Increment the current prefetch count value by one."""
  88. return self.set(self.value.increment())
  89. def decrement(self):
  90. """Decrement the current prefetch count value by one."""
  91. return self.set(self.value.decrement())
  92. def decrement_eventually(self):
  93. """Decrement the value, but do not update the qos.
  94. The MainThread will be responsible for calling :meth:`update`
  95. when necessary.
  96. """
  97. self.value.decrement()
  98. def set(self, pcount):
  99. """Set channel prefetch_count setting."""
  100. self.logger.debug("basic.qos: prefetch_count->%s" % pcount)
  101. self.consumer.qos(prefetch_count=pcount)
  102. self.prev = pcount
  103. return pcount
  104. def update(self):
  105. """Update prefetch count with current value."""
  106. return self.set(self.next)
  107. @property
  108. def next(self):
  109. return int(self.value)
  110. class CarrotListener(object):
  111. """Listen for messages received from the broker and
  112. move them the the ready queue for task processing.
  113. :param ready_queue: See :attr:`ready_queue`.
  114. :param eta_schedule: See :attr:`eta_schedule`.
  115. .. attribute:: ready_queue
  116. The queue that holds tasks ready for immediate processing.
  117. .. attribute:: eta_schedule
  118. Scheduler for paused tasks. Reasons for being paused include
  119. a countdown/eta or that it's waiting for retry.
  120. .. attribute:: send_events
  121. Is events enabled?
  122. .. attribute:: init_callback
  123. Callback to be called the first time the connection is active.
  124. .. attribute:: hostname
  125. Current hostname. Defaults to the system hostname.
  126. .. attribute:: initial_prefetch_count
  127. Initial QoS prefetch count for the task channel.
  128. .. attribute:: control_dispatch
  129. Control command dispatcher.
  130. See :class:`celery.worker.control.ControlDispatch`.
  131. .. attribute:: event_dispatcher
  132. See :class:`celery.events.EventDispatcher`.
  133. .. attribute:: hart
  134. :class:`~celery.worker.heartbeat.Heart` sending out heart beats
  135. if events enabled.
  136. .. attribute:: logger
  137. The logger used.
  138. """
  139. _state = None
  140. def __init__(self, ready_queue, eta_schedule, logger,
  141. init_callback=noop, send_events=False, hostname=None,
  142. initial_prefetch_count=2, pool=None, queues=None, app=None):
  143. self.app = app_or_default(app)
  144. self.connection = None
  145. self.task_consumer = None
  146. self.ready_queue = ready_queue
  147. self.eta_schedule = eta_schedule
  148. self.send_events = send_events
  149. self.init_callback = init_callback
  150. self.logger = logger
  151. self.hostname = hostname or socket.gethostname()
  152. self.initial_prefetch_count = initial_prefetch_count
  153. self.event_dispatcher = None
  154. self.heart = None
  155. self.pool = pool
  156. self.control_dispatch = ControlDispatch(app=self.app,
  157. logger=logger,
  158. hostname=self.hostname,
  159. listener=self)
  160. self.queues = queues
  161. def start(self):
  162. """Start the consumer.
  163. If the connection is lost, it tries to re-establish the connection
  164. and restarts consuming messages.
  165. """
  166. self.init_callback(self)
  167. while 1:
  168. self.reset_connection()
  169. try:
  170. self.consume_messages()
  171. except (socket.error, AMQPConnectionException, IOError):
  172. self.logger.error("CarrotListener: Connection to broker lost."
  173. + " Trying to re-establish connection...")
  174. def consume_messages(self):
  175. """Consume messages forever (or until an exception is raised)."""
  176. self.logger.debug("CarrotListener: Starting message consumer...")
  177. wait_for_message = self._detect_wait_method()(limit=None).next
  178. self.logger.debug("CarrotListener: Ready to accept tasks!")
  179. while 1:
  180. if self.qos.prev != self.qos.next:
  181. self.qos.update()
  182. wait_for_message()
  183. def on_task(self, task):
  184. """Handle received task.
  185. If the task has an ``eta`` we enter it into the ETA schedule,
  186. otherwise we move it the ready queue for immediate processing.
  187. """
  188. if task.revoked():
  189. return
  190. self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
  191. self.event_dispatcher.send("task-received", uuid=task.task_id,
  192. name=task.task_name, args=repr(task.args),
  193. kwargs=repr(task.kwargs), retries=task.retries,
  194. eta=task.eta and task.eta.isoformat(),
  195. expires=task.expires and task.expires.isoformat())
  196. if task.eta:
  197. self.qos.increment()
  198. self.eta_schedule.apply_at(task.eta,
  199. self.apply_eta_task, (task, ))
  200. else:
  201. self.ready_queue.put(task)
  202. def apply_eta_task(self, task):
  203. self.ready_queue.put(task)
  204. self.qos.decrement_eventually()
  205. def on_control(self, control):
  206. """Handle received remote control command."""
  207. return self.control_dispatch.dispatch_from_message(control)
  208. def receive_message(self, message_data, message):
  209. """The callback called when a new message is received. """
  210. # Handle task
  211. if message_data.get("task"):
  212. try:
  213. task = TaskRequest.from_message(message, message_data,
  214. app=self.app,
  215. logger=self.logger,
  216. hostname=self.hostname,
  217. eventer=self.event_dispatcher)
  218. except NotRegistered, exc:
  219. self.logger.error("Unknown task ignored: %s: %s" % (
  220. str(exc), message_data))
  221. message.ack()
  222. except InvalidTaskError, exc:
  223. self.logger.error("Invalid task ignored: %s: %s" % (
  224. str(exc), message_data))
  225. message.ack()
  226. else:
  227. self.on_task(task)
  228. return
  229. # Handle control command
  230. control = message_data.get("control")
  231. if control:
  232. return self.on_control(control)
  233. warnings.warn(RuntimeWarning(
  234. "Received and deleted unknown message. Wrong destination?!? \
  235. the message was: %s" % message_data))
  236. message.ack()
  237. def maybe_conn_error(self, fun):
  238. try:
  239. fun()
  240. except Exception: # TODO kombu.connection_errors
  241. pass
  242. def close_connection(self):
  243. self.logger.debug("CarrotListener: "
  244. "Closing consumer channel...")
  245. if self.task_consumer:
  246. self.task_consumer = \
  247. self.maybe_conn_error(self.task_consumer.close)
  248. self.logger.debug("CarrotListener: "
  249. "Closing connection to broker...")
  250. if self.connection:
  251. self.connection = self.maybe_conn_error(self.connection.close)
  252. def stop_consumers(self, close=True):
  253. """Stop consuming."""
  254. if not self._state == RUN:
  255. return
  256. self._state = CLOSE
  257. if self.heart:
  258. self.logger.debug("Heart: Going into cardiac arrest...")
  259. self.heart = self.heart.stop()
  260. self.logger.debug("TaskConsumer: Cancelling consumers...")
  261. if self.task_consumer:
  262. self.maybe_conn_error(self.task_consumer.cancel)
  263. if self.event_dispatcher:
  264. self.logger.debug("EventDispatcher: Shutting down...")
  265. self.event_dispatcher = \
  266. self.maybe_conn_error(self.event_dispatcher.close)
  267. if close:
  268. self.close_connection()
  269. def on_decode_error(self, message, exc):
  270. """Callback called if the message had decoding errors.
  271. :param message: The message with errors.
  272. :param exc: The original exception instance.
  273. """
  274. self.logger.critical("Message decoding error: %s "
  275. "(type:%s encoding:%s raw:'%s')" % (
  276. exc, message.content_type,
  277. message.content_encoding, message.body))
  278. message.ack()
  279. def reset_connection(self):
  280. """Re-establish connection and set up consumers."""
  281. self.logger.debug(
  282. "CarrotListener: Re-establishing connection to the broker...")
  283. self.stop_consumers()
  284. # Clear internal queues.
  285. self.ready_queue.clear()
  286. self.eta_schedule.clear()
  287. self.connection = self._open_connection()
  288. self.logger.debug("CarrotListener: Connection Established.")
  289. self.task_consumer = self.app.get_consumer_set(
  290. connection=self.connection,
  291. queues=self.queues)
  292. # QoS: Reset prefetch window.
  293. self.qos = QoS(self.task_consumer,
  294. self.initial_prefetch_count, self.logger)
  295. self.qos.update() # enable prefetch_count QoS.
  296. self.task_consumer.on_decode_error = self.on_decode_error
  297. self.broadcast_consumer = BroadcastConsumer(self.connection,
  298. app=self.app,
  299. hostname=self.hostname)
  300. self.task_consumer.register_callback(self.receive_message)
  301. # Flush events sent while connection was down.
  302. if self.event_dispatcher:
  303. self.event_dispatcher.flush()
  304. self.event_dispatcher = EventDispatcher(self.connection,
  305. app=self.app,
  306. hostname=self.hostname,
  307. enabled=self.send_events)
  308. self.heart = Heart(self.event_dispatcher)
  309. self.heart.start()
  310. self._state = RUN
  311. def _mainloop(self, **kwargs):
  312. while 1:
  313. yield self.connection.drain_events()
  314. def _detect_wait_method(self):
  315. if hasattr(self.connection.connection, "drain_events"):
  316. self.broadcast_consumer.register_callback(self.receive_message)
  317. self.task_consumer.iterconsume()
  318. self.broadcast_consumer.iterconsume()
  319. return self._mainloop
  320. else:
  321. self.task_consumer.add_consumer(self.broadcast_consumer)
  322. return self.task_consumer.iterconsume
  323. def _open_connection(self):
  324. """Retries connecting to the AMQP broker over time.
  325. See :func:`celery.utils.retry_over_time`.
  326. """
  327. def _connection_error_handler(exc, interval):
  328. """Callback handler for connection errors."""
  329. self.logger.error("CarrotListener: Connection Error: %s. " % exc
  330. + "Trying again in %d seconds..." % interval)
  331. def _establish_connection():
  332. """Establish a connection to the broker."""
  333. conn = self.app.broker_connection()
  334. conn.connect() # Connection is established lazily, so connect.
  335. return conn
  336. if not self.app.conf.BROKER_CONNECTION_RETRY:
  337. return _establish_connection()
  338. conn = retry_over_time(_establish_connection, (socket.error, IOError),
  339. errback=_connection_error_handler,
  340. max_retries=self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
  341. return conn
  342. def stop(self):
  343. """Stop consuming.
  344. Does not close connection.
  345. """
  346. self.logger.debug("CarrotListener: Stopping consumers...")
  347. self.stop_consumers(close=False)