consumer.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. """
  2. This module contains the component responsible for consuming messages
  3. from the broker, processing the messages and keeping the broker connections
  4. up and running.
  5. * :meth:`~Consumer.start` is an infinite loop, which only iterates
  6. again if the connection is lost. For each iteration (at start, or if the
  7. connection is lost) it calls :meth:`~Consumer.reset_connection`,
  8. and starts the consumer by calling :meth:`~Consumer.consume_messages`.
  9. * :meth:`~Consumer.reset_connection`, clears the internal queues,
  10. establishes a new connection to the broker, sets up the task
  11. consumer (+ QoS), and the broadcast remote control command consumer.
  12. Also if events are enabled it configures the event dispatcher and starts
  13. up the hartbeat thread.
  14. * Finally it can consume messages. :meth:`~Consumer.consume_messages`
  15. is simply an infinite loop waiting for events on the AMQP channels.
  16. Both the task consumer and the broadcast consumer uses the same
  17. callback: :meth:`~Consumer.receive_message`.
  18. * So for each message received the :meth:`~Consumer.receive_message`
  19. method is called, this checks the payload of the message for either
  20. a `task` key or a `control` key.
  21. If the message is a task, it verifies the validity of the message
  22. converts it to a :class:`celery.worker.job.TaskRequest`, and sends
  23. it to :meth:`~Consumer.on_task`.
  24. If the message is a control command the message is passed to
  25. :meth:`~Consumer.on_control`, which in turn dispatches
  26. the control command using the control dispatcher.
  27. It also tries to handle malformed or invalid messages properly,
  28. so the worker doesn't choke on them and die. Any invalid messages
  29. are acknowledged immediately and logged, so the message is not resent
  30. again, and again.
  31. * If the task has an ETA/countdown, the task is moved to the `eta_schedule`
  32. so the :class:`timer2.Timer` can schedule it at its
  33. deadline. Tasks without an eta are moved immediately to the `ready_queue`,
  34. so they can be picked up by the :class:`~celery.worker.controllers.Mediator`
  35. to be sent to the pool.
  36. * When a task with an ETA is received the QoS prefetch count is also
  37. incremented, so another message can be reserved. When the ETA is met
  38. the prefetch count is decremented again, though this cannot happen
  39. immediately because amqplib doesn't support doing broker requests
  40. across threads. Instead the current prefetch count is kept as a
  41. shared counter, so as soon as :meth:`~Consumer.consume_messages`
  42. detects that the value has changed it will send out the actual
  43. QoS event to the broker.
  44. * Notice that when the connection is lost all internal queues are cleared
  45. because we can no longer ack the messages reserved in memory.
  46. Hoever, this is not dangerous as the broker will resend them
  47. to another worker when the channel is closed.
  48. * **WARNING**: :meth:`~Consumer.stop` does not close the connection!
  49. This is because some pre-acked messages may be in processing,
  50. and they need to be finished before the channel is closed.
  51. For celeryd this means the pool must finish the tasks it has acked
  52. early, *then* close the connection.
  53. """
  54. from __future__ import generators
  55. import socket
  56. import sys
  57. import threading
  58. import traceback
  59. import warnings
  60. from celery.app import app_or_default
  61. from celery.datastructures import AttributeDict
  62. from celery.exceptions import NotRegistered
  63. from celery.utils import noop
  64. from celery.utils.timer2 import to_timestamp
  65. from celery.worker import state
  66. from celery.worker.job import TaskRequest, InvalidTaskError
  67. from celery.worker.control.registry import Panel
  68. from celery.worker.heartbeat import Heart
  69. RUN = 0x1
  70. CLOSE = 0x2
  71. class QoS(object):
  72. """Quality of Service for Channel.
  73. For thread-safe increment/decrement of a channels prefetch count value.
  74. :param consumer: A :class:`kombu.messaging.Consumer` instance.
  75. :param initial_value: Initial prefetch count value.
  76. :param logger: Logger used to log debug messages.
  77. """
  78. prev = None
  79. def __init__(self, consumer, initial_value, logger):
  80. self.consumer = consumer
  81. self.logger = logger
  82. self._mutex = threading.RLock()
  83. self.value = initial_value
  84. def increment(self, n=1):
  85. """Increment the current prefetch count value by one."""
  86. self._mutex.acquire()
  87. try:
  88. if self.value:
  89. self.value += max(n, 0)
  90. self.set(self.value)
  91. return self.value
  92. finally:
  93. self._mutex.release()
  94. def _sub(self, n=1):
  95. assert self.value - n > 1
  96. self.value -= n
  97. def decrement(self, n=1):
  98. """Decrement the current prefetch count value by one."""
  99. self._mutex.acquire()
  100. try:
  101. if self.value:
  102. self._sub(n)
  103. self.set(self.value)
  104. return self.value
  105. finally:
  106. self._mutex.release()
  107. def decrement_eventually(self, n=1):
  108. """Decrement the value, but do not update the qos.
  109. The MainThread will be responsible for calling :meth:`update`
  110. when necessary.
  111. """
  112. self._mutex.acquire()
  113. try:
  114. if self.value:
  115. self._sub(n)
  116. finally:
  117. self._mutex.release()
  118. def set(self, pcount):
  119. """Set channel prefetch_count setting."""
  120. if pcount != self.prev:
  121. self.logger.debug("basic.qos: prefetch_count->%s" % pcount)
  122. self.consumer.qos(prefetch_count=pcount)
  123. self.prev = pcount
  124. return pcount
  125. def update(self):
  126. """Update prefetch count with current value."""
  127. self._mutex.acquire()
  128. try:
  129. return self.set(self.value)
  130. finally:
  131. self._mutex.release()
  132. class Consumer(object):
  133. """Listen for messages received from the broker and
  134. move them the the ready queue for task processing.
  135. :param ready_queue: See :attr:`ready_queue`.
  136. :param eta_schedule: See :attr:`eta_schedule`.
  137. .. attribute:: ready_queue
  138. The queue that holds tasks ready for immediate processing.
  139. .. attribute:: eta_schedule
  140. Scheduler for paused tasks. Reasons for being paused include
  141. a countdown/eta or that it's waiting for retry.
  142. .. attribute:: send_events
  143. Is events enabled?
  144. .. attribute:: init_callback
  145. Callback to be called the first time the connection is active.
  146. .. attribute:: hostname
  147. Current hostname. Defaults to the system hostname.
  148. .. attribute:: initial_prefetch_count
  149. Initial QoS prefetch count for the task channel.
  150. .. attribute:: control_dispatch
  151. Control command dispatcher.
  152. See :class:`celery.worker.control.ControlDispatch`.
  153. .. attribute:: event_dispatcher
  154. See :class:`celery.events.EventDispatcher`.
  155. .. attribute:: hart
  156. :class:`~celery.worker.heartbeat.Heart` sending out heart beats
  157. if events enabled.
  158. .. attribute:: logger
  159. The logger used.
  160. """
  161. _state = None
  162. def __init__(self, ready_queue, eta_schedule, logger,
  163. init_callback=noop, send_events=False, hostname=None,
  164. initial_prefetch_count=2, pool=None, queues=None, app=None):
  165. self.app = app_or_default(app)
  166. self.connection = None
  167. self.task_consumer = None
  168. self.broadcast_consumer = None
  169. self.ready_queue = ready_queue
  170. self.eta_schedule = eta_schedule
  171. self.send_events = send_events
  172. self.init_callback = init_callback
  173. self.logger = logger
  174. self.hostname = hostname or socket.gethostname()
  175. self.initial_prefetch_count = initial_prefetch_count
  176. self.event_dispatcher = None
  177. self.heart = None
  178. self.pool = pool
  179. pidbox_state = AttributeDict(app=self.app,
  180. logger=logger,
  181. hostname=self.hostname,
  182. listener=self, # pre 2.2
  183. consumer=self)
  184. self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
  185. state=pidbox_state,
  186. handlers=Panel.data)
  187. conninfo = self.app.broker_connection()
  188. self.connection_errors = conninfo.connection_errors
  189. self.channel_errors = conninfo.channel_errors
  190. self.queues = queues
  191. def start(self):
  192. """Start the consumer.
  193. If the connection is lost, it tries to re-establish the connection
  194. and restarts consuming messages.
  195. """
  196. self.init_callback(self)
  197. while 1:
  198. try:
  199. self.reset_connection()
  200. self.consume_messages()
  201. except self.connection_errors:
  202. self.logger.error("Consumer: Connection to broker lost."
  203. + " Trying to re-establish connection...")
  204. def consume_messages(self):
  205. """Consume messages forever (or until an exception is raised)."""
  206. self.logger.debug("Consumer: Starting message consumer...")
  207. self.task_consumer.consume()
  208. self.broadcast_consumer.consume()
  209. self.logger.debug("Consumer: Ready to accept tasks!")
  210. while 1:
  211. if not self.connection:
  212. break
  213. if self.qos.prev != self.qos.value:
  214. self.qos.update()
  215. self.connection.drain_events()
  216. def on_task(self, task):
  217. """Handle received task.
  218. If the task has an `eta` we enter it into the ETA schedule,
  219. otherwise we move it the ready queue for immediate processing.
  220. """
  221. if task.revoked():
  222. return
  223. self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
  224. self.event_dispatcher.send("task-received", uuid=task.task_id,
  225. name=task.task_name, args=repr(task.args),
  226. kwargs=repr(task.kwargs), retries=task.retries,
  227. eta=task.eta and task.eta.isoformat(),
  228. expires=task.expires and task.expires.isoformat())
  229. if task.eta:
  230. try:
  231. eta = to_timestamp(task.eta)
  232. except OverflowError, exc:
  233. self.logger.error(
  234. "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
  235. task.eta, exc, task.info(safe=True)),
  236. exc_info=sys.exc_info())
  237. task.acknowledge()
  238. else:
  239. self.qos.increment()
  240. self.eta_schedule.apply_at(eta,
  241. self.apply_eta_task, (task, ))
  242. else:
  243. state.task_reserved(task)
  244. self.ready_queue.put(task)
  245. def on_control(self, body, message):
  246. try:
  247. self.pidbox_node.handle_message(body, message)
  248. except KeyError, exc:
  249. self.logger.error("No such control command: %s" % exc)
  250. except Exception, exc:
  251. self.logger.error(
  252. "Error occurred while handling control command: %r\n%r" % (
  253. exc, traceback.format_exc()), exc_info=sys.exc_info())
  254. self.reset_pidbox_node()
  255. def apply_eta_task(self, task):
  256. state.task_reserved(task)
  257. self.ready_queue.put(task)
  258. self.qos.decrement_eventually()
  259. def receive_message(self, body, message):
  260. """The callback called when a new message is received. """
  261. # Handle task
  262. if body.get("task"):
  263. def ack():
  264. try:
  265. message.ack()
  266. except self.connection_errors, exc:
  267. self.logger.critical(
  268. "Couldn't ack %r: message:%r reason:%r" % (
  269. message.delivery_tag, body, exc))
  270. try:
  271. task = TaskRequest.from_message(message, body, ack,
  272. app=self.app,
  273. logger=self.logger,
  274. hostname=self.hostname,
  275. eventer=self.event_dispatcher)
  276. except NotRegistered, exc:
  277. self.logger.error("Unknown task ignored: %r Body->%r" % (
  278. exc, body), exc_info=sys.exc_info())
  279. message.ack()
  280. except InvalidTaskError, exc:
  281. self.logger.error("Invalid task ignored: %s: %s" % (
  282. str(exc), body), exc_info=sys.exc_info())
  283. message.ack()
  284. else:
  285. self.on_task(task)
  286. return
  287. warnings.warn(RuntimeWarning(
  288. "Received and deleted unknown message. Wrong destination?!? \
  289. the message was: %s" % body))
  290. message.ack()
  291. def maybe_conn_error(self, fun):
  292. try:
  293. fun()
  294. except (AttributeError, ) + \
  295. self.connection_errors + \
  296. self.channel_errors:
  297. pass
  298. def close_connection(self):
  299. self.logger.debug("Consumer: "
  300. "Closing consumer channel...")
  301. if self.task_consumer:
  302. self.task_consumer = \
  303. self.maybe_conn_error(self.task_consumer.close)
  304. self.logger.debug("Consumer: "
  305. "Closing connection to broker...")
  306. self.logger.debug("CarrotListener: Closing broadcast channel...")
  307. if self.broadcast_consumer:
  308. self.broadcast_consumer = \
  309. self.maybe_conn_error(self.broadcast_consumer.channel.close)
  310. if self.connection:
  311. self.connection = self.maybe_conn_error(self.connection.close)
  312. def stop_consumers(self, close=True):
  313. """Stop consuming."""
  314. if not self._state == RUN:
  315. return
  316. self._state = CLOSE
  317. if self.heart:
  318. self.logger.debug("Heart: Going into cardiac arrest...")
  319. self.heart = self.heart.stop()
  320. self.logger.debug("TaskConsumer: Cancelling consumers...")
  321. if self.task_consumer:
  322. self.maybe_conn_error(self.task_consumer.cancel)
  323. if self.event_dispatcher:
  324. self.logger.debug("EventDispatcher: Shutting down...")
  325. self.event_dispatcher = \
  326. self.maybe_conn_error(self.event_dispatcher.close)
  327. self.logger.debug("BroadcastConsumer: Cancelling consumer...")
  328. if self.broadcast_consumer:
  329. self.maybe_conn_error(self.broadcast_consumer.cancel)
  330. if close:
  331. self.close_connection()
  332. def on_decode_error(self, message, exc):
  333. """Callback called if the message had decoding errors.
  334. :param message: The message with errors.
  335. :param exc: The original exception instance.
  336. """
  337. self.logger.critical("Can't decode message body: %r "
  338. "(type:%r encoding:%r raw:%r')" % (
  339. exc, message.content_type,
  340. message.content_encoding, message.body))
  341. message.ack()
  342. def reset_pidbox_node(self):
  343. if self.pidbox_node.channel:
  344. try:
  345. self.pidbox_node.channel.close()
  346. except self.connection_errors + self.channel_errors:
  347. pass
  348. self.pidbox_node.channel = self.connection.channel()
  349. self.broadcast_consumer = self.pidbox_node.listen(
  350. callback=self.on_control)
  351. def reset_connection(self):
  352. """Re-establish connection and set up consumers."""
  353. self.logger.debug(
  354. "Consumer: Re-establishing connection to the broker...")
  355. self.stop_consumers()
  356. # Clear internal queues.
  357. self.ready_queue.clear()
  358. self.eta_schedule.clear()
  359. self.connection = self._open_connection()
  360. self.logger.debug("Consumer: Connection Established.")
  361. self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
  362. queues=self.queues,
  363. on_decode_error=self.on_decode_error)
  364. # QoS: Reset prefetch window.
  365. self.qos = QoS(self.task_consumer,
  366. self.initial_prefetch_count, self.logger)
  367. self.qos.update() # enable prefetch_count
  368. self.task_consumer.register_callback(self.receive_message)
  369. # Pidbox
  370. self.reset_pidbox_node()
  371. # Flush events sent while connection was down.
  372. prev_event_dispatcher = self.event_dispatcher
  373. self.event_dispatcher = self.app.events.Dispatcher(self.connection,
  374. hostname=self.hostname,
  375. enabled=self.send_events)
  376. if prev_event_dispatcher:
  377. self.event_dispatcher.copy_buffer(prev_event_dispatcher)
  378. self.event_dispatcher.flush()
  379. self.restart_heartbeat()
  380. self._state = RUN
  381. def restart_heartbeat(self):
  382. self.heart = Heart(self.event_dispatcher)
  383. self.heart.start()
  384. def _open_connection(self):
  385. """Open connection. May retry opening the connection if configuration
  386. allows that."""
  387. def _connection_error_handler(exc, interval):
  388. """Callback handler for connection errors."""
  389. self.logger.error("Consumer: Connection Error: %s. " % exc
  390. + "Trying again in %d seconds..." % interval)
  391. conn = self.app.broker_connection()
  392. if not self.app.conf.BROKER_CONNECTION_RETRY:
  393. conn.connect()
  394. return conn
  395. return conn.ensure_connection(_connection_error_handler,
  396. self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
  397. def stop(self):
  398. """Stop consuming.
  399. Does not close connection.
  400. """
  401. self.logger.debug("Consumer: Stopping consumers...")
  402. self.stop_consumers(close=False)
  403. @property
  404. def info(self):
  405. conninfo = {}
  406. if self.connection:
  407. conninfo = self.connection.info()
  408. conninfo.pop("password", None) # don't send password.
  409. return {"broker": conninfo,
  410. "prefetch_count": self.qos.value}