amqp.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ~~~~~~~~~~~~~~~
  5. Sending and receiving messages using Kombu.
  6. """
  7. from __future__ import absolute_import
  8. import numbers
  9. from datetime import timedelta
  10. from weakref import WeakValueDictionary
  11. from kombu import Connection, Consumer, Exchange, Producer, Queue
  12. from kombu.common import Broadcast
  13. from kombu.pools import ProducerPool
  14. from kombu.utils import cached_property, uuid
  15. from kombu.utils.encoding import safe_repr
  16. from kombu.utils.functional import maybe_list
  17. from celery import signals
  18. from celery.five import items, string_t
  19. from celery.utils.text import indent as textindent
  20. from celery.utils.timeutils import to_utc
  21. from . import app_or_default
  22. from . import routes as _routes
  23. __all__ = ['AMQP', 'Queues', 'TaskProducer', 'TaskConsumer']
  24. #: Human readable queue declaration.
  25. QUEUE_FORMAT = """
  26. .> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
  27. key={0.routing_key}
  28. """
  29. class Queues(dict):
  30. """Queue name⇒ declaration mapping.
  31. :param queues: Initial list/tuple or dict of queues.
  32. :keyword create_missing: By default any unknown queues will be
  33. added automatically, but if disabled
  34. the occurrence of unknown queues
  35. in `wanted` will raise :exc:`KeyError`.
  36. :keyword ha_policy: Default HA policy for queues with none set.
  37. """
  38. #: If set, this is a subset of queues to consume from.
  39. #: The rest of the queues are then used for routing only.
  40. _consume_from = None
  41. def __init__(self, queues=None, default_exchange=None,
  42. create_missing=True, ha_policy=None, autoexchange=None):
  43. dict.__init__(self)
  44. self.aliases = WeakValueDictionary()
  45. self.default_exchange = default_exchange
  46. self.create_missing = create_missing
  47. self.ha_policy = ha_policy
  48. self.autoexchange = Exchange if autoexchange is None else autoexchange
  49. if isinstance(queues, (tuple, list)):
  50. queues = dict((q.name, q) for q in queues)
  51. for name, q in items(queues or {}):
  52. self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
  53. def __getitem__(self, name):
  54. try:
  55. return self.aliases[name]
  56. except KeyError:
  57. return dict.__getitem__(self, name)
  58. def __setitem__(self, name, queue):
  59. if self.default_exchange and (not queue.exchange or
  60. not queue.exchange.name):
  61. queue.exchange = self.default_exchange
  62. dict.__setitem__(self, name, queue)
  63. if queue.alias:
  64. self.aliases[queue.alias] = queue
  65. def __missing__(self, name):
  66. if self.create_missing:
  67. return self.add(self.new_missing(name))
  68. raise KeyError(name)
  69. def add(self, queue, **kwargs):
  70. """Add new queue.
  71. The first argument can either be a :class:`kombu.Queue` instance,
  72. or the name of a queue. If the former the rest of the keyword
  73. arguments are ignored, and options are simply taken from the queue
  74. instance.
  75. :param queue: :class:`kombu.Queue` instance or name of the queue.
  76. :keyword exchange: (if named) specifies exchange name.
  77. :keyword routing_key: (if named) specifies binding key.
  78. :keyword exchange_type: (if named) specifies type of exchange.
  79. :keyword \*\*options: (if named) Additional declaration options.
  80. """
  81. if not isinstance(queue, Queue):
  82. return self.add_compat(queue, **kwargs)
  83. if self.ha_policy:
  84. if queue.queue_arguments is None:
  85. queue.queue_arguments = {}
  86. self._set_ha_policy(queue.queue_arguments)
  87. self[queue.name] = queue
  88. return queue
  89. def add_compat(self, name, **options):
  90. # docs used to use binding_key as routing key
  91. options.setdefault('routing_key', options.get('binding_key'))
  92. if options['routing_key'] is None:
  93. options['routing_key'] = name
  94. if self.ha_policy is not None:
  95. self._set_ha_policy(options.setdefault('queue_arguments', {}))
  96. q = self[name] = Queue.from_dict(name, **options)
  97. return q
  98. def _set_ha_policy(self, args):
  99. policy = self.ha_policy
  100. if isinstance(policy, (list, tuple)):
  101. return args.update({'x-ha-policy': 'nodes',
  102. 'x-ha-policy-params': list(policy)})
  103. args['x-ha-policy'] = policy
  104. def format(self, indent=0, indent_first=True):
  105. """Format routing table into string for log dumps."""
  106. active = self.consume_from
  107. if not active:
  108. return ''
  109. info = [QUEUE_FORMAT.strip().format(q)
  110. for _, q in sorted(items(active))]
  111. if indent_first:
  112. return textindent('\n'.join(info), indent)
  113. return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
  114. def select_add(self, queue, **kwargs):
  115. """Add new task queue that will be consumed from even when
  116. a subset has been selected using the :option:`-Q` option."""
  117. q = self.add(queue, **kwargs)
  118. if self._consume_from is not None:
  119. self._consume_from[q.name] = q
  120. return q
  121. def select(self, include):
  122. """Sets :attr:`consume_from` by selecting a subset of the
  123. currently defined queues.
  124. :param include: Names of queues to consume from.
  125. Can be iterable or string.
  126. """
  127. if include:
  128. self._consume_from = dict((name, self[name])
  129. for name in maybe_list(include))
  130. select_subset = select # XXX compat
  131. def deselect(self, exclude):
  132. """Deselect queues so that they will not be consumed from.
  133. :param exclude: Names of queues to avoid consuming from.
  134. Can be iterable or string.
  135. """
  136. if exclude:
  137. exclude = maybe_list(exclude)
  138. if self._consume_from is None:
  139. # using selection
  140. return self.select(k for k in self if k not in exclude)
  141. # using all queues
  142. for queue in exclude:
  143. self._consume_from.pop(queue, None)
  144. select_remove = deselect # XXX compat
  145. def new_missing(self, name):
  146. return Queue(name, self.autoexchange(name), name)
  147. @property
  148. def consume_from(self):
  149. if self._consume_from is not None:
  150. return self._consume_from
  151. return self
  152. class TaskProducer(Producer):
  153. app = None
  154. auto_declare = False
  155. retry = False
  156. retry_policy = None
  157. utc = True
  158. event_dispatcher = None
  159. send_sent_event = False
  160. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  161. self.retry = kwargs.pop('retry', self.retry)
  162. self.retry_policy = kwargs.pop('retry_policy',
  163. self.retry_policy or {})
  164. self.send_sent_event = kwargs.pop('send_sent_event',
  165. self.send_sent_event)
  166. exchange = exchange or self.exchange
  167. self.queues = self.app.amqp.queues # shortcut
  168. self.default_queue = self.app.amqp.default_queue
  169. self._default_mode = self.app.conf.CELERY_DEFAULT_DELIVERY_MODE
  170. super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
  171. def publish_task(self, task_name, task_args=None, task_kwargs=None,
  172. countdown=None, eta=None, task_id=None, group_id=None,
  173. taskset_id=None, # compat alias to group_id
  174. expires=None, exchange=None, exchange_type=None,
  175. event_dispatcher=None, retry=None, retry_policy=None,
  176. queue=None, now=None, retries=0, chord=None,
  177. callbacks=None, errbacks=None, routing_key=None,
  178. serializer=None, delivery_mode=None, compression=None,
  179. reply_to=None, time_limit=None, soft_time_limit=None,
  180. declare=None, headers=None,
  181. send_before_publish=signals.before_task_publish.send,
  182. before_receivers=signals.before_task_publish.receivers,
  183. send_after_publish=signals.after_task_publish.send,
  184. after_receivers=signals.after_task_publish.receivers,
  185. send_task_sent=signals.task_sent.send, # XXX deprecated
  186. sent_receivers=signals.task_sent.receivers,
  187. **kwargs):
  188. """Send task message."""
  189. retry = self.retry if retry is None else retry
  190. headers = {} if headers is None else headers
  191. qname = queue
  192. if queue is None and exchange is None:
  193. queue = self.default_queue
  194. if queue is not None:
  195. if isinstance(queue, string_t):
  196. qname, queue = queue, self.queues[queue]
  197. else:
  198. qname = queue.name
  199. exchange = exchange or queue.exchange.name
  200. routing_key = routing_key or queue.routing_key
  201. if declare is None and queue and not isinstance(queue, Broadcast):
  202. declare = [queue]
  203. if delivery_mode is None:
  204. delivery_mode = self._default_mode
  205. # merge default and custom policy
  206. retry = self.retry if retry is None else retry
  207. _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
  208. else self.retry_policy)
  209. task_id = task_id or uuid()
  210. task_args = task_args or []
  211. task_kwargs = task_kwargs or {}
  212. if not isinstance(task_args, (list, tuple)):
  213. raise ValueError('task args must be a list or tuple')
  214. if not isinstance(task_kwargs, dict):
  215. raise ValueError('task kwargs must be a dictionary')
  216. if countdown: # Convert countdown to ETA.
  217. now = now or self.app.now()
  218. eta = now + timedelta(seconds=countdown)
  219. if self.utc:
  220. eta = to_utc(eta).astimezone(self.app.timezone)
  221. if isinstance(expires, numbers.Real):
  222. now = now or self.app.now()
  223. expires = now + timedelta(seconds=expires)
  224. if self.utc:
  225. expires = to_utc(expires).astimezone(self.app.timezone)
  226. eta = eta and eta.isoformat()
  227. expires = expires and expires.isoformat()
  228. body = {
  229. 'task': task_name,
  230. 'id': task_id,
  231. 'args': task_args,
  232. 'kwargs': task_kwargs,
  233. 'retries': retries or 0,
  234. 'eta': eta,
  235. 'expires': expires,
  236. 'utc': self.utc,
  237. 'callbacks': callbacks,
  238. 'errbacks': errbacks,
  239. 'timelimit': (time_limit, soft_time_limit),
  240. 'taskset': group_id or taskset_id,
  241. 'chord': chord,
  242. }
  243. if before_receivers:
  244. send_before_publish(
  245. sender=task_name, body=body,
  246. exchange=exchange,
  247. routing_key=routing_key,
  248. declare=declare,
  249. headers=headers,
  250. properties=kwargs,
  251. retry_policy=retry_policy,
  252. )
  253. self.publish(
  254. body,
  255. exchange=exchange, routing_key=routing_key,
  256. serializer=serializer or self.serializer,
  257. compression=compression or self.compression,
  258. headers=headers,
  259. retry=retry, retry_policy=_rp,
  260. reply_to=reply_to,
  261. correlation_id=task_id,
  262. delivery_mode=delivery_mode, declare=declare,
  263. **kwargs
  264. )
  265. if after_receivers:
  266. send_after_publish(sender=task_name, body=body,
  267. exchange=exchange, routing_key=routing_key)
  268. if sent_receivers: # XXX deprecated
  269. send_task_sent(sender=task_name, task_id=task_id,
  270. task=task_name, args=task_args,
  271. kwargs=task_kwargs, eta=eta,
  272. taskset=group_id or taskset_id)
  273. if self.send_sent_event:
  274. evd = event_dispatcher or self.event_dispatcher
  275. exname = exchange or self.exchange
  276. if isinstance(exname, Exchange):
  277. exname = exname.name
  278. evd.publish(
  279. 'task-sent',
  280. {
  281. 'uuid': task_id,
  282. 'name': task_name,
  283. 'args': safe_repr(task_args),
  284. 'kwargs': safe_repr(task_kwargs),
  285. 'retries': retries,
  286. 'eta': eta,
  287. 'expires': expires,
  288. 'queue': qname,
  289. 'exchange': exname,
  290. 'routing_key': routing_key,
  291. },
  292. self, retry=retry, retry_policy=retry_policy,
  293. )
  294. return task_id
  295. delay_task = publish_task # XXX Compat
  296. @cached_property
  297. def event_dispatcher(self):
  298. # We call Dispatcher.publish with a custom producer
  299. # so don't need the dispatcher to be "enabled".
  300. return self.app.events.Dispatcher(enabled=False)
  301. class TaskPublisher(TaskProducer):
  302. """Deprecated version of :class:`TaskProducer`."""
  303. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  304. self.app = app_or_default(kwargs.pop('app', self.app))
  305. self.retry = kwargs.pop('retry', self.retry)
  306. self.retry_policy = kwargs.pop('retry_policy',
  307. self.retry_policy or {})
  308. exchange = exchange or self.exchange
  309. if not isinstance(exchange, Exchange):
  310. exchange = Exchange(exchange,
  311. kwargs.pop('exchange_type', 'direct'))
  312. self.queues = self.app.amqp.queues # shortcut
  313. super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
  314. class TaskConsumer(Consumer):
  315. app = None
  316. def __init__(self, channel, queues=None, app=None, accept=None, **kw):
  317. self.app = app or self.app
  318. if accept is None:
  319. accept = self.app.conf.CELERY_ACCEPT_CONTENT
  320. super(TaskConsumer, self).__init__(
  321. channel,
  322. queues or list(self.app.amqp.queues.consume_from.values()),
  323. accept=accept,
  324. **kw
  325. )
  326. class AMQP(object):
  327. Connection = Connection
  328. Consumer = Consumer
  329. #: compat alias to Connection
  330. BrokerConnection = Connection
  331. producer_cls = TaskProducer
  332. consumer_cls = TaskConsumer
  333. queues_cls = Queues
  334. #: Cached and prepared routing table.
  335. _rtable = None
  336. #: Underlying producer pool instance automatically
  337. #: set by the :attr:`producer_pool`.
  338. _producer_pool = None
  339. # Exchange class/function used when defining automatic queues.
  340. # E.g. you can use ``autoexchange = lambda n: None`` to use the
  341. # amqp default exchange, which is a shortcut to bypass routing
  342. # and instead send directly to the queue named in the routing key.
  343. autoexchange = None
  344. def __init__(self, app):
  345. self.app = app
  346. def flush_routes(self):
  347. self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
  348. def Queues(self, queues, create_missing=None, ha_policy=None,
  349. autoexchange=None):
  350. """Create new :class:`Queues` instance, using queue defaults
  351. from the current configuration."""
  352. conf = self.app.conf
  353. if create_missing is None:
  354. create_missing = conf.CELERY_CREATE_MISSING_QUEUES
  355. if ha_policy is None:
  356. ha_policy = conf.CELERY_QUEUE_HA_POLICY
  357. if not queues and conf.CELERY_DEFAULT_QUEUE:
  358. queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
  359. exchange=self.default_exchange,
  360. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
  361. autoexchange = (self.autoexchange if autoexchange is None
  362. else autoexchange)
  363. return self.queues_cls(
  364. queues, self.default_exchange, create_missing,
  365. ha_policy, autoexchange,
  366. )
  367. def Router(self, queues=None, create_missing=None):
  368. """Return the current task router."""
  369. return _routes.Router(self.routes, queues or self.queues,
  370. self.app.either('CELERY_CREATE_MISSING_QUEUES',
  371. create_missing), app=self.app)
  372. @cached_property
  373. def TaskConsumer(self):
  374. """Return consumer configured to consume from the queues
  375. we are configured for (``app.amqp.queues.consume_from``)."""
  376. return self.app.subclass_with_self(self.consumer_cls,
  377. reverse='amqp.TaskConsumer')
  378. get_task_consumer = TaskConsumer # XXX compat
  379. @cached_property
  380. def TaskProducer(self):
  381. """Return publisher used to send tasks.
  382. You should use `app.send_task` instead.
  383. """
  384. conf = self.app.conf
  385. return self.app.subclass_with_self(
  386. self.producer_cls,
  387. reverse='amqp.TaskProducer',
  388. exchange=self.default_exchange,
  389. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
  390. serializer=conf.CELERY_TASK_SERIALIZER,
  391. compression=conf.CELERY_MESSAGE_COMPRESSION,
  392. retry=conf.CELERY_TASK_PUBLISH_RETRY,
  393. retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
  394. send_sent_event=conf.CELERY_SEND_TASK_SENT_EVENT,
  395. utc=conf.CELERY_ENABLE_UTC,
  396. )
  397. TaskPublisher = TaskProducer # compat
  398. @cached_property
  399. def default_queue(self):
  400. return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
  401. @cached_property
  402. def queues(self):
  403. """Queue name⇒ declaration mapping."""
  404. return self.Queues(self.app.conf.CELERY_QUEUES)
  405. @queues.setter # noqa
  406. def queues(self, queues):
  407. return self.Queues(queues)
  408. @property
  409. def routes(self):
  410. if self._rtable is None:
  411. self.flush_routes()
  412. return self._rtable
  413. @cached_property
  414. def router(self):
  415. return self.Router()
  416. @property
  417. def producer_pool(self):
  418. if self._producer_pool is None:
  419. self._producer_pool = ProducerPool(
  420. self.app.pool,
  421. limit=self.app.pool.limit,
  422. Producer=self.TaskProducer,
  423. )
  424. return self._producer_pool
  425. publisher_pool = producer_pool # compat alias
  426. @cached_property
  427. def default_exchange(self):
  428. return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
  429. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)