consumer.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. # -*- coding: utf-8 -*-
  2. """Worker Consumer Blueprint.
  3. This module contains the components responsible for consuming messages
  4. from the broker, processing the messages and keeping the broker connections
  5. up and running.
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import errno
  9. import logging
  10. import os
  11. from collections import defaultdict
  12. from time import sleep
  13. from billiard.common import restart_state
  14. from billiard.exceptions import RestartFreqExceeded
  15. from kombu.async.semaphore import DummyLock
  16. from kombu.utils.compat import _detect_environment
  17. from kombu.utils.encoding import safe_repr, bytes_t
  18. from kombu.utils.limits import TokenBucket
  19. from vine import ppartial, promise
  20. from celery import bootsteps
  21. from celery import signals
  22. from celery.app.trace import build_tracer
  23. from celery.exceptions import InvalidTaskError, NotRegistered
  24. from celery.five import buffer_t, items, python_2_unicode_compatible, values
  25. from celery.utils.functional import noop
  26. from celery.utils.log import get_logger
  27. from celery.utils.nodenames import gethostname
  28. from celery.utils.objects import Bunch
  29. from celery.utils.text import truncate
  30. from celery.utils.time import humanize_seconds, rate
  31. from celery.worker import loops
  32. from celery.worker.state import (
  33. task_reserved, maybe_shutdown, reserved_requests,
  34. )
  35. __all__ = ['Consumer', 'Evloop', 'dump_body']
  36. CLOSE = bootsteps.CLOSE
  37. logger = get_logger(__name__)
  38. debug, info, warn, error, crit = (logger.debug, logger.info, logger.warning,
  39. logger.error, logger.critical)
  40. CONNECTION_RETRY = """\
  41. consumer: Connection to broker lost. \
  42. Trying to re-establish the connection...\
  43. """
  44. CONNECTION_RETRY_STEP = """\
  45. Trying again {when}...\
  46. """
  47. CONNECTION_ERROR = """\
  48. consumer: Cannot connect to %s: %s.
  49. %s
  50. """
  51. CONNECTION_FAILOVER = """\
  52. Will retry using next failover.\
  53. """
  54. UNKNOWN_FORMAT = """\
  55. Received and deleted unknown message. Wrong destination?!?
  56. The full contents of the message body was: %s
  57. """
  58. #: Error message for when an unregistered task is received.
  59. UNKNOWN_TASK_ERROR = """\
  60. Received unregistered task of type %s.
  61. The message has been ignored and discarded.
  62. Did you remember to import the module containing this task?
  63. Or maybe you're using relative imports?
  64. Please see
  65. http://docs.celeryq.org/en/latest/internals/protocol.html
  66. for more information.
  67. The full contents of the message body was:
  68. %s
  69. """
  70. #: Error message for when an invalid task message is received.
  71. INVALID_TASK_ERROR = """\
  72. Received invalid task message: %s
  73. The message has been ignored and discarded.
  74. Please ensure your message conforms to the task
  75. message protocol as described here:
  76. http://docs.celeryq.org/en/latest/internals/protocol.html
  77. The full contents of the message body was:
  78. %s
  79. """
  80. MESSAGE_DECODE_ERROR = """\
  81. Can't decode message body: %r [type:%r encoding:%r headers:%s]
  82. body: %s
  83. """
  84. MESSAGE_REPORT = """\
  85. body: {0}
  86. {{content_type:{1} content_encoding:{2}
  87. delivery_info:{3} headers={4}}}
  88. """
  89. def dump_body(m, body):
  90. """Format message body for debugging purposes."""
  91. # v2 protocol does not deserialize body
  92. body = m.body if body is None else body
  93. if isinstance(body, buffer_t):
  94. body = bytes_t(body)
  95. return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
  96. len(m.body))
  97. @python_2_unicode_compatible
  98. class Consumer(object):
  99. """Consumer blueprint."""
  100. Strategies = dict
  101. #: Optional callback called the first time the worker
  102. #: is ready to receive tasks.
  103. init_callback = None
  104. #: The current worker pool instance.
  105. pool = None
  106. #: A timer used for high-priority internal tasks, such
  107. #: as sending heartbeats.
  108. timer = None
  109. restart_count = -1 # first start is the same as a restart
  110. class Blueprint(bootsteps.Blueprint):
  111. """Consumer blueprint."""
  112. name = 'Consumer'
  113. default_steps = [
  114. 'celery.worker.consumer.connection:Connection',
  115. 'celery.worker.consumer.mingle:Mingle',
  116. 'celery.worker.consumer.events:Events',
  117. 'celery.worker.consumer.gossip:Gossip',
  118. 'celery.worker.consumer.heart:Heart',
  119. 'celery.worker.consumer.control:Control',
  120. 'celery.worker.consumer.tasks:Tasks',
  121. 'celery.worker.consumer.consumer:Evloop',
  122. 'celery.worker.consumer.agent:Agent',
  123. ]
  124. def shutdown(self, parent):
  125. self.send_all(parent, 'shutdown')
  126. def __init__(self, on_task_request,
  127. init_callback=noop, hostname=None,
  128. pool=None, app=None,
  129. timer=None, controller=None, hub=None, amqheartbeat=None,
  130. worker_options=None, disable_rate_limits=False,
  131. initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):
  132. self.app = app
  133. self.controller = controller
  134. self.init_callback = init_callback
  135. self.hostname = hostname or gethostname()
  136. self.pid = os.getpid()
  137. self.pool = pool
  138. self.timer = timer
  139. self.strategies = self.Strategies()
  140. self.conninfo = self.app.connection_for_read()
  141. self.connection_errors = self.conninfo.connection_errors
  142. self.channel_errors = self.conninfo.channel_errors
  143. self._restart_state = restart_state(maxR=5, maxT=1)
  144. self._does_info = logger.isEnabledFor(logging.INFO)
  145. self._limit_order = 0
  146. self.on_task_request = on_task_request
  147. self.on_task_message = set()
  148. self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate
  149. self.disable_rate_limits = disable_rate_limits
  150. self.initial_prefetch_count = initial_prefetch_count
  151. self.prefetch_multiplier = prefetch_multiplier
  152. # this contains a tokenbucket for each task type by name, used for
  153. # rate limits, or None if rate limits are disabled for that task.
  154. self.task_buckets = defaultdict(lambda: None)
  155. self.reset_rate_limits()
  156. self.hub = hub
  157. if self.hub or getattr(self.pool, 'is_green', False):
  158. self.amqheartbeat = amqheartbeat
  159. if self.amqheartbeat is None:
  160. self.amqheartbeat = self.app.conf.broker_heartbeat
  161. else:
  162. self.amqheartbeat = 0
  163. if not hasattr(self, 'loop'):
  164. self.loop = loops.asynloop if hub else loops.synloop
  165. if _detect_environment() == 'gevent':
  166. # there's a gevent bug that causes timeouts to not be reset,
  167. # so if the connection timeout is exceeded once, it can NEVER
  168. # connect again.
  169. self.app.conf.broker_connection_timeout = None
  170. self._pending_operations = []
  171. self.steps = []
  172. self.blueprint = self.Blueprint(
  173. steps=self.app.steps['consumer'],
  174. on_close=self.on_close,
  175. )
  176. self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
  177. def call_soon(self, p, *args, **kwargs):
  178. p = ppartial(p, *args, **kwargs)
  179. if self.hub:
  180. return self.hub.call_soon(p)
  181. self._pending_operations.append(p)
  182. return p
  183. def perform_pending_operations(self):
  184. if not self.hub:
  185. while self._pending_operations:
  186. try:
  187. self._pending_operations.pop()()
  188. except Exception as exc: # pylint: disable=broad-except
  189. logger.exception('Pending callback raised: %r', exc)
  190. def bucket_for_task(self, type):
  191. limit = rate(getattr(type, 'rate_limit', None))
  192. return TokenBucket(limit, capacity=1) if limit else None
  193. def reset_rate_limits(self):
  194. self.task_buckets.update(
  195. (n, self.bucket_for_task(t)) for n, t in items(self.app.tasks)
  196. )
  197. def _update_prefetch_count(self, index=0):
  198. """Update prefetch count after pool/shrink grow operations.
  199. Index must be the change in number of processes as a positive
  200. (increasing) or negative (decreasing) number.
  201. Note:
  202. Currently pool grow operations will end up with an offset
  203. of +1 if the initial size of the pool was 0 (e.g.
  204. :option:`--autoscale=1,0 <celery worker --autoscale>`).
  205. """
  206. num_processes = self.pool.num_processes
  207. if not self.initial_prefetch_count or not num_processes:
  208. return # prefetch disabled
  209. self.initial_prefetch_count = (
  210. self.pool.num_processes * self.prefetch_multiplier
  211. )
  212. return self._update_qos_eventually(index)
  213. def _update_qos_eventually(self, index):
  214. return (self.qos.decrement_eventually if index < 0
  215. else self.qos.increment_eventually)(
  216. abs(index) * self.prefetch_multiplier)
  217. def _limit_move_to_pool(self, request):
  218. task_reserved(request)
  219. self.on_task_request(request)
  220. def _on_bucket_wakeup(self, bucket, tokens):
  221. try:
  222. request = bucket.pop()
  223. except IndexError:
  224. pass
  225. else:
  226. self._limit_move_to_pool(request)
  227. self._schedule_oldest_bucket_request(bucket, tokens)
  228. def _schedule_oldest_bucket_request(self, bucket, tokens):
  229. try:
  230. request = bucket.pop()
  231. except IndexError:
  232. pass
  233. else:
  234. return self._schedule_bucket_request(request, bucket, tokens)
  235. def _schedule_bucket_request(self, request, bucket, tokens):
  236. bucket.can_consume(tokens)
  237. bucket.add(request)
  238. pri = self._limit_order = (self._limit_order + 1) % 10
  239. hold = bucket.expected_time(tokens)
  240. self.timer.call_after(
  241. hold, self._on_bucket_wakeup, (bucket, tokens),
  242. priority=pri,
  243. )
  244. def _limit_task(self, request, bucket, tokens):
  245. if bucket.contents:
  246. return bucket.add(request)
  247. return self._schedule_bucket_request(request, bucket, tokens)
  248. def start(self):
  249. blueprint = self.blueprint
  250. while blueprint.state != CLOSE:
  251. maybe_shutdown()
  252. if self.restart_count:
  253. try:
  254. self._restart_state.step()
  255. except RestartFreqExceeded as exc:
  256. crit('Frequent restarts detected: %r', exc, exc_info=1)
  257. sleep(1)
  258. self.restart_count += 1
  259. try:
  260. blueprint.start(self)
  261. except self.connection_errors as exc:
  262. # If we're not retrying connections, no need to catch
  263. # connection errors
  264. if not self.app.conf.broker_connection_retry:
  265. raise
  266. if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
  267. raise # Too many open files
  268. maybe_shutdown()
  269. if blueprint.state != CLOSE:
  270. if self.connection:
  271. self.on_connection_error_after_connected(exc)
  272. else:
  273. self.on_connection_error_before_connected(exc)
  274. self.on_close()
  275. blueprint.restart(self)
  276. def on_connection_error_before_connected(self, exc):
  277. error(CONNECTION_ERROR, self.conninfo.as_uri(), exc,
  278. 'Trying to reconnect...')
  279. def on_connection_error_after_connected(self, exc):
  280. warn(CONNECTION_RETRY, exc_info=True)
  281. try:
  282. self.connection.collect()
  283. except Exception: # pylint: disable=broad-except
  284. pass
  285. def register_with_event_loop(self, hub):
  286. self.blueprint.send_all(
  287. self, 'register_with_event_loop', args=(hub,),
  288. description='Hub.register',
  289. )
  290. def shutdown(self):
  291. self.blueprint.shutdown(self)
  292. def stop(self):
  293. self.blueprint.stop(self)
  294. def on_ready(self):
  295. callback, self.init_callback = self.init_callback, None
  296. if callback:
  297. callback(self)
  298. def loop_args(self):
  299. return (self, self.connection, self.task_consumer,
  300. self.blueprint, self.hub, self.qos, self.amqheartbeat,
  301. self.app.clock, self.amqheartbeat_rate)
  302. def on_decode_error(self, message, exc):
  303. """Callback called if an error occurs while decoding a message.
  304. Simply logs the error and acknowledges the message so it
  305. doesn't enter a loop.
  306. Arguments:
  307. message (kombu.Message): The message received.
  308. exc (Exception): The exception being handled.
  309. """
  310. crit(MESSAGE_DECODE_ERROR,
  311. exc, message.content_type, message.content_encoding,
  312. safe_repr(message.headers), dump_body(message, message.body),
  313. exc_info=1)
  314. message.ack()
  315. def on_close(self):
  316. # Clear internal queues to get rid of old messages.
  317. # They can't be acked anyway, as a delivery tag is specific
  318. # to the current channel.
  319. if self.controller and self.controller.semaphore:
  320. self.controller.semaphore.clear()
  321. if self.timer:
  322. self.timer.clear()
  323. for bucket in values(self.task_buckets):
  324. if bucket:
  325. bucket.clear_pending()
  326. reserved_requests.clear()
  327. if self.pool and self.pool.flush:
  328. self.pool.flush()
  329. def connect(self):
  330. """Establish the broker connection used for consuming tasks.
  331. Retries establishing the connection if the
  332. :setting:`broker_connection_retry` setting is enabled
  333. """
  334. return self.connection_for_read(heartbeat=self.amqheartbeat)
  335. def connection_for_read(self, heartbeat=None):
  336. return self.ensure_connected(
  337. self.app.connection_for_read(heartbeat=heartbeat))
  338. def connection_for_write(self, heartbeat=None):
  339. return self.ensure_connected(
  340. self.app.connection_for_write(heartbeat=heartbeat))
  341. def ensure_connected(self, conn):
  342. # Callback called for each retry while the connection
  343. # can't be established.
  344. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  345. if getattr(conn, 'alt', None) and interval == 0:
  346. next_step = CONNECTION_FAILOVER
  347. error(CONNECTION_ERROR, conn.as_uri(), exc,
  348. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  349. # remember that the connection is lazy, it won't establish
  350. # until needed.
  351. if not self.app.conf.broker_connection_retry:
  352. # retry disabled, just call connect directly.
  353. conn.connect()
  354. return conn
  355. conn = conn.ensure_connection(
  356. _error_handler, self.app.conf.broker_connection_max_retries,
  357. callback=maybe_shutdown,
  358. )
  359. if self.hub:
  360. conn.transport.register_with_event_loop(conn.connection, self.hub)
  361. return conn
  362. def _flush_events(self):
  363. if self.event_dispatcher:
  364. self.event_dispatcher.flush()
  365. def on_send_event_buffered(self):
  366. if self.hub:
  367. self.hub._ready.add(self._flush_events)
  368. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  369. routing_key=None, **options):
  370. cset = self.task_consumer
  371. queues = self.app.amqp.queues
  372. # Must use in' here, as __missing__ will automatically
  373. # create queues when :setting:`task_create_missing_queues` is enabled.
  374. # (Issue #1079)
  375. if queue in queues:
  376. q = queues[queue]
  377. else:
  378. exchange = queue if exchange is None else exchange
  379. exchange_type = ('direct' if exchange_type is None
  380. else exchange_type)
  381. q = queues.select_add(queue,
  382. exchange=exchange,
  383. exchange_type=exchange_type,
  384. routing_key=routing_key, **options)
  385. if not cset.consuming_from(queue):
  386. cset.add_queue(q)
  387. cset.consume()
  388. info('Started consuming from %s', queue)
  389. def cancel_task_queue(self, queue):
  390. info('Canceling queue %s', queue)
  391. self.app.amqp.queues.deselect(queue)
  392. self.task_consumer.cancel_by_queue(queue)
  393. def apply_eta_task(self, task):
  394. """Method called by the timer to apply a task with an ETA/countdown."""
  395. task_reserved(task)
  396. self.on_task_request(task)
  397. self.qos.decrement_eventually()
  398. def _message_report(self, body, message):
  399. return MESSAGE_REPORT.format(dump_body(message, body),
  400. safe_repr(message.content_type),
  401. safe_repr(message.content_encoding),
  402. safe_repr(message.delivery_info),
  403. safe_repr(message.headers))
  404. def on_unknown_message(self, body, message):
  405. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  406. message.reject_log_error(logger, self.connection_errors)
  407. signals.task_rejected.send(sender=self, message=message, exc=None)
  408. def on_unknown_task(self, body, message, exc):
  409. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  410. try:
  411. id_, name = message.headers['id'], message.headers['task']
  412. root_id = message.headers.get('root_id')
  413. except KeyError: # proto1
  414. payload = message.payload
  415. id_, name = payload['id'], payload['task']
  416. root_id = None
  417. request = Bunch(
  418. name=name, chord=None, root_id=root_id,
  419. correlation_id=message.properties.get('correlation_id'),
  420. reply_to=message.properties.get('reply_to'),
  421. errbacks=None,
  422. )
  423. message.reject_log_error(logger, self.connection_errors)
  424. self.app.backend.mark_as_failure(
  425. id_, NotRegistered(name), request=request,
  426. )
  427. if self.event_dispatcher:
  428. self.event_dispatcher.send(
  429. 'task-failed', uuid=id_,
  430. exception='NotRegistered({0!r})'.format(name),
  431. )
  432. signals.task_unknown.send(
  433. sender=self, message=message, exc=exc, name=name, id=id_,
  434. )
  435. def on_invalid_task(self, body, message, exc):
  436. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  437. message.reject_log_error(logger, self.connection_errors)
  438. signals.task_rejected.send(sender=self, message=message, exc=exc)
  439. def update_strategies(self):
  440. loader = self.app.loader
  441. for name, task in items(self.app.tasks):
  442. self.strategies[name] = task.start_strategy(self.app, self)
  443. task.__trace__ = build_tracer(name, task, loader, self.hostname,
  444. app=self.app)
  445. def create_task_handler(self, promise=promise):
  446. strategies = self.strategies
  447. on_unknown_message = self.on_unknown_message
  448. on_unknown_task = self.on_unknown_task
  449. on_invalid_task = self.on_invalid_task
  450. callbacks = self.on_task_message
  451. call_soon = self.call_soon
  452. def on_task_received(message):
  453. # payload will only be set for v1 protocol, since v2
  454. # will defer deserializing the message body to the pool.
  455. payload = None
  456. try:
  457. type_ = message.headers['task'] # protocol v2
  458. except TypeError:
  459. return on_unknown_message(None, message)
  460. except KeyError:
  461. try:
  462. payload = message.decode()
  463. except Exception as exc: # pylint: disable=broad-except
  464. return self.on_decode_error(message, exc)
  465. try:
  466. type_, payload = payload['task'], payload # protocol v1
  467. except (TypeError, KeyError):
  468. return on_unknown_message(payload, message)
  469. try:
  470. strategy = strategies[type_]
  471. except KeyError as exc:
  472. return on_unknown_task(None, message, exc)
  473. else:
  474. try:
  475. strategy(
  476. message, payload,
  477. promise(call_soon, (message.ack_log_error,)),
  478. promise(call_soon, (message.reject_log_error,)),
  479. callbacks,
  480. )
  481. except InvalidTaskError as exc:
  482. return on_invalid_task(payload, message, exc)
  483. return on_task_received
  484. def __repr__(self):
  485. """``repr(self)``."""
  486. return '<Consumer: {self.hostname} ({state})>'.format(
  487. self=self, state=self.blueprint.human_state(),
  488. )
  489. class Evloop(bootsteps.StartStopStep):
  490. """Event loop service.
  491. Note:
  492. This is always started last.
  493. """
  494. label = 'event loop'
  495. last = True
  496. def start(self, c):
  497. self.patch_all(c)
  498. c.loop(*c.loop_args())
  499. def patch_all(self, c):
  500. c.qos._mutex = DummyLock()