consumer.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.consumer
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. This module contains the component responsible for consuming messages
  6. from the broker, processing the messages and keeping the broker connections
  7. up and running.
  8. * :meth:`~Consumer.start` is an infinite loop, which only iterates
  9. again if the connection is lost. For each iteration (at start, or if the
  10. connection is lost) it calls :meth:`~Consumer.reset_connection`,
  11. and starts the consumer by calling :meth:`~Consumer.consume_messages`.
  12. * :meth:`~Consumer.reset_connection`, clears the internal queues,
  13. establishes a new connection to the broker, sets up the task
  14. consumer (+ QoS), and the broadcast remote control command consumer.
  15. Also if events are enabled it configures the event dispatcher and starts
  16. up the heartbeat thread.
  17. * Finally it can consume messages. :meth:`~Consumer.consume_messages`
  18. is simply an infinite loop waiting for events on the AMQP channels.
  19. Both the task consumer and the broadcast consumer uses the same
  20. callback: :meth:`~Consumer.receive_message`.
  21. * So for each message received the :meth:`~Consumer.receive_message`
  22. method is called, this checks the payload of the message for either
  23. a `task` key or a `control` key.
  24. If the message is a task, it verifies the validity of the message
  25. converts it to a :class:`celery.worker.job.Request`, and sends
  26. it to :meth:`~Consumer.on_task`.
  27. If the message is a control command the message is passed to
  28. :meth:`~Consumer.on_control`, which in turn dispatches
  29. the control command using the control dispatcher.
  30. It also tries to handle malformed or invalid messages properly,
  31. so the worker doesn't choke on them and die. Any invalid messages
  32. are acknowledged immediately and logged, so the message is not resent
  33. again, and again.
  34. * If the task has an ETA/countdown, the task is moved to the `timer`
  35. so the :class:`timer2.Timer` can schedule it at its
  36. deadline. Tasks without an eta are moved immediately to the `ready_queue`,
  37. so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
  38. to be sent to the pool.
  39. * When a task with an ETA is received the QoS prefetch count is also
  40. incremented, so another message can be reserved. When the ETA is met
  41. the prefetch count is decremented again, though this cannot happen
  42. immediately because most broker clients don't support doing broker
  43. requests across threads. Instead the current prefetch count is kept as a
  44. shared counter, so as soon as :meth:`~Consumer.consume_messages`
  45. detects that the value has changed it will send out the actual
  46. QoS event to the broker.
  47. * Notice that when the connection is lost all internal queues are cleared
  48. because we can no longer ack the messages reserved in memory.
  49. However, this is not dangerous as the broker will resend them
  50. to another worker when the channel is closed.
  51. * **WARNING**: :meth:`~Consumer.stop` does not close the connection!
  52. This is because some pre-acked messages may be in processing,
  53. and they need to be finished before the channel is closed.
  54. For celeryd this means the pool must finish the tasks it has acked
  55. early, *then* close the connection.
  56. """
  57. from __future__ import absolute_import
  58. from __future__ import with_statement
  59. import logging
  60. import socket
  61. import threading
  62. from time import sleep
  63. from Queue import Empty
  64. from kombu.syn import _detect_environment
  65. from kombu.utils.encoding import safe_repr, safe_str
  66. from kombu.utils.eventio import READ, WRITE, ERR
  67. from celery.app import app_or_default
  68. from celery.datastructures import AttributeDict
  69. from celery.exceptions import InvalidTaskError, SystemTerminate
  70. from celery.task.trace import build_tracer
  71. from celery.utils import text
  72. from celery.utils import timer2
  73. from celery.utils.functional import noop
  74. from celery.utils.log import get_logger
  75. from celery.utils.timer2 import to_timestamp
  76. from celery.utils.timeutils import humanize_seconds, timezone
  77. from . import state
  78. from .bootsteps import StartStopComponent
  79. from .control import Panel
  80. from .heartbeat import Heart
  81. RUN = 0x1
  82. CLOSE = 0x2
  83. #: Prefetch count can't exceed short.
  84. PREFETCH_COUNT_MAX = 0xFFFF
  85. UNKNOWN_FORMAT = """\
  86. Received and deleted unknown message. Wrong destination?!?
  87. The full contents of the message body was: %s
  88. """
  89. #: Error message for when an unregistered task is received.
  90. UNKNOWN_TASK_ERROR = """\
  91. Received unregistered task of type %s.
  92. The message has been ignored and discarded.
  93. Did you remember to import the module containing this task?
  94. Or maybe you are using relative imports?
  95. More: http://docs.celeryq.org/en/latest/userguide/tasks.html#names
  96. The full contents of the message body was:
  97. %s
  98. """
  99. #: Error message for when an invalid task message is received.
  100. INVALID_TASK_ERROR = """\
  101. Received invalid task message: %s
  102. The message has been ignored and discarded.
  103. Please ensure your message conforms to the task message format:
  104. http://docs.celeryq.org/en/latest/internals/protocol.html
  105. The full contents of the message body was:
  106. %s
  107. """
  108. MESSAGE_REPORT = """\
  109. body: %s {content_type:%s content_encoding:%s delivery_info:%s}\
  110. """
  111. RETRY_CONNECTION = """\
  112. consumer: Connection to broker lost. \
  113. Trying to re-establish the connection...\
  114. """
  115. CONNECTION_ERROR = """\
  116. consumer: Cannot connect to %s: %s.
  117. %s
  118. """
  119. CONNECTION_RETRY = """\
  120. Trying again %(when)s...\
  121. """
  122. CONNECTION_FAILOVER = """\
  123. Will retry using next failover.\
  124. """
  125. task_reserved = state.task_reserved
  126. logger = get_logger(__name__)
  127. info, warn, error, crit = (logger.info, logger.warning,
  128. logger.error, logger.critical)
  129. def debug(msg, *args, **kwargs):
  130. logger.debug('consumer: %s' % (msg, ), *args, **kwargs)
  131. def dump_body(m, body):
  132. return "%s (%sb)" % (text.truncate(safe_repr(body), 1024), len(m.body))
  133. class Component(StartStopComponent):
  134. name = 'worker.consumer'
  135. last = True
  136. def Consumer(self, w):
  137. return (w.consumer_cls or
  138. Consumer if w.hub else BlockingConsumer)
  139. def create(self, w):
  140. prefetch_count = w.concurrency * w.prefetch_multiplier
  141. c = w.consumer = self.instantiate(
  142. self.Consumer(w),
  143. w.ready_queue,
  144. hostname=w.hostname,
  145. send_events=w.send_events,
  146. init_callback=w.ready_callback,
  147. initial_prefetch_count=prefetch_count,
  148. pool=w.pool,
  149. timer=w.timer,
  150. app=w.app,
  151. controller=w,
  152. hub=w.hub,
  153. )
  154. return c
  155. class QoS(object):
  156. """Thread safe increment/decrement of a channels prefetch_count.
  157. :param consumer: A :class:`kombu.messaging.Consumer` instance.
  158. :param initial_value: Initial prefetch count value.
  159. """
  160. prev = None
  161. def __init__(self, consumer, initial_value):
  162. self.consumer = consumer
  163. self._mutex = threading.RLock()
  164. self.value = initial_value or 0
  165. def increment_eventually(self, n=1):
  166. """Increment the value, but do not update the channels QoS.
  167. The MainThread will be responsible for calling :meth:`update`
  168. when necessary.
  169. """
  170. with self._mutex:
  171. if self.value:
  172. self.value = self.value + max(n, 0)
  173. return self.value
  174. def decrement_eventually(self, n=1):
  175. """Decrement the value, but do not update the channels QoS.
  176. The MainThread will be responsible for calling :meth:`update`
  177. when necessary.
  178. """
  179. with self._mutex:
  180. if self.value:
  181. self.value -= n
  182. return self.value
  183. def set(self, pcount):
  184. """Set channel prefetch_count setting."""
  185. if pcount != self.prev:
  186. new_value = pcount
  187. if pcount > PREFETCH_COUNT_MAX:
  188. warn('QoS: Disabled: prefetch_count exceeds %r',
  189. PREFETCH_COUNT_MAX)
  190. new_value = 0
  191. debug('basic.qos: prefetch_count->%s', new_value)
  192. self.consumer.qos(prefetch_count=new_value)
  193. self.prev = pcount
  194. return pcount
  195. def update(self):
  196. """Update prefetch count with current value."""
  197. with self._mutex:
  198. return self.set(self.value)
  199. class Consumer(object):
  200. """Listen for messages received from the broker and
  201. move them to the ready queue for task processing.
  202. :param ready_queue: See :attr:`ready_queue`.
  203. :param timer: See :attr:`timer`.
  204. """
  205. #: The queue that holds tasks ready for immediate processing.
  206. ready_queue = None
  207. #: Enable/disable events.
  208. send_events = False
  209. #: Optional callback to be called when the connection is established.
  210. #: Will only be called once, even if the connection is lost and
  211. #: re-established.
  212. init_callback = None
  213. #: The current hostname. Defaults to the system hostname.
  214. hostname = None
  215. #: Initial QoS prefetch count for the task channel.
  216. initial_prefetch_count = 0
  217. #: A :class:`celery.events.EventDispatcher` for sending events.
  218. event_dispatcher = None
  219. #: The thread that sends event heartbeats at regular intervals.
  220. #: The heartbeats are used by monitors to detect that a worker
  221. #: went offline/disappeared.
  222. heart = None
  223. #: The broker connection.
  224. connection = None
  225. #: The consumer used to consume task messages.
  226. task_consumer = None
  227. #: The consumer used to consume broadcast commands.
  228. broadcast_consumer = None
  229. #: The process mailbox (kombu pidbox node).
  230. pidbox_node = None
  231. _pidbox_node_shutdown = None # used for greenlets
  232. _pidbox_node_stopped = None # used for greenlets
  233. #: The current worker pool instance.
  234. pool = None
  235. #: A timer used for high-priority internal tasks, such
  236. #: as sending heartbeats.
  237. timer = None
  238. # Consumer state, can be RUN or CLOSE.
  239. _state = None
  240. restart_count = -1 # first start is the same as a restart
  241. def __init__(self, ready_queue,
  242. init_callback=noop, send_events=False, hostname=None,
  243. initial_prefetch_count=2, pool=None, app=None,
  244. timer=None, controller=None, hub=None, amqheartbeat=None,
  245. **kwargs):
  246. self.app = app_or_default(app)
  247. self.connection = None
  248. self.task_consumer = None
  249. self.controller = controller
  250. self.broadcast_consumer = None
  251. self.ready_queue = ready_queue
  252. self.send_events = send_events
  253. self.init_callback = init_callback
  254. self.hostname = hostname or socket.gethostname()
  255. self.initial_prefetch_count = initial_prefetch_count
  256. self.event_dispatcher = None
  257. self.heart = None
  258. self.pool = pool
  259. self.timer = timer or timer2.default_timer
  260. pidbox_state = AttributeDict(app=self.app,
  261. hostname=self.hostname,
  262. listener=self, # pre 2.2
  263. consumer=self)
  264. self.pidbox_node = self.app.control.mailbox.Node(
  265. safe_str(self.hostname), state=pidbox_state, handlers=Panel.data,
  266. )
  267. conninfo = self.app.connection()
  268. self.connection_errors = conninfo.connection_errors
  269. self.channel_errors = conninfo.channel_errors
  270. self._does_info = logger.isEnabledFor(logging.INFO)
  271. self.strategies = {}
  272. if hub:
  273. hub.on_init.append(self.on_poll_init)
  274. self.hub = hub
  275. self._quick_put = self.ready_queue.put
  276. self.amqheartbeat = amqheartbeat
  277. if self.amqheartbeat is None:
  278. self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
  279. if not hub:
  280. self.amqheartbeat = 0
  281. if _detect_environment() == 'gevent':
  282. # there's a gevent bug that causes timeouts to not be reset,
  283. # so if the connection timeout is exceeded once, it can NEVER
  284. # connect again.
  285. self.app.conf.BROKER_CONNECTION_TIMEOUT = None
  286. def update_strategies(self):
  287. S = self.strategies
  288. app = self.app
  289. loader = app.loader
  290. hostname = self.hostname
  291. for name, task in self.app.tasks.iteritems():
  292. S[name] = task.start_strategy(app, self)
  293. task.__trace__ = build_tracer(name, task, loader, hostname)
  294. def start(self):
  295. """Start the consumer.
  296. Automatically survives intermittent connection failure,
  297. and will retry establishing the connection and restart
  298. consuming messages.
  299. """
  300. self.init_callback(self)
  301. while self._state != CLOSE:
  302. self.restart_count += 1
  303. self.maybe_shutdown()
  304. try:
  305. self.reset_connection()
  306. self.consume_messages()
  307. except self.connection_errors + self.channel_errors:
  308. error(RETRY_CONNECTION, exc_info=True)
  309. def on_poll_init(self, hub):
  310. hub.update_readers(self.connection.eventmap)
  311. self.connection.transport.on_poll_init(hub.poller)
  312. def consume_messages(self, sleep=sleep, min=min, Empty=Empty):
  313. """Consume messages forever (or until an exception is raised)."""
  314. hbrate = self.app.conf.BROKER_HEARTBEAT_CHECKRATE
  315. with self.hub as hub:
  316. qos = self.qos
  317. update_qos = qos.update
  318. update_readers = hub.update_readers
  319. readers, writers = hub.readers, hub.writers
  320. poll = hub.poller.poll
  321. fire_timers = hub.fire_timers
  322. scheduled = hub.timer._queue
  323. connection = self.connection
  324. hb = self.amqheartbeat
  325. hbtick = connection.heartbeat_check
  326. on_poll_start = connection.transport.on_poll_start
  327. on_poll_empty = connection.transport.on_poll_empty
  328. strategies = self.strategies
  329. drain_nowait = connection.drain_nowait
  330. on_task_callbacks = hub.on_task
  331. keep_draining = connection.transport.nb_keep_draining
  332. errors = connection.connection_errors
  333. if hb and connection.supports_heartbeats:
  334. hub.timer.apply_interval(
  335. hb * 1000.0 / hbrate, hbtick, (hbrate, ))
  336. def on_task_received(body, message):
  337. if on_task_callbacks:
  338. [callback() for callback in on_task_callbacks]
  339. try:
  340. name = body['task']
  341. except (KeyError, TypeError):
  342. return self.handle_unknown_message(body, message)
  343. try:
  344. strategies[name](message, body, message.ack_log_error)
  345. except KeyError, exc:
  346. self.handle_unknown_task(body, message, exc)
  347. except InvalidTaskError, exc:
  348. self.handle_invalid_task(body, message, exc)
  349. self.task_consumer.callbacks = [on_task_received]
  350. self.task_consumer.consume()
  351. debug('Ready to accept tasks!')
  352. while self._state != CLOSE and self.connection:
  353. # shutdown if signal handlers told us to.
  354. if state.should_stop:
  355. raise SystemExit()
  356. elif state.should_terminate:
  357. raise SystemTerminate()
  358. # fire any ready timers, this also returns
  359. # the number of seconds until we need to fire timers again.
  360. poll_timeout = (fire_timers(propagate=errors) if scheduled
  361. else 1)
  362. # We only update QoS when there is no more messages to read.
  363. # This groups together qos calls, and makes sure that remote
  364. # control commands will be prioritized over task messages.
  365. if qos.prev != qos.value:
  366. update_qos()
  367. update_readers(on_poll_start())
  368. if readers or writers:
  369. connection.more_to_read = True
  370. while connection.more_to_read:
  371. try:
  372. events = poll(poll_timeout)
  373. except ValueError: # Issue 882
  374. return
  375. if not events:
  376. on_poll_empty()
  377. for fileno, event in events or ():
  378. try:
  379. if event & READ:
  380. readers[fileno](fileno, event)
  381. if event & WRITE:
  382. writers[fileno](fileno, event)
  383. if event & ERR:
  384. for handlermap in readers, writers:
  385. try:
  386. handlermap[fileno](fileno, event)
  387. except KeyError:
  388. pass
  389. except (KeyError, Empty):
  390. continue
  391. except socket.error:
  392. if self._state != CLOSE: # pragma: no cover
  393. raise
  394. if keep_draining:
  395. drain_nowait()
  396. poll_timeout = 0
  397. else:
  398. connection.more_to_read = False
  399. else:
  400. # no sockets yet, startup is probably not done.
  401. sleep(min(poll_timeout, 0.1))
  402. def on_task(self, task, task_reserved=task_reserved,
  403. to_system_tz=timezone.to_system):
  404. """Handle received task.
  405. If the task has an `eta` we enter it into the ETA schedule,
  406. otherwise we move it the ready queue for immediate processing.
  407. """
  408. if task.revoked():
  409. return
  410. if self._does_info:
  411. info('Got task from broker: %s', task)
  412. if self.event_dispatcher.enabled:
  413. self.event_dispatcher.send(
  414. 'task-received',
  415. uuid=task.id, name=task.name,
  416. args=safe_repr(task.args), kwargs=safe_repr(task.kwargs),
  417. retries=task.request_dict.get('retries', 0),
  418. eta=task.eta and task.eta.isoformat(),
  419. expires=task.expires and task.expires.isoformat(),
  420. )
  421. if task.eta:
  422. try:
  423. if task.utc:
  424. eta = to_timestamp(to_system_tz(task.eta))
  425. else:
  426. eta = to_timestamp(task.eta, timezone.local)
  427. except OverflowError, exc:
  428. error("Couldn't convert eta %s to timestamp: %r. Task: %r",
  429. task.eta, exc, task.info(safe=True), exc_info=True)
  430. task.acknowledge()
  431. else:
  432. self.qos.increment_eventually()
  433. self.timer.apply_at(
  434. eta, self.apply_eta_task, (task, ), priority=6,
  435. )
  436. else:
  437. task_reserved(task)
  438. self._quick_put(task)
  439. def on_control(self, body, message):
  440. """Process remote control command message."""
  441. try:
  442. self.pidbox_node.handle_message(body, message)
  443. except KeyError, exc:
  444. error('No such control command: %s', exc)
  445. except Exception, exc:
  446. error('Control command error: %r', exc, exc_info=True)
  447. self.reset_pidbox_node()
  448. def apply_eta_task(self, task):
  449. """Method called by the timer to apply a task with an
  450. ETA/countdown."""
  451. task_reserved(task)
  452. self._quick_put(task)
  453. self.qos.decrement_eventually()
  454. def _message_report(self, body, message):
  455. return MESSAGE_REPORT % (dump_body(message, body),
  456. safe_repr(message.content_type),
  457. safe_repr(message.content_encoding),
  458. safe_repr(message.delivery_info))
  459. def handle_unknown_message(self, body, message):
  460. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  461. message.reject_log_error(logger, self.connection_errors)
  462. def handle_unknown_task(self, body, message, exc):
  463. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  464. message.reject_log_error(logger, self.connection_errors)
  465. def handle_invalid_task(self, body, message, exc):
  466. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  467. message.reject_log_error(logger, self.connection_errors)
  468. def receive_message(self, body, message):
  469. """Handles incoming messages.
  470. :param body: The message body.
  471. :param message: The kombu message object.
  472. """
  473. try:
  474. name = body['task']
  475. except (KeyError, TypeError):
  476. return self.handle_unknown_message(body, message)
  477. try:
  478. self.strategies[name](message, body, message.ack_log_error)
  479. except KeyError, exc:
  480. self.handle_unknown_task(body, message, exc)
  481. except InvalidTaskError, exc:
  482. self.handle_invalid_task(body, message, exc)
  483. def maybe_conn_error(self, fun):
  484. """Applies function but ignores any connection or channel
  485. errors raised."""
  486. try:
  487. fun()
  488. except (AttributeError, ) + \
  489. self.connection_errors + \
  490. self.channel_errors:
  491. pass
  492. def close_connection(self):
  493. """Closes the current broker connection and all open channels."""
  494. # We must set self.connection to None here, so
  495. # that the green pidbox thread exits.
  496. connection, self.connection = self.connection, None
  497. if self.task_consumer:
  498. debug('Closing consumer channel...')
  499. self.task_consumer = self.maybe_conn_error(
  500. self.task_consumer.close)
  501. self.stop_pidbox_node()
  502. if connection:
  503. debug('Closing broker connection...')
  504. self.maybe_conn_error(connection.close)
  505. def stop_consumers(self, close_connection=True, join=True):
  506. """Stop consuming tasks and broadcast commands, also stops
  507. the heartbeat thread and event dispatcher.
  508. :keyword close_connection: Set to False to skip closing the broker
  509. connection.
  510. """
  511. if not self._state == RUN:
  512. return
  513. if self.heart:
  514. # Stop the heartbeat thread if it's running.
  515. debug('Heart: Going into cardiac arrest...')
  516. self.heart = self.heart.stop()
  517. debug('Cancelling task consumer...')
  518. if join and self.task_consumer:
  519. self.maybe_conn_error(self.task_consumer.cancel)
  520. if self.event_dispatcher:
  521. debug('Shutting down event dispatcher...')
  522. self.event_dispatcher = self.maybe_conn_error(
  523. self.event_dispatcher.close)
  524. debug('Cancelling broadcast consumer...')
  525. if join and self.broadcast_consumer:
  526. self.maybe_conn_error(self.broadcast_consumer.cancel)
  527. if close_connection:
  528. self.close_connection()
  529. def on_decode_error(self, message, exc):
  530. """Callback called if an error occurs while decoding
  531. a message received.
  532. Simply logs the error and acknowledges the message so it
  533. doesn't enter a loop.
  534. :param message: The message with errors.
  535. :param exc: The original exception instance.
  536. """
  537. crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
  538. exc, message.content_type, message.content_encoding,
  539. dump_body(message, message.body), exc_info=1)
  540. message.ack()
  541. def reset_pidbox_node(self):
  542. """Sets up the process mailbox."""
  543. self.stop_pidbox_node()
  544. # close previously opened channel if any.
  545. if self.pidbox_node.channel:
  546. try:
  547. self.pidbox_node.channel.close()
  548. except self.connection_errors + self.channel_errors:
  549. pass
  550. if self.pool is not None and self.pool.is_green:
  551. return self.pool.spawn_n(self._green_pidbox_node)
  552. self.pidbox_node.channel = self.connection.channel()
  553. self.broadcast_consumer = self.pidbox_node.listen(
  554. callback=self.on_control,
  555. )
  556. def stop_pidbox_node(self):
  557. if self._pidbox_node_stopped:
  558. self._pidbox_node_shutdown.set()
  559. debug('Waiting for broadcast thread to shutdown...')
  560. self._pidbox_node_stopped.wait()
  561. self._pidbox_node_stopped = self._pidbox_node_shutdown = None
  562. elif self.broadcast_consumer:
  563. debug('Closing broadcast channel...')
  564. self.broadcast_consumer = \
  565. self.maybe_conn_error(self.broadcast_consumer.channel.close)
  566. def _green_pidbox_node(self):
  567. """Sets up the process mailbox when running in a greenlet
  568. environment."""
  569. # THIS CODE IS TERRIBLE
  570. # Luckily work has already started rewriting the Consumer for 4.0.
  571. self._pidbox_node_shutdown = threading.Event()
  572. self._pidbox_node_stopped = threading.Event()
  573. try:
  574. with self._open_connection() as conn:
  575. info('pidbox: Connected to %s.', conn.as_uri())
  576. self.pidbox_node.channel = conn.default_channel
  577. self.broadcast_consumer = self.pidbox_node.listen(
  578. callback=self.on_control,
  579. )
  580. with self.broadcast_consumer:
  581. while not self._pidbox_node_shutdown.isSet():
  582. try:
  583. conn.drain_events(timeout=1.0)
  584. except socket.timeout:
  585. pass
  586. finally:
  587. self._pidbox_node_stopped.set()
  588. def reset_connection(self):
  589. """Re-establish the broker connection and set up consumers,
  590. heartbeat and the event dispatcher."""
  591. debug('Re-establishing connection to the broker...')
  592. self.stop_consumers(join=False)
  593. # Clear internal queues to get rid of old messages.
  594. # They can't be acked anyway, as a delivery tag is specific
  595. # to the current channel.
  596. self.ready_queue.clear()
  597. self.timer.clear()
  598. # Re-establish the broker connection and setup the task consumer.
  599. self.connection = self._open_connection()
  600. info('consumer: Connected to %s.', self.connection.as_uri())
  601. self.task_consumer = self.app.amqp.TaskConsumer(
  602. self.connection, on_decode_error=self.on_decode_error,
  603. )
  604. # QoS: Reset prefetch window.
  605. self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
  606. self.qos.update()
  607. # Setup the process mailbox.
  608. self.reset_pidbox_node()
  609. # Flush events sent while connection was down.
  610. prev_event_dispatcher = self.event_dispatcher
  611. self.event_dispatcher = self.app.events.Dispatcher(
  612. self.connection, hostname=self.hostname, enabled=self.send_events,
  613. )
  614. if prev_event_dispatcher:
  615. self.event_dispatcher.copy_buffer(prev_event_dispatcher)
  616. self.event_dispatcher.flush()
  617. # Restart heartbeat thread.
  618. self.restart_heartbeat()
  619. # reload all task's execution strategies.
  620. self.update_strategies()
  621. # We're back!
  622. self._state = RUN
  623. def restart_heartbeat(self):
  624. """Restart the heartbeat thread.
  625. This thread sends heartbeat events at intervals so monitors
  626. can tell if the worker is off-line/missing.
  627. """
  628. self.heart = Heart(self.timer, self.event_dispatcher)
  629. self.heart.start()
  630. def _open_connection(self):
  631. """Establish the broker connection.
  632. Will retry establishing the connection if the
  633. :setting:`BROKER_CONNECTION_RETRY` setting is enabled
  634. """
  635. conn = self.app.connection(heartbeat=self.amqheartbeat)
  636. # Callback called for each retry while the connection
  637. # can't be established.
  638. def _error_handler(exc, interval, next_step=CONNECTION_RETRY):
  639. if getattr(conn, 'alt', None) and interval == 0:
  640. next_step = CONNECTION_FAILOVER
  641. error(CONNECTION_ERROR, conn.as_uri(), exc,
  642. next_step % {'when': humanize_seconds(interval, 'in', ' ')})
  643. # remember that the connection is lazy, it won't establish
  644. # until it's needed.
  645. if not self.app.conf.BROKER_CONNECTION_RETRY:
  646. # retry disabled, just call connect directly.
  647. conn.connect()
  648. return conn
  649. return conn.ensure_connection(
  650. _error_handler, self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
  651. callback=self.maybe_shutdown,
  652. )
  653. def stop(self):
  654. """Stop consuming.
  655. Does not close the broker connection, so be sure to call
  656. :meth:`close_connection` when you are finished with it.
  657. """
  658. # Notifies other threads that this instance can't be used
  659. # anymore.
  660. self.close()
  661. debug('Stopping consumers...')
  662. self.stop_consumers(close_connection=False, join=True)
  663. def close(self):
  664. self._state = CLOSE
  665. def maybe_shutdown(self):
  666. if state.should_stop:
  667. raise SystemExit()
  668. elif state.should_terminate:
  669. raise SystemTerminate()
  670. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  671. routing_key=None, **options):
  672. cset = self.task_consumer
  673. queues = self.app.amqp.queues
  674. # Must use in' here, as __missing__ will automatically
  675. # create queues when CELERY_CREATE_MISSING_QUEUES is enabled.
  676. # (Issue #1079)
  677. if queue in queues:
  678. q = queues[queue]
  679. else:
  680. exchange = queue if exchange is None else exchange
  681. exchange_type = ('direct' if exchange_type is None
  682. else exchange_type)
  683. q = queues.select_add(queue,
  684. exchange=exchange,
  685. exchange_type=exchange_type,
  686. routing_key=routing_key, **options)
  687. if not cset.consuming_from(queue):
  688. cset.add_queue(q)
  689. cset.consume()
  690. logger.info('Started consuming from %r', queue)
  691. def cancel_task_queue(self, queue):
  692. self.app.amqp.queues.select_remove(queue)
  693. self.task_consumer.cancel_by_queue(queue)
  694. @property
  695. def info(self):
  696. """Returns information about this consumer instance
  697. as a dict.
  698. This is also the consumer related info returned by
  699. ``celeryctl stats``.
  700. """
  701. conninfo = {}
  702. if self.connection:
  703. conninfo = self.connection.info()
  704. conninfo.pop('password', None) # don't send password.
  705. return {'broker': conninfo, 'prefetch_count': self.qos.value}
  706. class BlockingConsumer(Consumer):
  707. def consume_messages(self):
  708. # receive_message handles incoming messages.
  709. self.task_consumer.register_callback(self.receive_message)
  710. self.task_consumer.consume()
  711. debug('Ready to accept tasks!')
  712. while self._state != CLOSE and self.connection:
  713. self.maybe_shutdown()
  714. if self.qos.prev != self.qos.value: # pragma: no cover
  715. self.qos.update()
  716. try:
  717. self.connection.drain_events(timeout=10.0)
  718. except socket.timeout:
  719. pass
  720. except socket.error:
  721. if self._state != CLOSE: # pragma: no cover
  722. raise