__init__.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.consumer
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. This module contains the component responsible for consuming messages
  6. from the broker, processing the messages and keeping the broker connections
  7. up and running.
  8. """
  9. from __future__ import absolute_import
  10. import logging
  11. import socket
  12. from kombu.syn import _detect_environment
  13. from kombu.utils.encoding import safe_repr
  14. from celery.app import app_or_default
  15. from celery.task.trace import build_tracer
  16. from celery.utils.timer2 import default_timer, to_timestamp
  17. from celery.utils.functional import noop
  18. from celery.utils.imports import qualname
  19. from celery.utils.log import get_logger
  20. from celery.utils.text import dump_body
  21. from celery.utils.timeutils import humanize_seconds, timezone
  22. from celery.worker import state
  23. from celery.worker.state import maybe_shutdown
  24. from celery.worker.bootsteps import Namespace as _NS, StartStopComponent, CLOSE
  25. from . import loops
  26. logger = get_logger(__name__)
  27. info, warn, error, crit = (logger.info, logger.warn,
  28. logger.error, logger.critical)
  29. task_reserved = state.task_reserved
  30. CONNECTION_RETRY = """\
  31. consumer: Connection to broker lost. \
  32. Trying to re-establish the connection...\
  33. """
  34. CONNECTION_RETRY_STEP = """\
  35. Trying again {when}...\
  36. """
  37. CONNECTION_ERROR = """\
  38. consumer: Cannot connect to %s: %s.
  39. %s
  40. """
  41. CONNECTION_FAILOVER = """\
  42. Will retry using next failover.\
  43. """
  44. UNKNOWN_FORMAT = """\
  45. Received and deleted unknown message. Wrong destination?!?
  46. The full contents of the message body was: %s
  47. """
  48. #: Error message for when an unregistered task is received.
  49. UNKNOWN_TASK_ERROR = """\
  50. Received unregistered task of type %s.
  51. The message has been ignored and discarded.
  52. Did you remember to import the module containing this task?
  53. Or maybe you are using relative imports?
  54. Please see http://bit.ly/gLye1c for more information.
  55. The full contents of the message body was:
  56. %s
  57. """
  58. #: Error message for when an invalid task message is received.
  59. INVALID_TASK_ERROR = """\
  60. Received invalid task message: %s
  61. The message has been ignored and discarded.
  62. Please ensure your message conforms to the task
  63. message protocol as described here: http://bit.ly/hYj41y
  64. The full contents of the message body was:
  65. %s
  66. """
  67. MESSAGE_REPORT = """\
  68. body: {0} {{content_type:{1} content_encoding:{2} delivery_info:{3}}}\
  69. """
  70. def debug(msg, *args, **kwargs):
  71. logger.debug('consumer: {0}'.format(msg), *args, **kwargs)
  72. class Component(StartStopComponent):
  73. name = 'worker.consumer'
  74. last = True
  75. def create(self, w):
  76. prefetch_count = w.concurrency * w.prefetch_multiplier
  77. c = w.consumer = self.instantiate(w.consumer_cls,
  78. w.ready_queue,
  79. hostname=w.hostname,
  80. send_events=w.send_events,
  81. init_callback=w.ready_callback,
  82. initial_prefetch_count=prefetch_count,
  83. pool=w.pool,
  84. timer=w.timer,
  85. app=w.app,
  86. controller=w,
  87. hub=w.hub)
  88. return c
  89. class Namespace(_NS):
  90. name = 'consumer'
  91. builtin_boot_steps = ('celery.worker.consumer.components', )
  92. def shutdown(self, parent):
  93. delayed = self._shutdown_step(parent, parent.components, force=False)
  94. self._shutdown_step(parent, delayed, force=True)
  95. def _shutdown_step(self, parent, components, force=False):
  96. delayed = []
  97. for component in components:
  98. if component:
  99. logger.debug('Shutdown %s...', qualname(component))
  100. if not force and getattr(component, 'delay_shutdown', False):
  101. delayed.append(component)
  102. else:
  103. component.shutdown(parent)
  104. return delayed
  105. def modules(self):
  106. return (self.builtin_boot_steps +
  107. self.app.conf.CELERYD_CONSUMER_BOOT_STEPS)
  108. class Consumer(object):
  109. """Listen for messages received from the broker and
  110. move them to the ready queue for task processing.
  111. :param ready_queue: See :attr:`ready_queue`.
  112. :param timer: See :attr:`timer`.
  113. """
  114. #: The queue that holds tasks ready for immediate processing.
  115. ready_queue = None
  116. #: Optional callback to be called when the connection is established.
  117. #: Will only be called once, even if the connection is lost and
  118. #: re-established.
  119. init_callback = None
  120. #: The current hostname. Defaults to the system hostname.
  121. hostname = None
  122. #: The current worker pool instance.
  123. pool = None
  124. #: A timer used for high-priority internal tasks, such
  125. #: as sending heartbeats.
  126. timer = None
  127. def __init__(self, ready_queue,
  128. init_callback=noop, hostname=None,
  129. pool=None, app=None,
  130. timer=None, controller=None, hub=None, amqheartbeat=None,
  131. **kwargs):
  132. self.app = app_or_default(app)
  133. self.controller = controller
  134. self.ready_queue = ready_queue
  135. self.init_callback = init_callback
  136. self.hostname = hostname or socket.gethostname()
  137. self.pool = pool
  138. self.timer = timer or default_timer
  139. self.strategies = {}
  140. conninfo = self.app.connection()
  141. self.connection_errors = conninfo.connection_errors
  142. self.channel_errors = conninfo.channel_errors
  143. self._does_info = logger.isEnabledFor(logging.INFO)
  144. if not hasattr(self, 'loop'):
  145. self.loop = loops.asynloop if hub else loops.synloop
  146. if hub:
  147. hub.on_init.append(self.on_poll_init)
  148. self.hub = hub
  149. self._quick_put = self.ready_queue.put
  150. self.amqheartbeat = amqheartbeat
  151. if self.amqheartbeat is None:
  152. self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
  153. if not hub:
  154. self.amqheartbeat = 0
  155. if _detect_environment() == 'gevent':
  156. # there's a gevent bug that causes timeouts to not be reset,
  157. # so if the connection timeout is exceeded once, it can NEVER
  158. # connect again.
  159. self.app.conf.BROKER_CONNECTION_TIMEOUT = None
  160. self.components = []
  161. self.namespace = Namespace(app=self.app,
  162. on_start=self.on_start,
  163. on_close=self.on_close)
  164. self.namespace.apply(self, **kwargs)
  165. def on_start(self):
  166. # reload all task's execution strategies.
  167. self.update_strategies()
  168. self.init_callback(self)
  169. def start(self):
  170. """Start the consumer.
  171. Automatically survives intermittent connection failure,
  172. and will retry establishing the connection and restart
  173. consuming messages.
  174. """
  175. ns, loop, loop_args = self.namespace, self.loop, self.loop_args()
  176. while ns.state != CLOSE:
  177. maybe_shutdown()
  178. try:
  179. ns.start(self)
  180. loop(*loop_args)
  181. except self.connection_errors + self.channel_errors:
  182. error(CONNECTION_RETRY, exc_info=True)
  183. self.restart()
  184. def loop_args(self):
  185. return (self, self.connection, self.task_consumer,
  186. self.strategies, self.namespace, self.hub, self.qos,
  187. self.amqheartbeat, self.handle_unknown_message,
  188. self.handle_unknown_task, self.handle_invalid_task)
  189. def restart(self):
  190. return self.namespace.restart(self)
  191. def on_poll_init(self, hub):
  192. hub.update_readers(self.connection.eventmap)
  193. self.connection.transport.on_poll_init(hub.poller)
  194. def maybe_conn_error(self, fun):
  195. """Applies function but ignores any connection or channel
  196. errors raised."""
  197. try:
  198. fun()
  199. except (AttributeError, ) + \
  200. self.connection_errors + \
  201. self.channel_errors:
  202. pass
  203. def shutdown(self):
  204. self.namespace.shutdown(self)
  205. def on_decode_error(self, message, exc):
  206. """Callback called if an error occurs while decoding
  207. a message received.
  208. Simply logs the error and acknowledges the message so it
  209. doesn't enter a loop.
  210. :param message: The message with errors.
  211. :param exc: The original exception instance.
  212. """
  213. crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
  214. exc, message.content_type, message.content_encoding,
  215. dump_body(message, message.body))
  216. message.ack()
  217. def on_close(self):
  218. # Clear internal queues to get rid of old messages.
  219. # They can't be acked anyway, as a delivery tag is specific
  220. # to the current channel.
  221. self.ready_queue.clear()
  222. self.timer.clear()
  223. def _open_connection(self):
  224. """Establish the broker connection.
  225. Will retry establishing the connection if the
  226. :setting:`BROKER_CONNECTION_RETRY` setting is enabled
  227. """
  228. conn = self.app.connection(heartbeat=self.amqheartbeat)
  229. # Callback called for each retry while the connection
  230. # can't be established.
  231. def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
  232. if getattr(conn, 'alt', None) and interval == 0:
  233. next_step = CONNECTION_FAILOVER
  234. error(CONNECTION_ERROR, conn.as_uri(), exc,
  235. next_step.format(when=humanize_seconds(interval, 'in', ' ')))
  236. # remember that the connection is lazy, it won't establish
  237. # until it's needed.
  238. if not self.app.conf.BROKER_CONNECTION_RETRY:
  239. # retry disabled, just call connect directly.
  240. conn.connect()
  241. return conn
  242. return conn.ensure_connection(_error_handler,
  243. self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
  244. callback=maybe_shutdown)
  245. def stop(self):
  246. """Stop consuming.
  247. Does not close the broker connection, so be sure to call
  248. :meth:`close_connection` when you are finished with it.
  249. """
  250. self.namespace.stop(self)
  251. def add_task_queue(self, queue, exchange=None, exchange_type=None,
  252. routing_key=None, **options):
  253. cset = self.task_consumer
  254. try:
  255. q = self.app.amqp.queues[queue]
  256. except KeyError:
  257. exchange = queue if exchange is None else exchange
  258. exchange_type = 'direct' if exchange_type is None \
  259. else exchange_type
  260. q = self.app.amqp.queues.select_add(queue,
  261. exchange=exchange,
  262. exchange_type=exchange_type,
  263. routing_key=routing_key, **options)
  264. if not cset.consuming_from(queue):
  265. cset.add_queue(q)
  266. cset.consume()
  267. info('Started consuming from %r', queue)
  268. def cancel_task_queue(self, queue):
  269. self.app.amqp.queues.select_remove(queue)
  270. self.task_consumer.cancel_by_queue(queue)
  271. @property
  272. def info(self):
  273. """Returns information about this consumer instance
  274. as a dict.
  275. This is also the consumer related info returned by
  276. ``celeryctl stats``.
  277. """
  278. conninfo = {}
  279. if self.connection:
  280. conninfo = self.connection.info()
  281. conninfo.pop('password', None) # don't send password.
  282. return {'broker': conninfo, 'prefetch_count': self.qos.value}
  283. def on_task(self, task, task_reserved=task_reserved):
  284. """Handle received task.
  285. If the task has an `eta` we enter it into the ETA schedule,
  286. otherwise we move it the ready queue for immediate processing.
  287. """
  288. if task.revoked():
  289. return
  290. if self._does_info:
  291. info('Got task from broker: %s', task)
  292. if self.event_dispatcher.enabled:
  293. self.event_dispatcher.send('task-received', uuid=task.id,
  294. name=task.name, args=safe_repr(task.args),
  295. kwargs=safe_repr(task.kwargs),
  296. retries=task.request_dict.get('retries', 0),
  297. eta=task.eta and task.eta.isoformat(),
  298. expires=task.expires and task.expires.isoformat())
  299. if task.eta:
  300. eta = timezone.to_system(task.eta) if task.utc else task.eta
  301. try:
  302. eta = to_timestamp(eta)
  303. except OverflowError as exc:
  304. error("Couldn't convert eta %s to timestamp: %r. Task: %r",
  305. task.eta, exc, task.info(safe=True), exc_info=True)
  306. task.acknowledge()
  307. else:
  308. self.qos.increment_eventually()
  309. self.timer.apply_at(
  310. eta, self.apply_eta_task, (task, ), priority=6,
  311. )
  312. else:
  313. task_reserved(task)
  314. self._quick_put(task)
  315. def apply_eta_task(self, task):
  316. """Method called by the timer to apply a task with an
  317. ETA/countdown."""
  318. task_reserved(task)
  319. self._quick_put(task)
  320. self.qos.decrement_eventually()
  321. def _message_report(self, body, message):
  322. return MESSAGE_REPORT.format(dump_body(message, body),
  323. safe_repr(message.content_type),
  324. safe_repr(message.content_encoding),
  325. safe_repr(message.delivery_info))
  326. def handle_unknown_message(self, body, message):
  327. warn(UNKNOWN_FORMAT, self._message_report(body, message))
  328. message.reject_log_error(logger, self.connection_errors)
  329. def handle_unknown_task(self, body, message, exc):
  330. error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  331. message.reject_log_error(logger, self.connection_errors)
  332. def handle_invalid_task(self, body, message, exc):
  333. error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
  334. message.reject_log_error(logger, self.connection_errors)
  335. def update_strategies(self):
  336. S = self.strategies
  337. app = self.app
  338. loader = app.loader
  339. hostname = self.hostname
  340. for name, task in self.app.tasks.iteritems():
  341. S[name] = task.start_strategy(app, self)
  342. task.__trace__ = build_tracer(name, task, loader, hostname)