consumer.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. # -*- coding: utf-8 -*-
  2. """Worker Consumer Blueprint.
  3. This module contains the components responsible for consuming messages
  4. from the broker, processing the messages and keeping the broker connections
  5. up and running.
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import errno
  9. import logging
  10. import os
  11. from collections import defaultdict
  12. from time import sleep
  13. from billiard.common import restart_state
  14. from billiard.exceptions import RestartFreqExceeded
  15. from kombu.asynchronous.semaphore import DummyLock
  16. from kombu.exceptions import DecodeError
  17. from kombu.utils.compat import _detect_environment
  18. from kombu.utils.encoding import bytes_t, safe_repr
  19. from kombu.utils.limits import TokenBucket
  20. from vine import ppartial, promise
  21. from celery import bootsteps, signals
  22. from celery.app.trace import build_tracer
  23. from celery.exceptions import InvalidTaskError, NotRegistered
  24. from celery.five import buffer_t, items, python_2_unicode_compatible, values
  25. from celery.utils.functional import noop
  26. from celery.utils.log import get_logger
  27. from celery.utils.nodenames import gethostname
  28. from celery.utils.objects import Bunch
  29. from celery.utils.text import truncate
  30. from celery.utils.time import humanize_seconds, rate
  31. from celery.worker import loops
  32. from celery.worker.state import (maybe_shutdown, reserved_requests,
  33. task_reserved)
  34. __all__ = ('Consumer', 'Evloop', 'dump_body')
  35. CLOSE = bootsteps.CLOSE
  36. TERMINATE = bootsteps.TERMINATE
  37. STOP_CONDITIONS = {CLOSE, TERMINATE}
  38. logger = get_logger(__name__)
  39. debug, info, warn, error, crit = (logger.debug, logger.info, logger.warning,
  40. logger.error, logger.critical)
  41. CONNECTION_RETRY = """\
  42. consumer: Connection to broker lost. \
  43. Trying to re-establish the connection...\
  44. """
  45. CONNECTION_RETRY_STEP = """\
  46. Trying again {when}...\
  47. """
  48. CONNECTION_ERROR = """\
  49. consumer: Cannot connect to %s: %s.
  50. %s
  51. """
  52. CONNECTION_FAILOVER = """\
  53. Will retry using next failover.\
  54. """
  55. UNKNOWN_FORMAT = """\
  56. Received and deleted unknown message. Wrong destination?!?
  57. The full contents of the message body was: %s
  58. """
  59. #: Error message for when an unregistered task is received.
  60. UNKNOWN_TASK_ERROR = """\
  61. Received unregistered task of type %s.
  62. The message has been ignored and discarded.
  63. Did you remember to import the module containing this task?
  64. Or maybe you're using relative imports?
  65. Please see
  66. http://docs.celeryq.org/en/latest/internals/protocol.html
  67. for more information.
  68. The full contents of the message body was:
  69. %s
  70. """
  71. #: Error message for when an invalid task message is received.
  72. INVALID_TASK_ERROR = """\
  73. Received invalid task message: %s
  74. The message has been ignored and discarded.
  75. Please ensure your message conforms to the task
  76. message protocol as described here:
  77. http://docs.celeryq.org/en/latest/internals/protocol.html
  78. The full contents of the message body was:
  79. %s
  80. """
  81. MESSAGE_DECODE_ERROR = """\
  82. Can't decode message body: %r [type:%r encoding:%r headers:%s]
  83. body: %s
  84. """
  85. MESSAGE_REPORT = """\
  86. body: {0}
  87. {{content_type:{1} content_encoding:{2}
  88. delivery_info:{3} headers={4}}}
  89. """
  90. def dump_body(m, body):
  91. """Format message body for debugging purposes."""
  92. # v2 protocol does not deserialize body
  93. body = m.body if body is None else body
  94. if isinstance(body, buffer_t):
  95. body = bytes_t(body)
  96. return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
  97. len(m.body))
  98. @python_2_unicode_compatible
  99. class Consumer(object):
  100. """Consumer blueprint."""
  101. Strategies = dict
  102. #: Optional callback called the first time the worker
  103. #: is ready to receive tasks.
  104. init_callback = None
  105. #: The current worker pool instance.
  106. pool = None
  107. #: A timer used for high-priority internal tasks, such
  108. #: as sending heartbeats.
  109. timer = None
  110. restart_count = -1 # first start is the same as a restart
  111. class Blueprint(bootsteps.Blueprint):
  112. """Consumer blueprint."""
  113. name = 'Consumer'
  114. default_steps = [
  115. 'celery.worker.consumer.connection:Connection',
  116. 'celery.worker.consumer.mingle:Mingle',
  117. 'celery.worker.consumer.events:Events',
  118. 'celery.worker.consumer.gossip:Gossip',
  119. 'celery.worker.consumer.heart:Heart',
  120. 'celery.worker.consumer.control:Control',
  121. 'celery.worker.consumer.tasks:Tasks',
  122. 'celery.worker.consumer.consumer:Evloop',
  123. 'celery.worker.consumer.agent:Agent',
  124. ]
  125. def shutdown(self, parent):
  126. self.send_all(parent, 'shutdown')
  127. def __init__(self, on_task_request,
  128. init_callback=noop, hostname=None,
  129. pool=None, app=None,
  130. timer=None, controller=None, hub=None, amqheartbeat=None,
  131. worker_options=None, disable_rate_limits=False,
  132. initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):
  133. self.app = app
  134. self.controller = controller
  135. self.init_callback = init_callback
  136. self.hostname = hostname or gethostname()
  137. self.pid = os.getpid()
  138. self.pool = pool
  139. self.timer = timer
  140. self.strategies = self.Strategies()
  141. self.conninfo = self.app.connection_for_read()
  142. self.connection_errors = self.conninfo.connection_errors
  143. self.channel_errors = self.conninfo.channel_errors
  144. self._restart_state = restart_state(maxR=5, maxT=1)
  145. self._does_info = logger.isEnabledFor(logging.INFO)
  146. self._limit_order = 0
  147. self.on_task_request = on_task_request
  148. self.on_task_message = set()
  149. self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate
  150. self.disable_rate_limits = disable_rate_limits
  151. self.initial_prefetch_count = initial_prefetch_count
  152. self.prefetch_multiplier = prefetch_multiplier
  153. # this contains a tokenbucket for each task type by name, used for
  154. # rate limits, or None if rate limits are disabled for that task.
  155. self.task_buckets = defaultdict(lambda: None)
  156. self.reset_rate_limits()
  157. self.hub = hub
  158. if self.hub or getattr(self.pool, 'is_green', False):
  159. self.amqheartbeat = amqheartbeat
  160. if self.amqheartbeat is None:
  161. self.amqheartbeat = self.app.conf.broker_heartbeat
  162. else:
  163. self.amqheartbeat = 0
  164. if not hasattr(self, 'loop'):
  165. self.loop = loops.asynloop if hub else loops.synloop
  166. if _detect_environment() == 'gevent':
  167. # there's a gevent bug that causes timeouts to not be reset,
  168. # so if the connection timeout is exceeded once, it can NEVER
  169. # connect again.
  170. self.app.conf.broker_connection_timeout = None
  171. self._pending_operations = []
  172. self.steps = []
  173. self.blueprint = self.Blueprint(
  174. steps=self.app.steps['consumer'],
  175. on_close=self.on_close,
  176. )
  177. self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
  178. def call_soon(self, p, *args, **kwargs):
  179. p = ppartial(p, *args, **kwargs)
  180. if self.hub:
  181. return self.hub.call_soon(p)
  182. self._pending_operations.append(p)
  183. return p
  184. def perform_pending_operations(self):
  185. if not self.hub:
  186. while self._pending_operations:
  187. try:
  188. self._pending_operations.pop()()
  189. except Exception as exc: # pylint: disable=broad-except
  190. logger.exception('Pending callback raised: %r', exc)
  191. def bucket_for_task(self, type):
  192. limit = rate(getattr(type, 'rate_limit', None))
  193. return TokenBucket(limit, capacity=1) if limit else None
  194. def reset_rate_limits(self):
  195. self.task_buckets.update(
  196. (n, self.bucket_for_task(t)) for n, t in items(self.app.tasks)
  197. )
  198. def _update_prefetch_count(self, index=0):
  199. """Update prefetch count after pool/shrink grow operations.
  200. Index must be the change in number of processes as a positive
  201. (increasing) or negative (decreasing) number.
  202. Note:
  203. Currently pool grow operations will end up with an offset
  204. of +1 if the initial size of the pool was 0 (e.g.
  205. :option:`--autoscale=1,0 <celery worker --autoscale>`).
  206. """
  207. num_processes = self.pool.num_processes
  208. if not self.initial_prefetch_count or not num_processes:
  209. return # prefetch disabled
  210. self.initial_prefetch_count = (
  211. self.pool.num_processes * self.prefetch_multiplier
  212. )
  213. return self._update_qos_eventually(index)
  214. def _update_qos_eventually(self, index):
  215. return (self.qos.decrement_eventually if index < 0
  216. else self.qos.increment_eventually)(
  217. abs(index) * self.prefetch_multiplier)
  218. def _limit_move_to_pool(self, request):
  219. task_reserved(request)
  220. self.on_task_request(request)
  221. def _schedule_bucket_request(self, bucket):
  222. while True:
  223. try:
  224. request, tokens = bucket.pop()
  225. except IndexError:
  226. # no request, break
  227. break
  228. if bucket.can_consume(tokens):
  229. self._limit_move_to_pool(request)
  230. continue
  231. else:
  232. # requeue to head, keep the order.
  233. bucket.contents.appendleft((request, tokens))
  234. pri = self._limit_order = (self._limit_order + 1) % 10
  235. hold = bucket.expected_time(tokens)
  236. self.timer.call_after(
  237. hold, self._schedule_bucket_request, (bucket,),
  238. priority=pri,
  239. )
  240. # no tokens, break
  241. break
  242. def _limit_task(self, request, bucket, tokens):
  243. bucket.add((request, tokens))
  244. return self._schedule_bucket_request(bucket)
  245. def _limit_post_eta(self, request, bucket, tokens):
  246. self.qos.decrement_eventually()
  247. bucket.add((request, tokens))
  248. return self._schedule_bucket_request(bucket)
  249. def start(self):
  250. blueprint = self.blueprint
  251. while blueprint.state not in STOP_CONDITIONS:
  252. maybe_shutdown()
  253. if self.restart_count:
  254. try:
  255. self._restart_state.step()
  256. except RestartFreqExceeded as exc:
  257. crit('Frequent restarts detected: %r', exc, exc_info=1)
  258. sleep(1)
  259. self.restart_count += 1
  260. try:
  261. blueprint.start(self)
  262. except self.connection_errors as exc:
  263. # If we're not retrying connections, no need to catch
  264. # connection errors
  265. if not self.app.conf.broker_connection_retry:
  266. raise
  267. if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
  268. raise # Too many open files
  269. maybe_shutdown()
  270. if blueprint.state not in STOP_CONDITIONS:
  271. if self.connection:
  272. self.on_connection_error_after_connected(exc)
  273. else:
  274. self.on_connection_error_before_connected(exc)
  275. self.on_close()
  276. blueprint.restart(self)
  277. def on_connection_error_before_connected(self, exc):
  278. error(CONNECTION_ERROR, self.conninfo.as_uri(), exc,
  279. 'Trying to reconnect...')
  280. def on_connection_error_after_connected(self, exc):
  281. warn(CONNECTION_RETRY, exc_info=True)
  282. try:
  283. self.connection.collect()
  284. except Exception: # pylint: disable=broad-except
  285. pass
  286. def register_with_event_loop(self, hub):
  287. self.blueprint.send_all(
  288. self, 'register_with_event_loop', args=(hub,),
  289. description='Hub.register',
  290. )
  291. def shutdown(self):
  292. self.blueprint.shutdown(self)
  293. def stop(self):
  294. self.blueprint.stop(self)
  295. def on_ready(self):
  296. callback, self.init_callback = self.init_callback, None
  297. if callback:
  298. callback(self)
  299. def loop_args(self):
  300. return (self, self.connection, self.task_consumer,
  301. self.blueprint, self.hub, self.qos, self.amqheartbeat,
  302. self.app.clock, self.amqheartbeat_rate)
  303. def on_decode_error(self, message, exc):
  304. """Callback called if an error occurs while decoding a message.
  305. Simply logs the error and acknowledges the message so it
  306. doesn't enter a loop.
  307. Arguments:
  308. message (kombu.Message): The message received.
  309. exc (Exception): The exception being handled.
  310. """
  311. crit(MESSAGE_DECODE_ERROR,
  312. exc, message.content_type, message.content_encoding,
  313. safe_repr(message.headers), dump_body(message, message.body),
  314. exc_info=1)
  315. message.ack()
  316. def on_close(self):
  317. # Clear internal queues to get rid of old messages.
  318. # They can't be acked anyway, as a delivery tag is specific
  319. # to the current channel.
  320. if self.controller and self.controller.semaphore:
  321. self.controller.semaphore.clear()
  322. if self.timer:
  323. self.timer.clear()
  324. for bucket in values(self.task_buckets):
  325. if bucket:
  326. bucket.clear_pending()
  327. reserved_requests.clear()
  328. if self.pool and self.pool.flush:
  329. self.pool.flush()
  330. def connect(self):
  331. """Establish the broker connection used for consuming tasks.
  332. Retries establishing the connection if the
  333. :setting:`broker_connection_retry` setting is enabled
  334. """
  335. conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  336. if self.hub:
  337. conn.transport.register_with_event_loop(conn.connection, self.hub)
  338. return conn
  339. def connection_for_read(self, heartbeat=None):
  340. return self.ensure_connected(
  341. self.app.connection_for_read(heartbeat=heartbeat))
  342. def connection_for_write(self, heartbeat=None):
  343. return self.ensure_connected(
  344. self.app.connection_for_write(heartbeat=heartbeat))
  345. def ensure_connected(self, conn):
  346. # Callback called for each retry while the connection
  347. # can't be established.
  348. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  349. if getattr(conn, 'alt', None) and interval == 0:
  350. next_step = CONNECTION_FAILOVER
  351. error(CONNECTION_ERROR, conn.as_uri(), exc,
  352. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  353. # remember that the connection is lazy, it won't establish
  354. # until needed.
  355. if not self.app.conf.broker_connection_retry:
  356. # retry disabled, just call connect directly.
  357. conn.connect()
  358. return conn
  359. conn = conn.ensure_connection(
  360. _error_handler, self.app.conf.broker_connection_max_retries,
  361. callback=maybe_shutdown,
  362. )
  363. return conn
  364. def _flush_events(self):
  365. if self.event_dispatcher:
  366. self.event_dispatcher.flush()
  367. def on_send_event_buffered(self):
  368. if self.hub:
  369. self.hub._ready.add(self._flush_events)
  370. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  371. routing_key=None, **options):
  372. cset = self.task_consumer
  373. queues = self.app.amqp.queues
  374. # Must use in' here, as __missing__ will automatically
  375. # create queues when :setting:`task_create_missing_queues` is enabled.
  376. # (Issue #1079)
  377. if queue in queues:
  378. q = queues[queue]
  379. else:
  380. exchange = queue if exchange is None else exchange
  381. exchange_type = ('direct' if exchange_type is None
  382. else exchange_type)
  383. q = queues.select_add(queue,
  384. exchange=exchange,
  385. exchange_type=exchange_type,
  386. routing_key=routing_key, **options)
  387. if not cset.consuming_from(queue):
  388. cset.add_queue(q)
  389. cset.consume()
  390. info('Started consuming from %s', queue)
  391. def cancel_task_queue(self, queue):
  392. info('Canceling queue %s', queue)
  393. self.app.amqp.queues.deselect(queue)
  394. self.task_consumer.cancel_by_queue(queue)
  395. def apply_eta_task(self, task):
  396. """Method called by the timer to apply a task with an ETA/countdown."""
  397. task_reserved(task)
  398. self.on_task_request(task)
  399. self.qos.decrement_eventually()
  400. def _message_report(self, body, message):
  401. return MESSAGE_REPORT.format(dump_body(message, body),
  402. safe_repr(message.content_type),
  403. safe_repr(message.content_encoding),
  404. safe_repr(message.delivery_info),
  405. safe_repr(message.headers))
  406. def on_unknown_message(self, body, message):
  407. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  408. message.reject_log_error(logger, self.connection_errors)
  409. signals.task_rejected.send(sender=self, message=message, exc=None)
  410. def on_unknown_task(self, body, message, exc):
  411. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  412. try:
  413. id_, name = message.headers['id'], message.headers['task']
  414. root_id = message.headers.get('root_id')
  415. except KeyError: # proto1
  416. payload = message.payload
  417. id_, name = payload['id'], payload['task']
  418. root_id = None
  419. request = Bunch(
  420. name=name, chord=None, root_id=root_id,
  421. correlation_id=message.properties.get('correlation_id'),
  422. reply_to=message.properties.get('reply_to'),
  423. errbacks=None,
  424. )
  425. message.reject_log_error(logger, self.connection_errors)
  426. self.app.backend.mark_as_failure(
  427. id_, NotRegistered(name), request=request,
  428. )
  429. if self.event_dispatcher:
  430. self.event_dispatcher.send(
  431. 'task-failed', uuid=id_,
  432. exception='NotRegistered({0!r})'.format(name),
  433. )
  434. signals.task_unknown.send(
  435. sender=self, message=message, exc=exc, name=name, id=id_,
  436. )
  437. def on_invalid_task(self, body, message, exc):
  438. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  439. message.reject_log_error(logger, self.connection_errors)
  440. signals.task_rejected.send(sender=self, message=message, exc=exc)
  441. def update_strategies(self):
  442. loader = self.app.loader
  443. for name, task in items(self.app.tasks):
  444. self.strategies[name] = task.start_strategy(self.app, self)
  445. task.__trace__ = build_tracer(name, task, loader, self.hostname,
  446. app=self.app)
  447. def create_task_handler(self, promise=promise):
  448. strategies = self.strategies
  449. on_unknown_message = self.on_unknown_message
  450. on_unknown_task = self.on_unknown_task
  451. on_invalid_task = self.on_invalid_task
  452. callbacks = self.on_task_message
  453. call_soon = self.call_soon
  454. def on_task_received(message):
  455. # payload will only be set for v1 protocol, since v2
  456. # will defer deserializing the message body to the pool.
  457. payload = None
  458. try:
  459. type_ = message.headers['task'] # protocol v2
  460. except TypeError:
  461. return on_unknown_message(None, message)
  462. except KeyError:
  463. try:
  464. payload = message.decode()
  465. except Exception as exc: # pylint: disable=broad-except
  466. return self.on_decode_error(message, exc)
  467. try:
  468. type_, payload = payload['task'], payload # protocol v1
  469. except (TypeError, KeyError):
  470. return on_unknown_message(payload, message)
  471. try:
  472. strategy = strategies[type_]
  473. except KeyError as exc:
  474. return on_unknown_task(None, message, exc)
  475. else:
  476. try:
  477. strategy(
  478. message, payload,
  479. promise(call_soon, (message.ack_log_error,)),
  480. promise(call_soon, (message.reject_log_error,)),
  481. callbacks,
  482. )
  483. except InvalidTaskError as exc:
  484. return on_invalid_task(payload, message, exc)
  485. except DecodeError as exc:
  486. return self.on_decode_error(message, exc)
  487. return on_task_received
  488. def __repr__(self):
  489. """``repr(self)``."""
  490. return '<Consumer: {self.hostname} ({state})>'.format(
  491. self=self, state=self.blueprint.human_state(),
  492. )
  493. class Evloop(bootsteps.StartStopStep):
  494. """Event loop service.
  495. Note:
  496. This is always started last.
  497. """
  498. label = 'event loop'
  499. last = True
  500. def start(self, c):
  501. self.patch_all(c)
  502. c.loop(*c.loop_args())
  503. def patch_all(self, c):
  504. c.qos._mutex = DummyLock()