consumer.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.consumer
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. This module contains the component responsible for consuming messages
  6. from the broker, processing the messages and keeping the broker connections
  7. up and running.
  8. :copyright: (c) 2009 - 2012 by Ask Solem.
  9. :license: BSD, see LICENSE for more details.
  10. * :meth:`~Consumer.start` is an infinite loop, which only iterates
  11. again if the connection is lost. For each iteration (at start, or if the
  12. connection is lost) it calls :meth:`~Consumer.reset_connection`,
  13. and starts the consumer by calling :meth:`~Consumer.consume_messages`.
  14. * :meth:`~Consumer.reset_connection`, clears the internal queues,
  15. establishes a new connection to the broker, sets up the task
  16. consumer (+ QoS), and the broadcast remote control command consumer.
  17. Also if events are enabled it configures the event dispatcher and starts
  18. up the heartbeat thread.
  19. * Finally it can consume messages. :meth:`~Consumer.consume_messages`
  20. is simply an infinite loop waiting for events on the AMQP channels.
  21. Both the task consumer and the broadcast consumer uses the same
  22. callback: :meth:`~Consumer.receive_message`.
  23. * So for each message received the :meth:`~Consumer.receive_message`
  24. method is called, this checks the payload of the message for either
  25. a `task` key or a `control` key.
  26. If the message is a task, it verifies the validity of the message
  27. converts it to a :class:`celery.worker.job.Request`, and sends
  28. it to :meth:`~Consumer.on_task`.
  29. If the message is a control command the message is passed to
  30. :meth:`~Consumer.on_control`, which in turn dispatches
  31. the control command using the control dispatcher.
  32. It also tries to handle malformed or invalid messages properly,
  33. so the worker doesn't choke on them and die. Any invalid messages
  34. are acknowledged immediately and logged, so the message is not resent
  35. again, and again.
  36. * If the task has an ETA/countdown, the task is moved to the `timer`
  37. so the :class:`timer2.Timer` can schedule it at its
  38. deadline. Tasks without an eta are moved immediately to the `ready_queue`,
  39. so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
  40. to be sent to the pool.
  41. * When a task with an ETA is received the QoS prefetch count is also
  42. incremented, so another message can be reserved. When the ETA is met
  43. the prefetch count is decremented again, though this cannot happen
  44. immediately because amqplib doesn't support doing broker requests
  45. across threads. Instead the current prefetch count is kept as a
  46. shared counter, so as soon as :meth:`~Consumer.consume_messages`
  47. detects that the value has changed it will send out the actual
  48. QoS event to the broker.
  49. * Notice that when the connection is lost all internal queues are cleared
  50. because we can no longer ack the messages reserved in memory.
  51. However, this is not dangerous as the broker will resend them
  52. to another worker when the channel is closed.
  53. * **WARNING**: :meth:`~Consumer.stop` does not close the connection!
  54. This is because some pre-acked messages may be in processing,
  55. and they need to be finished before the channel is closed.
  56. For celeryd this means the pool must finish the tasks it has acked
  57. early, *then* close the connection.
  58. """
  59. from __future__ import absolute_import
  60. from __future__ import with_statement
  61. import logging
  62. import socket
  63. import threading
  64. from time import sleep
  65. from Queue import Empty
  66. from billiard.exceptions import WorkerLostError
  67. from kombu.utils.encoding import safe_repr
  68. from celery.app import app_or_default
  69. from celery.datastructures import AttributeDict
  70. from celery.exceptions import InvalidTaskError, SystemTerminate
  71. from celery.utils import timer2
  72. from celery.utils.functional import noop
  73. from celery.utils.log import get_logger
  74. from . import state
  75. from .abstract import StartStopComponent
  76. from .control import Panel
  77. from .heartbeat import Heart
  78. from .hub import Hub
  79. RUN = 0x1
  80. CLOSE = 0x2
  81. #: Prefetch count can't exceed short.
  82. PREFETCH_COUNT_MAX = 0xFFFF
  83. UNKNOWN_FORMAT = """\
  84. Received and deleted unknown message. Wrong destination?!?
  85. The full contents of the message body was: %s
  86. """
  87. #: Error message for when an unregistered task is received.
  88. UNKNOWN_TASK_ERROR = """\
  89. Received unregistered task of type %s.
  90. The message has been ignored and discarded.
  91. Did you remember to import the module containing this task?
  92. Or maybe you are using relative imports?
  93. Please see http://bit.ly/gLye1c for more information.
  94. The full contents of the message body was:
  95. %s
  96. """
  97. #: Error message for when an invalid task message is received.
  98. INVALID_TASK_ERROR = """\
  99. Received invalid task message: %s
  100. The message has been ignored and discarded.
  101. Please ensure your message conforms to the task
  102. message protocol as described here: http://bit.ly/hYj41y
  103. The full contents of the message body was:
  104. %s
  105. """
  106. MESSAGE_REPORT_FMT = """\
  107. body: %s {content_type:%s content_encoding:%s delivery_info:%s}\
  108. """
  109. RETRY_CONNECTION = """\
  110. Consumer: Connection to broker lost. \
  111. Trying to re-establish the connection...\
  112. """
  113. logger = get_logger(__name__)
  114. info, warn, error, crit = (logger.info, logger.warn,
  115. logger.error, logger.critical)
  116. def debug(msg, *args, **kwargs):
  117. logger.debug("Consumer: %s" % (msg, ), *args, **kwargs)
  118. class Component(StartStopComponent):
  119. name = "worker.consumer"
  120. last = True
  121. def Consumer(self, w):
  122. return (w.consumer_cls or
  123. Consumer if w.use_eventloop else BlockingConsumer)
  124. def create(self, w):
  125. prefetch_count = w.concurrency * w.prefetch_multiplier
  126. c = w.consumer = self.instantiate(self.Consumer(w),
  127. w.ready_queue,
  128. hostname=w.hostname,
  129. send_events=w.send_events,
  130. init_callback=w.ready_callback,
  131. initial_prefetch_count=prefetch_count,
  132. pool=w.pool,
  133. timer=w.timer,
  134. app=w.app,
  135. controller=w,
  136. use_eventloop=w.use_eventloop)
  137. return c
  138. class QoS(object):
  139. """Quality of Service for Channel.
  140. For thread-safe increment/decrement of a channels prefetch count value.
  141. :param consumer: A :class:`kombu.messaging.Consumer` instance.
  142. :param initial_value: Initial prefetch count value.
  143. """
  144. prev = None
  145. def __init__(self, consumer, initial_value):
  146. self.consumer = consumer
  147. self._mutex = threading.RLock()
  148. self.value = initial_value
  149. def increment(self, n=1):
  150. """Increment the current prefetch count value by n."""
  151. with self._mutex:
  152. if self.value:
  153. new_value = self.value + max(n, 0)
  154. self.value = self.set(new_value)
  155. return self.value
  156. def _sub(self, n=1):
  157. assert self.value - n > 1
  158. self.value -= n
  159. def decrement(self, n=1):
  160. """Decrement the current prefetch count value by n."""
  161. with self._mutex:
  162. if self.value:
  163. self._sub(n)
  164. self.set(self.value)
  165. return self.value
  166. def decrement_eventually(self, n=1):
  167. """Decrement the value, but do not update the qos.
  168. The MainThread will be responsible for calling :meth:`update`
  169. when necessary.
  170. """
  171. with self._mutex:
  172. if self.value:
  173. self._sub(n)
  174. def set(self, pcount):
  175. """Set channel prefetch_count setting."""
  176. if pcount != self.prev:
  177. new_value = pcount
  178. if pcount > PREFETCH_COUNT_MAX:
  179. warn("QoS: Disabled: prefetch_count exceeds %r",
  180. PREFETCH_COUNT_MAX)
  181. new_value = 0
  182. debug("basic.qos: prefetch_count->%s", new_value)
  183. self.consumer.qos(prefetch_count=new_value)
  184. self.prev = pcount
  185. return pcount
  186. def update(self):
  187. """Update prefetch count with current value."""
  188. with self._mutex:
  189. return self.set(self.value)
  190. class Consumer(object):
  191. """Listen for messages received from the broker and
  192. move them to the ready queue for task processing.
  193. :param ready_queue: See :attr:`ready_queue`.
  194. :param timer: See :attr:`timer`.
  195. """
  196. #: The queue that holds tasks ready for immediate processing.
  197. ready_queue = None
  198. #: Enable/disable events.
  199. send_events = False
  200. #: Optional callback to be called when the connection is established.
  201. #: Will only be called once, even if the connection is lost and
  202. #: re-established.
  203. init_callback = None
  204. #: The current hostname. Defaults to the system hostname.
  205. hostname = None
  206. #: Initial QoS prefetch count for the task channel.
  207. initial_prefetch_count = 0
  208. #: A :class:`celery.events.EventDispatcher` for sending events.
  209. event_dispatcher = None
  210. #: The thread that sends event heartbeats at regular intervals.
  211. #: The heartbeats are used by monitors to detect that a worker
  212. #: went offline/disappeared.
  213. heart = None
  214. #: The broker connection.
  215. connection = None
  216. #: The consumer used to consume task messages.
  217. task_consumer = None
  218. #: The consumer used to consume broadcast commands.
  219. broadcast_consumer = None
  220. #: The process mailbox (kombu pidbox node).
  221. pidbox_node = None
  222. _pidbox_node_shutdown = None # used for greenlets
  223. _pidbox_node_stopped = None # used for greenlets
  224. #: The current worker pool instance.
  225. pool = None
  226. #: A timer used for high-priority internal tasks, such
  227. #: as sending heartbeats.
  228. timer = None
  229. # Consumer state, can be RUN or CLOSE.
  230. _state = None
  231. def __init__(self, ready_queue,
  232. init_callback=noop, send_events=False, hostname=None,
  233. initial_prefetch_count=2, pool=None, app=None,
  234. timer=None, controller=None, use_eventloop=False, **kwargs):
  235. self.app = app_or_default(app)
  236. self.connection = None
  237. self.task_consumer = None
  238. self.controller = controller
  239. self.broadcast_consumer = None
  240. self.ready_queue = ready_queue
  241. self.send_events = send_events
  242. self.init_callback = init_callback
  243. self.hostname = hostname or socket.gethostname()
  244. self.initial_prefetch_count = initial_prefetch_count
  245. self.event_dispatcher = None
  246. self.heart = None
  247. self.pool = pool
  248. self.timer = timer or timer2.default_timer
  249. self.use_eventloop = use_eventloop
  250. pidbox_state = AttributeDict(app=self.app,
  251. hostname=self.hostname,
  252. listener=self, # pre 2.2
  253. consumer=self)
  254. self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
  255. state=pidbox_state,
  256. handlers=Panel.data)
  257. conninfo = self.app.broker_connection()
  258. self.connection_errors = conninfo.connection_errors
  259. self.channel_errors = conninfo.channel_errors
  260. self._does_info = logger.isEnabledFor(logging.INFO)
  261. self.strategies = {}
  262. if self.use_eventloop:
  263. self.hub = Hub(self.timer)
  264. def update_strategies(self):
  265. S = self.strategies
  266. for task in self.app.tasks.itervalues():
  267. S[task.name] = task.start_strategy(self.app, self)
  268. def start(self):
  269. """Start the consumer.
  270. Automatically survives intermittent connection failure,
  271. and will retry establishing the connection and restart
  272. consuming messages.
  273. """
  274. self.init_callback(self)
  275. while self._state != CLOSE:
  276. self.maybe_shutdown()
  277. try:
  278. self.reset_connection()
  279. self.consume_messages()
  280. except self.connection_errors + self.channel_errors:
  281. error(RETRY_CONNECTION, exc_info=True)
  282. def consume_messages(self, sleep=sleep, min=min, Empty=Empty):
  283. """Consume messages forever (or until an exception is raised)."""
  284. with self.hub as hub:
  285. qos = self.qos
  286. concurrency = self.pool.num_processes
  287. update_qos = qos.update
  288. update_readers = hub.update_readers
  289. fdmap = hub.fdmap
  290. poll = hub.poller.poll
  291. fire_timers = hub.fire_timers
  292. scheduled = hub.timer._queue
  293. transport = self.connection.transport
  294. on_poll_start = transport.on_poll_start
  295. strategies = self.strategies
  296. buffer = []
  297. def flush_buffer():
  298. for name, body, message in buffer:
  299. try:
  300. strategies[name](message, body, message.ack_log_error)
  301. except KeyError, exc:
  302. self.handle_unknown_task(body, message, exc)
  303. except InvalidTaskError, exc:
  304. self.handle_invalid_task(body, message, exc)
  305. buffer[:] = []
  306. def on_task_received(body, message):
  307. try:
  308. name = body["task"]
  309. except (KeyError, TypeError):
  310. return self.handle_unknown_message(body, message)
  311. bufferlen = len(buffer)
  312. buffer.append((name, body, message))
  313. if bufferlen + 1 >= 4:
  314. flush_buffer()
  315. if bufferlen:
  316. fire_timers()
  317. if not self.pool.did_start_ok():
  318. raise WorkerLostError("Could not start worker processes")
  319. update_readers(self.connection.eventmap, self.pool.readers)
  320. for handler, interval in self.pool.timers.iteritems():
  321. self.timer.apply_interval(interval * 1000.0, handler)
  322. def on_process_started(w):
  323. hub.add(w._popen.sentinel, self.pool._pool.maintain_pool)
  324. self.pool.on_process_started = on_process_started
  325. def on_process_down(w):
  326. hub.remove(w._popen.sentinel)
  327. self.pool.on_process_down = on_process_down
  328. transport.on_poll_init(hub.poller)
  329. self.task_consumer.callbacks = [on_task_received]
  330. self.task_consumer.consume()
  331. debug("Ready to accept tasks!")
  332. while self._state != CLOSE and self.connection:
  333. # shutdown if signal handlers told us to.
  334. if state.should_stop:
  335. raise SystemExit()
  336. elif state.should_terminate:
  337. raise SystemTerminate()
  338. # fire any ready timers, this also determines
  339. # when we need to wake up next.
  340. time_to_sleep = fire_timers() if scheduled else 1
  341. if qos.prev != qos.value:
  342. update_qos()
  343. update_readers(on_poll_start())
  344. if fdmap:
  345. for timeout in (time_to_sleep, 0.001):
  346. for fileno, event in poll(timeout) or ():
  347. try:
  348. fdmap[fileno](fileno, event)
  349. except Empty:
  350. break
  351. except socket.error:
  352. if self._state != CLOSE: # pragma: no cover
  353. raise
  354. if buffer:
  355. flush_buffer()
  356. else:
  357. sleep(min(time_to_sleep, 0.1))
  358. def on_task(self, task):
  359. """Handle received task.
  360. If the task has an `eta` we enter it into the ETA schedule,
  361. otherwise we move it the ready queue for immediate processing.
  362. """
  363. if task.revoked():
  364. return
  365. if self._does_info:
  366. info("Got task from broker: %s", task.shortinfo())
  367. if self.event_dispatcher.enabled:
  368. self.event_dispatcher.send("task-received", uuid=task.id,
  369. name=task.name, args=safe_repr(task.args),
  370. kwargs=safe_repr(task.kwargs),
  371. retries=task.request_dict.get("retries", 0),
  372. eta=task.eta and task.eta.isoformat(),
  373. expires=task.expires and task.expires.isoformat())
  374. if task.eta:
  375. try:
  376. eta = timer2.to_timestamp(task.eta)
  377. except OverflowError, exc:
  378. error("Couldn't convert eta %s to timestamp: %r. Task: %r",
  379. task.eta, exc, task.info(safe=True), exc_info=True)
  380. task.acknowledge()
  381. else:
  382. self.qos.increment()
  383. self.timer.apply_at(eta, self.apply_eta_task, (task, ),
  384. priority=6)
  385. else:
  386. state.task_reserved(task)
  387. self.ready_queue.put(task)
  388. def on_control(self, body, message):
  389. """Process remote control command message."""
  390. try:
  391. self.pidbox_node.handle_message(body, message)
  392. except KeyError, exc:
  393. error("No such control command: %s", exc)
  394. except Exception, exc:
  395. error("Control command error: %r", exc, exc_info=True)
  396. self.reset_pidbox_node()
  397. def apply_eta_task(self, task):
  398. """Method called by the timer to apply a task with an
  399. ETA/countdown."""
  400. state.task_reserved(task)
  401. self.ready_queue.put(task)
  402. self.qos.decrement_eventually()
  403. def _message_report(self, body, message):
  404. return MESSAGE_REPORT_FMT % (safe_repr(body),
  405. safe_repr(message.content_type),
  406. safe_repr(message.content_encoding),
  407. safe_repr(message.delivery_info))
  408. def handle_unknown_message(self, body, message):
  409. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  410. message.reject_log_error(logger, self.connection_errors)
  411. def handle_unknown_task(self, body, message, exc):
  412. error(UNKNOWN_TASK_ERROR, exc, safe_repr(body), exc_info=True)
  413. message.reject_log_error(logger, self.connection_errors)
  414. def handle_invalid_task(self, body, message, exc):
  415. error(INVALID_TASK_ERROR, str(exc), safe_repr(body), exc_info=True)
  416. message.reject_log_error(logger, self.connection_errors)
  417. def receive_message(self, body, message):
  418. """Handles incoming messages.
  419. :param body: The message body.
  420. :param message: The kombu message object.
  421. """
  422. try:
  423. name = body["task"]
  424. except (KeyError, TypeError):
  425. return self.handle_unknown_message(body, message)
  426. try:
  427. self.strategies[name](message, body, message.ack_log_error)
  428. except KeyError, exc:
  429. self.handle_unknown_task(body, message, exc)
  430. except InvalidTaskError, exc:
  431. self.handle_invalid_task(body, message, exc)
  432. def maybe_conn_error(self, fun):
  433. """Applies function but ignores any connection or channel
  434. errors raised."""
  435. try:
  436. fun()
  437. except (AttributeError, ) + \
  438. self.connection_errors + \
  439. self.channel_errors:
  440. pass
  441. def close_connection(self):
  442. """Closes the current broker connection and all open channels."""
  443. # We must set self.connection to None here, so
  444. # that the green pidbox thread exits.
  445. connection, self.connection = self.connection, None
  446. if self.task_consumer:
  447. debug("Closing consumer channel...")
  448. self.task_consumer = \
  449. self.maybe_conn_error(self.task_consumer.close)
  450. self.stop_pidbox_node()
  451. if connection:
  452. debug("Closing broker connection...")
  453. self.maybe_conn_error(connection.close)
  454. def stop_consumers(self, close_connection=True):
  455. """Stop consuming tasks and broadcast commands, also stops
  456. the heartbeat thread and event dispatcher.
  457. :keyword close_connection: Set to False to skip closing the broker
  458. connection.
  459. """
  460. if not self._state == RUN:
  461. return
  462. if self.heart:
  463. # Stop the heartbeat thread if it's running.
  464. debug("Heart: Going into cardiac arrest...")
  465. self.heart = self.heart.stop()
  466. debug("Cancelling task consumer...")
  467. if self.task_consumer:
  468. self.maybe_conn_error(self.task_consumer.cancel)
  469. if self.event_dispatcher:
  470. debug("Shutting down event dispatcher...")
  471. self.event_dispatcher = \
  472. self.maybe_conn_error(self.event_dispatcher.close)
  473. debug("Cancelling broadcast consumer...")
  474. if self.broadcast_consumer:
  475. self.maybe_conn_error(self.broadcast_consumer.cancel)
  476. if close_connection:
  477. self.close_connection()
  478. def on_decode_error(self, message, exc):
  479. """Callback called if an error occurs while decoding
  480. a message received.
  481. Simply logs the error and acknowledges the message so it
  482. doesn't enter a loop.
  483. :param message: The message with errors.
  484. :param exc: The original exception instance.
  485. """
  486. crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
  487. exc, message.content_type, message.content_encoding,
  488. safe_repr(message.body))
  489. message.ack()
  490. def reset_pidbox_node(self):
  491. """Sets up the process mailbox."""
  492. self.stop_pidbox_node()
  493. # close previously opened channel if any.
  494. if self.pidbox_node.channel:
  495. try:
  496. self.pidbox_node.channel.close()
  497. except self.connection_errors + self.channel_errors:
  498. pass
  499. if self.pool is not None and self.pool.is_green:
  500. return self.pool.spawn_n(self._green_pidbox_node)
  501. self.pidbox_node.channel = self.connection.channel()
  502. self.broadcast_consumer = self.pidbox_node.listen(
  503. callback=self.on_control)
  504. self.broadcast_consumer.consume()
  505. def stop_pidbox_node(self):
  506. if self._pidbox_node_stopped:
  507. self._pidbox_node_shutdown.set()
  508. debug("Waiting for broadcast thread to shutdown...")
  509. self._pidbox_node_stopped.wait()
  510. self._pidbox_node_stopped = self._pidbox_node_shutdown = None
  511. elif self.broadcast_consumer:
  512. debug("Closing broadcast channel...")
  513. self.broadcast_consumer = \
  514. self.maybe_conn_error(self.broadcast_consumer.channel.close)
  515. def _green_pidbox_node(self):
  516. """Sets up the process mailbox when running in a greenlet
  517. environment."""
  518. # THIS CODE IS TERRIBLE
  519. # Luckily work has already started rewriting the Consumer for 3.0.
  520. self._pidbox_node_shutdown = threading.Event()
  521. self._pidbox_node_stopped = threading.Event()
  522. try:
  523. with self._open_connection() as conn:
  524. self.pidbox_node.channel = conn.default_channel
  525. self.broadcast_consumer = self.pidbox_node.listen(
  526. callback=self.on_control)
  527. with self.broadcast_consumer:
  528. while not self._pidbox_node_shutdown.isSet():
  529. try:
  530. conn.drain_events(timeout=1.0)
  531. except socket.timeout:
  532. pass
  533. finally:
  534. self._pidbox_node_stopped.set()
  535. def reset_connection(self):
  536. """Re-establish the broker connection and set up consumers,
  537. heartbeat and the event dispatcher."""
  538. debug("Re-establishing connection to the broker...")
  539. self.stop_consumers()
  540. # Clear internal queues to get rid of old messages.
  541. # They can't be acked anyway, as a delivery tag is specific
  542. # to the current channel.
  543. self.ready_queue.clear()
  544. self.timer.clear()
  545. # Re-establish the broker connection and setup the task consumer.
  546. self.connection = self._open_connection()
  547. debug("Connection established.")
  548. self.task_consumer = self.app.amqp.TaskConsumer(self.connection,
  549. on_decode_error=self.on_decode_error)
  550. # QoS: Reset prefetch window.
  551. self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
  552. self.qos.update()
  553. # Setup the process mailbox.
  554. self.reset_pidbox_node()
  555. # Flush events sent while connection was down.
  556. prev_event_dispatcher = self.event_dispatcher
  557. self.event_dispatcher = self.app.events.Dispatcher(self.connection,
  558. hostname=self.hostname,
  559. enabled=self.send_events)
  560. if prev_event_dispatcher:
  561. self.event_dispatcher.copy_buffer(prev_event_dispatcher)
  562. self.event_dispatcher.flush()
  563. # Restart heartbeat thread.
  564. self.restart_heartbeat()
  565. # reload all task's execution strategies.
  566. self.update_strategies()
  567. # We're back!
  568. self._state = RUN
  569. def restart_heartbeat(self):
  570. """Restart the heartbeat thread.
  571. This thread sends heartbeat events at intervals so monitors
  572. can tell if the worker is off-line/missing.
  573. """
  574. self.heart = Heart(self.timer, self.event_dispatcher)
  575. self.heart.start()
  576. def _open_connection(self):
  577. """Establish the broker connection.
  578. Will retry establishing the connection if the
  579. :setting:`BROKER_CONNECTION_RETRY` setting is enabled
  580. """
  581. # Callback called for each retry while the connection
  582. # can't be established.
  583. def _error_handler(exc, interval):
  584. error("Consumer: Connection Error: %s. "
  585. "Trying again in %d seconds...", exc, interval)
  586. # remember that the connection is lazy, it won't establish
  587. # until it's needed.
  588. conn = self.app.broker_connection()
  589. if not self.app.conf.BROKER_CONNECTION_RETRY:
  590. # retry disabled, just call connect directly.
  591. conn.connect()
  592. return conn
  593. return conn.ensure_connection(_error_handler,
  594. self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
  595. callback=self.maybe_shutdown)
  596. def stop(self):
  597. """Stop consuming.
  598. Does not close the broker connection, so be sure to call
  599. :meth:`close_connection` when you are finished with it.
  600. """
  601. # Notifies other threads that this instance can't be used
  602. # anymore.
  603. self.close()
  604. debug("Stopping consumers...")
  605. self.stop_consumers(close_connection=False)
  606. def close(self):
  607. self._state = CLOSE
  608. def maybe_shutdown(self):
  609. if state.should_stop:
  610. raise SystemExit()
  611. elif state.should_terminate:
  612. raise SystemTerminate()
  613. @property
  614. def info(self):
  615. """Returns information about this consumer instance
  616. as a dict.
  617. This is also the consumer related info returned by
  618. ``celeryctl stats``.
  619. """
  620. conninfo = {}
  621. if self.connection:
  622. conninfo = self.connection.info()
  623. conninfo.pop("password", None) # don't send password.
  624. return {"broker": conninfo, "prefetch_count": self.qos.value}
  625. class BlockingConsumer(Consumer):
  626. def consume_messages(self):
  627. # receive_message handles incoming messages.
  628. self.task_consumer.register_callback(self.receive_message)
  629. self.task_consumer.consume()
  630. debug("Ready to accept tasks!")
  631. while self._state != CLOSE and self.connection:
  632. self.maybe_shutdown()
  633. if self.qos.prev != self.qos.value: # pragma: no cover
  634. self.qos.update()
  635. try:
  636. self.connection.drain_events(timeout=10.0)
  637. except socket.timeout:
  638. pass
  639. except socket.error:
  640. if self._state != CLOSE: # pragma: no cover
  641. raise