consumer.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.consumer
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. This module contains the components responsible for consuming messages
  6. from the broker, processing the messages and keeping the broker connections
  7. up and running.
  8. """
  9. from __future__ import absolute_import
  10. import kombu
  11. import logging
  12. import socket
  13. from collections import defaultdict
  14. from functools import partial
  15. from heapq import heappush
  16. from operator import itemgetter
  17. from time import sleep
  18. from billiard.common import restart_state
  19. from billiard.exceptions import RestartFreqExceeded
  20. from kombu.common import QoS, ignore_errors
  21. from kombu.syn import _detect_environment
  22. from kombu.utils.encoding import safe_repr
  23. from celery import bootsteps
  24. from celery.app import app_or_default
  25. from celery.canvas import subtask
  26. from celery.five import items, values
  27. from celery.task.trace import build_tracer
  28. from celery.utils.functional import noop
  29. from celery.utils.log import get_logger
  30. from celery.utils.text import truncate
  31. from celery.utils.timer2 import default_timer, to_timestamp
  32. from celery.utils.timeutils import humanize_seconds, timezone
  33. from . import heartbeat, loops, pidbox
  34. from .state import task_reserved, maybe_shutdown, revoked
  35. CLOSE = bootsteps.CLOSE
  36. logger = get_logger(__name__)
  37. debug, info, warn, error, crit = (logger.debug, logger.info, logger.warn,
  38. logger.error, logger.critical)
  39. CONNECTION_RETRY = """\
  40. consumer: Connection to broker lost. \
  41. Trying to re-establish the connection...\
  42. """
  43. CONNECTION_RETRY_STEP = """\
  44. Trying again {when}...\
  45. """
  46. CONNECTION_ERROR = """\
  47. consumer: Cannot connect to %s: %s.
  48. %s
  49. """
  50. CONNECTION_FAILOVER = """\
  51. Will retry using next failover.\
  52. """
  53. UNKNOWN_FORMAT = """\
  54. Received and deleted unknown message. Wrong destination?!?
  55. The full contents of the message body was: %s
  56. """
  57. #: Error message for when an unregistered task is received.
  58. UNKNOWN_TASK_ERROR = """\
  59. Received unregistered task of type %s.
  60. The message has been ignored and discarded.
  61. Did you remember to import the module containing this task?
  62. Or maybe you are using relative imports?
  63. Please see http://bit.ly/gLye1c for more information.
  64. The full contents of the message body was:
  65. %s
  66. """
  67. #: Error message for when an invalid task message is received.
  68. INVALID_TASK_ERROR = """\
  69. Received invalid task message: %s
  70. The message has been ignored and discarded.
  71. Please ensure your message conforms to the task
  72. message protocol as described here: http://bit.ly/hYj41y
  73. The full contents of the message body was:
  74. %s
  75. """
  76. MESSAGE_REPORT = """\
  77. body: {0} {{content_type:{1} content_encoding:{2} delivery_info:{3}}}\
  78. """
  79. def dump_body(m, body):
  80. return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
  81. len(m.body))
  82. class Consumer(object):
  83. #: Intra-queue for tasks ready to be handled
  84. ready_queue = None
  85. #: Optional callback called the first time the worker
  86. #: is ready to receive tasks.
  87. init_callback = None
  88. #: The current worker pool instance.
  89. pool = None
  90. #: A timer used for high-priority internal tasks, such
  91. #: as sending heartbeats.
  92. timer = None
  93. class Namespace(bootsteps.Namespace):
  94. name = 'Consumer'
  95. default_steps = [
  96. 'celery.worker.consumer:Connection',
  97. 'celery.worker.consumer:Mingle',
  98. 'celery.worker.consumer:Events',
  99. 'celery.worker.consumer:Gossip',
  100. 'celery.worker.consumer:Heart',
  101. 'celery.worker.consumer:Control',
  102. 'celery.worker.consumer:Tasks',
  103. 'celery.worker.consumer:Evloop',
  104. 'celery.worker.consumer:Agent',
  105. ]
  106. def shutdown(self, parent):
  107. self.restart(parent, 'Shutdown', 'shutdown')
  108. def __init__(self, ready_queue,
  109. init_callback=noop, hostname=None,
  110. pool=None, app=None,
  111. timer=None, controller=None, hub=None, amqheartbeat=None,
  112. worker_options=None, **kwargs):
  113. self.app = app_or_default(app)
  114. self.controller = controller
  115. self.ready_queue = ready_queue
  116. self.init_callback = init_callback
  117. self.hostname = hostname or socket.gethostname()
  118. self.pool = pool
  119. self.timer = timer or default_timer
  120. self.strategies = {}
  121. conninfo = self.app.connection()
  122. self.connection_errors = conninfo.connection_errors
  123. self.channel_errors = conninfo.channel_errors
  124. self._restart_state = restart_state(maxR=5, maxT=1)
  125. self._does_info = logger.isEnabledFor(logging.INFO)
  126. self._quick_put = self.ready_queue.put
  127. if hub:
  128. self.amqheartbeat = amqheartbeat
  129. if self.amqheartbeat is None:
  130. self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
  131. self.hub = hub
  132. self.hub.on_init.append(self.on_poll_init)
  133. else:
  134. self.hub = None
  135. self.amqheartbeat = 0
  136. if not hasattr(self, 'loop'):
  137. self.loop = loops.asynloop if hub else loops.synloop
  138. if _detect_environment() == 'gevent':
  139. # there's a gevent bug that causes timeouts to not be reset,
  140. # so if the connection timeout is exceeded once, it can NEVER
  141. # connect again.
  142. self.app.conf.BROKER_CONNECTION_TIMEOUT = None
  143. self.steps = []
  144. self.namespace = self.Namespace(
  145. app=self.app, on_close=self.on_close,
  146. )
  147. self.namespace.apply(self, **dict(worker_options or {}, **kwargs))
  148. def start(self):
  149. ns, loop = self.namespace, self.loop
  150. while ns.state != CLOSE:
  151. maybe_shutdown()
  152. try:
  153. ns.start(self)
  154. except self.connection_errors:
  155. maybe_shutdown()
  156. try:
  157. self._restart_state.step()
  158. except RestartFreqExceeded as exc:
  159. crit('Frequent restarts detected: %r', exc, exc_info=1)
  160. sleep(1)
  161. if ns.state != CLOSE and self.connection:
  162. warn(CONNECTION_RETRY, exc_info=True)
  163. ns.restart(self)
  164. def shutdown(self):
  165. self.namespace.shutdown(self)
  166. def stop(self):
  167. self.namespace.stop(self)
  168. def on_ready(self):
  169. callback, self.init_callback = self.init_callback, None
  170. if callback:
  171. callback(self)
  172. def loop_args(self):
  173. return (self, self.connection, self.task_consumer,
  174. self.strategies, self.namespace, self.hub, self.qos,
  175. self.amqheartbeat, self.handle_unknown_message,
  176. self.handle_unknown_task, self.handle_invalid_task,
  177. self.app.clock)
  178. def on_poll_init(self, hub):
  179. hub.update_readers(self.connection.eventmap)
  180. self.connection.transport.on_poll_init(hub.poller)
  181. def on_decode_error(self, message, exc):
  182. """Callback called if an error occurs while decoding
  183. a message received.
  184. Simply logs the error and acknowledges the message so it
  185. doesn't enter a loop.
  186. :param message: The message with errors.
  187. :param exc: The original exception instance.
  188. """
  189. crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
  190. exc, message.content_type, message.content_encoding,
  191. dump_body(message, message.body))
  192. message.ack()
  193. def on_close(self):
  194. # Clear internal queues to get rid of old messages.
  195. # They can't be acked anyway, as a delivery tag is specific
  196. # to the current channel.
  197. self.ready_queue.clear()
  198. self.timer.clear()
  199. def connect(self):
  200. """Establish the broker connection.
  201. Will retry establishing the connection if the
  202. :setting:`BROKER_CONNECTION_RETRY` setting is enabled
  203. """
  204. conn = self.app.connection(heartbeat=self.amqheartbeat)
  205. # Callback called for each retry while the connection
  206. # can't be established.
  207. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  208. if getattr(conn, 'alt', None) and interval == 0:
  209. next_step = CONNECTION_FAILOVER
  210. error(CONNECTION_ERROR, conn.as_uri(), exc,
  211. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  212. # remember that the connection is lazy, it won't establish
  213. # until it's needed.
  214. if not self.app.conf.BROKER_CONNECTION_RETRY:
  215. # retry disabled, just call connect directly.
  216. conn.connect()
  217. return conn
  218. return conn.ensure_connection(_error_handler,
  219. self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
  220. callback=maybe_shutdown)
  221. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  222. routing_key=None, **options):
  223. cset = self.task_consumer
  224. queues = self.app.amqp.queues
  225. # Must use in' here, as __missing__ will automatically
  226. # create queues when CELERY_CREATE_MISSING_QUEUES is enabled.
  227. # (Issue #1079)
  228. if queue in queues:
  229. q = queues[queue]
  230. else:
  231. exchange = queue if exchange is None else exchange
  232. exchange_type = ('direct' if exchange_type is None
  233. else exchange_type)
  234. q = queues.select_add(queue,
  235. exchange=exchange,
  236. exchange_type=exchange_type,
  237. routing_key=routing_key, **options)
  238. if not cset.consuming_from(queue):
  239. cset.add_queue(q)
  240. cset.consume()
  241. info('Started consuming from %r', queue)
  242. def cancel_task_queue(self, queue):
  243. self.app.amqp.queues.select_remove(queue)
  244. self.task_consumer.cancel_by_queue(queue)
  245. def on_task(self, task, task_reserved=task_reserved,
  246. to_system_tz=timezone.to_system):
  247. """Handle received task.
  248. If the task has an `eta` we enter it into the ETA schedule,
  249. otherwise we move it the ready queue for immediate processing.
  250. """
  251. if task.revoked():
  252. return
  253. if self._does_info:
  254. info('Got task from broker: %s', task)
  255. if self.event_dispatcher.enabled:
  256. self.event_dispatcher.send('task-received', uuid=task.id,
  257. name=task.name, args=safe_repr(task.args),
  258. kwargs=safe_repr(task.kwargs),
  259. retries=task.request_dict.get('retries', 0),
  260. eta=task.eta and task.eta.isoformat(),
  261. expires=task.expires and task.expires.isoformat())
  262. if task.eta:
  263. try:
  264. if task.utc:
  265. eta = to_timestamp(to_system_tz(task.eta))
  266. else:
  267. eta = to_timestamp(task.eta, timezone.local)
  268. except OverflowError as exc:
  269. error("Couldn't convert eta %s to timestamp: %r. Task: %r",
  270. task.eta, exc, task.info(safe=True), exc_info=True)
  271. task.acknowledge()
  272. else:
  273. self.qos.increment_eventually()
  274. self.timer.apply_at(
  275. eta, self.apply_eta_task, (task, ), priority=6,
  276. )
  277. else:
  278. task_reserved(task)
  279. self._quick_put(task)
  280. def apply_eta_task(self, task):
  281. """Method called by the timer to apply a task with an
  282. ETA/countdown."""
  283. task_reserved(task)
  284. self._quick_put(task)
  285. self.qos.decrement_eventually()
  286. def _message_report(self, body, message):
  287. return MESSAGE_REPORT.format(dump_body(message, body),
  288. safe_repr(message.content_type),
  289. safe_repr(message.content_encoding),
  290. safe_repr(message.delivery_info))
  291. def handle_unknown_message(self, body, message):
  292. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  293. message.reject_log_error(logger, self.connection_errors)
  294. def handle_unknown_task(self, body, message, exc):
  295. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  296. message.reject_log_error(logger, self.connection_errors)
  297. def handle_invalid_task(self, body, message, exc):
  298. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  299. message.reject_log_error(logger, self.connection_errors)
  300. def update_strategies(self):
  301. loader = self.app.loader
  302. for name, task in items(self.app.tasks):
  303. self.strategies[name] = task.start_strategy(self.app, self)
  304. task.__trace__ = build_tracer(name, task, loader, self.hostname)
  305. class Connection(bootsteps.StartStopStep):
  306. def __init__(self, c, **kwargs):
  307. c.connection = None
  308. def start(self, c):
  309. c.connection = c.connect()
  310. info('Connected to %s', c.connection.as_uri())
  311. def shutdown(self, c):
  312. # We must set self.connection to None here, so
  313. # that the green pidbox thread exits.
  314. connection, c.connection = c.connection, None
  315. if connection:
  316. ignore_errors(connection, connection.close)
  317. def info(self, c):
  318. info = c.connection.info()
  319. info.pop('password', None) # don't send password.
  320. return {'broker': info}
  321. class Events(bootsteps.StartStopStep):
  322. requires = (Connection, )
  323. def __init__(self, c, send_events=None, **kwargs):
  324. self.send_events = True
  325. self.groups = None if send_events else ['worker']
  326. c.event_dispatcher = None
  327. def start(self, c):
  328. # Flush events sent while connection was down.
  329. prev = c.event_dispatcher
  330. dis = c.event_dispatcher = c.app.events.Dispatcher(
  331. c.connection, hostname=c.hostname,
  332. enabled=self.send_events, groups=self.groups,
  333. )
  334. if prev:
  335. dis.copy_buffer(prev)
  336. dis.flush()
  337. def stop(self, c):
  338. if c.event_dispatcher:
  339. ignore_errors(c, c.event_dispatcher.close)
  340. c.event_dispatcher = None
  341. shutdown = stop
  342. class Heart(bootsteps.StartStopStep):
  343. requires = (Events, )
  344. def __init__(self, c, enable_heartbeat=True, **kwargs):
  345. self.enabled = enable_heartbeat
  346. c.heart = None
  347. def start(self, c):
  348. c.heart = heartbeat.Heart(c.timer, c.event_dispatcher)
  349. c.heart.start()
  350. def stop(self, c):
  351. c.heart = c.heart and c.heart.stop()
  352. shutdown = stop
  353. class Control(bootsteps.StartStopStep):
  354. requires = (Events, )
  355. def __init__(self, c, **kwargs):
  356. self.is_green = c.pool is not None and c.pool.is_green
  357. self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
  358. self.start = self.box.start
  359. self.stop = self.box.stop
  360. self.shutdown = self.box.shutdown
  361. class Tasks(bootsteps.StartStopStep):
  362. requires = (Control, )
  363. def __init__(self, c, initial_prefetch_count=2, **kwargs):
  364. c.task_consumer = c.qos = None
  365. self.initial_prefetch_count = initial_prefetch_count
  366. def start(self, c):
  367. c.update_strategies()
  368. c.task_consumer = c.app.amqp.TaskConsumer(
  369. c.connection, on_decode_error=c.on_decode_error,
  370. )
  371. c.qos = QoS(c.task_consumer.qos, self.initial_prefetch_count)
  372. c.qos.update() # set initial prefetch count
  373. def stop(self, c):
  374. if c.task_consumer:
  375. debug('Cancelling task consumer...')
  376. ignore_errors(c, c.task_consumer.cancel)
  377. def shutdown(self, c):
  378. if c.task_consumer:
  379. self.stop(c)
  380. debug('Closing consumer channel...')
  381. ignore_errors(c, c.task_consumer.close)
  382. c.task_consumer = None
  383. def info(self, c):
  384. return {'prefetch_count': c.qos.value}
  385. class Agent(bootsteps.StartStopStep):
  386. conditional = True
  387. requires = (Connection, )
  388. def __init__(self, c, **kwargs):
  389. self.agent_cls = self.enabled = c.app.conf.CELERYD_AGENT
  390. def create(self, c):
  391. agent = c.agent = self.instantiate(self.agent_cls, c.connection)
  392. return agent
  393. class Mingle(bootsteps.StartStopStep):
  394. label = 'Mingle'
  395. requires = (Connection, )
  396. def __init__(self, c, enable_mingle=True, **kwargs):
  397. self.enabled = enable_mingle
  398. def start(self, c):
  399. info('mingle: searching for neighbors')
  400. I = c.app.control.inspect(timeout=1.0, connection=c.connection)
  401. replies = I.hello()
  402. if replies:
  403. for reply in values(replies):
  404. c.app.clock.adjust(reply['clock'])
  405. revoked.update(reply['revoked'])
  406. info('mingle: synced with %s', ', '.join(replies))
  407. else:
  408. info('mingle: no one here')
  409. class Gossip(bootsteps.ConsumerStep):
  410. label = 'Gossip'
  411. requires = (Events, )
  412. _cons_stamp_fields = itemgetter(
  413. 'clock', 'hostname', 'pid', 'topic', 'action',
  414. )
  415. def __init__(self, c, enable_gossip=True, interval=5.0, **kwargs):
  416. self.enabled = enable_gossip
  417. self.app = c.app
  418. c.gossip = self
  419. self.Receiver = c.app.events.Receiver
  420. self.hostname = c.hostname
  421. self.timer = c.timer
  422. self.state = c.app.events.State()
  423. self.interval = interval
  424. self._tref = None
  425. self.consensus_requests = defaultdict(list)
  426. self.consensus_replies = {}
  427. self.update_state = self.state.worker_event
  428. self.event_handlers = {
  429. 'worker.elect': self.on_elect,
  430. 'worker.elect.ack': self.on_elect_ack,
  431. }
  432. self.clock = c.app.clock
  433. self.election_handlers = {
  434. 'task': self.call_task
  435. }
  436. def election(self, id, topic, action=None):
  437. self.consensus_replies[id] = []
  438. self.dispatcher.send('worker-elect', id=id, topic=topic, action=action)
  439. def call_task(self, task):
  440. try:
  441. X = subtask(task)
  442. X.apply_async()
  443. except Exception as exc:
  444. error('Could not call task: %r', exc, exc_info=1)
  445. def on_elect(self, event):
  446. id = event['id']
  447. self.dispatcher.send('worker-elect-ack', id=id)
  448. clock, hostname, pid, topic, action = self._cons_stamp_fields(event)
  449. heappush(self.consensus_requests[id],
  450. (clock, '%s.%s' % (hostname, pid), topic, action),
  451. )
  452. def start(self, c):
  453. super(Gossip, self).start(c)
  454. self.dispatcher = c.event_dispatcher
  455. def on_elect_ack(self, event):
  456. id = event['id']
  457. try:
  458. replies = self.consensus_replies[id]
  459. except KeyError:
  460. return
  461. alive_workers = self.state.alive_workers()
  462. replies.append(event['hostname'])
  463. if len(replies) >= len(alive_workers):
  464. _, leader, topic, action = self.lock.sort_heap(
  465. self.consensus_requests[id],
  466. )
  467. if leader == self.hostname:
  468. info('I won the election %r', id)
  469. try:
  470. handler = self.election_handlers[topic]
  471. except KeyError:
  472. error('Unknown election topic %r', topic, exc_info=1)
  473. else:
  474. handler(action)
  475. else:
  476. info('node %s elected for %r', leader, id)
  477. self.consensus_requests.pop(id, None)
  478. self.consensus_replies.pop(id, None)
  479. def on_node_join(self, worker):
  480. info('%s joined the party', worker.hostname)
  481. def on_node_leave(self, worker):
  482. info('%s left', worker.hostname)
  483. def on_node_lost(self, worker):
  484. warn('%s went missing!', worker.hostname)
  485. def register_timer(self):
  486. if self._tref is not None:
  487. self._tref.cancel()
  488. self.timer.apply_interval(self.interval * 1000.0, self.periodic)
  489. def periodic(self):
  490. for worker in values(self.state.workers):
  491. if not worker.alive:
  492. try:
  493. self.on_node_lost(worker)
  494. finally:
  495. self.state.workers.pop(worker.hostname, None)
  496. def get_consumers(self, channel):
  497. self.register_timer()
  498. ev = self.Receiver(channel, routing_key='worker.#')
  499. return [kombu.Consumer(channel,
  500. queues=[ev.queue],
  501. on_message=partial(self.on_message, ev.event_from_message),
  502. no_ack=True)]
  503. def on_message(self, prepare, message):
  504. _type = message.delivery_info['routing_key']
  505. try:
  506. handler = self.event_handlers[_type]
  507. except KeyError:
  508. pass
  509. else:
  510. return handler(message.payload)
  511. hostname = (message.headers.get('hostname') or
  512. message.payload['hostname'])
  513. if hostname != self.hostname:
  514. type, event = prepare(message.payload)
  515. group, _, subject = type.partition('-')
  516. worker, created = self.update_state(subject, event)
  517. if subject == 'offline':
  518. try:
  519. self.on_node_leave(worker)
  520. finally:
  521. self.state.workers.pop(worker.hostname, None)
  522. elif created or subject == 'online':
  523. self.on_node_join(worker)
  524. else:
  525. self.clock.forward()
  526. class Evloop(bootsteps.StartStopStep):
  527. label = 'event loop'
  528. last = True
  529. def start(self, c):
  530. c.loop(*c.loop_args())