consumer.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. """
  2. This module contains the component responsible for consuming messages
  3. from the broker, processing the messages and keeping the broker connections
  4. up and running.
  5. * :meth:`~Consumer.start` is an infinite loop, which only iterates
  6. again if the connection is lost. For each iteration (at start, or if the
  7. connection is lost) it calls :meth:`~Consumer.reset_connection`,
  8. and starts the consumer by calling :meth:`~Consumer.consume_messages`.
  9. * :meth:`~Consumer.reset_connection`, clears the internal queues,
  10. establishes a new connection to the broker, sets up the task
  11. consumer (+ QoS), and the broadcast remote control command consumer.
  12. Also if events are enabled it configures the event dispatcher and starts
  13. up the hartbeat thread.
  14. * Finally it can consume messages. :meth:`~Consumer.consume_messages`
  15. is simply an infinite loop waiting for events on the AMQP channels.
  16. Both the task consumer and the broadcast consumer uses the same
  17. callback: :meth:`~Consumer.receive_message`.
  18. * So for each message received the :meth:`~Consumer.receive_message`
  19. method is called, this checks the payload of the message for either
  20. a ``task`` key or a ``control`` key.
  21. If the message is a task, it verifies the validity of the message
  22. converts it to a :class:`celery.worker.job.TaskRequest`, and sends
  23. it to :meth:`~Consumer.on_task`.
  24. If the message is a control command the message is passed to
  25. :meth:`~Consumer.on_control`, which in turn dispatches
  26. the control command using the control dispatcher.
  27. It also tries to handle malformed or invalid messages properly,
  28. so the worker doesn't choke on them and die. Any invalid messages
  29. are acknowledged immediately and logged, so the message is not resent
  30. again, and again.
  31. * If the task has an ETA/countdown, the task is moved to the ``eta_schedule``
  32. so the :class:`timer2.Timer` can schedule it at its
  33. deadline. Tasks without an eta are moved immediately to the ``ready_queue``,
  34. so they can be picked up by the :class:`~celery.worker.controllers.Mediator`
  35. to be sent to the pool.
  36. * When a task with an ETA is received the QoS prefetch count is also
  37. incremented, so another message can be reserved. When the ETA is met
  38. the prefetch count is decremented again, though this cannot happen
  39. immediately because amqplib doesn't support doing broker requests
  40. across threads. Instead the current prefetch count is kept as a
  41. shared counter, so as soon as :meth:`~Consumer.consume_messages`
  42. detects that the value has changed it will send out the actual
  43. QoS event to the broker.
  44. * Notice that when the connection is lost all internal queues are cleared
  45. because we can no longer ack the messages reserved in memory.
  46. Hoever, this is not dangerous as the broker will resend them
  47. to another worker when the channel is closed.
  48. * **WARNING**: :meth:`~Consumer.stop` does not close the connection!
  49. This is because some pre-acked messages may be in processing,
  50. and they need to be finished before the channel is closed.
  51. For celeryd this means the pool must finish the tasks it has acked
  52. early, *then* close the connection.
  53. """
  54. from __future__ import generators
  55. import socket
  56. import sys
  57. import warnings
  58. from celery.app import app_or_default
  59. from celery.datastructures import AttributeDict, SharedCounter
  60. from celery.events import EventDispatcher
  61. from celery.exceptions import NotRegistered
  62. from celery.utils import noop
  63. from celery.utils.timer2 import to_timestamp
  64. from celery.worker.job import TaskRequest, InvalidTaskError
  65. from celery.worker.control.registry import Panel
  66. from celery.worker.heartbeat import Heart
  67. RUN = 0x1
  68. CLOSE = 0x2
  69. class QoS(object):
  70. """Quality of Service for Channel.
  71. For thread-safe increment/decrement of a channels prefetch count value.
  72. :param consumer: A :class:`kombu.messaging.Consumer` instance.
  73. :param initial_value: Initial prefetch count value.
  74. :param logger: Logger used to log debug messages.
  75. """
  76. prev = None
  77. def __init__(self, consumer, initial_value, logger):
  78. self.consumer = consumer
  79. self.logger = logger
  80. self.value = SharedCounter(initial_value)
  81. def increment(self):
  82. """Increment the current prefetch count value by one."""
  83. if int(self.value):
  84. return self.set(self.value.increment())
  85. def decrement(self):
  86. """Decrement the current prefetch count value by one."""
  87. if int(self.value):
  88. return self.set(self.value.decrement())
  89. def decrement_eventually(self):
  90. """Decrement the value, but do not update the qos.
  91. The MainThread will be responsible for calling :meth:`update`
  92. when necessary.
  93. """
  94. self.value.decrement()
  95. def set(self, pcount):
  96. """Set channel prefetch_count setting."""
  97. self.logger.debug("basic.qos: prefetch_count->%s" % pcount)
  98. self.consumer.qos(prefetch_count=pcount)
  99. self.prev = pcount
  100. return pcount
  101. def update(self):
  102. """Update prefetch count with current value."""
  103. return self.set(self.next)
  104. @property
  105. def next(self):
  106. return int(self.value)
  107. class Consumer(object):
  108. """Listen for messages received from the broker and
  109. move them the the ready queue for task processing.
  110. :param ready_queue: See :attr:`ready_queue`.
  111. :param eta_schedule: See :attr:`eta_schedule`.
  112. .. attribute:: ready_queue
  113. The queue that holds tasks ready for immediate processing.
  114. .. attribute:: eta_schedule
  115. Scheduler for paused tasks. Reasons for being paused include
  116. a countdown/eta or that it's waiting for retry.
  117. .. attribute:: send_events
  118. Is events enabled?
  119. .. attribute:: init_callback
  120. Callback to be called the first time the connection is active.
  121. .. attribute:: hostname
  122. Current hostname. Defaults to the system hostname.
  123. .. attribute:: initial_prefetch_count
  124. Initial QoS prefetch count for the task channel.
  125. .. attribute:: control_dispatch
  126. Control command dispatcher.
  127. See :class:`celery.worker.control.ControlDispatch`.
  128. .. attribute:: event_dispatcher
  129. See :class:`celery.events.EventDispatcher`.
  130. .. attribute:: hart
  131. :class:`~celery.worker.heartbeat.Heart` sending out heart beats
  132. if events enabled.
  133. .. attribute:: logger
  134. The logger used.
  135. """
  136. _state = None
  137. def __init__(self, ready_queue, eta_schedule, logger,
  138. init_callback=noop, send_events=False, hostname=None,
  139. initial_prefetch_count=2, pool=None, queues=None, app=None):
  140. self.app = app_or_default(app)
  141. self.connection = None
  142. self.task_consumer = None
  143. self.ready_queue = ready_queue
  144. self.eta_schedule = eta_schedule
  145. self.send_events = send_events
  146. self.init_callback = init_callback
  147. self.logger = logger
  148. self.hostname = hostname or socket.gethostname()
  149. self.initial_prefetch_count = initial_prefetch_count
  150. self.event_dispatcher = None
  151. self.heart = None
  152. self.pool = pool
  153. pidbox_state = AttributeDict(app=self.app,
  154. logger=logger,
  155. hostname=self.hostname,
  156. listener=self, # pre 2.2
  157. consumer=self)
  158. self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
  159. state=pidbox_state,
  160. handlers=Panel.data)
  161. self.connection_errors = \
  162. self.app.broker_connection().connection_errors
  163. self.queues = queues
  164. def start(self):
  165. """Start the consumer.
  166. If the connection is lost, it tries to re-establish the connection
  167. and restarts consuming messages.
  168. """
  169. self.init_callback(self)
  170. while 1:
  171. self.reset_connection()
  172. try:
  173. self.consume_messages()
  174. except self.connection_errors:
  175. self.logger.error("Consumer: Connection to broker lost."
  176. + " Trying to re-establish connection...")
  177. def consume_messages(self):
  178. """Consume messages forever (or until an exception is raised)."""
  179. self.logger.debug("Consumer: Starting message consumer...")
  180. self.task_consumer.consume()
  181. self.broadcast_consumer.consume()
  182. wait_for_message = self._mainloop().next
  183. self.logger.debug("Consumer: Ready to accept tasks!")
  184. while 1:
  185. if self.qos.prev != self.qos.next:
  186. self.qos.update()
  187. wait_for_message()
  188. def on_task(self, task):
  189. """Handle received task.
  190. If the task has an ``eta`` we enter it into the ETA schedule,
  191. otherwise we move it the ready queue for immediate processing.
  192. """
  193. if task.revoked():
  194. return
  195. self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
  196. self.event_dispatcher.send("task-received", uuid=task.task_id,
  197. name=task.task_name, args=repr(task.args),
  198. kwargs=repr(task.kwargs), retries=task.retries,
  199. eta=task.eta and task.eta.isoformat(),
  200. expires=task.expires and task.expires.isoformat())
  201. if task.eta:
  202. try:
  203. eta = to_timestamp(task.eta)
  204. except OverflowError, exc:
  205. self.logger.error(
  206. "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
  207. task.eta, exc, task.info(safe=True)), exc_info=sys.exc_info())
  208. task.acknowledge()
  209. else:
  210. self.qos.increment()
  211. self.eta_schedule.apply_at(eta,
  212. self.apply_eta_task, (task, ))
  213. else:
  214. self.ready_queue.put(task)
  215. def on_control(self, message, message_data):
  216. try:
  217. self.pidbox_node.handle_message(message, message_data)
  218. except KeyError:
  219. self.logger.error("No such control command: %s" % command)
  220. def apply_eta_task(self, task):
  221. self.ready_queue.put(task)
  222. self.qos.decrement_eventually()
  223. def receive_message(self, message_data, message):
  224. """The callback called when a new message is received. """
  225. # Handle task
  226. if message_data.get("task"):
  227. try:
  228. task = TaskRequest.from_message(message, message_data,
  229. app=self.app,
  230. logger=self.logger,
  231. hostname=self.hostname,
  232. eventer=self.event_dispatcher)
  233. except NotRegistered, exc:
  234. self.logger.error("Unknown task ignored: %s: %s" % (
  235. str(exc), message_data), exc_info=sys.exc_info())
  236. message.ack()
  237. except InvalidTaskError, exc:
  238. self.logger.error("Invalid task ignored: %s: %s" % (
  239. str(exc), message_data), exc_info=sys.exc_info())
  240. message.ack()
  241. else:
  242. self.on_task(task)
  243. return
  244. warnings.warn(RuntimeWarning(
  245. "Received and deleted unknown message. Wrong destination?!? \
  246. the message was: %s" % message_data))
  247. message.ack()
  248. def maybe_conn_error(self, fun):
  249. try:
  250. fun()
  251. except self.connection_errors:
  252. pass
  253. def close_connection(self):
  254. self.logger.debug("Consumer: "
  255. "Closing consumer channel...")
  256. if self.task_consumer:
  257. self.task_consumer = \
  258. self.maybe_conn_error(self.task_consumer.close)
  259. self.logger.debug("Consumer: "
  260. "Closing connection to broker...")
  261. if self.connection:
  262. self.connection = self.maybe_conn_error(self.connection.close)
  263. def stop_consumers(self, close=True):
  264. """Stop consuming."""
  265. if not self._state == RUN:
  266. return
  267. self._state = CLOSE
  268. if self.heart:
  269. self.logger.debug("Heart: Going into cardiac arrest...")
  270. self.heart = self.heart.stop()
  271. self.logger.debug("TaskConsumer: Cancelling consumers...")
  272. if self.task_consumer:
  273. self.maybe_conn_error(self.task_consumer.cancel)
  274. if self.event_dispatcher:
  275. self.logger.debug("EventDispatcher: Shutting down...")
  276. self.event_dispatcher = \
  277. self.maybe_conn_error(self.event_dispatcher.close)
  278. if self.broadcast_consumer:
  279. self.broadcast_consumer.channel.close()
  280. if close:
  281. self.close_connection()
  282. def on_decode_error(self, message, exc):
  283. """Callback called if the message had decoding errors.
  284. :param message: The message with errors.
  285. :param exc: The original exception instance.
  286. """
  287. self.logger.critical("Message decoding error: %s "
  288. "(type:%s encoding:%s raw:'%s')" % (
  289. exc, message.content_type,
  290. message.content_encoding, message.body))
  291. message.ack()
  292. def reset_connection(self):
  293. """Re-establish connection and set up consumers."""
  294. self.logger.debug(
  295. "Consumer: Re-establishing connection to the broker...")
  296. self.stop_consumers()
  297. # Clear internal queues.
  298. self.ready_queue.clear()
  299. self.eta_schedule.clear()
  300. self.connection = self._open_connection()
  301. self.logger.debug("Consumer: Connection Established.")
  302. self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
  303. queues=self.queues)
  304. # QoS: Reset prefetch window.
  305. self.qos = QoS(self.task_consumer,
  306. self.initial_prefetch_count, self.logger)
  307. self.qos.update() # enable prefetch_count
  308. self.task_consumer.on_decode_error = self.on_decode_error
  309. self.task_consumer.register_callback(self.receive_message)
  310. self.pidbox_node.channel = self.connection.channel()
  311. self.broadcast_consumer = self.pidbox_node.listen(
  312. callback=self.on_control)
  313. # Flush events sent while connection was down.
  314. if self.event_dispatcher:
  315. self.event_dispatcher.flush()
  316. self.event_dispatcher = EventDispatcher(self.connection,
  317. app=self.app,
  318. hostname=self.hostname,
  319. enabled=self.send_events)
  320. self.restart_heartbeat()
  321. self._state = RUN
  322. def restart_heartbeat(self):
  323. self.heart = Heart(self.event_dispatcher)
  324. self.heart.start()
  325. def _mainloop(self):
  326. while 1:
  327. yield self.connection.drain_events()
  328. def _open_connection(self):
  329. """Open connection. May retry opening the connection if configuration
  330. allows that."""
  331. def _connection_error_handler(exc, interval):
  332. """Callback handler for connection errors."""
  333. self.logger.error("Consumer: Connection Error: %s. " % exc
  334. + "Trying again in %d seconds..." % interval)
  335. conn = self.app.broker_connection()
  336. if not self.app.conf.BROKER_CONNECTION_RETRY:
  337. conn.connect()
  338. return conn
  339. return conn.ensure_connection(_connection_error_handler,
  340. self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
  341. def stop(self):
  342. """Stop consuming.
  343. Does not close connection.
  344. """
  345. self.logger.debug("Consumer: Stopping consumers...")
  346. self.stop_consumers(close=False)
  347. @property
  348. def info(self):
  349. conninfo = {}
  350. if self.connection:
  351. conninfo = self.app.amqp.get_broker_info(self.connection)
  352. return {"broker": conninfo,
  353. "prefetch_count": self.qos.next}