consumer.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. # -*- coding: utf-8 -*-
  2. """Worker Consumer Blueprint.
  3. This module contains the components responsible for consuming messages
  4. from the broker, processing the messages and keeping the broker connections
  5. up and running.
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import errno
  9. import logging
  10. import os
  11. from collections import defaultdict
  12. from time import sleep
  13. from billiard.common import restart_state
  14. from billiard.exceptions import RestartFreqExceeded
  15. from kombu.async.semaphore import DummyLock
  16. from kombu.syn import _detect_environment
  17. from kombu.utils.encoding import safe_repr, bytes_t
  18. from kombu.utils.limits import TokenBucket
  19. from vine import ppartial, promise
  20. from celery import bootsteps
  21. from celery import signals
  22. from celery.app.trace import build_tracer
  23. from celery.exceptions import InvalidTaskError, NotRegistered
  24. from celery.five import buffer_t, items, python_2_unicode_compatible, values
  25. from celery.utils.functional import noop
  26. from celery.utils.log import get_logger
  27. from celery.utils.nodenames import gethostname
  28. from celery.utils.objects import Bunch
  29. from celery.utils.text import truncate
  30. from celery.utils.timeutils import humanize_seconds, rate
  31. from celery.worker import loops
  32. from celery.worker.state import (
  33. task_reserved, maybe_shutdown, reserved_requests,
  34. )
  35. __all__ = ['Consumer', 'Evloop', 'dump_body']
  36. CLOSE = bootsteps.CLOSE
  37. logger = get_logger(__name__)
  38. debug, info, warn, error, crit = (logger.debug, logger.info, logger.warning,
  39. logger.error, logger.critical)
  40. CONNECTION_RETRY = """\
  41. consumer: Connection to broker lost. \
  42. Trying to re-establish the connection...\
  43. """
  44. CONNECTION_RETRY_STEP = """\
  45. Trying again {when}...\
  46. """
  47. CONNECTION_ERROR = """\
  48. consumer: Cannot connect to %s: %s.
  49. %s
  50. """
  51. CONNECTION_FAILOVER = """\
  52. Will retry using next failover.\
  53. """
  54. UNKNOWN_FORMAT = """\
  55. Received and deleted unknown message. Wrong destination?!?
  56. The full contents of the message body was: %s
  57. """
  58. #: Error message for when an unregistered task is received.
  59. UNKNOWN_TASK_ERROR = """\
  60. Received unregistered task of type %s.
  61. The message has been ignored and discarded.
  62. Did you remember to import the module containing this task?
  63. Or maybe you're using relative imports?
  64. Please see http://bit.ly/gLye1c for more information.
  65. The full contents of the message body was:
  66. %s
  67. """
  68. #: Error message for when an invalid task message is received.
  69. INVALID_TASK_ERROR = """\
  70. Received invalid task message: %s
  71. The message has been ignored and discarded.
  72. Please ensure your message conforms to the task
  73. message protocol as described here: http://bit.ly/hYj41y
  74. The full contents of the message body was:
  75. %s
  76. """
  77. MESSAGE_DECODE_ERROR = """\
  78. Can't decode message body: %r [type:%r encoding:%r headers:%s]
  79. body: %s
  80. """
  81. MESSAGE_REPORT = """\
  82. body: {0}
  83. {{content_type:{1} content_encoding:{2}
  84. delivery_info:{3} headers={4}}}
  85. """
  86. def dump_body(m, body):
  87. # v2 protocol does not deserialize body
  88. body = m.body if body is None else body
  89. if isinstance(body, buffer_t):
  90. body = bytes_t(body)
  91. return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
  92. len(m.body))
  93. @python_2_unicode_compatible
  94. class Consumer(object):
  95. Strategies = dict
  96. #: Optional callback called the first time the worker
  97. #: is ready to receive tasks.
  98. init_callback = None
  99. #: The current worker pool instance.
  100. pool = None
  101. #: A timer used for high-priority internal tasks, such
  102. #: as sending heartbeats.
  103. timer = None
  104. restart_count = -1 # first start is the same as a restart
  105. class Blueprint(bootsteps.Blueprint):
  106. name = 'Consumer'
  107. default_steps = [
  108. 'celery.worker.consumer.connection:Connection',
  109. 'celery.worker.consumer.mingle:Mingle',
  110. 'celery.worker.consumer.events:Events',
  111. 'celery.worker.consumer.gossip:Gossip',
  112. 'celery.worker.consumer.heart:Heart',
  113. 'celery.worker.consumer.control:Control',
  114. 'celery.worker.consumer.tasks:Tasks',
  115. 'celery.worker.consumer.consumer:Evloop',
  116. 'celery.worker.consumer.agent:Agent',
  117. ]
  118. def shutdown(self, parent):
  119. self.send_all(parent, 'shutdown')
  120. def __init__(self, on_task_request,
  121. init_callback=noop, hostname=None,
  122. pool=None, app=None,
  123. timer=None, controller=None, hub=None, amqheartbeat=None,
  124. worker_options=None, disable_rate_limits=False,
  125. initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):
  126. self.app = app
  127. self.controller = controller
  128. self.init_callback = init_callback
  129. self.hostname = hostname or gethostname()
  130. self.pid = os.getpid()
  131. self.pool = pool
  132. self.timer = timer
  133. self.strategies = self.Strategies()
  134. self.conninfo = self.app.connection_for_read()
  135. self.connection_errors = self.conninfo.connection_errors
  136. self.channel_errors = self.conninfo.channel_errors
  137. self._restart_state = restart_state(maxR=5, maxT=1)
  138. self._does_info = logger.isEnabledFor(logging.INFO)
  139. self._limit_order = 0
  140. self.on_task_request = on_task_request
  141. self.on_task_message = set()
  142. self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate
  143. self.disable_rate_limits = disable_rate_limits
  144. self.initial_prefetch_count = initial_prefetch_count
  145. self.prefetch_multiplier = prefetch_multiplier
  146. # this contains a tokenbucket for each task type by name, used for
  147. # rate limits, or None if rate limits are disabled for that task.
  148. self.task_buckets = defaultdict(lambda: None)
  149. self.reset_rate_limits()
  150. self.hub = hub
  151. if self.hub:
  152. self.amqheartbeat = amqheartbeat
  153. if self.amqheartbeat is None:
  154. self.amqheartbeat = self.app.conf.broker_heartbeat
  155. else:
  156. self.amqheartbeat = 0
  157. if not hasattr(self, 'loop'):
  158. self.loop = loops.asynloop if hub else loops.synloop
  159. if _detect_environment() == 'gevent':
  160. # there's a gevent bug that causes timeouts to not be reset,
  161. # so if the connection timeout is exceeded once, it can NEVER
  162. # connect again.
  163. self.app.conf.broker_connection_timeout = None
  164. self._pending_operations = []
  165. self.steps = []
  166. self.blueprint = self.Blueprint(
  167. app=self.app, on_close=self.on_close,
  168. )
  169. self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
  170. def call_soon(self, p, *args, **kwargs):
  171. p = ppartial(p, *args, **kwargs)
  172. if self.hub:
  173. return self.hub.call_soon(p)
  174. self._pending_operations.append(p)
  175. return p
  176. def perform_pending_operations(self):
  177. if not self.hub:
  178. while self._pending_operations:
  179. try:
  180. self._pending_operations.pop()()
  181. except Exception as exc:
  182. error('Pending callback raised: %r', exc, exc_info=1)
  183. def bucket_for_task(self, type):
  184. limit = rate(getattr(type, 'rate_limit', None))
  185. return TokenBucket(limit, capacity=1) if limit else None
  186. def reset_rate_limits(self):
  187. self.task_buckets.update(
  188. (n, self.bucket_for_task(t)) for n, t in items(self.app.tasks)
  189. )
  190. def _update_prefetch_count(self, index=0):
  191. """Update prefetch count after pool/shrink grow operations.
  192. Index must be the change in number of processes as a positive
  193. (increasing) or negative (decreasing) number.
  194. Note:
  195. Currently pool grow operations will end up with an offset
  196. of +1 if the initial size of the pool was 0 (which could
  197. be the case with old deprecated autoscale option, may consider
  198. removing this now that it's no longer supported).
  199. """
  200. num_processes = self.pool.num_processes
  201. if not self.initial_prefetch_count or not num_processes:
  202. return # prefetch disabled
  203. self.initial_prefetch_count = (
  204. self.pool.num_processes * self.prefetch_multiplier
  205. )
  206. return self._update_qos_eventually(index)
  207. def _update_qos_eventually(self, index):
  208. return (self.qos.decrement_eventually if index < 0
  209. else self.qos.increment_eventually)(
  210. abs(index) * self.prefetch_multiplier)
  211. def _limit_move_to_pool(self, request):
  212. task_reserved(request)
  213. self.on_task_request(request)
  214. def _on_bucket_wakeup(self, bucket, tokens):
  215. try:
  216. request = bucket.pop()
  217. except IndexError:
  218. pass
  219. else:
  220. self._limit_move_to_pool(request)
  221. self._schedule_oldest_bucket_request(bucket, tokens)
  222. def _schedule_oldest_bucket_request(self, bucket, tokens):
  223. try:
  224. request = bucket.pop()
  225. except IndexError:
  226. pass
  227. else:
  228. return self._schedule_bucket_request(request, bucket, tokens)
  229. def _schedule_bucket_request(self, request, bucket, tokens):
  230. bucket.can_consume(tokens)
  231. bucket.add(request)
  232. pri = self._limit_order = (self._limit_order + 1) % 10
  233. hold = bucket.expected_time(tokens)
  234. self.timer.call_after(
  235. hold, self._on_bucket_wakeup, (bucket, tokens),
  236. priority=pri,
  237. )
  238. def _limit_task(self, request, bucket, tokens):
  239. if bucket.contents:
  240. return bucket.add(request)
  241. return self._schedule_bucket_request(request, bucket, tokens)
  242. def start(self):
  243. blueprint = self.blueprint
  244. while blueprint.state != CLOSE:
  245. maybe_shutdown()
  246. if self.restart_count:
  247. try:
  248. self._restart_state.step()
  249. except RestartFreqExceeded as exc:
  250. crit('Frequent restarts detected: %r', exc, exc_info=1)
  251. sleep(1)
  252. self.restart_count += 1
  253. try:
  254. blueprint.start(self)
  255. except self.connection_errors as exc:
  256. # If we're not retrying connections, no need to catch
  257. # connection errors
  258. if not self.app.conf.broker_connection_retry:
  259. raise
  260. if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
  261. raise # Too many open files
  262. maybe_shutdown()
  263. if blueprint.state != CLOSE:
  264. if self.connection:
  265. self.on_connection_error_after_connected(exc)
  266. else:
  267. self.on_connection_error_before_connected(exc)
  268. self.on_close()
  269. blueprint.restart(self)
  270. def on_connection_error_before_connected(self, exc):
  271. error(CONNECTION_ERROR, self.conninfo.as_uri(), exc,
  272. 'Trying to reconnect...')
  273. def on_connection_error_after_connected(self, exc):
  274. warn(CONNECTION_RETRY, exc_info=True)
  275. try:
  276. self.connection.collect()
  277. except Exception:
  278. pass
  279. def register_with_event_loop(self, hub):
  280. self.blueprint.send_all(
  281. self, 'register_with_event_loop', args=(hub,),
  282. description='Hub.register',
  283. )
  284. def shutdown(self):
  285. self.blueprint.shutdown(self)
  286. def stop(self):
  287. self.blueprint.stop(self)
  288. def on_ready(self):
  289. callback, self.init_callback = self.init_callback, None
  290. if callback:
  291. callback(self)
  292. def loop_args(self):
  293. return (self, self.connection, self.task_consumer,
  294. self.blueprint, self.hub, self.qos, self.amqheartbeat,
  295. self.app.clock, self.amqheartbeat_rate)
  296. def on_decode_error(self, message, exc):
  297. """Callback called if an error occurs while decoding
  298. a message received.
  299. Simply logs the error and acknowledges the message so it
  300. doesn't enter a loop.
  301. Arguments:
  302. message (Message): The message received.
  303. exc (Exception): The exception being handled.
  304. """
  305. crit(MESSAGE_DECODE_ERROR,
  306. exc, message.content_type, message.content_encoding,
  307. safe_repr(message.headers), dump_body(message, message.body),
  308. exc_info=1)
  309. message.ack()
  310. def on_close(self):
  311. # Clear internal queues to get rid of old messages.
  312. # They can't be acked anyway, as a delivery tag is specific
  313. # to the current channel.
  314. if self.controller and self.controller.semaphore:
  315. self.controller.semaphore.clear()
  316. if self.timer:
  317. self.timer.clear()
  318. for bucket in values(self.task_buckets):
  319. if bucket:
  320. bucket.clear_pending()
  321. reserved_requests.clear()
  322. if self.pool and self.pool.flush:
  323. self.pool.flush()
  324. def connect(self):
  325. """Establish the broker connection.
  326. Retries establishing the connection if the
  327. :setting:`broker_connection_retry` setting is enabled
  328. """
  329. conn = self.app.connection_for_read(heartbeat=self.amqheartbeat)
  330. # Callback called for each retry while the connection
  331. # can't be established.
  332. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  333. if getattr(conn, 'alt', None) and interval == 0:
  334. next_step = CONNECTION_FAILOVER
  335. error(CONNECTION_ERROR, conn.as_uri(), exc,
  336. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  337. # remember that the connection is lazy, it won't establish
  338. # until needed.
  339. if not self.app.conf.broker_connection_retry:
  340. # retry disabled, just call connect directly.
  341. conn.connect()
  342. return conn
  343. conn = conn.ensure_connection(
  344. _error_handler, self.app.conf.broker_connection_max_retries,
  345. callback=maybe_shutdown,
  346. )
  347. if self.hub:
  348. conn.transport.register_with_event_loop(conn.connection, self.hub)
  349. return conn
  350. def _flush_events(self):
  351. if self.event_dispatcher:
  352. self.event_dispatcher.flush()
  353. def on_send_event_buffered(self):
  354. if self.hub:
  355. self.hub._ready.add(self._flush_events)
  356. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  357. routing_key=None, **options):
  358. cset = self.task_consumer
  359. queues = self.app.amqp.queues
  360. # Must use in' here, as __missing__ will automatically
  361. # create queues when :setting:`task_create_missing_queues` is enabled.
  362. # (Issue #1079)
  363. if queue in queues:
  364. q = queues[queue]
  365. else:
  366. exchange = queue if exchange is None else exchange
  367. exchange_type = ('direct' if exchange_type is None
  368. else exchange_type)
  369. q = queues.select_add(queue,
  370. exchange=exchange,
  371. exchange_type=exchange_type,
  372. routing_key=routing_key, **options)
  373. if not cset.consuming_from(queue):
  374. cset.add_queue(q)
  375. cset.consume()
  376. info('Started consuming from %s', queue)
  377. def cancel_task_queue(self, queue):
  378. info('Canceling queue %s', queue)
  379. self.app.amqp.queues.deselect(queue)
  380. self.task_consumer.cancel_by_queue(queue)
  381. def apply_eta_task(self, task):
  382. """Method called by the timer to apply a task with an
  383. ETA/countdown."""
  384. task_reserved(task)
  385. self.on_task_request(task)
  386. self.qos.decrement_eventually()
  387. def _message_report(self, body, message):
  388. return MESSAGE_REPORT.format(dump_body(message, body),
  389. safe_repr(message.content_type),
  390. safe_repr(message.content_encoding),
  391. safe_repr(message.delivery_info),
  392. safe_repr(message.headers))
  393. def on_unknown_message(self, body, message):
  394. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  395. message.reject_log_error(logger, self.connection_errors)
  396. signals.task_rejected.send(sender=self, message=message, exc=None)
  397. def on_unknown_task(self, body, message, exc):
  398. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  399. try:
  400. id_, name = message.headers['id'], message.headers['task']
  401. root_id = message.headers.get('root_id')
  402. except KeyError: # proto1
  403. id_, name = body['id'], body['task']
  404. root_id = None
  405. request = Bunch(
  406. name=name, chord=None, root_id=root_id,
  407. correlation_id=message.properties.get('correlation_id'),
  408. reply_to=message.properties.get('reply_to'),
  409. errbacks=None,
  410. )
  411. message.reject_log_error(logger, self.connection_errors)
  412. self.app.backend.mark_as_failure(
  413. id_, NotRegistered(name), request=request,
  414. )
  415. if self.event_dispatcher:
  416. self.event_dispatcher.send(
  417. 'task-failed', uuid=id_,
  418. exception='NotRegistered({0!r})'.format(name),
  419. )
  420. signals.task_unknown.send(
  421. sender=self, message=message, exc=exc, name=name, id=id_,
  422. )
  423. def on_invalid_task(self, body, message, exc):
  424. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  425. message.reject_log_error(logger, self.connection_errors)
  426. signals.task_rejected.send(sender=self, message=message, exc=exc)
  427. def update_strategies(self):
  428. loader = self.app.loader
  429. for name, task in items(self.app.tasks):
  430. self.strategies[name] = task.start_strategy(self.app, self)
  431. task.__trace__ = build_tracer(name, task, loader, self.hostname,
  432. app=self.app)
  433. def create_task_handler(self, promise=promise):
  434. strategies = self.strategies
  435. on_unknown_message = self.on_unknown_message
  436. on_unknown_task = self.on_unknown_task
  437. on_invalid_task = self.on_invalid_task
  438. callbacks = self.on_task_message
  439. call_soon = self.call_soon
  440. def on_task_received(message):
  441. # payload will only be set for v1 protocol, since v2
  442. # will defer deserializing the message body to the pool.
  443. payload = None
  444. try:
  445. type_ = message.headers['task'] # protocol v2
  446. except TypeError:
  447. return on_unknown_message(None, message)
  448. except KeyError:
  449. try:
  450. payload = message.decode()
  451. except Exception as exc:
  452. return self.on_decode_error(message, exc)
  453. try:
  454. type_, payload = payload['task'], payload # protocol v1
  455. except (TypeError, KeyError):
  456. return on_unknown_message(payload, message)
  457. try:
  458. strategy = strategies[type_]
  459. except KeyError as exc:
  460. return on_unknown_task(None, message, exc)
  461. else:
  462. try:
  463. strategy(
  464. message, payload,
  465. promise(call_soon, (message.ack_log_error,)),
  466. promise(call_soon, (message.reject_log_error,)),
  467. callbacks,
  468. )
  469. except InvalidTaskError as exc:
  470. return on_invalid_task(payload, message, exc)
  471. except MemoryError:
  472. raise
  473. except Exception as exc:
  474. # XXX handle as internal error?
  475. return on_invalid_task(payload, message, exc)
  476. return on_task_received
  477. def __repr__(self):
  478. return '<Consumer: {self.hostname} ({state})>'.format(
  479. self=self, state=self.blueprint.human_state(),
  480. )
  481. class Evloop(bootsteps.StartStopStep):
  482. label = 'event loop'
  483. last = True
  484. def start(self, c):
  485. self.patch_all(c)
  486. c.loop(*c.loop_args())
  487. def patch_all(self, c):
  488. c.qos._mutex = DummyLock()