listener.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. """
  2. This module contains the component responsible for consuming messages
  3. from the broker, processing the messages and keeping the broker connections
  4. up and running.
  5. * :meth:`~CarrotListener.start` is an infinite loop, which only iterates
  6. again if the connection is lost. For each iteration (at start, or if the
  7. connection is lost) it calls :meth:`~CarrotListener.reset_connection`,
  8. and starts the consumer by calling :meth:`~CarrotListener.consume_messages`.
  9. * :meth:`~CarrotListener.reset_connection`, clears the internal queues,
  10. establishes a new connection to the broker, sets up the task
  11. consumer (+ QoS), and the broadcast remote control command consumer.
  12. Also if events are enabled it configures the event dispatcher and starts
  13. up the hartbeat thread.
  14. * Finally it can consume messages. :meth:`~CarrotListener.consume_messages`
  15. is simply an infinite loop waiting for events on the AMQP channels.
  16. Both the task consumer and the broadcast consumer uses the same
  17. callback: :meth:`~CarrotListener.receive_message`.
  18. The reason is that some carrot backends doesn't support consuming
  19. from several channels simultaneously, so we use a little nasty trick
  20. (:meth:`~CarrotListener._detect_wait_method`) to select the best
  21. possible channel distribution depending on the functionality supported
  22. by the carrot backend.
  23. * So for each message received the :meth:`~CarrotListener.receive_message`
  24. method is called, this checks the payload of the message for either
  25. a ``task`` key or a ``control`` key.
  26. If the message is a task, it verifies the validity of the message
  27. converts it to a :class:`celery.worker.job.TaskRequest`, and sends
  28. it to :meth:`~CarrotListener.on_task`.
  29. If the message is a control command the message is passed to
  30. :meth:`~CarrotListener.on_control`, which in turn dispatches
  31. the control command using the control dispatcher.
  32. It also tries to handle malformed or invalid messages properly,
  33. so the worker doesn't choke on them and die. Any invalid messages
  34. are acknowledged immediately and logged, so the message is not resent
  35. again, and again.
  36. * If the task has an ETA/countdown, the task is moved to the ``eta_schedule``
  37. so the :class:`timer2.Timer` can schedule it at its
  38. deadline. Tasks without an eta are moved immediately to the ``ready_queue``,
  39. so they can be picked up by the :class:`~celery.worker.controllers.Mediator`
  40. to be sent to the pool.
  41. * When a task with an ETA is received the QoS prefetch count is also
  42. incremented, so another message can be reserved. When the ETA is met
  43. the prefetch count is decremented again, though this cannot happen
  44. immediately because amqplib doesn't support doing broker requests
  45. across threads. Instead the current prefetch count is kept as a
  46. shared counter, so as soon as :meth:`~CarrotListener.consume_messages`
  47. detects that the value has changed it will send out the actual
  48. QoS event to the broker.
  49. * Notice that when the connection is lost all internal queues are cleared
  50. because we can no longer ack the messages reserved in memory.
  51. Hoever, this is not dangerous as the broker will resend them
  52. to another worker when the channel is closed.
  53. * **WARNING**: :meth:`~CarrotListener.stop` does not close the connection!
  54. This is because some pre-acked messages may be in processing,
  55. and they need to be finished before the channel is closed.
  56. For celeryd this means the pool must finish the tasks it has acked
  57. early, *then* close the connection.
  58. """
  59. from __future__ import generators
  60. import socket
  61. import warnings
  62. from carrot.connection import AMQPConnectionException
  63. from celery import conf
  64. from celery.utils import noop, retry_over_time
  65. from celery.worker.job import TaskRequest, InvalidTaskError
  66. from celery.worker.control import ControlDispatch
  67. from celery.worker.heartbeat import Heart
  68. from celery.events import EventDispatcher
  69. from celery.messaging import establish_connection
  70. from celery.messaging import get_consumer_set, BroadcastConsumer
  71. from celery.exceptions import NotRegistered
  72. from celery.datastructures import SharedCounter
  73. from celery.utils.info import get_broker_info
  74. RUN = 0x1
  75. CLOSE = 0x2
  76. class QoS(object):
  77. """Quality of Service for Channel.
  78. For thread-safe increment/decrement of a channels prefetch count value.
  79. :param consumer: A :class:`carrot.messaging.Consumer` instance.
  80. :param initial_value: Initial prefetch count value.
  81. :param logger: Logger used to log debug messages.
  82. """
  83. prev = None
  84. def __init__(self, consumer, initial_value, logger):
  85. self.consumer = consumer
  86. self.logger = logger
  87. self.value = SharedCounter(initial_value)
  88. def increment(self):
  89. """Increment the current prefetch count value by one."""
  90. if int(self.value):
  91. return self.set(self.value.increment())
  92. def decrement(self):
  93. """Decrement the current prefetch count value by one."""
  94. if int(self.value):
  95. return self.set(self.value.decrement())
  96. def decrement_eventually(self):
  97. """Decrement the value, but do not update the qos.
  98. The MainThread will be responsible for calling :meth:`update`
  99. when necessary.
  100. """
  101. self.value.decrement()
  102. def set(self, pcount):
  103. """Set channel prefetch_count setting."""
  104. self.logger.debug("basic.qos: prefetch_count->%s" % pcount)
  105. self.consumer.qos(prefetch_count=pcount)
  106. self.prev = pcount
  107. return pcount
  108. def update(self):
  109. """Update prefetch count with current value."""
  110. return self.set(self.next)
  111. @property
  112. def next(self):
  113. return int(self.value)
  114. class CarrotListener(object):
  115. """Listen for messages received from the broker and
  116. move them the the ready queue for task processing.
  117. :param ready_queue: See :attr:`ready_queue`.
  118. :param eta_schedule: See :attr:`eta_schedule`.
  119. .. attribute:: ready_queue
  120. The queue that holds tasks ready for immediate processing.
  121. .. attribute:: eta_schedule
  122. Scheduler for paused tasks. Reasons for being paused include
  123. a countdown/eta or that it's waiting for retry.
  124. .. attribute:: send_events
  125. Is events enabled?
  126. .. attribute:: init_callback
  127. Callback to be called the first time the connection is active.
  128. .. attribute:: hostname
  129. Current hostname. Defaults to the system hostname.
  130. .. attribute:: initial_prefetch_count
  131. Initial QoS prefetch count for the task channel.
  132. .. attribute:: control_dispatch
  133. Control command dispatcher.
  134. See :class:`celery.worker.control.ControlDispatch`.
  135. .. attribute:: event_dispatcher
  136. See :class:`celery.events.EventDispatcher`.
  137. .. attribute:: hart
  138. :class:`~celery.worker.heartbeat.Heart` sending out heart beats
  139. if events enabled.
  140. .. attribute:: logger
  141. The logger used.
  142. """
  143. _state = None
  144. def __init__(self, ready_queue, eta_schedule, logger,
  145. init_callback=noop, send_events=False, hostname=None,
  146. initial_prefetch_count=2, pool=None):
  147. self.connection = None
  148. self.task_consumer = None
  149. self.ready_queue = ready_queue
  150. self.eta_schedule = eta_schedule
  151. self.send_events = send_events
  152. self.init_callback = init_callback
  153. self.logger = logger
  154. self.hostname = hostname or socket.gethostname()
  155. self.initial_prefetch_count = initial_prefetch_count
  156. self.event_dispatcher = None
  157. self.heart = None
  158. self.pool = pool
  159. self.control_dispatch = ControlDispatch(logger=logger,
  160. hostname=self.hostname,
  161. listener=self)
  162. def start(self):
  163. """Start the consumer.
  164. If the connection is lost, it tries to re-establish the connection
  165. and restarts consuming messages.
  166. """
  167. self.init_callback(self)
  168. while 1:
  169. self.reset_connection()
  170. try:
  171. self.consume_messages()
  172. except (socket.error, AMQPConnectionException, IOError):
  173. self.logger.error("CarrotListener: Connection to broker lost."
  174. + " Trying to re-establish connection...")
  175. def consume_messages(self):
  176. """Consume messages forever (or until an exception is raised)."""
  177. self.logger.debug("CarrotListener: Starting message consumer...")
  178. wait_for_message = self._detect_wait_method()(limit=None).next
  179. self.logger.debug("CarrotListener: Ready to accept tasks!")
  180. while 1:
  181. if self.qos.prev != self.qos.next:
  182. self.qos.update()
  183. wait_for_message()
  184. def on_task(self, task):
  185. """Handle received task.
  186. If the task has an ``eta`` we enter it into the ETA schedule,
  187. otherwise we move it the ready queue for immediate processing.
  188. """
  189. if task.revoked():
  190. return
  191. self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
  192. self.event_dispatcher.send("task-received", uuid=task.task_id,
  193. name=task.task_name, args=repr(task.args),
  194. kwargs=repr(task.kwargs), retries=task.retries,
  195. eta=task.eta and task.eta.isoformat(),
  196. expires=task.expires and task.expires.isoformat())
  197. if task.eta:
  198. self.qos.increment()
  199. self.eta_schedule.apply_at(task.eta,
  200. self.apply_eta_task, (task, ))
  201. else:
  202. self.ready_queue.put(task)
  203. def apply_eta_task(self, task):
  204. self.ready_queue.put(task)
  205. self.qos.decrement_eventually()
  206. def on_control(self, control):
  207. """Handle received remote control command."""
  208. return self.control_dispatch.dispatch_from_message(control)
  209. def receive_message(self, message_data, message):
  210. """The callback called when a new message is received. """
  211. # Handle task
  212. if message_data.get("task"):
  213. try:
  214. task = TaskRequest.from_message(message, message_data,
  215. logger=self.logger,
  216. hostname=self.hostname,
  217. eventer=self.event_dispatcher)
  218. except NotRegistered, exc:
  219. self.logger.error("Unknown task ignored: %s: %s" % (
  220. str(exc), message_data))
  221. message.ack()
  222. except InvalidTaskError, exc:
  223. self.logger.error("Invalid task ignored: %s: %s" % (
  224. str(exc), message_data))
  225. message.ack()
  226. else:
  227. self.on_task(task)
  228. return
  229. # Handle control command
  230. control = message_data.get("control")
  231. if control:
  232. return self.on_control(control)
  233. warnings.warn(RuntimeWarning(
  234. "Received and deleted unknown message. Wrong destination?!? \
  235. the message was: %s" % message_data))
  236. message.ack()
  237. def maybe_conn_error(self, fun):
  238. try:
  239. fun()
  240. except Exception: # TODO kombu.connection_errors
  241. pass
  242. def close_connection(self):
  243. self.logger.debug("CarrotListener: "
  244. "Closing consumer channel...")
  245. if self.task_consumer:
  246. self.task_consumer = \
  247. self.maybe_conn_error(self.task_consumer.close)
  248. self.logger.debug("CarrotListener: "
  249. "Closing connection to broker...")
  250. if self.connection:
  251. self.connection = self.maybe_conn_error(self.connection.close)
  252. def stop_consumers(self, close=True):
  253. """Stop consuming."""
  254. if not self._state == RUN:
  255. return
  256. self._state = CLOSE
  257. if self.heart:
  258. self.logger.debug("Heart: Going into cardiac arrest...")
  259. self.heart = self.heart.stop()
  260. self.logger.debug("TaskConsumer: Cancelling consumers...")
  261. if self.task_consumer:
  262. self.maybe_conn_error(self.task_consumer.cancel)
  263. if self.event_dispatcher:
  264. self.logger.debug("EventDispatcher: Shutting down...")
  265. self.event_dispatcher = \
  266. self.maybe_conn_error(self.event_dispatcher.close)
  267. if close:
  268. self.close_connection()
  269. def on_decode_error(self, message, exc):
  270. """Callback called if the message had decoding errors.
  271. :param message: The message with errors.
  272. :param exc: The original exception instance.
  273. """
  274. self.logger.critical("Message decoding error: %s "
  275. "(type:%s encoding:%s raw:'%s')" % (
  276. exc, message.content_type,
  277. message.content_encoding, message.body))
  278. message.ack()
  279. def reset_connection(self):
  280. """Re-establish connection and set up consumers."""
  281. self.logger.debug(
  282. "CarrotListener: Re-establishing connection to the broker...")
  283. self.stop_consumers()
  284. # Clear internal queues.
  285. self.ready_queue.clear()
  286. self.eta_schedule.clear()
  287. self.connection = self._open_connection()
  288. self.logger.debug("CarrotListener: Connection Established.")
  289. self.task_consumer = get_consumer_set(connection=self.connection)
  290. # QoS: Reset prefetch window.
  291. self.qos = QoS(self.task_consumer,
  292. self.initial_prefetch_count, self.logger)
  293. self.qos.update() # enable prefetch_count QoS.
  294. self.task_consumer.on_decode_error = self.on_decode_error
  295. self.broadcast_consumer = BroadcastConsumer(self.connection,
  296. hostname=self.hostname)
  297. self.task_consumer.register_callback(self.receive_message)
  298. # Flush events sent while connection was down.
  299. if self.event_dispatcher:
  300. self.event_dispatcher.flush()
  301. self.event_dispatcher = EventDispatcher(self.connection,
  302. hostname=self.hostname,
  303. enabled=self.send_events)
  304. self.restart_heartbeat()
  305. self._state = RUN
  306. def restart_heartbeat(self):
  307. self.heart = Heart(self.event_dispatcher)
  308. self.heart.start()
  309. def _mainloop(self, **kwargs):
  310. while 1:
  311. yield self.connection.drain_events()
  312. def _detect_wait_method(self):
  313. if hasattr(self.connection.connection, "drain_events"):
  314. self.broadcast_consumer.register_callback(self.receive_message)
  315. self.task_consumer.iterconsume()
  316. self.broadcast_consumer.iterconsume()
  317. return self._mainloop
  318. else:
  319. self.task_consumer.add_consumer(self.broadcast_consumer)
  320. return self.task_consumer.iterconsume
  321. def _open_connection(self):
  322. """Retries connecting to the AMQP broker over time.
  323. See :func:`celery.utils.retry_over_time`.
  324. """
  325. def _connection_error_handler(exc, interval):
  326. """Callback handler for connection errors."""
  327. self.logger.error("CarrotListener: Connection Error: %s. " % exc
  328. + "Trying again in %d seconds..." % interval)
  329. def _establish_connection():
  330. """Establish a connection to the broker."""
  331. conn = establish_connection()
  332. conn.connect() # evaluate connection
  333. return conn
  334. if not conf.BROKER_CONNECTION_RETRY:
  335. return _establish_connection()
  336. conn = retry_over_time(_establish_connection, (socket.error, IOError),
  337. errback=_connection_error_handler,
  338. max_retries=conf.BROKER_CONNECTION_MAX_RETRIES)
  339. return conn
  340. def stop(self):
  341. """Stop consuming.
  342. Does not close connection.
  343. """
  344. self.logger.debug("CarrotListener: Stopping consumers...")
  345. self.stop_consumers(close=False)
  346. @property
  347. def info(self):
  348. conninfo = {}
  349. if self.connection:
  350. conninfo = get_broker_info(self.connection)
  351. return {"broker": conninfo,
  352. "prefetch_count": self.qos.next}