consumer.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. # -*- coding: utf-8 -*-
  2. """Worker Consumer Blueprint.
  3. This module contains the components responsible for consuming messages
  4. from the broker, processing the messages and keeping the broker connections
  5. up and running.
  6. """
  7. import errno
  8. import logging
  9. import os
  10. from collections import defaultdict
  11. from time import sleep
  12. from billiard.common import restart_state
  13. from billiard.exceptions import RestartFreqExceeded
  14. from kombu.async.semaphore import DummyLock
  15. from kombu.utils.compat import _detect_environment
  16. from kombu.utils.encoding import safe_repr
  17. from kombu.utils.limits import TokenBucket
  18. from vine import ppartial, promise
  19. from celery import bootsteps
  20. from celery import signals
  21. from celery.app.trace import build_tracer
  22. from celery.exceptions import InvalidTaskError, NotRegistered
  23. from celery.utils.functional import noop
  24. from celery.utils.log import get_logger
  25. from celery.utils.nodenames import gethostname
  26. from celery.utils.objects import Bunch
  27. from celery.utils.text import truncate
  28. from celery.utils.time import humanize_seconds, rate
  29. from celery.worker import loops
  30. from celery.worker.state import (
  31. task_reserved, maybe_shutdown, reserved_requests,
  32. )
  33. __all__ = ['Consumer', 'Evloop', 'dump_body']
  34. CLOSE = bootsteps.CLOSE
  35. logger = get_logger(__name__)
  36. debug, info, warn, error, crit = (logger.debug, logger.info, logger.warning,
  37. logger.error, logger.critical)
  38. CONNECTION_RETRY = """\
  39. consumer: Connection to broker lost. \
  40. Trying to re-establish the connection...\
  41. """
  42. CONNECTION_RETRY_STEP = """\
  43. Trying again {when}...\
  44. """
  45. CONNECTION_ERROR = """\
  46. consumer: Cannot connect to %s: %s.
  47. %s
  48. """
  49. CONNECTION_FAILOVER = """\
  50. Will retry using next failover.\
  51. """
  52. UNKNOWN_FORMAT = """\
  53. Received and deleted unknown message. Wrong destination?!?
  54. The full contents of the message body was: %s
  55. """
  56. #: Error message for when an unregistered task is received.
  57. UNKNOWN_TASK_ERROR = """\
  58. Received unregistered task of type %s.
  59. The message has been ignored and discarded.
  60. Did you remember to import the module containing this task?
  61. Or maybe you're using relative imports?
  62. Please see
  63. http://docs.celeryq.org/en/latest/internals/protocol.html
  64. for more information.
  65. The full contents of the message body was:
  66. %s
  67. """
  68. #: Error message for when an invalid task message is received.
  69. INVALID_TASK_ERROR = """\
  70. Received invalid task message: %s
  71. The message has been ignored and discarded.
  72. Please ensure your message conforms to the task
  73. message protocol as described here:
  74. http://docs.celeryq.org/en/latest/internals/protocol.html
  75. The full contents of the message body was:
  76. %s
  77. """
  78. MESSAGE_DECODE_ERROR = """\
  79. Can't decode message body: %r [type:%r encoding:%r headers:%s]
  80. body: %s
  81. """
  82. MESSAGE_REPORT = """\
  83. body: {0}
  84. {{content_type:{1} content_encoding:{2}
  85. delivery_info:{3} headers={4}}}
  86. """
  87. def dump_body(m, body):
  88. """Format message body for debugging purposes."""
  89. # v2 protocol does not deserialize body
  90. body = m.body if body is None else body
  91. return '{0} ({1}b)'.format(
  92. truncate(safe_repr(body), 1024), len(m.body))
  93. class Consumer:
  94. """Consumer blueprint."""
  95. Strategies = dict
  96. #: Optional callback called the first time the worker
  97. #: is ready to receive tasks.
  98. init_callback = None
  99. #: The current worker pool instance.
  100. pool = None
  101. #: A timer used for high-priority internal tasks, such
  102. #: as sending heartbeats.
  103. timer = None
  104. restart_count = -1 # first start is the same as a restart
  105. class Blueprint(bootsteps.Blueprint):
  106. """Consumer blueprint."""
  107. name = 'Consumer'
  108. default_steps = [
  109. 'celery.worker.consumer.connection:Connection',
  110. 'celery.worker.consumer.mingle:Mingle',
  111. 'celery.worker.consumer.events:Events',
  112. 'celery.worker.consumer.gossip:Gossip',
  113. 'celery.worker.consumer.heart:Heart',
  114. 'celery.worker.consumer.control:Control',
  115. 'celery.worker.consumer.tasks:Tasks',
  116. 'celery.worker.consumer.consumer:Evloop',
  117. 'celery.worker.consumer.agent:Agent',
  118. ]
  119. def shutdown(self, parent):
  120. self.send_all(parent, 'shutdown')
  121. def __init__(self, on_task_request,
  122. init_callback=noop, hostname=None,
  123. pool=None, app=None,
  124. timer=None, controller=None, hub=None, amqheartbeat=None,
  125. worker_options=None, disable_rate_limits=False,
  126. initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):
  127. self.app = app
  128. self.controller = controller
  129. self.init_callback = init_callback
  130. self.hostname = hostname or gethostname()
  131. self.pid = os.getpid()
  132. self.pool = pool
  133. self.timer = timer
  134. self.strategies = self.Strategies()
  135. self.conninfo = self.app.connection_for_read()
  136. self.connection_errors = self.conninfo.connection_errors
  137. self.channel_errors = self.conninfo.channel_errors
  138. self._restart_state = restart_state(maxR=5, maxT=1)
  139. self._does_info = logger.isEnabledFor(logging.INFO)
  140. self._limit_order = 0
  141. self.on_task_request = on_task_request
  142. self.on_task_message = set()
  143. self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate
  144. self.disable_rate_limits = disable_rate_limits
  145. self.initial_prefetch_count = initial_prefetch_count
  146. self.prefetch_multiplier = prefetch_multiplier
  147. # this contains a tokenbucket for each task type by name, used for
  148. # rate limits, or None if rate limits are disabled for that task.
  149. self.task_buckets = defaultdict(lambda: None)
  150. self.reset_rate_limits()
  151. self.hub = hub
  152. if self.hub or getattr(self.pool, 'is_green', False):
  153. self.amqheartbeat = amqheartbeat
  154. if self.amqheartbeat is None:
  155. self.amqheartbeat = self.app.conf.broker_heartbeat
  156. else:
  157. self.amqheartbeat = 0
  158. if not hasattr(self, 'loop'):
  159. self.loop = loops.asynloop if hub else loops.synloop
  160. if _detect_environment() == 'gevent':
  161. # there's a gevent bug that causes timeouts to not be reset,
  162. # so if the connection timeout is exceeded once, it can NEVER
  163. # connect again.
  164. self.app.conf.broker_connection_timeout = None
  165. self._pending_operations = []
  166. self.steps = []
  167. self.blueprint = self.Blueprint(
  168. steps=self.app.steps['consumer'],
  169. on_close=self.on_close,
  170. )
  171. self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
  172. def call_soon(self, p, *args, **kwargs):
  173. p = ppartial(p, *args, **kwargs)
  174. if self.hub:
  175. return self.hub.call_soon(p)
  176. self._pending_operations.append(p)
  177. return p
  178. def perform_pending_operations(self):
  179. if not self.hub:
  180. while self._pending_operations:
  181. try:
  182. self._pending_operations.pop()()
  183. except Exception as exc: # pylint: disable=broad-except
  184. logger.exception('Pending callback raised: %r', exc)
  185. def bucket_for_task(self, type):
  186. limit = rate(getattr(type, 'rate_limit', None))
  187. return TokenBucket(limit, capacity=1) if limit else None
  188. def reset_rate_limits(self):
  189. self.task_buckets.update(
  190. (n, self.bucket_for_task(t)) for n, t in self.app.tasks.items()
  191. )
  192. def _update_prefetch_count(self, index=0):
  193. """Update prefetch count after pool/shrink grow operations.
  194. Index must be the change in number of processes as a positive
  195. (increasing) or negative (decreasing) number.
  196. Note:
  197. Currently pool grow operations will end up with an offset
  198. of +1 if the initial size of the pool was 0 (e.g.
  199. :option:`--autoscale=1,0 <celery worker --autoscale>`).
  200. """
  201. num_processes = self.pool.num_processes
  202. if not self.initial_prefetch_count or not num_processes:
  203. return # prefetch disabled
  204. self.initial_prefetch_count = (
  205. self.pool.num_processes * self.prefetch_multiplier
  206. )
  207. return self._update_qos_eventually(index)
  208. def _update_qos_eventually(self, index):
  209. return (self.qos.decrement_eventually if index < 0
  210. else self.qos.increment_eventually)(
  211. abs(index) * self.prefetch_multiplier)
  212. def _limit_move_to_pool(self, request):
  213. task_reserved(request)
  214. self.on_task_request(request)
  215. def _on_bucket_wakeup(self, bucket, tokens):
  216. try:
  217. request = bucket.pop()
  218. except IndexError:
  219. pass
  220. else:
  221. self._limit_move_to_pool(request)
  222. self._schedule_oldest_bucket_request(bucket, tokens)
  223. def _schedule_oldest_bucket_request(self, bucket, tokens):
  224. try:
  225. request = bucket.pop()
  226. except IndexError:
  227. pass
  228. else:
  229. return self._schedule_bucket_request(request, bucket, tokens)
  230. def _schedule_bucket_request(self, request, bucket, tokens):
  231. bucket.can_consume(tokens)
  232. bucket.add(request)
  233. pri = self._limit_order = (self._limit_order + 1) % 10
  234. hold = bucket.expected_time(tokens)
  235. self.timer.call_after(
  236. hold, self._on_bucket_wakeup, (bucket, tokens),
  237. priority=pri,
  238. )
  239. def _limit_task(self, request, bucket, tokens):
  240. if bucket.contents:
  241. return bucket.add(request)
  242. return self._schedule_bucket_request(request, bucket, tokens)
  243. def start(self):
  244. blueprint = self.blueprint
  245. while blueprint.state != CLOSE:
  246. maybe_shutdown()
  247. if self.restart_count:
  248. try:
  249. self._restart_state.step()
  250. except RestartFreqExceeded as exc:
  251. crit('Frequent restarts detected: %r', exc, exc_info=1)
  252. sleep(1)
  253. self.restart_count += 1
  254. try:
  255. blueprint.start(self)
  256. except self.connection_errors as exc:
  257. # If we're not retrying connections, no need to catch
  258. # connection errors
  259. if not self.app.conf.broker_connection_retry:
  260. raise
  261. if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
  262. raise # Too many open files
  263. maybe_shutdown()
  264. if blueprint.state != CLOSE:
  265. if self.connection:
  266. self.on_connection_error_after_connected(exc)
  267. else:
  268. self.on_connection_error_before_connected(exc)
  269. self.on_close()
  270. blueprint.restart(self)
  271. def on_connection_error_before_connected(self, exc):
  272. error(CONNECTION_ERROR, self.conninfo.as_uri(), exc,
  273. 'Trying to reconnect...')
  274. def on_connection_error_after_connected(self, exc):
  275. warn(CONNECTION_RETRY, exc_info=True)
  276. try:
  277. self.connection.collect()
  278. except Exception: # pylint: disable=broad-except
  279. pass
  280. def register_with_event_loop(self, hub):
  281. self.blueprint.send_all(
  282. self, 'register_with_event_loop', args=(hub,),
  283. description='Hub.register',
  284. )
  285. def shutdown(self):
  286. self.blueprint.shutdown(self)
  287. def stop(self):
  288. self.blueprint.stop(self)
  289. def on_ready(self):
  290. callback, self.init_callback = self.init_callback, None
  291. if callback:
  292. callback(self)
  293. def loop_args(self):
  294. return (self, self.connection, self.task_consumer,
  295. self.blueprint, self.hub, self.qos, self.amqheartbeat,
  296. self.app.clock, self.amqheartbeat_rate)
  297. def on_decode_error(self, message, exc):
  298. """Callback called if an error occurs while decoding a message.
  299. Simply logs the error and acknowledges the message so it
  300. doesn't enter a loop.
  301. Arguments:
  302. message (kombu.Message): The message received.
  303. exc (Exception): The exception being handled.
  304. """
  305. crit(MESSAGE_DECODE_ERROR,
  306. exc, message.content_type, message.content_encoding,
  307. safe_repr(message.headers), dump_body(message, message.body),
  308. exc_info=1)
  309. message.ack()
  310. def on_close(self):
  311. # Clear internal queues to get rid of old messages.
  312. # They can't be acked anyway, as a delivery tag is specific
  313. # to the current channel.
  314. if self.controller and self.controller.semaphore:
  315. self.controller.semaphore.clear()
  316. if self.timer:
  317. self.timer.clear()
  318. for bucket in self.task_buckets.values():
  319. if bucket:
  320. bucket.clear_pending()
  321. reserved_requests.clear()
  322. if self.pool and self.pool.flush:
  323. self.pool.flush()
  324. def connect(self):
  325. """Establish the broker connection.
  326. Retries establishing the connection if the
  327. :setting:`broker_connection_retry` setting is enabled
  328. """
  329. conn = self.app.connection_for_read(heartbeat=self.amqheartbeat)
  330. # Callback called for each retry while the connection
  331. # can't be established.
  332. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  333. if getattr(conn, 'alt', None) and interval == 0:
  334. next_step = CONNECTION_FAILOVER
  335. error(CONNECTION_ERROR, conn.as_uri(), exc,
  336. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  337. # remember that the connection is lazy, it won't establish
  338. # until needed.
  339. if not self.app.conf.broker_connection_retry:
  340. # retry disabled, just call connect directly.
  341. conn.connect()
  342. return conn
  343. conn = conn.ensure_connection(
  344. _error_handler, self.app.conf.broker_connection_max_retries,
  345. callback=maybe_shutdown,
  346. )
  347. if self.hub:
  348. conn.transport.register_with_event_loop(conn.connection, self.hub)
  349. return conn
  350. def _flush_events(self):
  351. if self.event_dispatcher:
  352. self.event_dispatcher.flush()
  353. def on_send_event_buffered(self):
  354. if self.hub:
  355. self.hub._ready.add(self._flush_events)
  356. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  357. routing_key=None, **options):
  358. cset = self.task_consumer
  359. queues = self.app.amqp.queues
  360. # Must use in' here, as __missing__ will automatically
  361. # create queues when :setting:`task_create_missing_queues` is enabled.
  362. # (Issue #1079)
  363. if queue in queues:
  364. q = queues[queue]
  365. else:
  366. exchange = queue if exchange is None else exchange
  367. exchange_type = ('direct' if exchange_type is None
  368. else exchange_type)
  369. q = queues.select_add(queue,
  370. exchange=exchange,
  371. exchange_type=exchange_type,
  372. routing_key=routing_key, **options)
  373. if not cset.consuming_from(queue):
  374. cset.add_queue(q)
  375. cset.consume()
  376. info('Started consuming from %s', queue)
  377. def cancel_task_queue(self, queue):
  378. info('Canceling queue %s', queue)
  379. self.app.amqp.queues.deselect(queue)
  380. self.task_consumer.cancel_by_queue(queue)
  381. def apply_eta_task(self, task):
  382. """Method called by the timer to apply a task with an ETA/countdown."""
  383. task_reserved(task)
  384. self.on_task_request(task)
  385. self.qos.decrement_eventually()
  386. def _message_report(self, body, message):
  387. return MESSAGE_REPORT.format(dump_body(message, body),
  388. safe_repr(message.content_type),
  389. safe_repr(message.content_encoding),
  390. safe_repr(message.delivery_info),
  391. safe_repr(message.headers))
  392. def on_unknown_message(self, body, message):
  393. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  394. message.reject_log_error(logger, self.connection_errors)
  395. signals.task_rejected.send(sender=self, message=message, exc=None)
  396. def on_unknown_task(self, body, message, exc):
  397. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  398. try:
  399. id_, name = message.headers['id'], message.headers['task']
  400. root_id = message.headers.get('root_id')
  401. except KeyError: # proto1
  402. payload = message.payload
  403. id_, name = payload['id'], payload['task']
  404. root_id = None
  405. request = Bunch(
  406. name=name, chord=None, root_id=root_id,
  407. correlation_id=message.properties.get('correlation_id'),
  408. reply_to=message.properties.get('reply_to'),
  409. errbacks=None,
  410. )
  411. message.reject_log_error(logger, self.connection_errors)
  412. self.app.backend.mark_as_failure(
  413. id_, NotRegistered(name), request=request,
  414. )
  415. if self.event_dispatcher:
  416. self.event_dispatcher.send(
  417. 'task-failed', uuid=id_,
  418. exception='NotRegistered({0!r})'.format(name),
  419. )
  420. signals.task_unknown.send(
  421. sender=self, message=message, exc=exc, name=name, id=id_,
  422. )
  423. def on_invalid_task(self, body, message, exc):
  424. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  425. message.reject_log_error(logger, self.connection_errors)
  426. signals.task_rejected.send(sender=self, message=message, exc=exc)
  427. def update_strategies(self):
  428. loader = self.app.loader
  429. for name, task in self.app.tasks.items():
  430. self.strategies[name] = task.start_strategy(self.app, self)
  431. task.__trace__ = build_tracer(name, task, loader, self.hostname,
  432. app=self.app)
  433. def create_task_handler(self, promise=promise):
  434. strategies = self.strategies
  435. on_unknown_message = self.on_unknown_message
  436. on_unknown_task = self.on_unknown_task
  437. on_invalid_task = self.on_invalid_task
  438. callbacks = self.on_task_message
  439. call_soon = self.call_soon
  440. def on_task_received(message):
  441. # payload will only be set for v1 protocol, since v2
  442. # will defer deserializing the message body to the pool.
  443. payload = None
  444. try:
  445. type_ = message.headers['task'] # protocol v2
  446. except TypeError:
  447. return on_unknown_message(None, message)
  448. except KeyError:
  449. try:
  450. payload = message.decode()
  451. except Exception as exc: # pylint: disable=broad-except
  452. return self.on_decode_error(message, exc)
  453. try:
  454. type_, payload = payload['task'], payload # protocol v1
  455. except (TypeError, KeyError):
  456. return on_unknown_message(payload, message)
  457. try:
  458. strategy = strategies[type_]
  459. except KeyError as exc:
  460. return on_unknown_task(None, message, exc)
  461. else:
  462. try:
  463. strategy(
  464. message, payload,
  465. promise(call_soon, (message.ack_log_error,)),
  466. promise(call_soon, (message.reject_log_error,)),
  467. callbacks,
  468. )
  469. except InvalidTaskError as exc:
  470. return on_invalid_task(payload, message, exc)
  471. return on_task_received
  472. def __repr__(self):
  473. """``repr(self)``."""
  474. return '<Consumer: {self.hostname} ({state})>'.format(
  475. self=self, state=self.blueprint.human_state(),
  476. )
  477. class Evloop(bootsteps.StartStopStep):
  478. """Event loop service.
  479. Note:
  480. This is always started last.
  481. """
  482. label = 'event loop'
  483. last = True
  484. def start(self, c):
  485. self.patch_all(c)
  486. c.loop(*c.loop_args())
  487. def patch_all(self, c):
  488. c.qos._mutex = DummyLock()