consumer.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. """
  2. This module contains the component responsible for consuming messages
  3. from the broker, processing the messages and keeping the broker connections
  4. up and running.
  5. * :meth:`~Consumer.start` is an infinite loop, which only iterates
  6. again if the connection is lost. For each iteration (at start, or if the
  7. connection is lost) it calls :meth:`~Consumer.reset_connection`,
  8. and starts the consumer by calling :meth:`~Consumer.consume_messages`.
  9. * :meth:`~Consumer.reset_connection`, clears the internal queues,
  10. establishes a new connection to the broker, sets up the task
  11. consumer (+ QoS), and the broadcast remote control command consumer.
  12. Also if events are enabled it configures the event dispatcher and starts
  13. up the hartbeat thread.
  14. * Finally it can consume messages. :meth:`~Consumer.consume_messages`
  15. is simply an infinite loop waiting for events on the AMQP channels.
  16. Both the task consumer and the broadcast consumer uses the same
  17. callback: :meth:`~Consumer.receive_message`.
  18. * So for each message received the :meth:`~Consumer.receive_message`
  19. method is called, this checks the payload of the message for either
  20. a `task` key or a `control` key.
  21. If the message is a task, it verifies the validity of the message
  22. converts it to a :class:`celery.worker.job.TaskRequest`, and sends
  23. it to :meth:`~Consumer.on_task`.
  24. If the message is a control command the message is passed to
  25. :meth:`~Consumer.on_control`, which in turn dispatches
  26. the control command using the control dispatcher.
  27. It also tries to handle malformed or invalid messages properly,
  28. so the worker doesn't choke on them and die. Any invalid messages
  29. are acknowledged immediately and logged, so the message is not resent
  30. again, and again.
  31. * If the task has an ETA/countdown, the task is moved to the `eta_schedule`
  32. so the :class:`timer2.Timer` can schedule it at its
  33. deadline. Tasks without an eta are moved immediately to the `ready_queue`,
  34. so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
  35. to be sent to the pool.
  36. * When a task with an ETA is received the QoS prefetch count is also
  37. incremented, so another message can be reserved. When the ETA is met
  38. the prefetch count is decremented again, though this cannot happen
  39. immediately because amqplib doesn't support doing broker requests
  40. across threads. Instead the current prefetch count is kept as a
  41. shared counter, so as soon as :meth:`~Consumer.consume_messages`
  42. detects that the value has changed it will send out the actual
  43. QoS event to the broker.
  44. * Notice that when the connection is lost all internal queues are cleared
  45. because we can no longer ack the messages reserved in memory.
  46. Hoever, this is not dangerous as the broker will resend them
  47. to another worker when the channel is closed.
  48. * **WARNING**: :meth:`~Consumer.stop` does not close the connection!
  49. This is because some pre-acked messages may be in processing,
  50. and they need to be finished before the channel is closed.
  51. For celeryd this means the pool must finish the tasks it has acked
  52. early, *then* close the connection.
  53. """
  54. from __future__ import generators
  55. import socket
  56. import sys
  57. import threading
  58. import traceback
  59. import warnings
  60. from celery.app import app_or_default
  61. from celery.datastructures import AttributeDict
  62. from celery.exceptions import NotRegistered
  63. from celery.utils import noop
  64. from celery.utils import timer2
  65. from celery.utils.encoding import safe_repr, safe_str
  66. from celery.worker import state
  67. from celery.worker.job import TaskRequest, InvalidTaskError
  68. from celery.worker.control.registry import Panel
  69. from celery.worker.heartbeat import Heart
  70. RUN = 0x1
  71. CLOSE = 0x2
  72. #: Prefetch count can't exceed short.
  73. PREFETCH_COUNT_MAX = 0xFFFF
  74. class QoS(object):
  75. """Quality of Service for Channel.
  76. For thread-safe increment/decrement of a channels prefetch count value.
  77. :param consumer: A :class:`kombu.messaging.Consumer` instance.
  78. :param initial_value: Initial prefetch count value.
  79. :param logger: Logger used to log debug messages.
  80. """
  81. prev = None
  82. def __init__(self, consumer, initial_value, logger):
  83. self.consumer = consumer
  84. self.logger = logger
  85. self._mutex = threading.RLock()
  86. self.value = initial_value
  87. def increment(self, n=1):
  88. """Increment the current prefetch count value by one."""
  89. self._mutex.acquire()
  90. try:
  91. if self.value:
  92. new_value = self.value + max(n, 0)
  93. self.value = self.set(new_value)
  94. return self.value
  95. finally:
  96. self._mutex.release()
  97. def _sub(self, n=1):
  98. assert self.value - n > 1
  99. self.value -= n
  100. def decrement(self, n=1):
  101. """Decrement the current prefetch count value by one."""
  102. self._mutex.acquire()
  103. try:
  104. if self.value:
  105. self._sub(n)
  106. self.set(self.value)
  107. return self.value
  108. finally:
  109. self._mutex.release()
  110. def decrement_eventually(self, n=1):
  111. """Decrement the value, but do not update the qos.
  112. The MainThread will be responsible for calling :meth:`update`
  113. when necessary.
  114. """
  115. self._mutex.acquire()
  116. try:
  117. if self.value:
  118. self._sub(n)
  119. finally:
  120. self._mutex.release()
  121. def set(self, pcount):
  122. """Set channel prefetch_count setting."""
  123. if pcount != self.prev:
  124. new_value = pcount
  125. if pcount > PREFETCH_COUNT_MAX:
  126. self.logger.warning(
  127. "QoS: Disabled: prefetch_count exceeds %r" % (
  128. PREFETCH_COUNT_MAX, ))
  129. new_value = 0
  130. self.logger.debug("basic.qos: prefetch_count->%s" % new_value)
  131. self.consumer.qos(prefetch_count=new_value)
  132. self.prev = pcount
  133. return pcount
  134. def update(self):
  135. """Update prefetch count with current value."""
  136. self._mutex.acquire()
  137. try:
  138. return self.set(self.value)
  139. finally:
  140. self._mutex.release()
  141. class Consumer(object):
  142. """Listen for messages received from the broker and
  143. move them the the ready queue for task processing.
  144. :param ready_queue: See :attr:`ready_queue`.
  145. :param eta_schedule: See :attr:`eta_schedule`.
  146. .. attribute:: ready_queue
  147. The queue that holds tasks ready for immediate processing.
  148. .. attribute:: eta_schedule
  149. Scheduler for paused tasks. Reasons for being paused include
  150. a countdown/eta or that it's waiting for retry.
  151. .. attribute:: send_events
  152. Is events enabled?
  153. .. attribute:: init_callback
  154. Callback to be called the first time the connection is active.
  155. .. attribute:: hostname
  156. Current hostname. Defaults to the system hostname.
  157. .. attribute:: initial_prefetch_count
  158. Initial QoS prefetch count for the task channel.
  159. .. attribute:: control_dispatch
  160. Control command dispatcher.
  161. See :class:`celery.worker.control.ControlDispatch`.
  162. .. attribute:: event_dispatcher
  163. See :class:`celery.events.EventDispatcher`.
  164. .. attribute:: hart
  165. :class:`~celery.worker.heartbeat.Heart` sending out heart beats
  166. if events enabled.
  167. .. attribute:: logger
  168. The logger used.
  169. """
  170. _state = None
  171. def __init__(self, ready_queue, eta_schedule, logger,
  172. init_callback=noop, send_events=False, hostname=None,
  173. initial_prefetch_count=2, pool=None, app=None,
  174. priority_timer=None):
  175. self.app = app_or_default(app)
  176. self.connection = None
  177. self.task_consumer = None
  178. self.broadcast_consumer = None
  179. self.ready_queue = ready_queue
  180. self.eta_schedule = eta_schedule
  181. self.send_events = send_events
  182. self.init_callback = init_callback
  183. self.logger = logger
  184. self.hostname = hostname or socket.gethostname()
  185. self.initial_prefetch_count = initial_prefetch_count
  186. self.event_dispatcher = None
  187. self.heart = None
  188. self.pool = pool
  189. self.priority_timer = priority_timer or timer2.Timer()
  190. pidbox_state = AttributeDict(app=self.app,
  191. logger=logger,
  192. hostname=self.hostname,
  193. listener=self, # pre 2.2
  194. consumer=self)
  195. self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
  196. state=pidbox_state,
  197. handlers=Panel.data)
  198. conninfo = self.app.broker_connection()
  199. self.connection_errors = conninfo.connection_errors
  200. self.channel_errors = conninfo.channel_errors
  201. def start(self):
  202. """Start the consumer.
  203. If the connection is lost, it tries to re-establish the connection
  204. and restarts consuming messages.
  205. """
  206. self.init_callback(self)
  207. while self._state != CLOSE:
  208. try:
  209. self.reset_connection()
  210. self.consume_messages()
  211. except self.connection_errors:
  212. self.logger.error("Consumer: Connection to broker lost."
  213. + " Trying to re-establish connection...",
  214. exc_info=sys.exc_info())
  215. def consume_messages(self):
  216. """Consume messages forever (or until an exception is raised)."""
  217. self.logger.debug("Consumer: Starting message consumer...")
  218. self.task_consumer.consume()
  219. self.logger.debug("Consumer: Ready to accept tasks!")
  220. while self._state != CLOSE and self.connection:
  221. if self.qos.prev != self.qos.value:
  222. self.qos.update()
  223. try:
  224. self.connection.drain_events(timeout=1)
  225. except socket.timeout:
  226. pass
  227. except socket.error:
  228. if self._state != CLOSE:
  229. raise
  230. def on_task(self, task):
  231. """Handle received task.
  232. If the task has an `eta` we enter it into the ETA schedule,
  233. otherwise we move it the ready queue for immediate processing.
  234. """
  235. if task.revoked():
  236. return
  237. self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
  238. self.event_dispatcher.send("task-received", uuid=task.task_id,
  239. name=task.task_name, args=safe_repr(task.args),
  240. kwargs=safe_repr(task.kwargs), retries=task.retries,
  241. eta=task.eta and task.eta.isoformat(),
  242. expires=task.expires and task.expires.isoformat())
  243. if task.eta:
  244. try:
  245. eta = timer2.to_timestamp(task.eta)
  246. except OverflowError, exc:
  247. self.logger.error(
  248. "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
  249. task.eta, exc, task.info(safe=True)),
  250. exc_info=sys.exc_info())
  251. task.acknowledge()
  252. else:
  253. self.qos.increment()
  254. self.eta_schedule.apply_at(eta,
  255. self.apply_eta_task, (task, ))
  256. else:
  257. state.task_reserved(task)
  258. self.ready_queue.put(task)
  259. def on_control(self, body, message):
  260. try:
  261. self.pidbox_node.handle_message(body, message)
  262. except KeyError, exc:
  263. self.logger.error("No such control command: %s" % exc)
  264. except Exception, exc:
  265. self.logger.error(
  266. "Error occurred while handling control command: %r\n%r" % (
  267. exc, traceback.format_exc()), exc_info=sys.exc_info())
  268. self.reset_pidbox_node()
  269. def apply_eta_task(self, task):
  270. state.task_reserved(task)
  271. self.ready_queue.put(task)
  272. self.qos.decrement_eventually()
  273. def receive_message(self, body, message):
  274. """The callback called when a new message is received. """
  275. # Handle task
  276. if body.get("task"):
  277. def ack():
  278. try:
  279. message.ack()
  280. except self.connection_errors + (AttributeError, ), exc:
  281. self.logger.critical(
  282. "Couldn't ack %r: message:%r reason:%r" % (
  283. message.delivery_tag, body, exc))
  284. try:
  285. task = TaskRequest.from_message(message, body, ack,
  286. app=self.app,
  287. logger=self.logger,
  288. hostname=self.hostname,
  289. eventer=self.event_dispatcher)
  290. except NotRegistered, exc:
  291. self.logger.error("Unknown task ignored: %r Body->%r" % (
  292. exc, body), exc_info=sys.exc_info())
  293. message.ack()
  294. except InvalidTaskError, exc:
  295. self.logger.error("Invalid task ignored: %s: %s" % (
  296. str(exc), body), exc_info=sys.exc_info())
  297. message.ack()
  298. else:
  299. self.on_task(task)
  300. return
  301. warnings.warn(RuntimeWarning(
  302. "Received and deleted unknown message. Wrong destination?!? \
  303. the message was: %s" % body))
  304. message.ack()
  305. def maybe_conn_error(self, fun):
  306. try:
  307. fun()
  308. except (AttributeError, ) + \
  309. self.connection_errors + \
  310. self.channel_errors:
  311. pass
  312. def close_connection(self):
  313. if self.task_consumer:
  314. self.logger.debug("Consumer: " "Closing consumer channel...")
  315. self.task_consumer = \
  316. self.maybe_conn_error(self.task_consumer.close)
  317. if self.broadcast_consumer:
  318. self.logger.debug("CarrotListener: Closing broadcast channel...")
  319. self.broadcast_consumer = \
  320. self.maybe_conn_error(self.broadcast_consumer.channel.close)
  321. if self.connection:
  322. self.logger.debug("Consumer: " "Closing connection to broker...")
  323. self.connection = self.maybe_conn_error(self.connection.close)
  324. def stop_consumers(self, close=True):
  325. """Stop consuming."""
  326. if not self._state == RUN:
  327. return
  328. if self.heart:
  329. self.logger.debug("Heart: Going into cardiac arrest...")
  330. self.heart = self.heart.stop()
  331. self.logger.debug("TaskConsumer: Cancelling consumers...")
  332. if self.task_consumer:
  333. self.maybe_conn_error(self.task_consumer.cancel)
  334. if self.event_dispatcher:
  335. self.logger.debug("EventDispatcher: Shutting down...")
  336. self.event_dispatcher = \
  337. self.maybe_conn_error(self.event_dispatcher.close)
  338. self.logger.debug("BroadcastConsumer: Cancelling consumer...")
  339. if self.broadcast_consumer:
  340. self.maybe_conn_error(self.broadcast_consumer.cancel)
  341. if close:
  342. self.close_connection()
  343. def on_decode_error(self, message, exc):
  344. """Callback called if the message had decoding errors.
  345. :param message: The message with errors.
  346. :param exc: The original exception instance.
  347. """
  348. self.logger.critical("Can't decode message body: %r "
  349. "(type:%r encoding:%r raw:%r')" % (
  350. exc, message.content_type,
  351. message.content_encoding,
  352. safe_str(message.body)))
  353. message.ack()
  354. def reset_pidbox_node(self):
  355. if self.pidbox_node.channel:
  356. try:
  357. self.pidbox_node.channel.close()
  358. except self.connection_errors + self.channel_errors:
  359. pass
  360. if self.pool.is_green:
  361. return self.pool.spawn_n(self._green_pidbox_node)
  362. self.pidbox_node.channel = self.connection.channel()
  363. self.broadcast_consumer = self.pidbox_node.listen(
  364. callback=self.on_control)
  365. self.broadcast_consumer.consume()
  366. def _green_pidbox_node(self):
  367. conn = self._open_connection()
  368. self.pidbox_node.channel = conn.channel()
  369. self.broadcast_consumer = self.pidbox_node.listen(
  370. callback=self.on_control)
  371. self.broadcast_consumer.consume()
  372. try:
  373. while self.connection: # main connection still open?
  374. conn.drain_events()
  375. finally:
  376. conn.close()
  377. def reset_connection(self):
  378. """Re-establish connection and set up consumers."""
  379. self.logger.debug(
  380. "Consumer: Re-establishing connection to the broker...")
  381. self.stop_consumers()
  382. # Clear internal queues.
  383. self.ready_queue.clear()
  384. self.eta_schedule.clear()
  385. self.connection = self._open_connection()
  386. self.logger.debug("Consumer: Connection Established.")
  387. self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
  388. on_decode_error=self.on_decode_error)
  389. # QoS: Reset prefetch window.
  390. self.qos = QoS(self.task_consumer,
  391. self.initial_prefetch_count, self.logger)
  392. self.qos.update() # enable prefetch_count
  393. self.task_consumer.register_callback(self.receive_message)
  394. # Pidbox
  395. self.reset_pidbox_node()
  396. # Flush events sent while connection was down.
  397. prev_event_dispatcher = self.event_dispatcher
  398. self.event_dispatcher = self.app.events.Dispatcher(self.connection,
  399. hostname=self.hostname,
  400. enabled=self.send_events)
  401. if prev_event_dispatcher:
  402. self.event_dispatcher.copy_buffer(prev_event_dispatcher)
  403. self.event_dispatcher.flush()
  404. self.restart_heartbeat()
  405. self._state = RUN
  406. def restart_heartbeat(self):
  407. self.heart = Heart(self.priority_timer, self.event_dispatcher)
  408. self.heart.start()
  409. def _open_connection(self):
  410. """Open connection. May retry opening the connection if configuration
  411. allows that."""
  412. def _connection_error_handler(exc, interval):
  413. """Callback handler for connection errors."""
  414. self.logger.error("Consumer: Connection Error: %s. " % exc
  415. + "Trying again in %d seconds..." % interval)
  416. conn = self.app.broker_connection()
  417. if not self.app.conf.BROKER_CONNECTION_RETRY:
  418. conn.connect()
  419. return conn
  420. return conn.ensure_connection(_connection_error_handler,
  421. self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
  422. def stop(self):
  423. """Stop consuming.
  424. Does not close connection.
  425. """
  426. self._state = CLOSE
  427. self.logger.debug("Consumer: Stopping consumers...")
  428. self.stop_consumers(close=False)
  429. @property
  430. def info(self):
  431. conninfo = {}
  432. if self.connection:
  433. conninfo = self.connection.info()
  434. conninfo.pop("password", None) # don't send password.
  435. return {"broker": conninfo,
  436. "prefetch_count": self.qos.value}