amqp.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ~~~~~~~~~~~~~~~
  5. Sending and receiving messages using Kombu.
  6. """
  7. from __future__ import absolute_import
  8. from datetime import timedelta
  9. from weakref import WeakValueDictionary
  10. from kombu import Connection, Consumer, Exchange, Producer, Queue
  11. from kombu.common import Broadcast
  12. from kombu.pools import ProducerPool
  13. from kombu.utils import cached_property, uuid
  14. from kombu.utils.encoding import safe_repr
  15. from kombu.utils.functional import maybe_list
  16. from celery import signals
  17. from celery.five import items, string_t
  18. from celery.utils.text import indent as textindent
  19. from celery.utils.timeutils import to_utc
  20. from . import app_or_default
  21. from . import routes as _routes
  22. __all__ = ['AMQP', 'Queues', 'TaskProducer', 'TaskConsumer']
  23. #: Human readable queue declaration.
  24. QUEUE_FORMAT = """
  25. .> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
  26. key={0.routing_key}
  27. """
  28. class Queues(dict):
  29. """Queue name⇒ declaration mapping.
  30. :param queues: Initial list/tuple or dict of queues.
  31. :keyword create_missing: By default any unknown queues will be
  32. added automatically, but if disabled
  33. the occurrence of unknown queues
  34. in `wanted` will raise :exc:`KeyError`.
  35. :keyword ha_policy: Default HA policy for queues with none set.
  36. """
  37. #: If set, this is a subset of queues to consume from.
  38. #: The rest of the queues are then used for routing only.
  39. _consume_from = None
  40. def __init__(self, queues=None, default_exchange=None,
  41. create_missing=True, ha_policy=None, autoexchange=None):
  42. dict.__init__(self)
  43. self.aliases = WeakValueDictionary()
  44. self.default_exchange = default_exchange
  45. self.create_missing = create_missing
  46. self.ha_policy = ha_policy
  47. self.autoexchange = Exchange if autoexchange is None else autoexchange
  48. if isinstance(queues, (tuple, list)):
  49. queues = dict((q.name, q) for q in queues)
  50. for name, q in items(queues or {}):
  51. self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
  52. def __getitem__(self, name):
  53. try:
  54. return self.aliases[name]
  55. except KeyError:
  56. return dict.__getitem__(self, name)
  57. def __setitem__(self, name, queue):
  58. if self.default_exchange and (not queue.exchange or
  59. not queue.exchange.name):
  60. queue.exchange = self.default_exchange
  61. dict.__setitem__(self, name, queue)
  62. if queue.alias:
  63. self.aliases[queue.alias] = queue
  64. def __missing__(self, name):
  65. if self.create_missing:
  66. return self.add(self.new_missing(name))
  67. raise KeyError(name)
  68. def add(self, queue, **kwargs):
  69. """Add new queue.
  70. The first argument can either be a :class:`kombu.Queue` instance,
  71. or the name of a queue. If the former the rest of the keyword
  72. arguments are ignored, and options are simply taken from the queue
  73. instance.
  74. :param queue: :class:`kombu.Queue` instance or name of the queue.
  75. :keyword exchange: (if named) specifies exchange name.
  76. :keyword routing_key: (if named) specifies binding key.
  77. :keyword exchange_type: (if named) specifies type of exchange.
  78. :keyword \*\*options: (if named) Additional declaration options.
  79. """
  80. if not isinstance(queue, Queue):
  81. return self.add_compat(queue, **kwargs)
  82. if self.ha_policy:
  83. if queue.queue_arguments is None:
  84. queue.queue_arguments = {}
  85. self._set_ha_policy(queue.queue_arguments)
  86. self[queue.name] = queue
  87. return queue
  88. def add_compat(self, name, **options):
  89. # docs used to use binding_key as routing key
  90. options.setdefault('routing_key', options.get('binding_key'))
  91. if options['routing_key'] is None:
  92. options['routing_key'] = name
  93. if self.ha_policy is not None:
  94. self._set_ha_policy(options.setdefault('queue_arguments', {}))
  95. q = self[name] = Queue.from_dict(name, **options)
  96. return q
  97. def _set_ha_policy(self, args):
  98. policy = self.ha_policy
  99. if isinstance(policy, (list, tuple)):
  100. return args.update({'x-ha-policy': 'nodes',
  101. 'x-ha-policy-params': list(policy)})
  102. args['x-ha-policy'] = policy
  103. def format(self, indent=0, indent_first=True):
  104. """Format routing table into string for log dumps."""
  105. active = self.consume_from
  106. if not active:
  107. return ''
  108. info = [QUEUE_FORMAT.strip().format(q)
  109. for _, q in sorted(items(active))]
  110. if indent_first:
  111. return textindent('\n'.join(info), indent)
  112. return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
  113. def select_add(self, queue, **kwargs):
  114. """Add new task queue that will be consumed from even when
  115. a subset has been selected using the :option:`-Q` option."""
  116. q = self.add(queue, **kwargs)
  117. if self._consume_from is not None:
  118. self._consume_from[q.name] = q
  119. return q
  120. def select(self, include):
  121. """Sets :attr:`consume_from` by selecting a subset of the
  122. currently defined queues.
  123. :param include: Names of queues to consume from.
  124. Can be iterable or string.
  125. """
  126. if include:
  127. self._consume_from = dict((name, self[name])
  128. for name in maybe_list(include))
  129. select_subset = select # XXX compat
  130. def deselect(self, exclude):
  131. """Deselect queues so that they will not be consumed from.
  132. :param exclude: Names of queues to avoid consuming from.
  133. Can be iterable or string.
  134. """
  135. if exclude:
  136. exclude = maybe_list(exclude)
  137. if self._consume_from is None:
  138. # using selection
  139. return self.select(k for k in self if k not in exclude)
  140. # using all queues
  141. for queue in exclude:
  142. self._consume_from.pop(queue, None)
  143. select_remove = deselect # XXX compat
  144. def new_missing(self, name):
  145. return Queue(name, self.autoexchange(name), name)
  146. @property
  147. def consume_from(self):
  148. if self._consume_from is not None:
  149. return self._consume_from
  150. return self
  151. class TaskProducer(Producer):
  152. app = None
  153. auto_declare = False
  154. retry = False
  155. retry_policy = None
  156. utc = True
  157. event_dispatcher = None
  158. send_sent_event = False
  159. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  160. self.retry = kwargs.pop('retry', self.retry)
  161. self.retry_policy = kwargs.pop('retry_policy',
  162. self.retry_policy or {})
  163. self.send_sent_event = kwargs.pop('send_sent_event',
  164. self.send_sent_event)
  165. exchange = exchange or self.exchange
  166. self.queues = self.app.amqp.queues # shortcut
  167. self.default_queue = self.app.amqp.default_queue
  168. super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
  169. def publish_task(self, task_name, task_args=None, task_kwargs=None,
  170. countdown=None, eta=None, task_id=None, group_id=None,
  171. taskset_id=None, # compat alias to group_id
  172. expires=None, exchange=None, exchange_type=None,
  173. event_dispatcher=None, retry=None, retry_policy=None,
  174. queue=None, now=None, retries=0, chord=None,
  175. callbacks=None, errbacks=None, routing_key=None,
  176. serializer=None, delivery_mode=None, compression=None,
  177. reply_to=None, time_limit=None, soft_time_limit=None,
  178. declare=None, headers=None,
  179. send_before_publish=signals.before_task_publish.send,
  180. before_receivers=signals.before_task_publish.receivers,
  181. send_after_publish=signals.after_task_publish.send,
  182. after_receivers=signals.after_task_publish.receivers,
  183. send_task_sent=signals.task_sent.send, # XXX deprecated
  184. sent_receivers=signals.task_sent.receivers,
  185. **kwargs):
  186. """Send task message."""
  187. retry = self.retry if retry is None else retry
  188. qname = queue
  189. if queue is None and exchange is None:
  190. queue = self.default_queue
  191. if queue is not None:
  192. if isinstance(queue, string_t):
  193. qname, queue = queue, self.queues[queue]
  194. else:
  195. qname = queue.name
  196. exchange = exchange or queue.exchange.name
  197. routing_key = routing_key or queue.routing_key
  198. if declare is None and queue and not isinstance(queue, Broadcast):
  199. declare = [queue]
  200. # merge default and custom policy
  201. retry = self.retry if retry is None else retry
  202. _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
  203. else self.retry_policy)
  204. task_id = task_id or uuid()
  205. task_args = task_args or []
  206. task_kwargs = task_kwargs or {}
  207. if not isinstance(task_args, (list, tuple)):
  208. raise ValueError('task args must be a list or tuple')
  209. if not isinstance(task_kwargs, dict):
  210. raise ValueError('task kwargs must be a dictionary')
  211. if countdown: # Convert countdown to ETA.
  212. now = now or self.app.now()
  213. eta = now + timedelta(seconds=countdown)
  214. if self.utc:
  215. eta = to_utc(eta).astimezone(self.app.timezone)
  216. if isinstance(expires, (int, float)):
  217. now = now or self.app.now()
  218. expires = now + timedelta(seconds=expires)
  219. if self.utc:
  220. expires = to_utc(expires).astimezone(self.app.timezone)
  221. eta = eta and eta.isoformat()
  222. expires = expires and expires.isoformat()
  223. body = {
  224. 'task': task_name,
  225. 'id': task_id,
  226. 'args': task_args,
  227. 'kwargs': task_kwargs,
  228. 'retries': retries or 0,
  229. 'eta': eta,
  230. 'expires': expires,
  231. 'utc': self.utc,
  232. 'callbacks': callbacks,
  233. 'errbacks': errbacks,
  234. 'timelimit': (time_limit, soft_time_limit),
  235. 'taskset': group_id or taskset_id,
  236. 'chord': chord,
  237. }
  238. if before_receivers:
  239. send_before_publish(
  240. sender=task_name, body=body,
  241. exchange=exchange,
  242. routing_key=routing_key,
  243. declare=declare,
  244. headers=headers,
  245. properties=kwargs,
  246. retry_policy=retry_policy,
  247. )
  248. self.publish(
  249. body,
  250. exchange=exchange, routing_key=routing_key,
  251. serializer=serializer or self.serializer,
  252. compression=compression or self.compression,
  253. headers=headers,
  254. retry=retry, retry_policy=_rp,
  255. reply_to=reply_to,
  256. correlation_id=task_id,
  257. delivery_mode=delivery_mode, declare=declare,
  258. **kwargs
  259. )
  260. if after_receivers:
  261. send_after_publish(sender=task_name, body=body,
  262. exchange=exchange, routing_key=routing_key)
  263. if sent_receivers: # XXX deprecated
  264. send_task_sent(sender=task_name, task_id=task_id,
  265. task=task_name, args=task_args,
  266. kwargs=task_kwargs, eta=eta,
  267. taskset=group_id or taskset_id)
  268. if self.send_sent_event:
  269. evd = event_dispatcher or self.event_dispatcher
  270. exname = exchange or self.exchange
  271. if isinstance(exname, Exchange):
  272. exname = exname.name
  273. evd.publish(
  274. 'task-sent',
  275. {
  276. 'uuid': task_id,
  277. 'name': task_name,
  278. 'args': safe_repr(task_args),
  279. 'kwargs': safe_repr(task_kwargs),
  280. 'retries': retries,
  281. 'eta': eta,
  282. 'expires': expires,
  283. 'queue': qname,
  284. 'exchange': exname,
  285. 'routing_key': routing_key,
  286. },
  287. self, retry=retry, retry_policy=retry_policy,
  288. )
  289. return task_id
  290. delay_task = publish_task # XXX Compat
  291. @cached_property
  292. def event_dispatcher(self):
  293. # We call Dispatcher.publish with a custom producer
  294. # so don't need the dispatcher to be "enabled".
  295. return self.app.events.Dispatcher(enabled=False)
  296. class TaskPublisher(TaskProducer):
  297. """Deprecated version of :class:`TaskProducer`."""
  298. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  299. self.app = app_or_default(kwargs.pop('app', self.app))
  300. self.retry = kwargs.pop('retry', self.retry)
  301. self.retry_policy = kwargs.pop('retry_policy',
  302. self.retry_policy or {})
  303. exchange = exchange or self.exchange
  304. if not isinstance(exchange, Exchange):
  305. exchange = Exchange(exchange,
  306. kwargs.pop('exchange_type', 'direct'))
  307. self.queues = self.app.amqp.queues # shortcut
  308. super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
  309. class TaskConsumer(Consumer):
  310. app = None
  311. def __init__(self, channel, queues=None, app=None, accept=None, **kw):
  312. self.app = app or self.app
  313. if accept is None:
  314. accept = self.app.conf.CELERY_ACCEPT_CONTENT
  315. super(TaskConsumer, self).__init__(
  316. channel,
  317. queues or list(self.app.amqp.queues.consume_from.values()),
  318. accept=accept,
  319. **kw
  320. )
  321. class AMQP(object):
  322. Connection = Connection
  323. Consumer = Consumer
  324. #: compat alias to Connection
  325. BrokerConnection = Connection
  326. producer_cls = TaskProducer
  327. consumer_cls = TaskConsumer
  328. #: Cached and prepared routing table.
  329. _rtable = None
  330. #: Underlying producer pool instance automatically
  331. #: set by the :attr:`producer_pool`.
  332. _producer_pool = None
  333. # Exchange class/function used when defining automatic queues.
  334. # E.g. you can use ``autoexchange = lambda n: None`` to use the
  335. # amqp default exchange, which is a shortcut to bypass routing
  336. # and instead send directly to the queue named in the routing key.
  337. autoexchange = None
  338. def __init__(self, app):
  339. self.app = app
  340. def flush_routes(self):
  341. self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
  342. def Queues(self, queues, create_missing=None, ha_policy=None,
  343. autoexchange=None):
  344. """Create new :class:`Queues` instance, using queue defaults
  345. from the current configuration."""
  346. conf = self.app.conf
  347. if create_missing is None:
  348. create_missing = conf.CELERY_CREATE_MISSING_QUEUES
  349. if ha_policy is None:
  350. ha_policy = conf.CELERY_QUEUE_HA_POLICY
  351. if not queues and conf.CELERY_DEFAULT_QUEUE:
  352. queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
  353. exchange=self.default_exchange,
  354. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
  355. autoexchange = (self.autoexchange if autoexchange is None
  356. else autoexchange)
  357. return Queues(
  358. queues, self.default_exchange, create_missing,
  359. ha_policy, autoexchange,
  360. )
  361. def Router(self, queues=None, create_missing=None):
  362. """Return the current task router."""
  363. return _routes.Router(self.routes, queues or self.queues,
  364. self.app.either('CELERY_CREATE_MISSING_QUEUES',
  365. create_missing), app=self.app)
  366. @cached_property
  367. def TaskConsumer(self):
  368. """Return consumer configured to consume from the queues
  369. we are configured for (``app.amqp.queues.consume_from``)."""
  370. return self.app.subclass_with_self(self.consumer_cls,
  371. reverse='amqp.TaskConsumer')
  372. get_task_consumer = TaskConsumer # XXX compat
  373. @cached_property
  374. def TaskProducer(self):
  375. """Return publisher used to send tasks.
  376. You should use `app.send_task` instead.
  377. """
  378. conf = self.app.conf
  379. return self.app.subclass_with_self(
  380. self.producer_cls,
  381. reverse='amqp.TaskProducer',
  382. exchange=self.default_exchange,
  383. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
  384. serializer=conf.CELERY_TASK_SERIALIZER,
  385. compression=conf.CELERY_MESSAGE_COMPRESSION,
  386. retry=conf.CELERY_TASK_PUBLISH_RETRY,
  387. retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
  388. send_sent_event=conf.CELERY_SEND_TASK_SENT_EVENT,
  389. utc=conf.CELERY_ENABLE_UTC,
  390. )
  391. TaskPublisher = TaskProducer # compat
  392. @cached_property
  393. def default_queue(self):
  394. return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
  395. @cached_property
  396. def queues(self):
  397. """Queue name⇒ declaration mapping."""
  398. return self.Queues(self.app.conf.CELERY_QUEUES)
  399. @queues.setter # noqa
  400. def queues(self, queues):
  401. return self.Queues(queues)
  402. @property
  403. def routes(self):
  404. if self._rtable is None:
  405. self.flush_routes()
  406. return self._rtable
  407. @cached_property
  408. def router(self):
  409. return self.Router()
  410. @property
  411. def producer_pool(self):
  412. if self._producer_pool is None:
  413. self._producer_pool = ProducerPool(
  414. self.app.pool,
  415. limit=self.app.pool.limit,
  416. Producer=self.TaskProducer,
  417. )
  418. return self._producer_pool
  419. publisher_pool = producer_pool # compat alias
  420. @cached_property
  421. def default_exchange(self):
  422. return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
  423. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)