consumer.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. # -*- coding: utf-8 -*-
  2. """Worker Consumer Blueprint.
  3. This module contains the components responsible for consuming messages
  4. from the broker, processing the messages and keeping the broker connections
  5. up and running.
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import errno
  9. import logging
  10. import os
  11. from collections import defaultdict
  12. from time import sleep
  13. from billiard.common import restart_state
  14. from billiard.exceptions import RestartFreqExceeded
  15. from kombu.asynchronous.semaphore import DummyLock
  16. from kombu.utils.compat import _detect_environment
  17. from kombu.utils.encoding import bytes_t, safe_repr
  18. from kombu.utils.limits import TokenBucket
  19. from vine import ppartial, promise
  20. from celery import bootsteps, signals
  21. from celery.app.trace import build_tracer
  22. from celery.exceptions import InvalidTaskError, NotRegistered
  23. from celery.five import buffer_t, items, python_2_unicode_compatible, values
  24. from celery.utils.functional import noop
  25. from celery.utils.log import get_logger
  26. from celery.utils.nodenames import gethostname
  27. from celery.utils.objects import Bunch
  28. from celery.utils.text import truncate
  29. from celery.utils.time import humanize_seconds, rate
  30. from celery.worker import loops
  31. from celery.worker.state import (maybe_shutdown, reserved_requests,
  32. task_reserved)
  33. __all__ = ('Consumer', 'Evloop', 'dump_body')
  34. CLOSE = bootsteps.CLOSE
  35. TERMINATE = bootsteps.TERMINATE
  36. STOP_CONDITIONS = {CLOSE, TERMINATE}
  37. logger = get_logger(__name__)
  38. debug, info, warn, error, crit = (logger.debug, logger.info, logger.warning,
  39. logger.error, logger.critical)
  40. CONNECTION_RETRY = """\
  41. consumer: Connection to broker lost. \
  42. Trying to re-establish the connection...\
  43. """
  44. CONNECTION_RETRY_STEP = """\
  45. Trying again {when}...\
  46. """
  47. CONNECTION_ERROR = """\
  48. consumer: Cannot connect to %s: %s.
  49. %s
  50. """
  51. CONNECTION_FAILOVER = """\
  52. Will retry using next failover.\
  53. """
  54. UNKNOWN_FORMAT = """\
  55. Received and deleted unknown message. Wrong destination?!?
  56. The full contents of the message body was: %s
  57. """
  58. #: Error message for when an unregistered task is received.
  59. UNKNOWN_TASK_ERROR = """\
  60. Received unregistered task of type %s.
  61. The message has been ignored and discarded.
  62. Did you remember to import the module containing this task?
  63. Or maybe you're using relative imports?
  64. Please see
  65. http://docs.celeryq.org/en/latest/internals/protocol.html
  66. for more information.
  67. The full contents of the message body was:
  68. %s
  69. """
  70. #: Error message for when an invalid task message is received.
  71. INVALID_TASK_ERROR = """\
  72. Received invalid task message: %s
  73. The message has been ignored and discarded.
  74. Please ensure your message conforms to the task
  75. message protocol as described here:
  76. http://docs.celeryq.org/en/latest/internals/protocol.html
  77. The full contents of the message body was:
  78. %s
  79. """
  80. MESSAGE_DECODE_ERROR = """\
  81. Can't decode message body: %r [type:%r encoding:%r headers:%s]
  82. body: %s
  83. """
  84. MESSAGE_REPORT = """\
  85. body: {0}
  86. {{content_type:{1} content_encoding:{2}
  87. delivery_info:{3} headers={4}}}
  88. """
  89. def dump_body(m, body):
  90. """Format message body for debugging purposes."""
  91. # v2 protocol does not deserialize body
  92. body = m.body if body is None else body
  93. if isinstance(body, buffer_t):
  94. body = bytes_t(body)
  95. return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
  96. len(m.body))
  97. @python_2_unicode_compatible
  98. class Consumer(object):
  99. """Consumer blueprint."""
  100. Strategies = dict
  101. #: Optional callback called the first time the worker
  102. #: is ready to receive tasks.
  103. init_callback = None
  104. #: The current worker pool instance.
  105. pool = None
  106. #: A timer used for high-priority internal tasks, such
  107. #: as sending heartbeats.
  108. timer = None
  109. restart_count = -1 # first start is the same as a restart
  110. class Blueprint(bootsteps.Blueprint):
  111. """Consumer blueprint."""
  112. name = 'Consumer'
  113. default_steps = [
  114. 'celery.worker.consumer.connection:Connection',
  115. 'celery.worker.consumer.mingle:Mingle',
  116. 'celery.worker.consumer.events:Events',
  117. 'celery.worker.consumer.gossip:Gossip',
  118. 'celery.worker.consumer.heart:Heart',
  119. 'celery.worker.consumer.control:Control',
  120. 'celery.worker.consumer.tasks:Tasks',
  121. 'celery.worker.consumer.consumer:Evloop',
  122. 'celery.worker.consumer.agent:Agent',
  123. ]
  124. def shutdown(self, parent):
  125. self.send_all(parent, 'shutdown')
  126. def __init__(self, on_task_request,
  127. init_callback=noop, hostname=None,
  128. pool=None, app=None,
  129. timer=None, controller=None, hub=None, amqheartbeat=None,
  130. worker_options=None, disable_rate_limits=False,
  131. initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):
  132. self.app = app
  133. self.controller = controller
  134. self.init_callback = init_callback
  135. self.hostname = hostname or gethostname()
  136. self.pid = os.getpid()
  137. self.pool = pool
  138. self.timer = timer
  139. self.strategies = self.Strategies()
  140. self.conninfo = self.app.connection_for_read()
  141. self.connection_errors = self.conninfo.connection_errors
  142. self.channel_errors = self.conninfo.channel_errors
  143. self._restart_state = restart_state(maxR=5, maxT=1)
  144. self._does_info = logger.isEnabledFor(logging.INFO)
  145. self._limit_order = 0
  146. self.on_task_request = on_task_request
  147. self.on_task_message = set()
  148. self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate
  149. self.disable_rate_limits = disable_rate_limits
  150. self.initial_prefetch_count = initial_prefetch_count
  151. self.prefetch_multiplier = prefetch_multiplier
  152. # this contains a tokenbucket for each task type by name, used for
  153. # rate limits, or None if rate limits are disabled for that task.
  154. self.task_buckets = defaultdict(lambda: None)
  155. self.reset_rate_limits()
  156. self.hub = hub
  157. if self.hub or getattr(self.pool, 'is_green', False):
  158. self.amqheartbeat = amqheartbeat
  159. if self.amqheartbeat is None:
  160. self.amqheartbeat = self.app.conf.broker_heartbeat
  161. else:
  162. self.amqheartbeat = 0
  163. if not hasattr(self, 'loop'):
  164. self.loop = loops.asynloop if hub else loops.synloop
  165. if _detect_environment() == 'gevent':
  166. # there's a gevent bug that causes timeouts to not be reset,
  167. # so if the connection timeout is exceeded once, it can NEVER
  168. # connect again.
  169. self.app.conf.broker_connection_timeout = None
  170. self._pending_operations = []
  171. self.steps = []
  172. self.blueprint = self.Blueprint(
  173. steps=self.app.steps['consumer'],
  174. on_close=self.on_close,
  175. )
  176. self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
  177. def call_soon(self, p, *args, **kwargs):
  178. p = ppartial(p, *args, **kwargs)
  179. if self.hub:
  180. return self.hub.call_soon(p)
  181. self._pending_operations.append(p)
  182. return p
  183. def perform_pending_operations(self):
  184. if not self.hub:
  185. while self._pending_operations:
  186. try:
  187. self._pending_operations.pop()()
  188. except Exception as exc: # pylint: disable=broad-except
  189. logger.exception('Pending callback raised: %r', exc)
  190. def bucket_for_task(self, type):
  191. limit = rate(getattr(type, 'rate_limit', None))
  192. return TokenBucket(limit, capacity=1) if limit else None
  193. def reset_rate_limits(self):
  194. self.task_buckets.update(
  195. (n, self.bucket_for_task(t)) for n, t in items(self.app.tasks)
  196. )
  197. def _update_prefetch_count(self, index=0):
  198. """Update prefetch count after pool/shrink grow operations.
  199. Index must be the change in number of processes as a positive
  200. (increasing) or negative (decreasing) number.
  201. Note:
  202. Currently pool grow operations will end up with an offset
  203. of +1 if the initial size of the pool was 0 (e.g.
  204. :option:`--autoscale=1,0 <celery worker --autoscale>`).
  205. """
  206. num_processes = self.pool.num_processes
  207. if not self.initial_prefetch_count or not num_processes:
  208. return # prefetch disabled
  209. self.initial_prefetch_count = (
  210. self.pool.num_processes * self.prefetch_multiplier
  211. )
  212. return self._update_qos_eventually(index)
  213. def _update_qos_eventually(self, index):
  214. return (self.qos.decrement_eventually if index < 0
  215. else self.qos.increment_eventually)(
  216. abs(index) * self.prefetch_multiplier)
  217. def _limit_move_to_pool(self, request):
  218. task_reserved(request)
  219. self.on_task_request(request)
  220. def _on_bucket_wakeup(self, bucket, tokens):
  221. try:
  222. request = bucket.pop()
  223. except IndexError:
  224. pass
  225. else:
  226. self._limit_move_to_pool(request)
  227. self._schedule_oldest_bucket_request(bucket, tokens)
  228. def _schedule_oldest_bucket_request(self, bucket, tokens):
  229. try:
  230. request = bucket.pop()
  231. except IndexError:
  232. pass
  233. else:
  234. return self._schedule_bucket_request(request, bucket, tokens)
  235. def _schedule_bucket_request(self, request, bucket, tokens):
  236. bucket.can_consume(tokens)
  237. bucket.add(request)
  238. pri = self._limit_order = (self._limit_order + 1) % 10
  239. hold = bucket.expected_time(tokens)
  240. self.timer.call_after(
  241. hold, self._on_bucket_wakeup, (bucket, tokens),
  242. priority=pri,
  243. )
  244. def _limit_task(self, request, bucket, tokens):
  245. if bucket.contents:
  246. return bucket.add(request)
  247. return self._schedule_bucket_request(request, bucket, tokens)
  248. def _limit_post_eta(self, request, bucket, tokens):
  249. self.qos.decrement_eventually()
  250. if bucket.contents:
  251. return bucket.add(request)
  252. return self._schedule_bucket_request(request, bucket, tokens)
  253. def start(self):
  254. blueprint = self.blueprint
  255. while blueprint.state not in STOP_CONDITIONS:
  256. maybe_shutdown()
  257. if self.restart_count:
  258. try:
  259. self._restart_state.step()
  260. except RestartFreqExceeded as exc:
  261. crit('Frequent restarts detected: %r', exc, exc_info=1)
  262. sleep(1)
  263. self.restart_count += 1
  264. try:
  265. blueprint.start(self)
  266. except self.connection_errors as exc:
  267. # If we're not retrying connections, no need to catch
  268. # connection errors
  269. if not self.app.conf.broker_connection_retry:
  270. raise
  271. if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
  272. raise # Too many open files
  273. maybe_shutdown()
  274. if blueprint.state not in STOP_CONDITIONS:
  275. if self.connection:
  276. self.on_connection_error_after_connected(exc)
  277. else:
  278. self.on_connection_error_before_connected(exc)
  279. self.on_close()
  280. blueprint.restart(self)
  281. def on_connection_error_before_connected(self, exc):
  282. error(CONNECTION_ERROR, self.conninfo.as_uri(), exc,
  283. 'Trying to reconnect...')
  284. def on_connection_error_after_connected(self, exc):
  285. warn(CONNECTION_RETRY, exc_info=True)
  286. try:
  287. self.connection.collect()
  288. except Exception: # pylint: disable=broad-except
  289. pass
  290. def register_with_event_loop(self, hub):
  291. self.blueprint.send_all(
  292. self, 'register_with_event_loop', args=(hub,),
  293. description='Hub.register',
  294. )
  295. def shutdown(self):
  296. self.blueprint.shutdown(self)
  297. def stop(self):
  298. self.blueprint.stop(self)
  299. def on_ready(self):
  300. callback, self.init_callback = self.init_callback, None
  301. if callback:
  302. callback(self)
  303. def loop_args(self):
  304. return (self, self.connection, self.task_consumer,
  305. self.blueprint, self.hub, self.qos, self.amqheartbeat,
  306. self.app.clock, self.amqheartbeat_rate)
  307. def on_decode_error(self, message, exc):
  308. """Callback called if an error occurs while decoding a message.
  309. Simply logs the error and acknowledges the message so it
  310. doesn't enter a loop.
  311. Arguments:
  312. message (kombu.Message): The message received.
  313. exc (Exception): The exception being handled.
  314. """
  315. crit(MESSAGE_DECODE_ERROR,
  316. exc, message.content_type, message.content_encoding,
  317. safe_repr(message.headers), dump_body(message, message.body),
  318. exc_info=1)
  319. message.ack()
  320. def on_close(self):
  321. # Clear internal queues to get rid of old messages.
  322. # They can't be acked anyway, as a delivery tag is specific
  323. # to the current channel.
  324. if self.controller and self.controller.semaphore:
  325. self.controller.semaphore.clear()
  326. if self.timer:
  327. self.timer.clear()
  328. for bucket in values(self.task_buckets):
  329. if bucket:
  330. bucket.clear_pending()
  331. reserved_requests.clear()
  332. if self.pool and self.pool.flush:
  333. self.pool.flush()
  334. def connect(self):
  335. """Establish the broker connection used for consuming tasks.
  336. Retries establishing the connection if the
  337. :setting:`broker_connection_retry` setting is enabled
  338. """
  339. conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  340. if self.hub:
  341. conn.transport.register_with_event_loop(conn.connection, self.hub)
  342. return conn
  343. def connection_for_read(self, heartbeat=None):
  344. return self.ensure_connected(
  345. self.app.connection_for_read(heartbeat=heartbeat))
  346. def connection_for_write(self, heartbeat=None):
  347. return self.ensure_connected(
  348. self.app.connection_for_write(heartbeat=heartbeat))
  349. def ensure_connected(self, conn):
  350. # Callback called for each retry while the connection
  351. # can't be established.
  352. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  353. if getattr(conn, 'alt', None) and interval == 0:
  354. next_step = CONNECTION_FAILOVER
  355. error(CONNECTION_ERROR, conn.as_uri(), exc,
  356. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  357. # remember that the connection is lazy, it won't establish
  358. # until needed.
  359. if not self.app.conf.broker_connection_retry:
  360. # retry disabled, just call connect directly.
  361. conn.connect()
  362. return conn
  363. conn = conn.ensure_connection(
  364. _error_handler, self.app.conf.broker_connection_max_retries,
  365. callback=maybe_shutdown,
  366. )
  367. return conn
  368. def _flush_events(self):
  369. if self.event_dispatcher:
  370. self.event_dispatcher.flush()
  371. def on_send_event_buffered(self):
  372. if self.hub:
  373. self.hub._ready.add(self._flush_events)
  374. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  375. routing_key=None, **options):
  376. cset = self.task_consumer
  377. queues = self.app.amqp.queues
  378. # Must use in' here, as __missing__ will automatically
  379. # create queues when :setting:`task_create_missing_queues` is enabled.
  380. # (Issue #1079)
  381. if queue in queues:
  382. q = queues[queue]
  383. else:
  384. exchange = queue if exchange is None else exchange
  385. exchange_type = ('direct' if exchange_type is None
  386. else exchange_type)
  387. q = queues.select_add(queue,
  388. exchange=exchange,
  389. exchange_type=exchange_type,
  390. routing_key=routing_key, **options)
  391. if not cset.consuming_from(queue):
  392. cset.add_queue(q)
  393. cset.consume()
  394. info('Started consuming from %s', queue)
  395. def cancel_task_queue(self, queue):
  396. info('Canceling queue %s', queue)
  397. self.app.amqp.queues.deselect(queue)
  398. self.task_consumer.cancel_by_queue(queue)
  399. def apply_eta_task(self, task):
  400. """Method called by the timer to apply a task with an ETA/countdown."""
  401. task_reserved(task)
  402. self.on_task_request(task)
  403. self.qos.decrement_eventually()
  404. def _message_report(self, body, message):
  405. return MESSAGE_REPORT.format(dump_body(message, body),
  406. safe_repr(message.content_type),
  407. safe_repr(message.content_encoding),
  408. safe_repr(message.delivery_info),
  409. safe_repr(message.headers))
  410. def on_unknown_message(self, body, message):
  411. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  412. message.reject_log_error(logger, self.connection_errors)
  413. signals.task_rejected.send(sender=self, message=message, exc=None)
  414. def on_unknown_task(self, body, message, exc):
  415. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  416. try:
  417. id_, name = message.headers['id'], message.headers['task']
  418. root_id = message.headers.get('root_id')
  419. except KeyError: # proto1
  420. payload = message.payload
  421. id_, name = payload['id'], payload['task']
  422. root_id = None
  423. request = Bunch(
  424. name=name, chord=None, root_id=root_id,
  425. correlation_id=message.properties.get('correlation_id'),
  426. reply_to=message.properties.get('reply_to'),
  427. errbacks=None,
  428. )
  429. message.reject_log_error(logger, self.connection_errors)
  430. self.app.backend.mark_as_failure(
  431. id_, NotRegistered(name), request=request,
  432. )
  433. if self.event_dispatcher:
  434. self.event_dispatcher.send(
  435. 'task-failed', uuid=id_,
  436. exception='NotRegistered({0!r})'.format(name),
  437. )
  438. signals.task_unknown.send(
  439. sender=self, message=message, exc=exc, name=name, id=id_,
  440. )
  441. def on_invalid_task(self, body, message, exc):
  442. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  443. message.reject_log_error(logger, self.connection_errors)
  444. signals.task_rejected.send(sender=self, message=message, exc=exc)
  445. def update_strategies(self):
  446. loader = self.app.loader
  447. for name, task in items(self.app.tasks):
  448. self.strategies[name] = task.start_strategy(self.app, self)
  449. task.__trace__ = build_tracer(name, task, loader, self.hostname,
  450. app=self.app)
  451. def create_task_handler(self, promise=promise):
  452. strategies = self.strategies
  453. on_unknown_message = self.on_unknown_message
  454. on_unknown_task = self.on_unknown_task
  455. on_invalid_task = self.on_invalid_task
  456. callbacks = self.on_task_message
  457. call_soon = self.call_soon
  458. def on_task_received(message):
  459. # payload will only be set for v1 protocol, since v2
  460. # will defer deserializing the message body to the pool.
  461. payload = None
  462. try:
  463. type_ = message.headers['task'] # protocol v2
  464. except TypeError:
  465. return on_unknown_message(None, message)
  466. except KeyError:
  467. try:
  468. payload = message.decode()
  469. except Exception as exc: # pylint: disable=broad-except
  470. return self.on_decode_error(message, exc)
  471. try:
  472. type_, payload = payload['task'], payload # protocol v1
  473. except (TypeError, KeyError):
  474. return on_unknown_message(payload, message)
  475. try:
  476. strategy = strategies[type_]
  477. except KeyError as exc:
  478. return on_unknown_task(None, message, exc)
  479. else:
  480. try:
  481. strategy(
  482. message, payload,
  483. promise(call_soon, (message.ack_log_error,)),
  484. promise(call_soon, (message.reject_log_error,)),
  485. callbacks,
  486. )
  487. except InvalidTaskError as exc:
  488. return on_invalid_task(payload, message, exc)
  489. return on_task_received
  490. def __repr__(self):
  491. """``repr(self)``."""
  492. return '<Consumer: {self.hostname} ({state})>'.format(
  493. self=self, state=self.blueprint.human_state(),
  494. )
  495. class Evloop(bootsteps.StartStopStep):
  496. """Event loop service.
  497. Note:
  498. This is always started last.
  499. """
  500. label = 'event loop'
  501. last = True
  502. def start(self, c):
  503. self.patch_all(c)
  504. c.loop(*c.loop_args())
  505. def patch_all(self, c):
  506. c.qos._mutex = DummyLock()