consumer.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.consumer
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. This module contains the component responsible for consuming messages
  6. from the broker, processing the messages and keeping the broker connections
  7. up and running.
  8. """
  9. from __future__ import absolute_import
  10. import logging
  11. import socket
  12. from time import sleep
  13. from Queue import Empty
  14. from kombu.syn import _detect_environment
  15. from kombu.utils.encoding import safe_repr
  16. from kombu.utils.eventio import READ, WRITE, ERR
  17. from celery.app import app_or_default
  18. from celery.exceptions import InvalidTaskError, SystemTerminate
  19. from celery.task.trace import build_tracer
  20. from celery.utils.timer2 import default_timer, to_timestamp
  21. from celery.utils.functional import noop
  22. from celery.utils.imports import qualname
  23. from celery.utils.log import get_logger
  24. from celery.utils.text import dump_body
  25. from celery.utils.timeutils import humanize_seconds
  26. from . import state
  27. from .bootsteps import Namespace as _NS, StartStopComponent, CLOSE
  28. logger = get_logger(__name__)
  29. info, warn, error, crit = (logger.info, logger.warn,
  30. logger.error, logger.critical)
  31. task_reserved = state.task_reserved
  32. #: Heartbeat check is called every heartbeat_seconds' / rate'.
  33. AMQHEARTBEAT_RATE = 2.0
  34. CONNECTION_RETRY = """\
  35. consumer: Connection to broker lost. \
  36. Trying to re-establish the connection...\
  37. """
  38. CONNECTION_RETRY_STEP = """\
  39. Trying again {when}...\
  40. """
  41. CONNECTION_ERROR = """\
  42. consumer: Cannot connect to %s: %s.
  43. %s
  44. """
  45. CONNECTION_FAILOVER = """\
  46. Will retry using next failover.\
  47. """
  48. UNKNOWN_FORMAT = """\
  49. Received and deleted unknown message. Wrong destination?!?
  50. The full contents of the message body was: %s
  51. """
  52. #: Error message for when an unregistered task is received.
  53. UNKNOWN_TASK_ERROR = """\
  54. Received unregistered task of type %s.
  55. The message has been ignored and discarded.
  56. Did you remember to import the module containing this task?
  57. Or maybe you are using relative imports?
  58. Please see http://bit.ly/gLye1c for more information.
  59. The full contents of the message body was:
  60. %s
  61. """
  62. #: Error message for when an invalid task message is received.
  63. INVALID_TASK_ERROR = """\
  64. Received invalid task message: %s
  65. The message has been ignored and discarded.
  66. Please ensure your message conforms to the task
  67. message protocol as described here: http://bit.ly/hYj41y
  68. The full contents of the message body was:
  69. %s
  70. """
  71. MESSAGE_REPORT = """\
  72. body: {0} {{content_type:{1} content_encoding:{2} delivery_info:{3}}}\
  73. """
  74. def debug(msg, *args, **kwargs):
  75. logger.debug('consumer: {0}'.format(msg), *args, **kwargs)
  76. class Component(StartStopComponent):
  77. name = 'worker.consumer'
  78. last = True
  79. def Consumer(self, w):
  80. return (w.consumer_cls or
  81. Consumer if w.hub else BlockingConsumer)
  82. def create(self, w):
  83. prefetch_count = w.concurrency * w.prefetch_multiplier
  84. c = w.consumer = self.instantiate(self.Consumer(w),
  85. w.ready_queue,
  86. hostname=w.hostname,
  87. send_events=w.send_events,
  88. init_callback=w.ready_callback,
  89. initial_prefetch_count=prefetch_count,
  90. pool=w.pool,
  91. timer=w.timer,
  92. app=w.app,
  93. controller=w,
  94. hub=w.hub)
  95. return c
  96. class Namespace(_NS):
  97. name = 'consumer'
  98. def shutdown(self, parent):
  99. delayed = self._shutdown_step(parent, parent.components, force=False)
  100. self._shutdown_step(parent, delayed, force=True)
  101. def _shutdown_step(self, parent, components, force=False):
  102. delayed = []
  103. for component in components:
  104. if component:
  105. logger.debug('Shutdown %s...', qualname(component))
  106. if not force and getattr(component, 'delay_shutdown', False):
  107. delayed.append(component)
  108. else:
  109. component.shutdown(parent)
  110. return delayed
  111. def modules(self):
  112. return ('celery.worker.parts', )
  113. class Consumer(object):
  114. """Listen for messages received from the broker and
  115. move them to the ready queue for task processing.
  116. :param ready_queue: See :attr:`ready_queue`.
  117. :param timer: See :attr:`timer`.
  118. """
  119. #: The queue that holds tasks ready for immediate processing.
  120. ready_queue = None
  121. #: Optional callback to be called when the connection is established.
  122. #: Will only be called once, even if the connection is lost and
  123. #: re-established.
  124. init_callback = None
  125. #: The current hostname. Defaults to the system hostname.
  126. hostname = None
  127. #: The current worker pool instance.
  128. pool = None
  129. #: A timer used for high-priority internal tasks, such
  130. #: as sending heartbeats.
  131. timer = None
  132. def __init__(self, ready_queue,
  133. init_callback=noop, hostname=None,
  134. pool=None, app=None,
  135. timer=None, controller=None, hub=None, amqheartbeat=None,
  136. **kwargs):
  137. self.app = app_or_default(app)
  138. self.controller = controller
  139. self.ready_queue = ready_queue
  140. self.init_callback = init_callback
  141. self.hostname = hostname or socket.gethostname()
  142. self.pool = pool
  143. self.timer = timer or default_timer
  144. self.strategies = {}
  145. conninfo = self.app.connection()
  146. self.connection_errors = conninfo.connection_errors
  147. self.channel_errors = conninfo.channel_errors
  148. self._does_info = logger.isEnabledFor(logging.INFO)
  149. if hub:
  150. hub.on_init.append(self.on_poll_init)
  151. self.hub = hub
  152. self._quick_put = self.ready_queue.put
  153. self.amqheartbeat = amqheartbeat
  154. if self.amqheartbeat is None:
  155. self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
  156. if not hub:
  157. self.amqheartbeat = 0
  158. if _detect_environment() == 'gevent':
  159. # there's a gevent bug that causes timeouts to not be reset,
  160. # so if the connection timeout is exceeded once, it can NEVER
  161. # connect again.
  162. self.app.conf.BROKER_CONNECTION_TIMEOUT = None
  163. self.components = []
  164. self.namespace = Namespace(app=self.app,
  165. on_start=self.on_start,
  166. on_close=self.on_close)
  167. self.namespace.apply(self, **kwargs)
  168. def on_start(self):
  169. # reload all task's execution strategies.
  170. self.update_strategies()
  171. self.init_callback(self)
  172. def start(self):
  173. """Start the consumer.
  174. Automatically survives intermittent connection failure,
  175. and will retry establishing the connection and restart
  176. consuming messages.
  177. """
  178. ns = self.namespace
  179. while ns.state != CLOSE:
  180. self.maybe_shutdown()
  181. try:
  182. self.namespace.start(self)
  183. self.consume_messages()
  184. except self.connection_errors + self.channel_errors:
  185. error(CONNECTION_RETRY, exc_info=True)
  186. ns.restart(self)
  187. ns.close(self)
  188. ns.state = CLOSE
  189. def on_poll_init(self, hub):
  190. hub.update_readers(self.connection.eventmap)
  191. self.connection.transport.on_poll_init(hub.poller)
  192. def maybe_conn_error(self, fun):
  193. """Applies function but ignores any connection or channel
  194. errors raised."""
  195. try:
  196. fun()
  197. except (AttributeError, ) + \
  198. self.connection_errors + \
  199. self.channel_errors:
  200. pass
  201. def shutdown(self):
  202. self.namespace.shutdown(self)
  203. def on_decode_error(self, message, exc):
  204. """Callback called if an error occurs while decoding
  205. a message received.
  206. Simply logs the error and acknowledges the message so it
  207. doesn't enter a loop.
  208. :param message: The message with errors.
  209. :param exc: The original exception instance.
  210. """
  211. crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
  212. exc, message.content_type, message.content_encoding,
  213. dump_body(message, message.body))
  214. message.ack()
  215. def on_close(self):
  216. # Clear internal queues to get rid of old messages.
  217. # They can't be acked anyway, as a delivery tag is specific
  218. # to the current channel.
  219. self.ready_queue.clear()
  220. self.timer.clear()
  221. def _open_connection(self):
  222. """Establish the broker connection.
  223. Will retry establishing the connection if the
  224. :setting:`BROKER_CONNECTION_RETRY` setting is enabled
  225. """
  226. conn = self.app.connection(heartbeat=self.amqheartbeat)
  227. # Callback called for each retry while the connection
  228. # can't be established.
  229. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  230. if getattr(conn, 'alt', None) and interval == 0:
  231. next_step = CONNECTION_FAILOVER
  232. error(CONNECTION_ERROR, conn.as_uri(), exc,
  233. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  234. # remember that the connection is lazy, it won't establish
  235. # until it's needed.
  236. if not self.app.conf.BROKER_CONNECTION_RETRY:
  237. # retry disabled, just call connect directly.
  238. conn.connect()
  239. return conn
  240. return conn.ensure_connection(_error_handler,
  241. self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
  242. callback=self.maybe_shutdown)
  243. def stop(self):
  244. """Stop consuming.
  245. Does not close the broker connection, so be sure to call
  246. :meth:`close_connection` when you are finished with it.
  247. """
  248. self.namespace.stop(self)
  249. def maybe_shutdown(self):
  250. if state.should_stop:
  251. raise SystemExit()
  252. elif state.should_terminate:
  253. raise SystemTerminate()
  254. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  255. routing_key=None, **options):
  256. cset = self.task_consumer
  257. try:
  258. q = self.app.amqp.queues[queue]
  259. except KeyError:
  260. exchange = queue if exchange is None else exchange
  261. exchange_type = 'direct' if exchange_type is None \
  262. else exchange_type
  263. q = self.app.amqp.queues.select_add(queue,
  264. exchange=exchange,
  265. exchange_type=exchange_type,
  266. routing_key=routing_key, **options)
  267. if not cset.consuming_from(queue):
  268. cset.add_queue(q)
  269. cset.consume()
  270. info('Started consuming from %r', queue)
  271. def cancel_task_queue(self, queue):
  272. self.app.amqp.queues.select_remove(queue)
  273. self.task_consumer.cancel_by_queue(queue)
  274. @property
  275. def info(self):
  276. """Returns information about this consumer instance
  277. as a dict.
  278. This is also the consumer related info returned by
  279. ``celeryctl stats``.
  280. """
  281. conninfo = {}
  282. if self.connection:
  283. conninfo = self.connection.info()
  284. conninfo.pop('password', None) # don't send password.
  285. return {'broker': conninfo, 'prefetch_count': self.qos.value}
  286. def consume_messages(self, sleep=sleep, min=min, Empty=Empty,
  287. hbrate=AMQHEARTBEAT_RATE):
  288. """Consume messages forever (or until an exception is raised)."""
  289. with self.hub as hub:
  290. ns = self.namespace
  291. qos = self.qos
  292. update_qos = qos.update
  293. update_readers = hub.update_readers
  294. readers, writers = hub.readers, hub.writers
  295. poll = hub.poller.poll
  296. fire_timers = hub.fire_timers
  297. scheduled = hub.timer._queue
  298. connection = self.connection
  299. hb = self.amqheartbeat
  300. hbtick = connection.heartbeat_check
  301. on_poll_start = connection.transport.on_poll_start
  302. on_poll_empty = connection.transport.on_poll_empty
  303. strategies = self.strategies
  304. drain_nowait = connection.drain_nowait
  305. on_task_callbacks = hub.on_task
  306. keep_draining = connection.transport.nb_keep_draining
  307. if hb and connection.supports_heartbeats:
  308. hub.timer.apply_interval(
  309. hb * 1000.0 / hbrate, hbtick, (hbrate, ))
  310. def on_task_received(body, message):
  311. if on_task_callbacks:
  312. [callback() for callback in on_task_callbacks]
  313. try:
  314. name = body['task']
  315. except (KeyError, TypeError):
  316. return self.handle_unknown_message(body, message)
  317. try:
  318. strategies[name](message, body, message.ack_log_error)
  319. except KeyError as exc:
  320. self.handle_unknown_task(body, message, exc)
  321. except InvalidTaskError as exc:
  322. self.handle_invalid_task(body, message, exc)
  323. #fire_timers()
  324. self.task_consumer.callbacks = [on_task_received]
  325. self.task_consumer.consume()
  326. debug('Ready to accept tasks!')
  327. while ns.state != CLOSE and self.connection:
  328. # shutdown if signal handlers told us to.
  329. if state.should_stop:
  330. raise SystemExit()
  331. elif state.should_terminate:
  332. raise SystemTerminate()
  333. # fire any ready timers, this also returns
  334. # the number of seconds until we need to fire timers again.
  335. poll_timeout = fire_timers() if scheduled else 1
  336. # We only update QoS when there is no more messages to read.
  337. # This groups together qos calls, and makes sure that remote
  338. # control commands will be prioritized over task messages.
  339. if qos.prev != qos.value:
  340. update_qos()
  341. update_readers(on_poll_start())
  342. if readers or writers:
  343. connection.more_to_read = True
  344. while connection.more_to_read:
  345. try:
  346. events = poll(poll_timeout)
  347. except ValueError: # Issue 882
  348. return
  349. if not events:
  350. on_poll_empty()
  351. for fileno, event in events or ():
  352. try:
  353. if event & READ:
  354. readers[fileno](fileno, event)
  355. if event & WRITE:
  356. writers[fileno](fileno, event)
  357. if event & ERR:
  358. for handlermap in readers, writers:
  359. try:
  360. handlermap[fileno](fileno, event)
  361. except KeyError:
  362. pass
  363. except (KeyError, Empty):
  364. continue
  365. except socket.error:
  366. if ns.state != CLOSE: # pragma: no cover
  367. raise
  368. if keep_draining:
  369. drain_nowait()
  370. poll_timeout = 0
  371. else:
  372. connection.more_to_read = False
  373. else:
  374. # no sockets yet, startup is probably not done.
  375. sleep(min(poll_timeout, 0.1))
  376. def on_task(self, task, task_reserved=task_reserved):
  377. """Handle received task.
  378. If the task has an `eta` we enter it into the ETA schedule,
  379. otherwise we move it the ready queue for immediate processing.
  380. """
  381. if task.revoked():
  382. return
  383. if self._does_info:
  384. info('Got task from broker: %s', task)
  385. if self.event_dispatcher.enabled:
  386. self.event_dispatcher.send('task-received', uuid=task.id,
  387. name=task.name, args=safe_repr(task.args),
  388. kwargs=safe_repr(task.kwargs),
  389. retries=task.request_dict.get('retries', 0),
  390. eta=task.eta and task.eta.isoformat(),
  391. expires=task.expires and task.expires.isoformat())
  392. if task.eta:
  393. try:
  394. eta = to_timestamp(task.eta)
  395. except OverflowError as exc:
  396. error("Couldn't convert eta %s to timestamp: %r. Task: %r",
  397. task.eta, exc, task.info(safe=True), exc_info=True)
  398. task.acknowledge()
  399. else:
  400. self.qos.increment_eventually()
  401. self.timer.apply_at(eta, self.apply_eta_task, (task, ),
  402. priority=6)
  403. else:
  404. task_reserved(task)
  405. self._quick_put(task)
  406. def apply_eta_task(self, task):
  407. """Method called by the timer to apply a task with an
  408. ETA/countdown."""
  409. task_reserved(task)
  410. self._quick_put(task)
  411. self.qos.decrement_eventually()
  412. def _message_report(self, body, message):
  413. return MESSAGE_REPORT.format(dump_body(message, body),
  414. safe_repr(message.content_type),
  415. safe_repr(message.content_encoding),
  416. safe_repr(message.delivery_info))
  417. def handle_unknown_message(self, body, message):
  418. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  419. message.reject_log_error(logger, self.connection_errors)
  420. def handle_unknown_task(self, body, message, exc):
  421. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  422. message.reject_log_error(logger, self.connection_errors)
  423. def handle_invalid_task(self, body, message, exc):
  424. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  425. message.reject_log_error(logger, self.connection_errors)
  426. def update_strategies(self):
  427. S = self.strategies
  428. app = self.app
  429. loader = app.loader
  430. hostname = self.hostname
  431. for name, task in self.app.tasks.iteritems():
  432. S[name] = task.start_strategy(app, self)
  433. task.__trace__ = build_tracer(name, task, loader, hostname)
  434. class BlockingConsumer(Consumer):
  435. def consume_messages(self):
  436. # receive_message handles incoming messages.
  437. self.task_consumer.register_callback(self.receive_message)
  438. self.task_consumer.consume()
  439. debug('Ready to accept tasks!')
  440. ns = self.ns
  441. while ns.state != CLOSE and self.connection:
  442. self.maybe_shutdown()
  443. if self.qos.prev != self.qos.value: # pragma: no cover
  444. self.qos.update()
  445. try:
  446. self.connection.drain_events(timeout=10.0)
  447. except socket.timeout:
  448. pass
  449. except socket.error:
  450. if ns.state != CLOSE: # pragma: no cover
  451. raise
  452. def receive_message(self, body, message):
  453. """Handles incoming messages.
  454. :param body: The message body.
  455. :param message: The kombu message object.
  456. """
  457. try:
  458. name = body['task']
  459. except (KeyError, TypeError):
  460. return self.handle_unknown_message(body, message)
  461. try:
  462. self.strategies[name](message, body, message.ack_log_error)
  463. except KeyError as exc:
  464. self.handle_unknown_task(body, message, exc)
  465. except InvalidTaskError as exc:
  466. self.handle_invalid_task(body, message, exc)