amqp.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ~~~~~~~~~~~~~~~
  5. Sending and receiving messages using Kombu.
  6. """
  7. from __future__ import absolute_import
  8. from datetime import timedelta
  9. from weakref import WeakValueDictionary
  10. from kombu import Connection, Consumer, Exchange, Producer, Queue
  11. from kombu.common import entry_to_queue
  12. from kombu.pools import ProducerPool
  13. from kombu.utils import cached_property, uuid
  14. from kombu.utils.encoding import safe_repr
  15. from celery import signals
  16. from celery.five import items, string_t
  17. from celery.utils.text import indent as textindent
  18. from . import app_or_default
  19. from . import routes as _routes
  20. #: Human readable queue declaration.
  21. QUEUE_FORMAT = """
  22. .> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
  23. key={0.routing_key}
  24. """
  25. class Queues(dict):
  26. """Queue name⇒ declaration mapping.
  27. :param queues: Initial list/tuple or dict of queues.
  28. :keyword create_missing: By default any unknown queues will be
  29. added automatically, but if disabled
  30. the occurrence of unknown queues
  31. in `wanted` will raise :exc:`KeyError`.
  32. :keyword ha_policy: Default HA policy for queues with none set.
  33. """
  34. #: If set, this is a subset of queues to consume from.
  35. #: The rest of the queues are then used for routing only.
  36. _consume_from = None
  37. def __init__(self, queues=None, default_exchange=None,
  38. create_missing=True, ha_policy=None):
  39. dict.__init__(self)
  40. self.aliases = WeakValueDictionary()
  41. self.default_exchange = default_exchange
  42. self.create_missing = create_missing
  43. self.ha_policy = ha_policy
  44. if isinstance(queues, (tuple, list)):
  45. queues = dict((q.name, q) for q in queues)
  46. for name, q in items(queues or {}):
  47. self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
  48. def __getitem__(self, name):
  49. try:
  50. return self.aliases[name]
  51. except KeyError:
  52. return dict.__getitem__(self, name)
  53. def __setitem__(self, name, queue):
  54. if self.default_exchange and (not queue.exchange or
  55. not queue.exchange.name):
  56. queue.exchange = self.default_exchange
  57. dict.__setitem__(self, name, queue)
  58. if queue.alias:
  59. self.aliases[queue.alias] = queue
  60. def __missing__(self, name):
  61. if self.create_missing:
  62. return self.add(self.new_missing(name))
  63. raise KeyError(name)
  64. def add(self, queue, **kwargs):
  65. """Add new queue.
  66. The first argument can either be a :class:`kombu.Queue` instance,
  67. or the name of a queue. If the former the rest of the keyword
  68. arguments are ignored, and options are simply taken from the queue
  69. instance.
  70. :param queue: :class:`kombu.Queue` instance or name of the queue.
  71. :keyword exchange: (if named) specifies exchange name.
  72. :keyword routing_key: (if named) specifies binding key.
  73. :keyword exchange_type: (if named) specifies type of exchange.
  74. :keyword \*\*options: (if named) Additional declaration options.
  75. """
  76. if not isinstance(queue, Queue):
  77. return self.add_compat(queue, **kwargs)
  78. if self.ha_policy:
  79. if queue.queue_arguments is None:
  80. queue.queue_arguments = {}
  81. self._set_ha_policy(queue.queue_arguments)
  82. self[queue.name] = queue
  83. return queue
  84. def add_compat(self, name, **options):
  85. # docs used to use binding_key as routing key
  86. options.setdefault('routing_key', options.get('binding_key'))
  87. if options['routing_key'] is None:
  88. options['routing_key'] = name
  89. if self.ha_policy is not None:
  90. self._set_ha_policy(options.setdefault('queue_arguments', {}))
  91. q = self[name] = entry_to_queue(name, **options)
  92. return q
  93. def _set_ha_policy(self, args):
  94. policy = self.ha_policy
  95. if isinstance(policy, (list, tuple)):
  96. return args.update({'x-ha-policy': 'nodes',
  97. 'x-ha-policy-params': list(policy)})
  98. args['x-ha-policy'] = policy
  99. def format(self, indent=0, indent_first=True):
  100. """Format routing table into string for log dumps."""
  101. active = self.consume_from
  102. if not active:
  103. return ''
  104. info = [QUEUE_FORMAT.strip().format(q)
  105. for _, q in sorted(items(active))]
  106. if indent_first:
  107. return textindent('\n'.join(info), indent)
  108. return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
  109. def select_add(self, queue, **kwargs):
  110. """Add new task queue that will be consumed from even when
  111. a subset has been selected using the :option:`-Q` option."""
  112. q = self.add(queue, **kwargs)
  113. if self._consume_from is not None:
  114. self._consume_from[q.name] = q
  115. return q
  116. def select_subset(self, wanted):
  117. """Sets :attr:`consume_from` by selecting a subset of the
  118. currently defined queues.
  119. :param wanted: List of wanted queue names.
  120. """
  121. if wanted:
  122. self._consume_from = dict((name, self[name]) for name in wanted)
  123. def select_remove(self, queue):
  124. if self._consume_from is None:
  125. self.select_subset(k for k in self if k != queue)
  126. else:
  127. self._consume_from.pop(queue, None)
  128. def new_missing(self, name):
  129. return Queue(name, Exchange(name), name)
  130. @property
  131. def consume_from(self):
  132. if self._consume_from is not None:
  133. return self._consume_from
  134. return self
  135. class TaskProducer(Producer):
  136. app = None
  137. auto_declare = False
  138. retry = False
  139. retry_policy = None
  140. utc = True
  141. event_dispatcher = None
  142. send_sent_event = False
  143. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  144. self.retry = kwargs.pop('retry', self.retry)
  145. self.retry_policy = kwargs.pop('retry_policy',
  146. self.retry_policy or {})
  147. self.send_sent_event = kwargs.pop('send_sent_event',
  148. self.send_sent_event)
  149. exchange = exchange or self.exchange
  150. self.queues = self.app.amqp.queues # shortcut
  151. self.default_queue = self.app.amqp.default_queue
  152. super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
  153. def publish_task(self, task_name, task_args=None, task_kwargs=None,
  154. countdown=None, eta=None, task_id=None, group_id=None,
  155. taskset_id=None, # compat alias to group_id
  156. expires=None, exchange=None, exchange_type=None,
  157. event_dispatcher=None, retry=None, retry_policy=None,
  158. queue=None, now=None, retries=0, chord=None,
  159. callbacks=None, errbacks=None, routing_key=None,
  160. serializer=None, delivery_mode=None, compression=None,
  161. reply_to=None, timeout=None, soft_timeout=None,
  162. timeouts=None, declare=None, **kwargs):
  163. """Send task message."""
  164. retry = self.retry if retry is None else retry
  165. qname = queue
  166. if queue is None and exchange is None:
  167. queue = self.default_queue
  168. if queue is not None:
  169. if isinstance(queue, string_t):
  170. qname, queue = queue, self.queues[queue]
  171. else:
  172. qname = queue.name
  173. exchange = exchange or queue.exchange.name
  174. routing_key = routing_key or queue.routing_key
  175. declare = declare or ([queue] if queue else [])
  176. # merge default and custom policy
  177. retry = self.retry if retry is None else retry
  178. _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
  179. else self.retry_policy)
  180. task_id = task_id or uuid()
  181. task_args = task_args or []
  182. task_kwargs = task_kwargs or {}
  183. if not isinstance(task_args, (list, tuple)):
  184. raise ValueError('task args must be a list or tuple')
  185. if not isinstance(task_kwargs, dict):
  186. raise ValueError('task kwargs must be a dictionary')
  187. if countdown: # Convert countdown to ETA.
  188. now = now or self.app.now()
  189. eta = now + timedelta(seconds=countdown)
  190. if isinstance(expires, (int, float)):
  191. now = now or self.app.now()
  192. expires = now + timedelta(seconds=expires)
  193. eta = eta and eta.isoformat()
  194. expires = expires and expires.isoformat()
  195. body = {
  196. 'task': task_name,
  197. 'id': task_id,
  198. 'args': task_args,
  199. 'kwargs': task_kwargs,
  200. 'retries': retries or 0,
  201. 'eta': eta,
  202. 'expires': expires,
  203. 'utc': self.utc,
  204. 'callbacks': callbacks,
  205. 'errbacks': errbacks,
  206. 'reply_to': reply_to,
  207. 'timeouts': timeouts or (timeout, soft_timeout),
  208. 'taskset': group_id or taskset_id,
  209. 'chord': chord,
  210. }
  211. self.publish(
  212. body,
  213. exchange=exchange, routing_key=routing_key,
  214. serializer=serializer or self.serializer,
  215. compression=compression or self.compression,
  216. retry=retry, retry_policy=_rp,
  217. delivery_mode=delivery_mode, declare=declare,
  218. **kwargs
  219. )
  220. signals.task_sent.send(sender=task_name, **body)
  221. if self.send_sent_event:
  222. evd = event_dispatcher or self.event_dispatcher
  223. exname = exchange or self.exchange
  224. if isinstance(exname, Exchange):
  225. exname = exname.name
  226. evd.publish(
  227. 'task-sent',
  228. {
  229. 'uuid': task_id,
  230. 'name': task_name,
  231. 'args': safe_repr(task_args),
  232. 'kwargs': safe_repr(task_kwargs),
  233. 'retries': retries,
  234. 'eta': eta,
  235. 'expires': expires,
  236. 'queue': qname,
  237. 'exchange': exname,
  238. 'routing_key': routing_key,
  239. },
  240. self, retry=retry, retry_policy=retry_policy,
  241. )
  242. return task_id
  243. delay_task = publish_task # XXX Compat
  244. @cached_property
  245. def event_dispatcher(self):
  246. # We call Dispatcher.publish with a custom producer
  247. # so don't need the dispatcher to be "enabled".
  248. return self.app.events.Dispatcher(enabled=False)
  249. class TaskPublisher(TaskProducer):
  250. """Deprecated version of :class:`TaskProducer`."""
  251. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  252. self.app = app_or_default(kwargs.pop('app', self.app))
  253. self.retry = kwargs.pop('retry', self.retry)
  254. self.retry_policy = kwargs.pop('retry_policy',
  255. self.retry_policy or {})
  256. exchange = exchange or self.exchange
  257. if not isinstance(exchange, Exchange):
  258. exchange = Exchange(exchange,
  259. kwargs.pop('exchange_type', 'direct'))
  260. self.queues = self.app.amqp.queues # shortcut
  261. super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
  262. class TaskConsumer(Consumer):
  263. app = None
  264. def __init__(self, channel, queues=None, app=None, accept=None, **kw):
  265. self.app = app or self.app
  266. if accept is None:
  267. accept = self.app.conf.CELERY_ACCEPT_CONTENT
  268. super(TaskConsumer, self).__init__(
  269. channel,
  270. queues or list(self.app.amqp.queues.consume_from.values()),
  271. accept=accept,
  272. **kw
  273. )
  274. class AMQP(object):
  275. Connection = Connection
  276. Consumer = Consumer
  277. #: compat alias to Connection
  278. BrokerConnection = Connection
  279. producer_cls = TaskProducer
  280. consumer_cls = TaskConsumer
  281. #: Cached and prepared routing table.
  282. _rtable = None
  283. #: Underlying producer pool instance automatically
  284. #: set by the :attr:`producer_pool`.
  285. _producer_pool = None
  286. def __init__(self, app):
  287. self.app = app
  288. def flush_routes(self):
  289. self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
  290. def Queues(self, queues, create_missing=None, ha_policy=None):
  291. """Create new :class:`Queues` instance, using queue defaults
  292. from the current configuration."""
  293. conf = self.app.conf
  294. if create_missing is None:
  295. create_missing = conf.CELERY_CREATE_MISSING_QUEUES
  296. if ha_policy is None:
  297. ha_policy = conf.CELERY_QUEUE_HA_POLICY
  298. if not queues and conf.CELERY_DEFAULT_QUEUE:
  299. queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
  300. exchange=self.default_exchange,
  301. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
  302. return Queues(queues, self.default_exchange, create_missing, ha_policy)
  303. def Router(self, queues=None, create_missing=None):
  304. """Returns the current task router."""
  305. return _routes.Router(self.routes, queues or self.queues,
  306. self.app.either('CELERY_CREATE_MISSING_QUEUES',
  307. create_missing), app=self.app)
  308. @cached_property
  309. def TaskConsumer(self):
  310. """Return consumer configured to consume from the queues
  311. we are configured for (``app.amqp.queues.consume_from``)."""
  312. return self.app.subclass_with_self(self.consumer_cls,
  313. reverse='amqp.TaskConsumer')
  314. get_task_consumer = TaskConsumer # XXX compat
  315. @cached_property
  316. def TaskProducer(self):
  317. """Returns publisher used to send tasks.
  318. You should use `app.send_task` instead.
  319. """
  320. conf = self.app.conf
  321. return self.app.subclass_with_self(
  322. self.producer_cls,
  323. reverse='amqp.TaskProducer',
  324. exchange=self.default_exchange,
  325. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
  326. serializer=conf.CELERY_TASK_SERIALIZER,
  327. compression=conf.CELERY_MESSAGE_COMPRESSION,
  328. retry=conf.CELERY_TASK_PUBLISH_RETRY,
  329. retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
  330. send_sent_event=conf.CELERY_SEND_TASK_SENT_EVENT,
  331. utc=conf.CELERY_ENABLE_UTC,
  332. )
  333. TaskPublisher = TaskProducer # compat
  334. @cached_property
  335. def default_queue(self):
  336. return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
  337. @cached_property
  338. def queues(self):
  339. """Queue name⇒ declaration mapping."""
  340. return self.Queues(self.app.conf.CELERY_QUEUES)
  341. @queues.setter # noqa
  342. def queues(self, queues):
  343. return self.Queues(queues)
  344. @property
  345. def routes(self):
  346. if self._rtable is None:
  347. self.flush_routes()
  348. return self._rtable
  349. @cached_property
  350. def router(self):
  351. return self.Router()
  352. @property
  353. def producer_pool(self):
  354. if self._producer_pool is None:
  355. self._producer_pool = ProducerPool(
  356. self.app.pool,
  357. limit=self.app.pool.limit,
  358. Producer=self.TaskProducer,
  359. )
  360. return self._producer_pool
  361. publisher_pool = producer_pool # compat alias
  362. @cached_property
  363. def default_exchange(self):
  364. return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
  365. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)