consumer.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. """
  2. This module contains the component responsible for consuming messages
  3. from the broker, processing the messages and keeping the broker connections
  4. up and running.
  5. * :meth:`~Consumer.start` is an infinite loop, which only iterates
  6. again if the connection is lost. For each iteration (at start, or if the
  7. connection is lost) it calls :meth:`~Consumer.reset_connection`,
  8. and starts the consumer by calling :meth:`~Consumer.consume_messages`.
  9. * :meth:`~Consumer.reset_connection`, clears the internal queues,
  10. establishes a new connection to the broker, sets up the task
  11. consumer (+ QoS), and the broadcast remote control command consumer.
  12. Also if events are enabled it configures the event dispatcher and starts
  13. up the hartbeat thread.
  14. * Finally it can consume messages. :meth:`~Consumer.consume_messages`
  15. is simply an infinite loop waiting for events on the AMQP channels.
  16. Both the task consumer and the broadcast consumer uses the same
  17. callback: :meth:`~Consumer.receive_message`.
  18. * So for each message received the :meth:`~Consumer.receive_message`
  19. method is called, this checks the payload of the message for either
  20. a `task` key or a `control` key.
  21. If the message is a task, it verifies the validity of the message
  22. converts it to a :class:`celery.worker.job.TaskRequest`, and sends
  23. it to :meth:`~Consumer.on_task`.
  24. If the message is a control command the message is passed to
  25. :meth:`~Consumer.on_control`, which in turn dispatches
  26. the control command using the control dispatcher.
  27. It also tries to handle malformed or invalid messages properly,
  28. so the worker doesn't choke on them and die. Any invalid messages
  29. are acknowledged immediately and logged, so the message is not resent
  30. again, and again.
  31. * If the task has an ETA/countdown, the task is moved to the `eta_schedule`
  32. so the :class:`timer2.Timer` can schedule it at its
  33. deadline. Tasks without an eta are moved immediately to the `ready_queue`,
  34. so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
  35. to be sent to the pool.
  36. * When a task with an ETA is received the QoS prefetch count is also
  37. incremented, so another message can be reserved. When the ETA is met
  38. the prefetch count is decremented again, though this cannot happen
  39. immediately because amqplib doesn't support doing broker requests
  40. across threads. Instead the current prefetch count is kept as a
  41. shared counter, so as soon as :meth:`~Consumer.consume_messages`
  42. detects that the value has changed it will send out the actual
  43. QoS event to the broker.
  44. * Notice that when the connection is lost all internal queues are cleared
  45. because we can no longer ack the messages reserved in memory.
  46. Hoever, this is not dangerous as the broker will resend them
  47. to another worker when the channel is closed.
  48. * **WARNING**: :meth:`~Consumer.stop` does not close the connection!
  49. This is because some pre-acked messages may be in processing,
  50. and they need to be finished before the channel is closed.
  51. For celeryd this means the pool must finish the tasks it has acked
  52. early, *then* close the connection.
  53. """
  54. from __future__ import generators
  55. import socket
  56. import sys
  57. import threading
  58. import traceback
  59. import warnings
  60. from celery.app import app_or_default
  61. from celery.datastructures import AttributeDict
  62. from celery.exceptions import NotRegistered
  63. from celery.utils import noop
  64. from celery.utils import timer2
  65. from celery.utils.encoding import safe_repr, safe_str
  66. from celery.worker import state
  67. from celery.worker.job import TaskRequest, InvalidTaskError
  68. from celery.worker.control.registry import Panel
  69. from celery.worker.heartbeat import Heart
  70. RUN = 0x1
  71. CLOSE = 0x2
  72. #: Prefetch count can't exceed short.
  73. PREFETCH_COUNT_MAX = 0xFFFF
  74. #: Error message for when an unregistered task is received.
  75. UNKNOWN_TASK_ERROR = """\
  76. Received unregistered task of type %s.
  77. The message has been ignored and discarded.
  78. Did you remember to import the module containing this task?
  79. Or maybe you are using relative imports?
  80. Please see http://bit.ly/gLye1c for more information.
  81. The full contents of the message body was:
  82. %s
  83. """
  84. #: Error message for when an invalid task message is received.
  85. INVALID_TASK_ERROR = """\
  86. Received invalid task message: %s
  87. The message has been ignored and discarded.
  88. Please ensure your message conforms to the task
  89. message protocol as described here: http://bit.ly/hYj41y
  90. The full contents of the message body was:
  91. %s
  92. """
  93. class QoS(object):
  94. """Quality of Service for Channel.
  95. For thread-safe increment/decrement of a channels prefetch count value.
  96. :param consumer: A :class:`kombu.messaging.Consumer` instance.
  97. :param initial_value: Initial prefetch count value.
  98. :param logger: Logger used to log debug messages.
  99. """
  100. prev = None
  101. def __init__(self, consumer, initial_value, logger):
  102. self.consumer = consumer
  103. self.logger = logger
  104. self._mutex = threading.RLock()
  105. self.value = initial_value
  106. def increment(self, n=1):
  107. """Increment the current prefetch count value by one."""
  108. self._mutex.acquire()
  109. try:
  110. if self.value:
  111. new_value = self.value + max(n, 0)
  112. self.value = self.set(new_value)
  113. return self.value
  114. finally:
  115. self._mutex.release()
  116. def _sub(self, n=1):
  117. assert self.value - n > 1
  118. self.value -= n
  119. def decrement(self, n=1):
  120. """Decrement the current prefetch count value by one."""
  121. self._mutex.acquire()
  122. try:
  123. if self.value:
  124. self._sub(n)
  125. self.set(self.value)
  126. return self.value
  127. finally:
  128. self._mutex.release()
  129. def decrement_eventually(self, n=1):
  130. """Decrement the value, but do not update the qos.
  131. The MainThread will be responsible for calling :meth:`update`
  132. when necessary.
  133. """
  134. self._mutex.acquire()
  135. try:
  136. if self.value:
  137. self._sub(n)
  138. finally:
  139. self._mutex.release()
  140. def set(self, pcount):
  141. """Set channel prefetch_count setting."""
  142. if pcount != self.prev:
  143. new_value = pcount
  144. if pcount > PREFETCH_COUNT_MAX:
  145. self.logger.warning(
  146. "QoS: Disabled: prefetch_count exceeds %r" % (
  147. PREFETCH_COUNT_MAX, ))
  148. new_value = 0
  149. self.logger.debug("basic.qos: prefetch_count->%s" % new_value)
  150. self.consumer.qos(prefetch_count=new_value)
  151. self.prev = pcount
  152. return pcount
  153. def update(self):
  154. """Update prefetch count with current value."""
  155. self._mutex.acquire()
  156. try:
  157. return self.set(self.value)
  158. finally:
  159. self._mutex.release()
  160. class Consumer(object):
  161. """Listen for messages received from the broker and
  162. move them the the ready queue for task processing.
  163. :param ready_queue: See :attr:`ready_queue`.
  164. :param eta_schedule: See :attr:`eta_schedule`.
  165. """
  166. #: The queue that holds tasks ready for immediate processing.
  167. ready_queue = None
  168. #: Timer for tasks with an ETA/countdown.
  169. eta_schedule = None
  170. #: Enable/disable events.
  171. send_events = False
  172. #: Optional callback to be called when the connection is established.
  173. #: Will only be called once, even if the connection is lost and
  174. #: re-established.
  175. init_callback = None
  176. #: The current hostname. Defaults to the system hostname.
  177. hostname = None
  178. #: Initial QoS prefetch count for the task channel.
  179. initial_prefetch_count = 0
  180. #: A :class:`celery.events.EventDispatcher` for sending events.
  181. event_dispatcher = None
  182. #: The thread that sends event heartbeats at regular intervals.
  183. #: The heartbeats are used by monitors to detect that a worker
  184. #: went offline/disappeared.
  185. heart = None
  186. #: The logger instance to use. Defaults to the default Celery logger.
  187. logger = None
  188. #: The broker connection.
  189. connection = None
  190. #: The consumer used to consume task messages.
  191. task_consumer = None
  192. #: The consumer used to consume broadcast commands.
  193. broadcast_consumer = None
  194. #: The process mailbox (kombu pidbox node).
  195. pidbox_node = None
  196. #: The current worker pool instance.
  197. pool = None
  198. #: A timer used for high-priority internal tasks, such
  199. #: as sending heartbeats.
  200. priority_timer = None
  201. # Consumer state, can be RUN or CLOSE.
  202. _state = None
  203. def __init__(self, ready_queue, eta_schedule, logger,
  204. init_callback=noop, send_events=False, hostname=None,
  205. initial_prefetch_count=2, pool=None, app=None,
  206. priority_timer=None):
  207. self.app = app_or_default(app)
  208. self.connection = None
  209. self.task_consumer = None
  210. self.broadcast_consumer = None
  211. self.ready_queue = ready_queue
  212. self.eta_schedule = eta_schedule
  213. self.send_events = send_events
  214. self.init_callback = init_callback
  215. self.logger = logger
  216. self.hostname = hostname or socket.gethostname()
  217. self.initial_prefetch_count = initial_prefetch_count
  218. self.event_dispatcher = None
  219. self.heart = None
  220. self.pool = pool
  221. self.priority_timer = priority_timer or timer2.default_timer
  222. pidbox_state = AttributeDict(app=self.app,
  223. logger=logger,
  224. hostname=self.hostname,
  225. listener=self, # pre 2.2
  226. consumer=self)
  227. self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
  228. state=pidbox_state,
  229. handlers=Panel.data)
  230. conninfo = self.app.broker_connection()
  231. self.connection_errors = conninfo.connection_errors
  232. self.channel_errors = conninfo.channel_errors
  233. def start(self):
  234. """Start the consumer.
  235. Automatically surivives intermittent connection failure,
  236. and will retry establishing the connection and restart
  237. consuming messages.
  238. """
  239. self.init_callback(self)
  240. while self._state != CLOSE:
  241. try:
  242. self.reset_connection()
  243. self.consume_messages()
  244. except self.connection_errors:
  245. self.logger.error("Consumer: Connection to broker lost."
  246. + " Trying to re-establish connection...",
  247. exc_info=sys.exc_info())
  248. def consume_messages(self):
  249. """Consume messages forever (or until an exception is raised)."""
  250. self._debug("Starting message consumer...")
  251. self.task_consumer.consume()
  252. self._debug("Ready to accept tasks!")
  253. while self._state != CLOSE and self.connection:
  254. if self.qos.prev != self.qos.value:
  255. self.qos.update()
  256. try:
  257. self.connection.drain_events(timeout=1)
  258. except socket.timeout:
  259. pass
  260. except socket.error:
  261. if self._state != CLOSE:
  262. raise
  263. def on_task(self, task):
  264. """Handle received task.
  265. If the task has an `eta` we enter it into the ETA schedule,
  266. otherwise we move it the ready queue for immediate processing.
  267. """
  268. if task.revoked():
  269. return
  270. self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
  271. if self.event_dispatcher.enabled:
  272. self.event_dispatcher.send("task-received", uuid=task.task_id,
  273. name=task.task_name, args=safe_repr(task.args),
  274. kwargs=safe_repr(task.kwargs), retries=task.retries,
  275. eta=task.eta and task.eta.isoformat(),
  276. expires=task.expires and task.expires.isoformat())
  277. if task.eta:
  278. try:
  279. eta = timer2.to_timestamp(task.eta)
  280. except OverflowError, exc:
  281. self.logger.error(
  282. "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
  283. task.eta, exc, task.info(safe=True)),
  284. exc_info=sys.exc_info())
  285. task.acknowledge()
  286. else:
  287. self.qos.increment()
  288. self.eta_schedule.apply_at(eta,
  289. self.apply_eta_task, (task, ))
  290. else:
  291. state.task_reserved(task)
  292. self.ready_queue.put(task)
  293. def on_control(self, body, message):
  294. """Process remote control command message."""
  295. try:
  296. self.pidbox_node.handle_message(body, message)
  297. except KeyError, exc:
  298. self.logger.error("No such control command: %s" % exc)
  299. except Exception, exc:
  300. self.logger.error(
  301. "Error occurred while handling control command: %r\n%r" % (
  302. exc, traceback.format_exc()), exc_info=sys.exc_info())
  303. self.reset_pidbox_node()
  304. def apply_eta_task(self, task):
  305. """Method called by the timer to apply a task with an
  306. ETA/countdown."""
  307. state.task_reserved(task)
  308. self.ready_queue.put(task)
  309. self.qos.decrement_eventually()
  310. def receive_message(self, body, message):
  311. """Handles incoming messages.
  312. :param body: The message body.
  313. :param message: The kombu message object.
  314. """
  315. # Handle task
  316. if body.get("task"):
  317. # need to guard against errors occuring while acking the message.
  318. def ack():
  319. try:
  320. message.ack()
  321. except self.connection_errors + (AttributeError, ), exc:
  322. self.logger.critical(
  323. "Couldn't ack %r: body:%r reason:%r" % (
  324. message.delivery_tag, safe_str(body), exc))
  325. try:
  326. task = TaskRequest.from_message(message, body, ack,
  327. app=self.app,
  328. logger=self.logger,
  329. hostname=self.hostname,
  330. eventer=self.event_dispatcher)
  331. except NotRegistered, exc:
  332. self.logger.error(UNKNOWN_TASK_ERROR % (
  333. exc, safe_str(body)), exc_info=sys.exc_info())
  334. message.ack()
  335. except InvalidTaskError, exc:
  336. self.logger.error(INVALID_TASK_ERROR % (
  337. str(exc), safe_str(body)), exc_info=sys.exc_info())
  338. message.ack()
  339. else:
  340. self.on_task(task)
  341. return
  342. warnings.warn(RuntimeWarning(
  343. "Received and deleted unknown message. Wrong destination?!? \
  344. the full contents of the message body was: %s" % (
  345. safe_str(body), )))
  346. message.ack()
  347. def maybe_conn_error(self, fun):
  348. """Applies function but ignores any connection or channel
  349. errors raised."""
  350. try:
  351. fun()
  352. except (AttributeError, ) + \
  353. self.connection_errors + \
  354. self.channel_errors:
  355. pass
  356. def close_connection(self):
  357. """Closes the current broker connection and all open channels."""
  358. if self.task_consumer:
  359. self._debug("Closing consumer channel...")
  360. self.task_consumer = \
  361. self.maybe_conn_error(self.task_consumer.close)
  362. if self.broadcast_consumer:
  363. self._debug("Closing broadcast channel...")
  364. self.broadcast_consumer = \
  365. self.maybe_conn_error(self.broadcast_consumer.channel.close)
  366. if self.connection:
  367. self._debug("Closing broker connection...")
  368. self.connection = self.maybe_conn_error(self.connection.close)
  369. def stop_consumers(self, close_connection=True):
  370. """Stop consuming tasks and broadcast commands, also stops
  371. the heartbeat thread and event dispatcher.
  372. :keyword close_connection: Set to False to skip closing the broker
  373. connection.
  374. """
  375. if not self._state == RUN:
  376. return
  377. if self.heart:
  378. # Stop the heartbeat thread if it's running.
  379. self.logger.debug("Heart: Going into cardiac arrest...")
  380. self.heart = self.heart.stop()
  381. self._debug("Cancelling task consumer...")
  382. if self.task_consumer:
  383. self.maybe_conn_error(self.task_consumer.cancel)
  384. if self.event_dispatcher:
  385. self._debug("Shutting down event dispatcher...")
  386. self.event_dispatcher = \
  387. self.maybe_conn_error(self.event_dispatcher.close)
  388. self._debug("Cancelling broadcast consumer...")
  389. if self.broadcast_consumer:
  390. self.maybe_conn_error(self.broadcast_consumer.cancel)
  391. if close_connection:
  392. self.close_connection()
  393. def on_decode_error(self, message, exc):
  394. """Callback called if an error occurs while decoding
  395. a message received.
  396. Simply logs the error and acknowledges the message so it
  397. doesn't enter a loop.
  398. :param message: The message with errors.
  399. :param exc: The original exception instance.
  400. """
  401. self.logger.critical(
  402. "Can't decode message body: %r (type:%r encoding:%r raw:%r')" % (
  403. exc, message.content_type, message.content_encoding,
  404. safe_str(message.body)))
  405. message.ack()
  406. def reset_pidbox_node(self):
  407. """Sets up the process mailbox."""
  408. # close previously opened channel if any.
  409. if self.pidbox_node.channel:
  410. try:
  411. self.pidbox_node.channel.close()
  412. except self.connection_errors + self.channel_errors:
  413. pass
  414. if self.pool.is_green:
  415. return self.pool.spawn_n(self._green_pidbox_node)
  416. self.pidbox_node.channel = self.connection.channel()
  417. self.broadcast_consumer = self.pidbox_node.listen(
  418. callback=self.on_control)
  419. self.broadcast_consumer.consume()
  420. def _green_pidbox_node(self):
  421. """Sets up the process mailbox when running in a greenlet
  422. environment."""
  423. conn = self._open_connection()
  424. self.pidbox_node.channel = conn.channel()
  425. self.broadcast_consumer = self.pidbox_node.listen(
  426. callback=self.on_control)
  427. self.broadcast_consumer.consume()
  428. try:
  429. while self.connection: # main connection still open?
  430. conn.drain_events()
  431. finally:
  432. conn.close()
  433. def reset_connection(self):
  434. """Re-establish the broker connection and set up consumers,
  435. heartbeat and the event dispatcher."""
  436. self._debug("Re-establishing connection to the broker...")
  437. self.stop_consumers()
  438. # Clear internal queues to get rid of old messages.
  439. # They can't be acked anyway, as a delivery tag is specific
  440. # to the current channel.
  441. self.ready_queue.clear()
  442. self.eta_schedule.clear()
  443. # Re-establish the broker connection and setup the task consumer.
  444. self.connection = self._open_connection()
  445. self._debug("Connection established.")
  446. self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
  447. on_decode_error=self.on_decode_error)
  448. # QoS: Reset prefetch window.
  449. self.qos = QoS(self.task_consumer,
  450. self.initial_prefetch_count, self.logger)
  451. self.qos.update()
  452. # receive_message handles incomsing messages.
  453. self.task_consumer.register_callback(self.receive_message)
  454. # Setup the process mailbox.
  455. self.reset_pidbox_node()
  456. # Flush events sent while connection was down.
  457. prev_event_dispatcher = self.event_dispatcher
  458. self.event_dispatcher = self.app.events.Dispatcher(self.connection,
  459. hostname=self.hostname,
  460. enabled=self.send_events)
  461. if prev_event_dispatcher:
  462. self.event_dispatcher.copy_buffer(prev_event_dispatcher)
  463. self.event_dispatcher.flush()
  464. # Restart heartbeat thread.
  465. self.restart_heartbeat()
  466. # We're back!
  467. self._state = RUN
  468. def restart_heartbeat(self):
  469. """Restart the heartbeat thread.
  470. This thread sends heartbeat events at intervals so monitors
  471. can tell if the worker is offline/missing.
  472. """
  473. self.heart = Heart(self.priority_timer, self.event_dispatcher)
  474. self.heart.start()
  475. def _open_connection(self):
  476. """Establish the broker connection.
  477. Will retry establishing the connection if the
  478. :setting:`BROKER_CONNECTION_RETRY` setting is enabled
  479. """
  480. def _connection_error_handler(exc, interval):
  481. # Callback called for each retry when the connection
  482. # can't be established.
  483. self.logger.error("Consumer: Connection Error: %s. " % exc
  484. + "Trying again in %d seconds..." % interval)
  485. # remember that the connection is lazy, it won't establish
  486. # until it's needed.
  487. conn = self.app.broker_connection()
  488. if not self.app.conf.BROKER_CONNECTION_RETRY:
  489. # retry disabled, just call connect directly.
  490. conn.connect()
  491. return conn
  492. return conn.ensure_connection(_connection_error_handler,
  493. self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
  494. def stop(self):
  495. """Stop consuming.
  496. Does not close the broker connection, so be sure to call
  497. :meth:`close_connection` when you are finished with it.
  498. """
  499. # Notifies other threads that this instance can't be used
  500. # anymore.
  501. self._state = CLOSE
  502. self._debug("Stopping consumers...")
  503. self.stop_consumers(close_connection=False)
  504. @property
  505. def info(self):
  506. """Returns information about this consumer instance
  507. as a dict.
  508. This is also the consumer related info returned by
  509. ``celeryctl stats``.
  510. """
  511. conninfo = {}
  512. if self.connection:
  513. conninfo = self.connection.info()
  514. conninfo.pop("password", None) # don't send password.
  515. return {"broker": conninfo, "prefetch_count": self.qos.value}
  516. def _debug(self, msg, **kwargs):
  517. self.logger.debug("Consumer: %s" % (msg, ), **kwargs)