amqp.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ~~~~~~~~~~~~~~~
  5. Sending and receiving messages using Kombu.
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import numbers
  9. import sys
  10. from collections import Mapping, namedtuple
  11. from datetime import timedelta
  12. from weakref import WeakValueDictionary
  13. from kombu import pools
  14. from kombu import Connection, Consumer, Exchange, Producer, Queue
  15. from kombu.common import Broadcast
  16. from kombu.utils import cached_property
  17. from kombu.utils.functional import maybe_list
  18. from celery import signals
  19. from celery.five import items, string_t
  20. from celery.local import try_import
  21. from celery.utils import anon_nodename
  22. from celery.utils.saferepr import saferepr
  23. from celery.utils.text import indent as textindent
  24. from celery.utils.timeutils import maybe_make_aware, to_utc
  25. from . import routes as _routes
  26. __all__ = ['AMQP', 'Queues', 'task_message']
  27. PY3 = sys.version_info[0] == 3
  28. # json in Python 2.7 borks if dict contains byte keys.
  29. JSON_NEEDS_UNICODE_KEYS = not PY3 and not try_import('simplejson')
  30. #: Human readable queue declaration.
  31. QUEUE_FORMAT = """
  32. .> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
  33. key={0.routing_key}
  34. """
  35. task_message = namedtuple('task_message',
  36. ('headers', 'properties', 'body', 'sent_event'))
  37. def utf8dict(d, encoding='utf-8'):
  38. return {k.decode(encoding) if isinstance(k, bytes) else k: v
  39. for k, v in items(d)}
  40. class Queues(dict):
  41. """Queue name⇒ declaration mapping.
  42. :param queues: Initial list/tuple or dict of queues.
  43. :keyword create_missing: By default any unknown queues will be
  44. added automatically, but if disabled
  45. the occurrence of unknown queues
  46. in `wanted` will raise :exc:`KeyError`.
  47. :keyword ha_policy: Default HA policy for queues with none set.
  48. :keyword max_priority: Default x-max-priority for queues with none set.
  49. """
  50. #: If set, this is a subset of queues to consume from.
  51. #: The rest of the queues are then used for routing only.
  52. _consume_from = None
  53. def __init__(self, queues=None, default_exchange=None,
  54. create_missing=True, ha_policy=None, autoexchange=None,
  55. max_priority=None):
  56. dict.__init__(self)
  57. self.aliases = WeakValueDictionary()
  58. self.default_exchange = default_exchange
  59. self.create_missing = create_missing
  60. self.ha_policy = ha_policy
  61. self.autoexchange = Exchange if autoexchange is None else autoexchange
  62. self.max_priority = max_priority
  63. if isinstance(queues, (tuple, list)):
  64. queues = {q.name: q for q in queues}
  65. for name, q in items(queues or {}):
  66. self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
  67. def __getitem__(self, name):
  68. try:
  69. return self.aliases[name]
  70. except KeyError:
  71. return dict.__getitem__(self, name)
  72. def __setitem__(self, name, queue):
  73. if self.default_exchange and not queue.exchange:
  74. queue.exchange = self.default_exchange
  75. dict.__setitem__(self, name, queue)
  76. if queue.alias:
  77. self.aliases[queue.alias] = queue
  78. def __missing__(self, name):
  79. if self.create_missing:
  80. return self.add(self.new_missing(name))
  81. raise KeyError(name)
  82. def add(self, queue, **kwargs):
  83. """Add new queue.
  84. The first argument can either be a :class:`kombu.Queue` instance,
  85. or the name of a queue. If the former the rest of the keyword
  86. arguments are ignored, and options are simply taken from the queue
  87. instance.
  88. :param queue: :class:`kombu.Queue` instance or name of the queue.
  89. :keyword exchange: (if named) specifies exchange name.
  90. :keyword routing_key: (if named) specifies binding key.
  91. :keyword exchange_type: (if named) specifies type of exchange.
  92. :keyword \*\*options: (if named) Additional declaration options.
  93. """
  94. if not isinstance(queue, Queue):
  95. return self.add_compat(queue, **kwargs)
  96. if self.ha_policy:
  97. if queue.queue_arguments is None:
  98. queue.queue_arguments = {}
  99. self._set_ha_policy(queue.queue_arguments)
  100. if self.max_priority is not None:
  101. if queue.queue_arguments is None:
  102. queue.queue_arguments = {}
  103. self._set_max_priority(queue.queue_arguments)
  104. self[queue.name] = queue
  105. return queue
  106. def add_compat(self, name, **options):
  107. # docs used to use binding_key as routing key
  108. options.setdefault('routing_key', options.get('binding_key'))
  109. if options['routing_key'] is None:
  110. options['routing_key'] = name
  111. if self.ha_policy is not None:
  112. self._set_ha_policy(options.setdefault('queue_arguments', {}))
  113. if self.max_priority is not None:
  114. self._set_max_priority(options.setdefault('queue_arguments', {}))
  115. q = self[name] = Queue.from_dict(name, **options)
  116. return q
  117. def _set_ha_policy(self, args):
  118. policy = self.ha_policy
  119. if isinstance(policy, (list, tuple)):
  120. return args.update({'x-ha-policy': 'nodes',
  121. 'x-ha-policy-params': list(policy)})
  122. args['x-ha-policy'] = policy
  123. def _set_max_priority(self, args):
  124. if 'x-max-priority' not in args and self.max_priority is not None:
  125. return args.update({'x-max-priority': self.max_priority})
  126. def format(self, indent=0, indent_first=True):
  127. """Format routing table into string for log dumps."""
  128. active = self.consume_from
  129. if not active:
  130. return ''
  131. info = [QUEUE_FORMAT.strip().format(q)
  132. for _, q in sorted(items(active))]
  133. if indent_first:
  134. return textindent('\n'.join(info), indent)
  135. return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
  136. def select_add(self, queue, **kwargs):
  137. """Add new task queue that will be consumed from even when
  138. a subset has been selected using the :option:`-Q` option."""
  139. q = self.add(queue, **kwargs)
  140. if self._consume_from is not None:
  141. self._consume_from[q.name] = q
  142. return q
  143. def select(self, include):
  144. """Sets :attr:`consume_from` by selecting a subset of the
  145. currently defined queues.
  146. :param include: Names of queues to consume from.
  147. Can be iterable or string.
  148. """
  149. if include:
  150. self._consume_from = {
  151. name: self[name] for name in maybe_list(include)
  152. }
  153. select_subset = select # XXX compat
  154. def deselect(self, exclude):
  155. """Deselect queues so that they will not be consumed from.
  156. :param exclude: Names of queues to avoid consuming from.
  157. Can be iterable or string.
  158. """
  159. if exclude:
  160. exclude = maybe_list(exclude)
  161. if self._consume_from is None:
  162. # using selection
  163. return self.select(k for k in self if k not in exclude)
  164. # using all queues
  165. for queue in exclude:
  166. self._consume_from.pop(queue, None)
  167. select_remove = deselect # XXX compat
  168. def new_missing(self, name):
  169. return Queue(name, self.autoexchange(name), name)
  170. @property
  171. def consume_from(self):
  172. if self._consume_from is not None:
  173. return self._consume_from
  174. return self
  175. class AMQP(object):
  176. Connection = Connection
  177. Consumer = Consumer
  178. Producer = Producer
  179. #: compat alias to Connection
  180. BrokerConnection = Connection
  181. queues_cls = Queues
  182. #: Cached and prepared routing table.
  183. _rtable = None
  184. #: Underlying producer pool instance automatically
  185. #: set by the :attr:`producer_pool`.
  186. _producer_pool = None
  187. # Exchange class/function used when defining automatic queues.
  188. # E.g. you can use ``autoexchange = lambda n: None`` to use the
  189. # amqp default exchange, which is a shortcut to bypass routing
  190. # and instead send directly to the queue named in the routing key.
  191. autoexchange = None
  192. #: Max size of positional argument representation used for
  193. #: logging purposes.
  194. argsrepr_maxsize = 1024
  195. #: Max size of keyword argument representation used for logging purposes.
  196. kwargsrepr_maxsize = 1024
  197. def __init__(self, app):
  198. self.app = app
  199. self.task_protocols = {
  200. 1: self.as_task_v1,
  201. 2: self.as_task_v2,
  202. }
  203. @cached_property
  204. def create_task_message(self):
  205. return self.task_protocols[self.app.conf.task_protocol]
  206. @cached_property
  207. def send_task_message(self):
  208. return self._create_task_sender()
  209. def Queues(self, queues, create_missing=None, ha_policy=None,
  210. autoexchange=None, max_priority=None):
  211. """Create new :class:`Queues` instance, using queue defaults
  212. from the current configuration."""
  213. conf = self.app.conf
  214. if create_missing is None:
  215. create_missing = conf.task_create_missing_queues
  216. if ha_policy is None:
  217. ha_policy = conf.task_queue_ha_policy
  218. if max_priority is None:
  219. max_priority = conf.task_queue_max_priority
  220. if not queues and conf.task_default_queue:
  221. queues = (Queue(conf.task_default_queue,
  222. exchange=self.default_exchange,
  223. routing_key=conf.task_default_routing_key),)
  224. autoexchange = (self.autoexchange if autoexchange is None
  225. else autoexchange)
  226. return self.queues_cls(
  227. queues, self.default_exchange, create_missing,
  228. ha_policy, autoexchange, max_priority,
  229. )
  230. def Router(self, queues=None, create_missing=None):
  231. """Return the current task router."""
  232. return _routes.Router(self.routes, queues or self.queues,
  233. self.app.either('task_create_missing_queues',
  234. create_missing), app=self.app)
  235. def flush_routes(self):
  236. self._rtable = _routes.prepare(self.app.conf.task_routes)
  237. def TaskConsumer(self, channel, queues=None, accept=None, **kw):
  238. if accept is None:
  239. accept = self.app.conf.accept_content
  240. return self.Consumer(
  241. channel, accept=accept,
  242. queues=queues or list(self.queues.consume_from.values()),
  243. **kw
  244. )
  245. def as_task_v2(self, task_id, name, args=None, kwargs=None,
  246. countdown=None, eta=None, group_id=None,
  247. expires=None, retries=0, chord=None,
  248. callbacks=None, errbacks=None, reply_to=None,
  249. time_limit=None, soft_time_limit=None,
  250. create_sent_event=False, root_id=None, parent_id=None,
  251. shadow=None, chain=None, now=None, timezone=None,
  252. origin=None):
  253. args = args or ()
  254. kwargs = kwargs or {}
  255. if not isinstance(args, (list, tuple)):
  256. raise TypeError('task args must be a list or tuple')
  257. if not isinstance(kwargs, Mapping):
  258. raise TypeError('task keyword arguments must be a mapping')
  259. if countdown: # convert countdown to ETA
  260. now = now or self.app.now()
  261. timezone = timezone or self.app.timezone
  262. eta = maybe_make_aware(
  263. now + timedelta(seconds=countdown), tz=timezone,
  264. )
  265. if isinstance(expires, numbers.Real):
  266. now = now or self.app.now()
  267. timezone = timezone or self.app.timezone
  268. expires = maybe_make_aware(
  269. now + timedelta(seconds=expires), tz=timezone,
  270. )
  271. eta = eta and eta.isoformat()
  272. expires = expires and expires.isoformat()
  273. argsrepr = saferepr(args, self.argsrepr_maxsize)
  274. kwargsrepr = saferepr(kwargs, self.kwargsrepr_maxsize)
  275. if JSON_NEEDS_UNICODE_KEYS: # pragma: no cover
  276. if callbacks:
  277. callbacks = [utf8dict(callback) for callback in callbacks]
  278. if errbacks:
  279. errbacks = [utf8dict(errback) for errback in errbacks]
  280. if chord:
  281. chord = utf8dict(chord)
  282. return task_message(
  283. headers={
  284. 'lang': 'py',
  285. 'task': name,
  286. 'id': task_id,
  287. 'eta': eta,
  288. 'expires': expires,
  289. 'group': group_id,
  290. 'retries': retries,
  291. 'timelimit': [time_limit, soft_time_limit],
  292. 'root_id': root_id,
  293. 'parent_id': parent_id,
  294. 'argsrepr': argsrepr,
  295. 'kwargsrepr': kwargsrepr,
  296. 'origin': origin or anon_nodename()
  297. },
  298. properties={
  299. 'correlation_id': task_id,
  300. 'reply_to': reply_to or '',
  301. },
  302. body=(
  303. args, kwargs, {
  304. 'callbacks': callbacks,
  305. 'errbacks': errbacks,
  306. 'chain': chain,
  307. 'chord': chord,
  308. },
  309. ),
  310. sent_event={
  311. 'uuid': task_id,
  312. 'root_id': root_id,
  313. 'parent_id': parent_id,
  314. 'name': name,
  315. 'args': argsrepr,
  316. 'kwargs': kwargsrepr,
  317. 'retries': retries,
  318. 'eta': eta,
  319. 'expires': expires,
  320. } if create_sent_event else None,
  321. )
  322. def as_task_v1(self, task_id, name, args=None, kwargs=None,
  323. countdown=None, eta=None, group_id=None,
  324. expires=None, retries=0,
  325. chord=None, callbacks=None, errbacks=None, reply_to=None,
  326. time_limit=None, soft_time_limit=None,
  327. create_sent_event=False, root_id=None, parent_id=None,
  328. shadow=None, now=None, timezone=None):
  329. args = args or ()
  330. kwargs = kwargs or {}
  331. utc = self.utc
  332. if not isinstance(args, (list, tuple)):
  333. raise ValueError('task args must be a list or tuple')
  334. if not isinstance(kwargs, Mapping):
  335. raise ValueError('task keyword arguments must be a mapping')
  336. if countdown: # convert countdown to ETA
  337. now = now or self.app.now()
  338. timezone = timezone or self.app.timezone
  339. eta = now + timedelta(seconds=countdown)
  340. if utc:
  341. eta = to_utc(eta).astimezone(timezone)
  342. if isinstance(expires, numbers.Real):
  343. now = now or self.app.now()
  344. timezone = timezone or self.app.timezone
  345. expires = now + timedelta(seconds=expires)
  346. if utc:
  347. expires = to_utc(expires).astimezone(timezone)
  348. eta = eta and eta.isoformat()
  349. expires = expires and expires.isoformat()
  350. if JSON_NEEDS_UNICODE_KEYS: # pragma: no cover
  351. if callbacks:
  352. callbacks = [utf8dict(callback) for callback in callbacks]
  353. if errbacks:
  354. errbacks = [utf8dict(errback) for errback in errbacks]
  355. if chord:
  356. chord = utf8dict(chord)
  357. return task_message(
  358. headers={},
  359. properties={
  360. 'correlation_id': task_id,
  361. 'reply_to': reply_to or '',
  362. },
  363. body={
  364. 'task': name,
  365. 'id': task_id,
  366. 'args': args,
  367. 'kwargs': kwargs,
  368. 'group': group_id,
  369. 'retries': retries,
  370. 'eta': eta,
  371. 'expires': expires,
  372. 'utc': utc,
  373. 'callbacks': callbacks,
  374. 'errbacks': errbacks,
  375. 'timelimit': (time_limit, soft_time_limit),
  376. 'taskset': group_id,
  377. 'chord': chord,
  378. },
  379. sent_event={
  380. 'uuid': task_id,
  381. 'name': name,
  382. 'args': saferepr(args),
  383. 'kwargs': saferepr(kwargs),
  384. 'retries': retries,
  385. 'eta': eta,
  386. 'expires': expires,
  387. } if create_sent_event else None,
  388. )
  389. def _create_task_sender(self):
  390. default_retry = self.app.conf.task_publish_retry
  391. default_policy = self.app.conf.task_publish_retry_policy
  392. default_delivery_mode = self.app.conf.task_default_delivery_mode
  393. default_queue = self.default_queue
  394. queues = self.queues
  395. send_before_publish = signals.before_task_publish.send
  396. before_receivers = signals.before_task_publish.receivers
  397. send_after_publish = signals.after_task_publish.send
  398. after_receivers = signals.after_task_publish.receivers
  399. send_task_sent = signals.task_sent.send # XXX compat
  400. sent_receivers = signals.task_sent.receivers
  401. default_evd = self._event_dispatcher
  402. default_exchange = self.default_exchange
  403. default_rkey = self.app.conf.task_default_routing_key
  404. default_serializer = self.app.conf.task_serializer
  405. default_compressor = self.app.conf.result_compression
  406. def send_task_message(producer, name, message,
  407. exchange=None, routing_key=None, queue=None,
  408. event_dispatcher=None,
  409. retry=None, retry_policy=None,
  410. serializer=None, delivery_mode=None,
  411. compression=None, declare=None,
  412. headers=None, exchange_type=None, **kwargs):
  413. retry = default_retry if retry is None else retry
  414. headers2, properties, body, sent_event = message
  415. if headers:
  416. headers2.update(headers)
  417. if kwargs:
  418. properties.update(kwargs)
  419. qname = queue
  420. if queue is None and exchange is None:
  421. queue = default_queue
  422. if queue is not None:
  423. if isinstance(queue, string_t):
  424. qname, queue = queue, queues[queue]
  425. else:
  426. qname = queue.name
  427. if delivery_mode is None:
  428. try:
  429. delivery_mode = queue.exchange.delivery_mode
  430. except AttributeError:
  431. pass
  432. delivery_mode = delivery_mode or default_delivery_mode
  433. if exchange_type is None:
  434. try:
  435. exchange_type = queue.exchange.type
  436. except AttributeError:
  437. exchange_type = 'direct'
  438. if not exchange and not routing_key and exchange_type == 'direct':
  439. exchange, routing_key = '', qname
  440. else:
  441. exchange = exchange or queue.exchange.name or default_exchange
  442. routing_key = routing_key or queue.routing_key or default_rkey
  443. if declare is None and queue and not isinstance(queue, Broadcast):
  444. declare = [queue]
  445. # merge default and custom policy
  446. retry = default_retry if retry is None else retry
  447. _rp = (dict(default_policy, **retry_policy) if retry_policy
  448. else default_policy)
  449. if before_receivers:
  450. send_before_publish(
  451. sender=name, body=body,
  452. exchange=exchange, routing_key=routing_key,
  453. declare=declare, headers=headers2,
  454. properties=kwargs, retry_policy=retry_policy,
  455. )
  456. ret = producer.publish(
  457. body,
  458. exchange=exchange,
  459. routing_key=routing_key,
  460. serializer=serializer or default_serializer,
  461. compression=compression or default_compressor,
  462. retry=retry, retry_policy=_rp,
  463. delivery_mode=delivery_mode, declare=declare,
  464. headers=headers2,
  465. **properties
  466. )
  467. if after_receivers:
  468. send_after_publish(sender=name, body=body, headers=headers2,
  469. exchange=exchange, routing_key=routing_key)
  470. if sent_receivers: # XXX deprecated
  471. send_task_sent(sender=name, task_id=body['id'], task=name,
  472. args=body['args'], kwargs=body['kwargs'],
  473. eta=body['eta'], taskset=body['taskset'])
  474. if sent_event:
  475. evd = event_dispatcher or default_evd
  476. exname = exchange
  477. if isinstance(exname, Exchange):
  478. exname = exname.name
  479. sent_event.update({
  480. 'queue': qname,
  481. 'exchange': exname,
  482. 'routing_key': routing_key,
  483. })
  484. evd.publish('task-sent', sent_event,
  485. self, retry=retry, retry_policy=retry_policy)
  486. return ret
  487. return send_task_message
  488. @cached_property
  489. def default_queue(self):
  490. return self.queues[self.app.conf.task_default_queue]
  491. @cached_property
  492. def queues(self):
  493. """Queue name⇒ declaration mapping."""
  494. return self.Queues(self.app.conf.task_queues)
  495. @queues.setter # noqa
  496. def queues(self, queues):
  497. return self.Queues(queues)
  498. @property
  499. def routes(self):
  500. if self._rtable is None:
  501. self.flush_routes()
  502. return self._rtable
  503. @cached_property
  504. def router(self):
  505. return self.Router()
  506. @property
  507. def producer_pool(self):
  508. if self._producer_pool is None:
  509. self._producer_pool = pools.producers[
  510. self.app.connection_for_write()]
  511. self._producer_pool.limit = self.app.pool.limit
  512. return self._producer_pool
  513. publisher_pool = producer_pool # compat alias
  514. @cached_property
  515. def default_exchange(self):
  516. return Exchange(self.app.conf.task_default_exchange,
  517. self.app.conf.task_default_exchange_type)
  518. @cached_property
  519. def utc(self):
  520. return self.app.conf.enable_utc
  521. @cached_property
  522. def _event_dispatcher(self):
  523. # We call Dispatcher.publish with a custom producer
  524. # so don't need the diuspatcher to be enabled.
  525. return self.app.events.Dispatcher(enabled=False)