amqp.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ~~~~~~~~~~~~~~~
  5. Sending and receiving messages using Kombu.
  6. """
  7. from __future__ import absolute_import
  8. import numbers
  9. from datetime import timedelta
  10. from weakref import WeakValueDictionary
  11. from kombu import Connection, Consumer, Exchange, Producer, Queue
  12. from kombu.common import Broadcast
  13. from kombu.pools import ProducerPool
  14. from kombu.utils import cached_property, uuid
  15. from kombu.utils.encoding import safe_repr
  16. from kombu.utils.functional import maybe_list
  17. from celery import signals
  18. from celery.five import items, string_t
  19. from celery.utils.text import indent as textindent
  20. from celery.utils.timeutils import to_utc
  21. from . import app_or_default
  22. from . import routes as _routes
  23. __all__ = ['AMQP', 'Queues', 'TaskProducer', 'TaskConsumer']
  24. #: Human readable queue declaration.
  25. QUEUE_FORMAT = """
  26. .> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
  27. key={0.routing_key}
  28. """
  29. class Queues(dict):
  30. """Queue name⇒ declaration mapping.
  31. :param queues: Initial list/tuple or dict of queues.
  32. :keyword create_missing: By default any unknown queues will be
  33. added automatically, but if disabled
  34. the occurrence of unknown queues
  35. in `wanted` will raise :exc:`KeyError`.
  36. :keyword ha_policy: Default HA policy for queues with none set.
  37. """
  38. #: If set, this is a subset of queues to consume from.
  39. #: The rest of the queues are then used for routing only.
  40. _consume_from = None
  41. def __init__(self, queues=None, default_exchange=None,
  42. create_missing=True, ha_policy=None, autoexchange=None):
  43. dict.__init__(self)
  44. self.aliases = WeakValueDictionary()
  45. self.default_exchange = default_exchange
  46. self.create_missing = create_missing
  47. self.ha_policy = ha_policy
  48. self.autoexchange = Exchange if autoexchange is None else autoexchange
  49. if isinstance(queues, (tuple, list)):
  50. queues = dict((q.name, q) for q in queues)
  51. for name, q in items(queues or {}):
  52. self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
  53. def __getitem__(self, name):
  54. try:
  55. return self.aliases[name]
  56. except KeyError:
  57. return dict.__getitem__(self, name)
  58. def __setitem__(self, name, queue):
  59. if self.default_exchange and (not queue.exchange or
  60. not queue.exchange.name):
  61. queue.exchange = self.default_exchange
  62. dict.__setitem__(self, name, queue)
  63. if queue.alias:
  64. self.aliases[queue.alias] = queue
  65. def __missing__(self, name):
  66. if self.create_missing:
  67. return self.add(self.new_missing(name))
  68. raise KeyError(name)
  69. def add(self, queue, **kwargs):
  70. """Add new queue.
  71. The first argument can either be a :class:`kombu.Queue` instance,
  72. or the name of a queue. If the former the rest of the keyword
  73. arguments are ignored, and options are simply taken from the queue
  74. instance.
  75. :param queue: :class:`kombu.Queue` instance or name of the queue.
  76. :keyword exchange: (if named) specifies exchange name.
  77. :keyword routing_key: (if named) specifies binding key.
  78. :keyword exchange_type: (if named) specifies type of exchange.
  79. :keyword \*\*options: (if named) Additional declaration options.
  80. """
  81. if not isinstance(queue, Queue):
  82. return self.add_compat(queue, **kwargs)
  83. if self.ha_policy:
  84. if queue.queue_arguments is None:
  85. queue.queue_arguments = {}
  86. self._set_ha_policy(queue.queue_arguments)
  87. self[queue.name] = queue
  88. return queue
  89. def add_compat(self, name, **options):
  90. # docs used to use binding_key as routing key
  91. options.setdefault('routing_key', options.get('binding_key'))
  92. if options['routing_key'] is None:
  93. options['routing_key'] = name
  94. if self.ha_policy is not None:
  95. self._set_ha_policy(options.setdefault('queue_arguments', {}))
  96. q = self[name] = Queue.from_dict(name, **options)
  97. return q
  98. def _set_ha_policy(self, args):
  99. policy = self.ha_policy
  100. if isinstance(policy, (list, tuple)):
  101. return args.update({'x-ha-policy': 'nodes',
  102. 'x-ha-policy-params': list(policy)})
  103. args['x-ha-policy'] = policy
  104. def format(self, indent=0, indent_first=True):
  105. """Format routing table into string for log dumps."""
  106. active = self.consume_from
  107. if not active:
  108. return ''
  109. info = [QUEUE_FORMAT.strip().format(q)
  110. for _, q in sorted(items(active))]
  111. if indent_first:
  112. return textindent('\n'.join(info), indent)
  113. return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
  114. def select_add(self, queue, **kwargs):
  115. """Add new task queue that will be consumed from even when
  116. a subset has been selected using the :option:`-Q` option."""
  117. q = self.add(queue, **kwargs)
  118. if self._consume_from is not None:
  119. self._consume_from[q.name] = q
  120. return q
  121. def select(self, include):
  122. """Sets :attr:`consume_from` by selecting a subset of the
  123. currently defined queues.
  124. :param include: Names of queues to consume from.
  125. Can be iterable or string.
  126. """
  127. if include:
  128. self._consume_from = dict((name, self[name])
  129. for name in maybe_list(include))
  130. select_subset = select # XXX compat
  131. def deselect(self, exclude):
  132. """Deselect queues so that they will not be consumed from.
  133. :param exclude: Names of queues to avoid consuming from.
  134. Can be iterable or string.
  135. """
  136. if exclude:
  137. exclude = maybe_list(exclude)
  138. if self._consume_from is None:
  139. # using selection
  140. return self.select(k for k in self if k not in exclude)
  141. # using all queues
  142. for queue in exclude:
  143. self._consume_from.pop(queue, None)
  144. select_remove = deselect # XXX compat
  145. def new_missing(self, name):
  146. return Queue(name, self.autoexchange(name), name)
  147. @property
  148. def consume_from(self):
  149. if self._consume_from is not None:
  150. return self._consume_from
  151. return self
  152. class TaskProducer(Producer):
  153. app = None
  154. auto_declare = False
  155. retry = False
  156. retry_policy = None
  157. utc = True
  158. event_dispatcher = None
  159. send_sent_event = False
  160. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  161. self.retry = kwargs.pop('retry', self.retry)
  162. self.retry_policy = kwargs.pop('retry_policy',
  163. self.retry_policy or {})
  164. self.send_sent_event = kwargs.pop('send_sent_event',
  165. self.send_sent_event)
  166. exchange = exchange or self.exchange
  167. self.queues = self.app.amqp.queues # shortcut
  168. self.default_queue = self.app.amqp.default_queue
  169. super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
  170. def publish_task(self, task_name, task_args=None, task_kwargs=None,
  171. countdown=None, eta=None, task_id=None, group_id=None,
  172. taskset_id=None, # compat alias to group_id
  173. expires=None, exchange=None, exchange_type=None,
  174. event_dispatcher=None, retry=None, retry_policy=None,
  175. queue=None, now=None, retries=0, chord=None,
  176. callbacks=None, errbacks=None, routing_key=None,
  177. serializer=None, delivery_mode=None, compression=None,
  178. reply_to=None, time_limit=None, soft_time_limit=None,
  179. declare=None, headers=None,
  180. send_before_publish=signals.before_task_publish.send,
  181. before_receivers=signals.before_task_publish.receivers,
  182. send_after_publish=signals.after_task_publish.send,
  183. after_receivers=signals.after_task_publish.receivers,
  184. send_task_sent=signals.task_sent.send, # XXX deprecated
  185. sent_receivers=signals.task_sent.receivers,
  186. **kwargs):
  187. """Send task message."""
  188. retry = self.retry if retry is None else retry
  189. headers = {} if headers is None else headers
  190. qname = queue
  191. if queue is None and exchange is None:
  192. queue = self.default_queue
  193. if queue is not None:
  194. if isinstance(queue, string_t):
  195. qname, queue = queue, self.queues[queue]
  196. else:
  197. qname = queue.name
  198. exchange = exchange or queue.exchange.name
  199. routing_key = routing_key or queue.routing_key
  200. if declare is None and queue and not isinstance(queue, Broadcast):
  201. declare = [queue]
  202. # merge default and custom policy
  203. retry = self.retry if retry is None else retry
  204. _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
  205. else self.retry_policy)
  206. task_id = task_id or uuid()
  207. task_args = task_args or []
  208. task_kwargs = task_kwargs or {}
  209. if not isinstance(task_args, (list, tuple)):
  210. raise ValueError('task args must be a list or tuple')
  211. if not isinstance(task_kwargs, dict):
  212. raise ValueError('task kwargs must be a dictionary')
  213. if countdown: # Convert countdown to ETA.
  214. now = now or self.app.now()
  215. eta = now + timedelta(seconds=countdown)
  216. if self.utc:
  217. eta = to_utc(eta).astimezone(self.app.timezone)
  218. if isinstance(expires, numbers.Real):
  219. now = now or self.app.now()
  220. expires = now + timedelta(seconds=expires)
  221. if self.utc:
  222. expires = to_utc(expires).astimezone(self.app.timezone)
  223. eta = eta and eta.isoformat()
  224. expires = expires and expires.isoformat()
  225. body = {
  226. 'task': task_name,
  227. 'id': task_id,
  228. 'args': task_args,
  229. 'kwargs': task_kwargs,
  230. 'retries': retries or 0,
  231. 'eta': eta,
  232. 'expires': expires,
  233. 'utc': self.utc,
  234. 'callbacks': callbacks,
  235. 'errbacks': errbacks,
  236. 'timelimit': (time_limit, soft_time_limit),
  237. 'taskset': group_id or taskset_id,
  238. 'chord': chord,
  239. }
  240. if before_receivers:
  241. send_before_publish(
  242. sender=task_name, body=body,
  243. exchange=exchange,
  244. routing_key=routing_key,
  245. declare=declare,
  246. headers=headers,
  247. properties=kwargs,
  248. retry_policy=retry_policy,
  249. )
  250. self.publish(
  251. body,
  252. exchange=exchange, routing_key=routing_key,
  253. serializer=serializer or self.serializer,
  254. compression=compression or self.compression,
  255. headers=headers,
  256. retry=retry, retry_policy=_rp,
  257. reply_to=reply_to,
  258. correlation_id=task_id,
  259. delivery_mode=delivery_mode, declare=declare,
  260. **kwargs
  261. )
  262. if after_receivers:
  263. send_after_publish(sender=task_name, body=body,
  264. exchange=exchange, routing_key=routing_key)
  265. if sent_receivers: # XXX deprecated
  266. send_task_sent(sender=task_name, task_id=task_id,
  267. task=task_name, args=task_args,
  268. kwargs=task_kwargs, eta=eta,
  269. taskset=group_id or taskset_id)
  270. if self.send_sent_event:
  271. evd = event_dispatcher or self.event_dispatcher
  272. exname = exchange or self.exchange
  273. if isinstance(exname, Exchange):
  274. exname = exname.name
  275. evd.publish(
  276. 'task-sent',
  277. {
  278. 'uuid': task_id,
  279. 'name': task_name,
  280. 'args': safe_repr(task_args),
  281. 'kwargs': safe_repr(task_kwargs),
  282. 'retries': retries,
  283. 'eta': eta,
  284. 'expires': expires,
  285. 'queue': qname,
  286. 'exchange': exname,
  287. 'routing_key': routing_key,
  288. },
  289. self, retry=retry, retry_policy=retry_policy,
  290. )
  291. return task_id
  292. delay_task = publish_task # XXX Compat
  293. @cached_property
  294. def event_dispatcher(self):
  295. # We call Dispatcher.publish with a custom producer
  296. # so don't need the dispatcher to be "enabled".
  297. return self.app.events.Dispatcher(enabled=False)
  298. class TaskPublisher(TaskProducer):
  299. """Deprecated version of :class:`TaskProducer`."""
  300. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  301. self.app = app_or_default(kwargs.pop('app', self.app))
  302. self.retry = kwargs.pop('retry', self.retry)
  303. self.retry_policy = kwargs.pop('retry_policy',
  304. self.retry_policy or {})
  305. exchange = exchange or self.exchange
  306. if not isinstance(exchange, Exchange):
  307. exchange = Exchange(exchange,
  308. kwargs.pop('exchange_type', 'direct'))
  309. self.queues = self.app.amqp.queues # shortcut
  310. super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
  311. class TaskConsumer(Consumer):
  312. app = None
  313. def __init__(self, channel, queues=None, app=None, accept=None, **kw):
  314. self.app = app or self.app
  315. if accept is None:
  316. accept = self.app.conf.CELERY_ACCEPT_CONTENT
  317. super(TaskConsumer, self).__init__(
  318. channel,
  319. queues or list(self.app.amqp.queues.consume_from.values()),
  320. accept=accept,
  321. **kw
  322. )
  323. class AMQP(object):
  324. Connection = Connection
  325. Consumer = Consumer
  326. #: compat alias to Connection
  327. BrokerConnection = Connection
  328. producer_cls = TaskProducer
  329. consumer_cls = TaskConsumer
  330. queues_cls = Queues
  331. #: Cached and prepared routing table.
  332. _rtable = None
  333. #: Underlying producer pool instance automatically
  334. #: set by the :attr:`producer_pool`.
  335. _producer_pool = None
  336. # Exchange class/function used when defining automatic queues.
  337. # E.g. you can use ``autoexchange = lambda n: None`` to use the
  338. # amqp default exchange, which is a shortcut to bypass routing
  339. # and instead send directly to the queue named in the routing key.
  340. autoexchange = None
  341. def __init__(self, app):
  342. self.app = app
  343. def flush_routes(self):
  344. self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
  345. def Queues(self, queues, create_missing=None, ha_policy=None,
  346. autoexchange=None):
  347. """Create new :class:`Queues` instance, using queue defaults
  348. from the current configuration."""
  349. conf = self.app.conf
  350. if create_missing is None:
  351. create_missing = conf.CELERY_CREATE_MISSING_QUEUES
  352. if ha_policy is None:
  353. ha_policy = conf.CELERY_QUEUE_HA_POLICY
  354. if not queues and conf.CELERY_DEFAULT_QUEUE:
  355. queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
  356. exchange=self.default_exchange,
  357. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
  358. autoexchange = (self.autoexchange if autoexchange is None
  359. else autoexchange)
  360. return self.queues_cls(
  361. queues, self.default_exchange, create_missing,
  362. ha_policy, autoexchange,
  363. )
  364. def Router(self, queues=None, create_missing=None):
  365. """Return the current task router."""
  366. return _routes.Router(self.routes, queues or self.queues,
  367. self.app.either('CELERY_CREATE_MISSING_QUEUES',
  368. create_missing), app=self.app)
  369. @cached_property
  370. def TaskConsumer(self):
  371. """Return consumer configured to consume from the queues
  372. we are configured for (``app.amqp.queues.consume_from``)."""
  373. return self.app.subclass_with_self(self.consumer_cls,
  374. reverse='amqp.TaskConsumer')
  375. get_task_consumer = TaskConsumer # XXX compat
  376. @cached_property
  377. def TaskProducer(self):
  378. """Return publisher used to send tasks.
  379. You should use `app.send_task` instead.
  380. """
  381. conf = self.app.conf
  382. return self.app.subclass_with_self(
  383. self.producer_cls,
  384. reverse='amqp.TaskProducer',
  385. exchange=self.default_exchange,
  386. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
  387. serializer=conf.CELERY_TASK_SERIALIZER,
  388. compression=conf.CELERY_MESSAGE_COMPRESSION,
  389. retry=conf.CELERY_TASK_PUBLISH_RETRY,
  390. retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
  391. send_sent_event=conf.CELERY_SEND_TASK_SENT_EVENT,
  392. utc=conf.CELERY_ENABLE_UTC,
  393. )
  394. TaskPublisher = TaskProducer # compat
  395. @cached_property
  396. def default_queue(self):
  397. return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
  398. @cached_property
  399. def queues(self):
  400. """Queue name⇒ declaration mapping."""
  401. return self.Queues(self.app.conf.CELERY_QUEUES)
  402. @queues.setter # noqa
  403. def queues(self, queues):
  404. return self.Queues(queues)
  405. @property
  406. def routes(self):
  407. if self._rtable is None:
  408. self.flush_routes()
  409. return self._rtable
  410. @cached_property
  411. def router(self):
  412. return self.Router()
  413. @property
  414. def producer_pool(self):
  415. if self._producer_pool is None:
  416. self._producer_pool = ProducerPool(
  417. self.app.pool,
  418. limit=self.app.pool.limit,
  419. Producer=self.TaskProducer,
  420. )
  421. return self._producer_pool
  422. publisher_pool = producer_pool # compat alias
  423. @cached_property
  424. def default_exchange(self):
  425. return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
  426. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)