consumer.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.consumer
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. This module contains the components responsible for consuming messages
  6. from the broker, processing the messages and keeping the broker connections
  7. up and running.
  8. """
  9. from __future__ import absolute_import
  10. import errno
  11. import kombu
  12. import logging
  13. import socket
  14. from collections import defaultdict
  15. from functools import partial
  16. from heapq import heappush
  17. from operator import itemgetter
  18. from time import sleep
  19. from billiard.common import restart_state
  20. from billiard.exceptions import RestartFreqExceeded
  21. from kombu.common import QoS, ignore_errors
  22. from kombu.syn import _detect_environment
  23. from kombu.utils.compat import get_errno
  24. from kombu.utils.encoding import safe_repr, bytes_t
  25. from kombu.utils.limits import TokenBucket
  26. from celery import bootsteps
  27. from celery.app import app_or_default
  28. from celery.canvas import subtask
  29. from celery.five import items, values
  30. from celery.task.trace import build_tracer
  31. from celery.utils.functional import noop
  32. from celery.utils.log import get_logger
  33. from celery.utils.text import truncate
  34. from celery.utils.timer2 import default_timer
  35. from celery.utils.timeutils import humanize_seconds, rate
  36. from . import heartbeat, loops, pidbox
  37. from .state import task_reserved, maybe_shutdown, revoked, reserved_requests
  38. try:
  39. buffer_t = buffer
  40. except NameError: # pragma: no cover
  41. class buffer_t(object): # noqa
  42. pass
  43. CLOSE = bootsteps.CLOSE
  44. logger = get_logger(__name__)
  45. debug, info, warn, error, crit = (logger.debug, logger.info, logger.warning,
  46. logger.error, logger.critical)
  47. CONNECTION_RETRY = """\
  48. consumer: Connection to broker lost. \
  49. Trying to re-establish the connection...\
  50. """
  51. CONNECTION_RETRY_STEP = """\
  52. Trying again {when}...\
  53. """
  54. CONNECTION_ERROR = """\
  55. consumer: Cannot connect to %s: %s.
  56. %s
  57. """
  58. CONNECTION_FAILOVER = """\
  59. Will retry using next failover.\
  60. """
  61. UNKNOWN_FORMAT = """\
  62. Received and deleted unknown message. Wrong destination?!?
  63. The full contents of the message body was: %s
  64. """
  65. #: Error message for when an unregistered task is received.
  66. UNKNOWN_TASK_ERROR = """\
  67. Received unregistered task of type %s.
  68. The message has been ignored and discarded.
  69. Did you remember to import the module containing this task?
  70. Or maybe you are using relative imports?
  71. Please see http://bit.ly/gLye1c for more information.
  72. The full contents of the message body was:
  73. %s
  74. """
  75. #: Error message for when an invalid task message is received.
  76. INVALID_TASK_ERROR = """\
  77. Received invalid task message: %s
  78. The message has been ignored and discarded.
  79. Please ensure your message conforms to the task
  80. message protocol as described here: http://bit.ly/hYj41y
  81. The full contents of the message body was:
  82. %s
  83. """
  84. MESSAGE_REPORT = """\
  85. body: {0} {{content_type:{1} content_encoding:{2} delivery_info:{3}}}\
  86. """
  87. MINGLE_GET_FIELDS = itemgetter('clock', 'revoked')
  88. def dump_body(m, body):
  89. if isinstance(body, buffer_t):
  90. body = bytes_t(buffer)
  91. return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
  92. len(m.body))
  93. class Consumer(object):
  94. #: set when consumer is shutting down.
  95. in_shutdown = False
  96. #: Optional callback called the first time the worker
  97. #: is ready to receive tasks.
  98. init_callback = None
  99. #: The current worker pool instance.
  100. pool = None
  101. #: A timer used for high-priority internal tasks, such
  102. #: as sending heartbeats.
  103. timer = None
  104. restart_count = -1 # first start is the same as a restart
  105. class Blueprint(bootsteps.Blueprint):
  106. name = 'Consumer'
  107. default_steps = [
  108. 'celery.worker.consumer:Connection',
  109. 'celery.worker.consumer:Mingle',
  110. 'celery.worker.consumer:Events',
  111. 'celery.worker.consumer:Gossip',
  112. 'celery.worker.consumer:Heart',
  113. 'celery.worker.consumer:Control',
  114. 'celery.worker.consumer:Tasks',
  115. 'celery.worker.consumer:Evloop',
  116. 'celery.worker.consumer:Agent',
  117. ]
  118. def shutdown(self, parent):
  119. self.send_all(parent, 'shutdown')
  120. def __init__(self, handle_task,
  121. init_callback=noop, hostname=None,
  122. pool=None, app=None,
  123. timer=None, controller=None, hub=None, amqheartbeat=None,
  124. worker_options=None, disable_rate_limits=False, **kwargs):
  125. self.app = app_or_default(app)
  126. self.controller = controller
  127. self.init_callback = init_callback
  128. self.hostname = hostname or socket.gethostname()
  129. self.pool = pool
  130. self.timer = timer or default_timer
  131. self.strategies = {}
  132. conninfo = self.app.connection()
  133. self.connection_errors = conninfo.connection_errors
  134. self.channel_errors = conninfo.channel_errors
  135. self._restart_state = restart_state(maxR=5, maxT=1)
  136. self._does_info = logger.isEnabledFor(logging.INFO)
  137. self.handle_task = handle_task
  138. self.amqheartbeat_rate = self.app.conf.BROKER_HEARTBEAT_CHECKRATE
  139. self.disable_rate_limits = disable_rate_limits
  140. # this contains a tokenbucket for each task type by name, used for
  141. # rate limits, or None if rate limits are disabled for that task.
  142. self.task_buckets = defaultdict(lambda: None)
  143. self.reset_rate_limits()
  144. if hub:
  145. self.amqheartbeat = amqheartbeat
  146. if self.amqheartbeat is None:
  147. self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
  148. self.hub = hub
  149. self.hub.on_init.append(self.on_poll_init)
  150. else:
  151. self.hub = None
  152. self.amqheartbeat = 0
  153. if not hasattr(self, 'loop'):
  154. self.loop = loops.asynloop if hub else loops.synloop
  155. if _detect_environment() == 'gevent':
  156. # there's a gevent bug that causes timeouts to not be reset,
  157. # so if the connection timeout is exceeded once, it can NEVER
  158. # connect again.
  159. self.app.conf.BROKER_CONNECTION_TIMEOUT = None
  160. self.steps = []
  161. self.blueprint = self.Blueprint(
  162. app=self.app, on_close=self.on_close,
  163. )
  164. self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
  165. def bucket_for_task(self, type):
  166. limit = rate(getattr(type, 'rate_limit', None))
  167. return TokenBucket(limit, capacity=1) if limit else None
  168. def reset_rate_limits(self):
  169. self.task_buckets.update(
  170. (n, self.bucket_for_task(t)) for n, t in items(self.app.tasks)
  171. )
  172. def _limit_task(self, request, bucket, tokens):
  173. if not bucket.can_consume(tokens):
  174. hold = bucket.expected_time(tokens)
  175. self.timer.apply_after(
  176. hold * 1000.0, self._limit_task, (request, bucket, tokens),
  177. )
  178. else:
  179. task_reserved(request)
  180. self.handle_task(request)
  181. def start(self):
  182. blueprint, loop = self.blueprint, self.loop
  183. while blueprint.state != CLOSE:
  184. self.restart_count += 1
  185. maybe_shutdown()
  186. try:
  187. blueprint.start(self)
  188. except self.connection_errors as exc:
  189. if isinstance(exc, OSError) and get_errno(exc) == errno.EMFILE:
  190. raise # Too many open files
  191. maybe_shutdown()
  192. try:
  193. self._restart_state.step()
  194. except RestartFreqExceeded as exc:
  195. crit('Frequent restarts detected: %r', exc, exc_info=1)
  196. sleep(1)
  197. if blueprint.state != CLOSE and self.connection:
  198. warn(CONNECTION_RETRY, exc_info=True)
  199. try:
  200. self.connection.collect()
  201. except Exception:
  202. pass
  203. self.on_close()
  204. blueprint.restart(self)
  205. def shutdown(self):
  206. self.in_shutdown = True
  207. self.blueprint.shutdown(self)
  208. def stop(self):
  209. self.blueprint.stop(self)
  210. def on_ready(self):
  211. callback, self.init_callback = self.init_callback, None
  212. if callback:
  213. callback(self)
  214. def loop_args(self):
  215. return (self, self.connection, self.task_consumer,
  216. self.strategies, self.blueprint, self.hub, self.qos,
  217. self.amqheartbeat, self.handle_unknown_message,
  218. self.handle_unknown_task, self.handle_invalid_task,
  219. self.app.clock, self.amqheartbeat_rate)
  220. def on_poll_init(self, hub):
  221. hub.update_readers(self.connection.eventmap)
  222. self.connection.transport.on_poll_init(hub.poller)
  223. def on_decode_error(self, message, exc):
  224. """Callback called if an error occurs while decoding
  225. a message received.
  226. Simply logs the error and acknowledges the message so it
  227. doesn't enter a loop.
  228. :param message: The message with errors.
  229. :param exc: The original exception instance.
  230. """
  231. crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
  232. exc, message.content_type, message.content_encoding,
  233. dump_body(message, message.body), exc_info=1)
  234. message.ack()
  235. def on_close(self):
  236. # Clear internal queues to get rid of old messages.
  237. # They can't be acked anyway, as a delivery tag is specific
  238. # to the current channel.
  239. if self.controller and self.controller.semaphore:
  240. self.controller.semaphore.clear()
  241. if self.timer:
  242. self.timer.clear()
  243. reserved_requests.clear()
  244. if self.pool:
  245. self.pool.flush()
  246. def connect(self):
  247. """Establish the broker connection.
  248. Will retry establishing the connection if the
  249. :setting:`BROKER_CONNECTION_RETRY` setting is enabled
  250. """
  251. conn = self.app.connection(heartbeat=self.amqheartbeat)
  252. # Callback called for each retry while the connection
  253. # can't be established.
  254. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  255. if getattr(conn, 'alt', None) and interval == 0:
  256. next_step = CONNECTION_FAILOVER
  257. error(CONNECTION_ERROR, conn.as_uri(), exc,
  258. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  259. # remember that the connection is lazy, it won't establish
  260. # until it's needed.
  261. if not self.app.conf.BROKER_CONNECTION_RETRY:
  262. # retry disabled, just call connect directly.
  263. conn.connect()
  264. return conn
  265. return conn.ensure_connection(
  266. _error_handler, self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
  267. callback=maybe_shutdown,
  268. )
  269. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  270. routing_key=None, **options):
  271. cset = self.task_consumer
  272. queues = self.app.amqp.queues
  273. # Must use in' here, as __missing__ will automatically
  274. # create queues when CELERY_CREATE_MISSING_QUEUES is enabled.
  275. # (Issue #1079)
  276. if queue in queues:
  277. q = queues[queue]
  278. else:
  279. exchange = queue if exchange is None else exchange
  280. exchange_type = ('direct' if exchange_type is None
  281. else exchange_type)
  282. q = queues.select_add(queue,
  283. exchange=exchange,
  284. exchange_type=exchange_type,
  285. routing_key=routing_key, **options)
  286. if not cset.consuming_from(queue):
  287. cset.add_queue(q)
  288. cset.consume()
  289. info('Started consuming from %r', queue)
  290. def cancel_task_queue(self, queue):
  291. self.app.amqp.queues.select_remove(queue)
  292. self.task_consumer.cancel_by_queue(queue)
  293. def apply_eta_task(self, task):
  294. """Method called by the timer to apply a task with an
  295. ETA/countdown."""
  296. task_reserved(task)
  297. self.handle_task(task)
  298. self.qos.decrement_eventually()
  299. def _message_report(self, body, message):
  300. return MESSAGE_REPORT.format(dump_body(message, body),
  301. safe_repr(message.content_type),
  302. safe_repr(message.content_encoding),
  303. safe_repr(message.delivery_info))
  304. def handle_unknown_message(self, body, message):
  305. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  306. message.reject_log_error(logger, self.connection_errors)
  307. def handle_unknown_task(self, body, message, exc):
  308. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  309. message.reject_log_error(logger, self.connection_errors)
  310. def handle_invalid_task(self, body, message, exc):
  311. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  312. message.reject_log_error(logger, self.connection_errors)
  313. def update_strategies(self):
  314. loader = self.app.loader
  315. for name, task in items(self.app.tasks):
  316. self.strategies[name] = task.start_strategy(self.app, self)
  317. task.__trace__ = build_tracer(name, task, loader, self.hostname)
  318. class Connection(bootsteps.StartStopStep):
  319. def __init__(self, c, **kwargs):
  320. c.connection = None
  321. def start(self, c):
  322. c.connection = c.connect()
  323. info('Connected to %s', c.connection.as_uri())
  324. def shutdown(self, c):
  325. # We must set self.connection to None here, so
  326. # that the green pidbox thread exits.
  327. connection, c.connection = c.connection, None
  328. if connection:
  329. ignore_errors(connection, connection.close)
  330. def info(self, c):
  331. info = c.connection.info()
  332. info.pop('password', None) # don't send password.
  333. return {'broker': info}
  334. class Events(bootsteps.StartStopStep):
  335. requires = (Connection, )
  336. def __init__(self, c, send_events=None, **kwargs):
  337. self.send_events = True
  338. self.groups = None if send_events else ['worker']
  339. c.event_dispatcher = None
  340. def start(self, c):
  341. # Flush events sent while connection was down.
  342. prev = c.event_dispatcher
  343. dis = c.event_dispatcher = c.app.events.Dispatcher(
  344. c.connection, hostname=c.hostname,
  345. enabled=self.send_events, groups=self.groups,
  346. )
  347. if prev:
  348. dis.copy_buffer(prev)
  349. dis.flush()
  350. def stop(self, c):
  351. if c.event_dispatcher:
  352. ignore_errors(c, c.event_dispatcher.close)
  353. c.event_dispatcher = None
  354. shutdown = stop
  355. class Heart(bootsteps.StartStopStep):
  356. requires = (Events, )
  357. def __init__(self, c, enable_heartbeat=True, **kwargs):
  358. self.enabled = enable_heartbeat
  359. c.heart = None
  360. def start(self, c):
  361. c.heart = heartbeat.Heart(c.timer, c.event_dispatcher)
  362. c.heart.start()
  363. def stop(self, c):
  364. c.heart = c.heart and c.heart.stop()
  365. shutdown = stop
  366. class Control(bootsteps.StartStopStep):
  367. requires = (Events, )
  368. def __init__(self, c, **kwargs):
  369. self.is_green = c.pool is not None and c.pool.is_green
  370. self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
  371. self.start = self.box.start
  372. self.stop = self.box.stop
  373. self.shutdown = self.box.shutdown
  374. class Tasks(bootsteps.StartStopStep):
  375. requires = (Control, )
  376. def __init__(self, c, initial_prefetch_count=2, **kwargs):
  377. c.task_consumer = c.qos = None
  378. self.initial_prefetch_count = initial_prefetch_count
  379. def start(self, c):
  380. c.update_strategies()
  381. c.task_consumer = c.app.amqp.TaskConsumer(
  382. c.connection, on_decode_error=c.on_decode_error,
  383. )
  384. c.qos = QoS(c.task_consumer.qos, self.initial_prefetch_count)
  385. c.qos.update() # set initial prefetch count
  386. def stop(self, c):
  387. if c.task_consumer:
  388. debug('Cancelling task consumer...')
  389. ignore_errors(c, c.task_consumer.cancel)
  390. def shutdown(self, c):
  391. if c.task_consumer:
  392. self.stop(c)
  393. debug('Closing consumer channel...')
  394. ignore_errors(c, c.task_consumer.close)
  395. c.task_consumer = None
  396. def info(self, c):
  397. return {'prefetch_count': c.qos.value}
  398. class Agent(bootsteps.StartStopStep):
  399. conditional = True
  400. requires = (Connection, )
  401. def __init__(self, c, **kwargs):
  402. self.agent_cls = self.enabled = c.app.conf.CELERYD_AGENT
  403. def create(self, c):
  404. agent = c.agent = self.instantiate(self.agent_cls, c.connection)
  405. return agent
  406. class Mingle(bootsteps.StartStopStep):
  407. label = 'Mingle'
  408. requires = (Connection, )
  409. def __init__(self, c, enable_mingle=True, **kwargs):
  410. self.enabled = enable_mingle
  411. def start(self, c):
  412. info('mingle: searching for neighbors')
  413. I = c.app.control.inspect(timeout=1.0, connection=c.connection)
  414. replies = I.hello()
  415. if replies:
  416. for reply in values(replies):
  417. try:
  418. other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
  419. except KeyError: # reply from pre-3.1 worker
  420. pass
  421. c.app.clock.adjust(other_clock)
  422. revoked.update(other_revoked)
  423. info('mingle: synced with %s', ', '.join(replies))
  424. else:
  425. info('mingle: no one here')
  426. class Gossip(bootsteps.ConsumerStep):
  427. label = 'Gossip'
  428. requires = (Events, )
  429. _cons_stamp_fields = itemgetter(
  430. 'clock', 'hostname', 'pid', 'topic', 'action',
  431. )
  432. def __init__(self, c, enable_gossip=True, interval=5.0, **kwargs):
  433. self.enabled = enable_gossip
  434. self.app = c.app
  435. c.gossip = self
  436. self.Receiver = c.app.events.Receiver
  437. self.hostname = c.hostname
  438. self.timer = c.timer
  439. self.state = c.app.events.State()
  440. self.interval = interval
  441. self._tref = None
  442. self.consensus_requests = defaultdict(list)
  443. self.consensus_replies = {}
  444. self.update_state = self.state.worker_event
  445. self.event_handlers = {
  446. 'worker.elect': self.on_elect,
  447. 'worker.elect.ack': self.on_elect_ack,
  448. }
  449. self.clock = c.app.clock
  450. self.election_handlers = {
  451. 'task': self.call_task
  452. }
  453. def election(self, id, topic, action=None):
  454. self.consensus_replies[id] = []
  455. self.dispatcher.send('worker-elect', id=id, topic=topic, action=action)
  456. def call_task(self, task):
  457. try:
  458. X = subtask(task)
  459. X.apply_async()
  460. except Exception as exc:
  461. error('Could not call task: %r', exc, exc_info=1)
  462. def on_elect(self, event):
  463. id = event['id']
  464. self.dispatcher.send('worker-elect-ack', id=id)
  465. clock, hostname, pid, topic, action = self._cons_stamp_fields(event)
  466. heappush(
  467. self.consensus_requests[id],
  468. (clock, '%s.%s' % (hostname, pid), topic, action),
  469. )
  470. def start(self, c):
  471. super(Gossip, self).start(c)
  472. self.dispatcher = c.event_dispatcher
  473. def on_elect_ack(self, event):
  474. id = event['id']
  475. try:
  476. replies = self.consensus_replies[id]
  477. except KeyError:
  478. return
  479. alive_workers = self.state.alive_workers()
  480. replies.append(event['hostname'])
  481. if len(replies) >= len(alive_workers):
  482. _, leader, topic, action = self.lock.sort_heap(
  483. self.consensus_requests[id],
  484. )
  485. if leader == self.hostname:
  486. info('I won the election %r', id)
  487. try:
  488. handler = self.election_handlers[topic]
  489. except KeyError:
  490. error('Unknown election topic %r', topic, exc_info=1)
  491. else:
  492. handler(action)
  493. else:
  494. info('node %s elected for %r', leader, id)
  495. self.consensus_requests.pop(id, None)
  496. self.consensus_replies.pop(id, None)
  497. def on_node_join(self, worker):
  498. info('%s joined the party', worker.hostname)
  499. def on_node_leave(self, worker):
  500. info('%s left', worker.hostname)
  501. def on_node_lost(self, worker):
  502. warn('%s went missing!', worker.hostname)
  503. def register_timer(self):
  504. if self._tref is not None:
  505. self._tref.cancel()
  506. self.timer.apply_interval(self.interval * 1000.0, self.periodic)
  507. def periodic(self):
  508. for worker in values(self.state.workers):
  509. if not worker.alive:
  510. try:
  511. self.on_node_lost(worker)
  512. finally:
  513. self.state.workers.pop(worker.hostname, None)
  514. def get_consumers(self, channel):
  515. self.register_timer()
  516. ev = self.Receiver(channel, routing_key='worker.#')
  517. return [kombu.Consumer(
  518. channel,
  519. queues=[ev.queue],
  520. on_message=partial(self.on_message, ev.event_from_message),
  521. no_ack=True
  522. )]
  523. def on_message(self, prepare, message):
  524. _type = message.delivery_info['routing_key']
  525. try:
  526. handler = self.event_handlers[_type]
  527. except KeyError:
  528. pass
  529. else:
  530. return handler(message.payload)
  531. hostname = (message.headers.get('hostname') or
  532. message.payload['hostname'])
  533. if hostname != self.hostname:
  534. type, event = prepare(message.payload)
  535. group, _, subject = type.partition('-')
  536. worker, created = self.update_state(subject, event)
  537. if subject == 'offline':
  538. try:
  539. self.on_node_leave(worker)
  540. finally:
  541. self.state.workers.pop(worker.hostname, None)
  542. elif created or subject == 'online':
  543. self.on_node_join(worker)
  544. else:
  545. self.clock.forward()
  546. class Evloop(bootsteps.StartStopStep):
  547. label = 'event loop'
  548. last = True
  549. def start(self, c):
  550. c.loop(*c.loop_args())