consumer.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.consumer
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. This module contains the components responsible for consuming messages
  6. from the broker, processing the messages and keeping the broker connections
  7. up and running.
  8. """
  9. from __future__ import absolute_import, unicode_literals
  10. import errno
  11. import logging
  12. import os
  13. from collections import defaultdict
  14. from time import sleep
  15. from billiard.common import restart_state
  16. from billiard.exceptions import RestartFreqExceeded
  17. from kombu.async.semaphore import DummyLock
  18. from kombu.syn import _detect_environment
  19. from kombu.utils.encoding import safe_repr, bytes_t
  20. from kombu.utils.limits import TokenBucket
  21. from vine import ppartial, promise
  22. from celery import bootsteps
  23. from celery import signals
  24. from celery.app.trace import build_tracer
  25. from celery.exceptions import InvalidTaskError, NotRegistered
  26. from celery.five import buffer_t, items, python_2_unicode_compatible, values
  27. from celery.utils import gethostname
  28. from celery.utils.functional import noop
  29. from celery.utils.log import get_logger
  30. from celery.utils.objects import Bunch
  31. from celery.utils.text import truncate
  32. from celery.utils.timeutils import humanize_seconds, rate
  33. from celery.worker import loops
  34. from celery.worker.state import (
  35. task_reserved, maybe_shutdown, reserved_requests,
  36. )
  37. __all__ = ['Consumer', 'Evloop', 'dump_body']
  38. CLOSE = bootsteps.CLOSE
  39. logger = get_logger(__name__)
  40. debug, info, warn, error, crit = (logger.debug, logger.info, logger.warning,
  41. logger.error, logger.critical)
  42. CONNECTION_RETRY = """\
  43. consumer: Connection to broker lost. \
  44. Trying to re-establish the connection...\
  45. """
  46. CONNECTION_RETRY_STEP = """\
  47. Trying again {when}...\
  48. """
  49. CONNECTION_ERROR = """\
  50. consumer: Cannot connect to %s: %s.
  51. %s
  52. """
  53. CONNECTION_FAILOVER = """\
  54. Will retry using next failover.\
  55. """
  56. UNKNOWN_FORMAT = """\
  57. Received and deleted unknown message. Wrong destination?!?
  58. The full contents of the message body was: %s
  59. """
  60. #: Error message for when an unregistered task is received.
  61. UNKNOWN_TASK_ERROR = """\
  62. Received unregistered task of type %s.
  63. The message has been ignored and discarded.
  64. Did you remember to import the module containing this task?
  65. Or maybe you are using relative imports?
  66. Please see http://bit.ly/gLye1c for more information.
  67. The full contents of the message body was:
  68. %s
  69. """
  70. #: Error message for when an invalid task message is received.
  71. INVALID_TASK_ERROR = """\
  72. Received invalid task message: %s
  73. The message has been ignored and discarded.
  74. Please ensure your message conforms to the task
  75. message protocol as described here: http://bit.ly/hYj41y
  76. The full contents of the message body was:
  77. %s
  78. """
  79. MESSAGE_DECODE_ERROR = """\
  80. Can't decode message body: %r [type:%r encoding:%r headers:%s]
  81. body: %s
  82. """
  83. MESSAGE_REPORT = """\
  84. body: {0}
  85. {{content_type:{1} content_encoding:{2}
  86. delivery_info:{3} headers={4}}}
  87. """
  88. def dump_body(m, body):
  89. # v2 protocol does not deserialize body
  90. body = m.body if body is None else body
  91. if isinstance(body, buffer_t):
  92. body = bytes_t(body)
  93. return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
  94. len(m.body))
  95. @python_2_unicode_compatible
  96. class Consumer(object):
  97. Strategies = dict
  98. #: Optional callback called the first time the worker
  99. #: is ready to receive tasks.
  100. init_callback = None
  101. #: The current worker pool instance.
  102. pool = None
  103. #: A timer used for high-priority internal tasks, such
  104. #: as sending heartbeats.
  105. timer = None
  106. restart_count = -1 # first start is the same as a restart
  107. class Blueprint(bootsteps.Blueprint):
  108. name = 'Consumer'
  109. default_steps = [
  110. 'celery.worker.consumer.connection:Connection',
  111. 'celery.worker.consumer.mingle:Mingle',
  112. 'celery.worker.consumer.events:Events',
  113. 'celery.worker.consumer.gossip:Gossip',
  114. 'celery.worker.consumer.heart:Heart',
  115. 'celery.worker.consumer.control:Control',
  116. 'celery.worker.consumer.tasks:Tasks',
  117. 'celery.worker.consumer.consumer:Evloop',
  118. 'celery.worker.consumer.agent:Agent',
  119. ]
  120. def shutdown(self, parent):
  121. self.send_all(parent, 'shutdown')
  122. def __init__(self, on_task_request,
  123. init_callback=noop, hostname=None,
  124. pool=None, app=None,
  125. timer=None, controller=None, hub=None, amqheartbeat=None,
  126. worker_options=None, disable_rate_limits=False,
  127. initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):
  128. self.app = app
  129. self.controller = controller
  130. self.init_callback = init_callback
  131. self.hostname = hostname or gethostname()
  132. self.pid = os.getpid()
  133. self.pool = pool
  134. self.timer = timer
  135. self.strategies = self.Strategies()
  136. self.conninfo = self.app.connection_for_read()
  137. self.connection_errors = self.conninfo.connection_errors
  138. self.channel_errors = self.conninfo.channel_errors
  139. self._restart_state = restart_state(maxR=5, maxT=1)
  140. self._does_info = logger.isEnabledFor(logging.INFO)
  141. self._limit_order = 0
  142. self.on_task_request = on_task_request
  143. self.on_task_message = set()
  144. self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate
  145. self.disable_rate_limits = disable_rate_limits
  146. self.initial_prefetch_count = initial_prefetch_count
  147. self.prefetch_multiplier = prefetch_multiplier
  148. # this contains a tokenbucket for each task type by name, used for
  149. # rate limits, or None if rate limits are disabled for that task.
  150. self.task_buckets = defaultdict(lambda: None)
  151. self.reset_rate_limits()
  152. self.hub = hub
  153. if self.hub:
  154. self.amqheartbeat = amqheartbeat
  155. if self.amqheartbeat is None:
  156. self.amqheartbeat = self.app.conf.broker_heartbeat
  157. else:
  158. self.amqheartbeat = 0
  159. if not hasattr(self, 'loop'):
  160. self.loop = loops.asynloop if hub else loops.synloop
  161. if _detect_environment() == 'gevent':
  162. # there's a gevent bug that causes timeouts to not be reset,
  163. # so if the connection timeout is exceeded once, it can NEVER
  164. # connect again.
  165. self.app.conf.broker_connection_timeout = None
  166. self._pending_operations = []
  167. self.steps = []
  168. self.blueprint = self.Blueprint(
  169. app=self.app, on_close=self.on_close,
  170. )
  171. self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
  172. def call_soon(self, p, *args, **kwargs):
  173. p = ppartial(p, *args, **kwargs)
  174. if self.hub:
  175. return self.hub.call_soon(p)
  176. self._pending_operations.append(p)
  177. return p
  178. def perform_pending_operations(self):
  179. if not self.hub:
  180. while self._pending_operations:
  181. try:
  182. self._pending_operations.pop()()
  183. except Exception as exc:
  184. error('Pending callback raised: %r', exc, exc_info=1)
  185. def bucket_for_task(self, type):
  186. limit = rate(getattr(type, 'rate_limit', None))
  187. return TokenBucket(limit, capacity=1) if limit else None
  188. def reset_rate_limits(self):
  189. self.task_buckets.update(
  190. (n, self.bucket_for_task(t)) for n, t in items(self.app.tasks)
  191. )
  192. def _update_prefetch_count(self, index=0):
  193. """Update prefetch count after pool/shrink grow operations.
  194. Index must be the change in number of processes as a positive
  195. (increasing) or negative (decreasing) number.
  196. .. note::
  197. Currently pool grow operations will end up with an offset
  198. of +1 if the initial size of the pool was 0 (e.g.
  199. :option:`--autoscale=1,0 <celery worker --autoscale>`).
  200. """
  201. num_processes = self.pool.num_processes
  202. if not self.initial_prefetch_count or not num_processes:
  203. return # prefetch disabled
  204. self.initial_prefetch_count = (
  205. self.pool.num_processes * self.prefetch_multiplier
  206. )
  207. return self._update_qos_eventually(index)
  208. def _update_qos_eventually(self, index):
  209. return (self.qos.decrement_eventually if index < 0
  210. else self.qos.increment_eventually)(
  211. abs(index) * self.prefetch_multiplier)
  212. def _limit_move_to_pool(self, request):
  213. task_reserved(request)
  214. self.on_task_request(request)
  215. def _on_bucket_wakeup(self, bucket, tokens):
  216. try:
  217. request = bucket.pop()
  218. except IndexError:
  219. pass
  220. else:
  221. self._limit_move_to_pool(request)
  222. self._schedule_oldest_bucket_request(bucket, tokens)
  223. def _schedule_oldest_bucket_request(self, bucket, tokens):
  224. try:
  225. request = bucket.pop()
  226. except IndexError:
  227. pass
  228. else:
  229. return self._schedule_bucket_request(request, bucket, tokens)
  230. def _schedule_bucket_request(self, request, bucket, tokens):
  231. bucket.can_consume(tokens)
  232. bucket.add(request)
  233. pri = self._limit_order = (self._limit_order + 1) % 10
  234. hold = bucket.expected_time(tokens)
  235. self.timer.call_after(
  236. hold, self._on_bucket_wakeup, (bucket, tokens),
  237. priority=pri,
  238. )
  239. def _limit_task(self, request, bucket, tokens):
  240. if bucket.contents:
  241. return bucket.add(request)
  242. return self._schedule_bucket_request(request, bucket, tokens)
  243. def start(self):
  244. blueprint = self.blueprint
  245. while blueprint.state != CLOSE:
  246. self.restart_count += 1
  247. maybe_shutdown()
  248. try:
  249. blueprint.start(self)
  250. except self.connection_errors as exc:
  251. # If we're not retrying connections, no need to catch
  252. # connection errors
  253. if not self.app.conf.broker_connection_retry:
  254. raise
  255. if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
  256. raise # Too many open files
  257. maybe_shutdown()
  258. try:
  259. self._restart_state.step()
  260. except RestartFreqExceeded as exc:
  261. crit('Frequent restarts detected: %r', exc, exc_info=1)
  262. sleep(1)
  263. if blueprint.state != CLOSE:
  264. if self.connection:
  265. self.on_connection_error_after_connected(exc)
  266. else:
  267. self.on_connection_error_before_connected(exc)
  268. self.on_close()
  269. blueprint.restart(self)
  270. def on_connection_error_before_connected(self, exc):
  271. error(CONNECTION_ERROR, self.conninfo.as_uri(), exc,
  272. 'Trying to reconnect...')
  273. def on_connection_error_after_connected(self, exc):
  274. warn(CONNECTION_RETRY, exc_info=True)
  275. try:
  276. self.connection.collect()
  277. except Exception:
  278. pass
  279. def register_with_event_loop(self, hub):
  280. self.blueprint.send_all(
  281. self, 'register_with_event_loop', args=(hub,),
  282. description='Hub.register',
  283. )
  284. def shutdown(self):
  285. self.blueprint.shutdown(self)
  286. def stop(self):
  287. self.blueprint.stop(self)
  288. def on_ready(self):
  289. callback, self.init_callback = self.init_callback, None
  290. if callback:
  291. callback(self)
  292. def loop_args(self):
  293. return (self, self.connection, self.task_consumer,
  294. self.blueprint, self.hub, self.qos, self.amqheartbeat,
  295. self.app.clock, self.amqheartbeat_rate)
  296. def on_decode_error(self, message, exc):
  297. """Callback called if an error occurs while decoding
  298. a message received.
  299. Simply logs the error and acknowledges the message so it
  300. doesn't enter a loop.
  301. :param message: The message with errors.
  302. :param exc: The original exception instance.
  303. """
  304. crit(MESSAGE_DECODE_ERROR,
  305. exc, message.content_type, message.content_encoding,
  306. safe_repr(message.headers), dump_body(message, message.body),
  307. exc_info=1)
  308. message.ack()
  309. def on_close(self):
  310. # Clear internal queues to get rid of old messages.
  311. # They can't be acked anyway, as a delivery tag is specific
  312. # to the current channel.
  313. if self.controller and self.controller.semaphore:
  314. self.controller.semaphore.clear()
  315. if self.timer:
  316. self.timer.clear()
  317. for bucket in values(self.task_buckets):
  318. if bucket:
  319. bucket.clear_pending()
  320. reserved_requests.clear()
  321. if self.pool and self.pool.flush:
  322. self.pool.flush()
  323. def connect(self):
  324. """Establish the broker connection.
  325. Will retry establishing the connection if the
  326. :setting:`broker_connection_retry` setting is enabled
  327. """
  328. conn = self.app.connection_for_read(heartbeat=self.amqheartbeat)
  329. # Callback called for each retry while the connection
  330. # can't be established.
  331. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  332. if getattr(conn, 'alt', None) and interval == 0:
  333. next_step = CONNECTION_FAILOVER
  334. error(CONNECTION_ERROR, conn.as_uri(), exc,
  335. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  336. # remember that the connection is lazy, it won't establish
  337. # until needed.
  338. if not self.app.conf.broker_connection_retry:
  339. # retry disabled, just call connect directly.
  340. conn.connect()
  341. return conn
  342. conn = conn.ensure_connection(
  343. _error_handler, self.app.conf.broker_connection_max_retries,
  344. callback=maybe_shutdown,
  345. )
  346. if self.hub:
  347. conn.transport.register_with_event_loop(conn.connection, self.hub)
  348. return conn
  349. def _flush_events(self):
  350. if self.event_dispatcher:
  351. self.event_dispatcher.flush()
  352. def on_send_event_buffered(self):
  353. if self.hub:
  354. self.hub._ready.add(self._flush_events)
  355. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  356. routing_key=None, **options):
  357. cset = self.task_consumer
  358. queues = self.app.amqp.queues
  359. # Must use in' here, as __missing__ will automatically
  360. # create queues when :setting:`task_create_missing_queues` is enabled.
  361. # (Issue #1079)
  362. if queue in queues:
  363. q = queues[queue]
  364. else:
  365. exchange = queue if exchange is None else exchange
  366. exchange_type = ('direct' if exchange_type is None
  367. else exchange_type)
  368. q = queues.select_add(queue,
  369. exchange=exchange,
  370. exchange_type=exchange_type,
  371. routing_key=routing_key, **options)
  372. if not cset.consuming_from(queue):
  373. cset.add_queue(q)
  374. cset.consume()
  375. info('Started consuming from %s', queue)
  376. def cancel_task_queue(self, queue):
  377. info('Canceling queue %s', queue)
  378. self.app.amqp.queues.deselect(queue)
  379. self.task_consumer.cancel_by_queue(queue)
  380. def apply_eta_task(self, task):
  381. """Method called by the timer to apply a task with an
  382. ETA/countdown."""
  383. task_reserved(task)
  384. self.on_task_request(task)
  385. self.qos.decrement_eventually()
  386. def _message_report(self, body, message):
  387. return MESSAGE_REPORT.format(dump_body(message, body),
  388. safe_repr(message.content_type),
  389. safe_repr(message.content_encoding),
  390. safe_repr(message.delivery_info),
  391. safe_repr(message.headers))
  392. def on_unknown_message(self, body, message):
  393. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  394. message.reject_log_error(logger, self.connection_errors)
  395. signals.task_rejected.send(sender=self, message=message, exc=None)
  396. def on_unknown_task(self, body, message, exc):
  397. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  398. try:
  399. id_, name = message.headers['id'], message.headers['task']
  400. root_id = message.headers.get('root_id')
  401. except KeyError: # proto1
  402. id_, name = body['id'], body['task']
  403. root_id = None
  404. request = Bunch(
  405. name=name, chord=None, root_id=root_id,
  406. correlation_id=message.properties.get('correlation_id'),
  407. reply_to=message.properties.get('reply_to'),
  408. errbacks=None,
  409. )
  410. message.reject_log_error(logger, self.connection_errors)
  411. self.app.backend.mark_as_failure(
  412. id_, NotRegistered(name), request=request,
  413. )
  414. if self.event_dispatcher:
  415. self.event_dispatcher.send(
  416. 'task-failed', uuid=id_,
  417. exception='NotRegistered({0!r})'.format(name),
  418. )
  419. signals.task_unknown.send(
  420. sender=self, message=message, exc=exc, name=name, id=id_,
  421. )
  422. def on_invalid_task(self, body, message, exc):
  423. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  424. message.reject_log_error(logger, self.connection_errors)
  425. signals.task_rejected.send(sender=self, message=message, exc=exc)
  426. def update_strategies(self):
  427. loader = self.app.loader
  428. for name, task in items(self.app.tasks):
  429. self.strategies[name] = task.start_strategy(self.app, self)
  430. task.__trace__ = build_tracer(name, task, loader, self.hostname,
  431. app=self.app)
  432. def create_task_handler(self, promise=promise):
  433. strategies = self.strategies
  434. on_unknown_message = self.on_unknown_message
  435. on_unknown_task = self.on_unknown_task
  436. on_invalid_task = self.on_invalid_task
  437. callbacks = self.on_task_message
  438. call_soon = self.call_soon
  439. def on_task_received(message):
  440. # payload will only be set for v1 protocol, since v2
  441. # will defer deserializing the message body to the pool.
  442. payload = None
  443. try:
  444. type_ = message.headers['task'] # protocol v2
  445. except TypeError:
  446. return on_unknown_message(None, message)
  447. except KeyError:
  448. try:
  449. payload = message.decode()
  450. except Exception as exc:
  451. return self.on_decode_error(message, exc)
  452. try:
  453. type_, payload = payload['task'], payload # protocol v1
  454. except (TypeError, KeyError):
  455. return on_unknown_message(payload, message)
  456. try:
  457. strategy = strategies[type_]
  458. except KeyError as exc:
  459. return on_unknown_task(None, message, exc)
  460. else:
  461. try:
  462. strategy(
  463. message, payload,
  464. promise(call_soon, (message.ack_log_error,)),
  465. promise(call_soon, (message.reject_log_error,)),
  466. callbacks,
  467. )
  468. except InvalidTaskError as exc:
  469. return on_invalid_task(payload, message, exc)
  470. except MemoryError:
  471. raise
  472. except Exception as exc:
  473. # XXX handle as internal error?
  474. return on_invalid_task(payload, message, exc)
  475. return on_task_received
  476. def __repr__(self):
  477. return '<Consumer: {self.hostname} ({state})>'.format(
  478. self=self, state=self.blueprint.human_state(),
  479. )
  480. class Evloop(bootsteps.StartStopStep):
  481. label = 'event loop'
  482. last = True
  483. def start(self, c):
  484. self.patch_all(c)
  485. c.loop(*c.loop_args())
  486. def patch_all(self, c):
  487. c.qos._mutex = DummyLock()