amqp.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ~~~~~~~~~~~~~~~
  5. Sending and receiving messages using Kombu.
  6. """
  7. from __future__ import absolute_import
  8. from datetime import timedelta
  9. from weakref import WeakValueDictionary
  10. from kombu import Connection, Consumer, Exchange, Producer, Queue
  11. from kombu.common import entry_to_queue
  12. from kombu.pools import ProducerPool
  13. from kombu.utils.encoding import safe_repr
  14. from celery import signals
  15. from celery.utils import cached_property, uuid
  16. from celery.utils.text import indent as textindent
  17. from . import app_or_default
  18. from . import routes as _routes
  19. #: Human readable queue declaration.
  20. QUEUE_FORMAT = """
  21. . %(name)s exchange:%(exchange)s(%(exchange_type)s) binding:%(routing_key)s
  22. """
  23. class Queues(dict):
  24. """Queue name⇒ declaration mapping.
  25. :param queues: Initial list/tuple or dict of queues.
  26. :keyword create_missing: By default any unknown queues will be
  27. added automatically, but if disabled
  28. the occurrence of unknown queues
  29. in `wanted` will raise :exc:`KeyError`.
  30. """
  31. #: If set, this is a subset of queues to consume from.
  32. #: The rest of the queues are then used for routing only.
  33. _consume_from = None
  34. def __init__(self, queues=None, default_exchange=None,
  35. create_missing=True):
  36. dict.__init__(self)
  37. self.aliases = WeakValueDictionary()
  38. self.default_exchange = default_exchange
  39. self.create_missing = create_missing
  40. if isinstance(queues, (tuple, list)):
  41. queues = dict((q.name, q) for q in queues)
  42. for name, q in (queues or {}).iteritems():
  43. self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
  44. def __getitem__(self, name):
  45. try:
  46. return self.aliases[name]
  47. except KeyError:
  48. return dict.__getitem__(self, name)
  49. def __setitem__(self, name, queue):
  50. if self.default_exchange and (not queue.exchange or
  51. not queue.exchange.name):
  52. queue.exchange = self.default_exchange
  53. dict.__setitem__(self, name, queue)
  54. if queue.alias:
  55. self.aliases[queue.alias] = queue
  56. def __missing__(self, name):
  57. if self.create_missing:
  58. return self.add(self.new_missing(name))
  59. raise KeyError(name)
  60. def add(self, queue, **kwargs):
  61. """Add new queue.
  62. :param queue: Name of the queue.
  63. :keyword exchange: Name of the exchange.
  64. :keyword routing_key: Binding key.
  65. :keyword exchange_type: Type of exchange.
  66. :keyword \*\*options: Additional declaration options.
  67. """
  68. if not isinstance(queue, Queue):
  69. return self.add_compat(queue, **kwargs)
  70. self[queue.name] = queue
  71. return queue
  72. def add_compat(self, name, **options):
  73. # docs used to use binding_key as routing key
  74. options.setdefault('routing_key', options.get('binding_key'))
  75. if options['routing_key'] is None:
  76. options['routing_key'] = name
  77. q = self[name] = entry_to_queue(name, **options)
  78. return q
  79. def format(self, indent=0, indent_first=True):
  80. """Format routing table into string for log dumps."""
  81. active = self.consume_from
  82. if not active:
  83. return ''
  84. info = [QUEUE_FORMAT.strip() % {
  85. 'name': (name + ':').ljust(12),
  86. 'exchange': q.exchange.name,
  87. 'exchange_type': q.exchange.type,
  88. 'routing_key': q.routing_key}
  89. for name, q in sorted(active.iteritems())]
  90. if indent_first:
  91. return textindent('\n'.join(info), indent)
  92. return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
  93. def select_add(self, queue, **kwargs):
  94. """Add new task queue that will be consumed from even when
  95. a subset has been selected using the :option:`-Q` option."""
  96. q = self.add(queue, **kwargs)
  97. if self._consume_from is not None:
  98. self._consume_from[q.name] = q
  99. return q
  100. def select_subset(self, wanted):
  101. """Sets :attr:`consume_from` by selecting a subset of the
  102. currently defined queues.
  103. :param wanted: List of wanted queue names.
  104. """
  105. if wanted:
  106. self._consume_from = dict((name, self[name]) for name in wanted)
  107. def select_remove(self, queue):
  108. if self._consume_from is None:
  109. self.select_subset(k for k in self.keys() if k != queue)
  110. else:
  111. self._consume_from.pop(queue, None)
  112. def new_missing(self, name):
  113. return Queue(name, Exchange(name), name)
  114. @property
  115. def consume_from(self):
  116. if self._consume_from is not None:
  117. return self._consume_from
  118. return self
  119. class TaskProducer(Producer):
  120. app = None
  121. auto_declare = False
  122. retry = False
  123. retry_policy = None
  124. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  125. self.retry = kwargs.pop('retry', self.retry)
  126. self.retry_policy = kwargs.pop('retry_policy',
  127. self.retry_policy or {})
  128. exchange = exchange or self.exchange
  129. self.queues = self.app.amqp.queues # shortcut
  130. super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
  131. def publish_task(self, task_name, task_args=None, task_kwargs=None,
  132. countdown=None, eta=None, task_id=None, group_id=None,
  133. taskset_id=None, # compat alias to group_id
  134. expires=None, exchange=None, exchange_type=None,
  135. event_dispatcher=None, retry=None, retry_policy=None,
  136. queue=None, now=None, retries=0, chord=None, callbacks=None,
  137. errbacks=None, mandatory=None, priority=None, immediate=None,
  138. routing_key=None, serializer=None, delivery_mode=None,
  139. compression=None, timeout=None, soft_timeout=None,
  140. **kwargs):
  141. """Send task message."""
  142. # merge default and custom policy
  143. retry = self.retry if retry is None else retry
  144. _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
  145. else self.retry_policy)
  146. task_id = task_id or uuid()
  147. task_args = task_args or []
  148. task_kwargs = task_kwargs or {}
  149. if not isinstance(task_args, (list, tuple)):
  150. raise ValueError('task args must be a list or tuple')
  151. if not isinstance(task_kwargs, dict):
  152. raise ValueError('task kwargs must be a dictionary')
  153. if countdown: # Convert countdown to ETA.
  154. now = now or self.app.now()
  155. eta = now + timedelta(seconds=countdown)
  156. if isinstance(expires, (int, float)):
  157. now = now or self.app.now()
  158. expires = now + timedelta(seconds=expires)
  159. eta = eta and eta.isoformat()
  160. expires = expires and expires.isoformat()
  161. timeouts = (timeout, soft_timeout)
  162. body = {'task': task_name,
  163. 'id': task_id,
  164. 'args': task_args,
  165. 'kwargs': task_kwargs,
  166. 'retries': retries or 0,
  167. 'eta': eta,
  168. 'expires': expires,
  169. 'utc': self.utc,
  170. 'callbacks': callbacks,
  171. 'errbacks': errbacks,
  172. 'timeouts': timeouts}
  173. group_id = group_id or taskset_id
  174. if group_id:
  175. body['taskset'] = group_id
  176. if chord:
  177. body['chord'] = chord
  178. self.publish(body, exchange=exchange, mandatory=mandatory,
  179. immediate=immediate, routing_key=routing_key,
  180. serializer=serializer or self.serializer,
  181. compression=compression or self.compression,
  182. retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
  183. priority=priority, declare=[self.queues[queue]] if queue else [],
  184. **kwargs)
  185. signals.task_sent.send(sender=task_name, **body)
  186. if event_dispatcher:
  187. exname = exchange or self.exchange
  188. if isinstance(exname, Exchange):
  189. exname = exname.name
  190. event_dispatcher.send('task-sent', uuid=task_id,
  191. name=task_name,
  192. args=safe_repr(task_args),
  193. kwargs=safe_repr(task_kwargs),
  194. retries=retries,
  195. eta=eta,
  196. expires=expires,
  197. queue=queue,
  198. exchange=exname,
  199. routing_key=routing_key)
  200. return task_id
  201. delay_task = publish_task # XXX Compat
  202. class TaskPublisher(TaskProducer):
  203. """Deprecated version of :class:`TaskProducer`."""
  204. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  205. self.app = app_or_default(kwargs.pop('app', self.app))
  206. self.retry = kwargs.pop('retry', self.retry)
  207. self.retry_policy = kwargs.pop('retry_policy',
  208. self.retry_policy or {})
  209. exchange = exchange or self.exchange
  210. if not isinstance(exchange, Exchange):
  211. exchange = Exchange(exchange,
  212. kwargs.pop('exchange_type', 'direct'))
  213. self.queues = self.app.amqp.queues # shortcut
  214. super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
  215. class TaskConsumer(Consumer):
  216. app = None
  217. def __init__(self, channel, queues=None, app=None, **kw):
  218. self.app = app or self.app
  219. super(TaskConsumer, self).__init__(channel,
  220. queues or self.app.amqp.queues.consume_from.values(), **kw)
  221. class AMQP(object):
  222. Connection = Connection
  223. Consumer = Consumer
  224. #: compat alias to Connection
  225. BrokerConnection = Connection
  226. #: Cached and prepared routing table.
  227. _rtable = None
  228. def __init__(self, app):
  229. self.app = app
  230. def flush_routes(self):
  231. self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
  232. def Queues(self, queues, create_missing=None):
  233. """Create new :class:`Queues` instance, using queue defaults
  234. from the current configuration."""
  235. conf = self.app.conf
  236. if create_missing is None:
  237. create_missing = conf.CELERY_CREATE_MISSING_QUEUES
  238. if not queues and conf.CELERY_DEFAULT_QUEUE:
  239. queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
  240. exchange=self.default_exchange,
  241. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
  242. return Queues(queues, self.default_exchange, create_missing)
  243. def Router(self, queues=None, create_missing=None):
  244. """Returns the current task router."""
  245. return _routes.Router(self.routes, queues or self.queues,
  246. self.app.either('CELERY_CREATE_MISSING_QUEUES',
  247. create_missing), app=self.app)
  248. @cached_property
  249. def TaskConsumer(self):
  250. """Return consumer configured to consume from the queues
  251. we are configured for (``app.amqp.queues.consume_from``)."""
  252. return self.app.subclass_with_self(TaskConsumer,
  253. reverse='amqp.TaskConsumer')
  254. get_task_consumer = TaskConsumer # XXX compat
  255. @cached_property
  256. def TaskProducer(self):
  257. """Returns publisher used to send tasks.
  258. You should use `app.send_task` instead.
  259. """
  260. conf = self.app.conf
  261. return self.app.subclass_with_self(TaskProducer,
  262. reverse='amqp.TaskProducer',
  263. exchange=self.default_exchange,
  264. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
  265. serializer=conf.CELERY_TASK_SERIALIZER,
  266. compression=conf.CELERY_MESSAGE_COMPRESSION,
  267. retry=conf.CELERY_TASK_PUBLISH_RETRY,
  268. retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
  269. utc=conf.CELERY_ENABLE_UTC)
  270. TaskPublisher = TaskProducer # compat
  271. @cached_property
  272. def default_queue(self):
  273. return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
  274. @cached_property
  275. def queues(self):
  276. """Queue name⇒ declaration mapping."""
  277. return self.Queues(self.app.conf.CELERY_QUEUES)
  278. @queues.setter # noqa
  279. def queues(self, queues):
  280. return self.Queues(queues)
  281. @property
  282. def routes(self):
  283. if self._rtable is None:
  284. self.flush_routes()
  285. return self._rtable
  286. @cached_property
  287. def router(self):
  288. return self.Router()
  289. @cached_property
  290. def producer_pool(self):
  291. return ProducerPool(self.app.pool, limit=self.app.pool.limit,
  292. Producer=self.TaskProducer)
  293. publisher_pool = producer_pool # compat alias
  294. @cached_property
  295. def default_exchange(self):
  296. return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
  297. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)