amqp.py 14 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ~~~~~~~~~~~~~~~
  5. Sending and receiving messages using Kombu.
  6. """
  7. from __future__ import absolute_import
  8. from datetime import timedelta
  9. from weakref import WeakValueDictionary
  10. from kombu import Connection, Consumer, Exchange, Producer, Queue
  11. from kombu.common import entry_to_queue
  12. from kombu.pools import ProducerPool
  13. from kombu.utils import cached_property, uuid
  14. from kombu.utils.encoding import safe_repr
  15. from celery import signals
  16. from celery.five import items
  17. from celery.utils.text import indent as textindent
  18. from . import app_or_default
  19. from . import routes as _routes
  20. #: Human readable queue declaration.
  21. QUEUE_FORMAT = """
  22. . {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
  23. key={0.routing_key}
  24. """
  25. class Queues(dict):
  26. """Queue name⇒ declaration mapping.
  27. :param queues: Initial list/tuple or dict of queues.
  28. :keyword create_missing: By default any unknown queues will be
  29. added automatically, but if disabled
  30. the occurrence of unknown queues
  31. in `wanted` will raise :exc:`KeyError`.
  32. :keyword ha_policy: Default HA policy for queues with none set.
  33. """
  34. #: If set, this is a subset of queues to consume from.
  35. #: The rest of the queues are then used for routing only.
  36. _consume_from = None
  37. def __init__(self, queues=None, default_exchange=None,
  38. create_missing=True, ha_policy=None):
  39. dict.__init__(self)
  40. self.aliases = WeakValueDictionary()
  41. self.default_exchange = default_exchange
  42. self.create_missing = create_missing
  43. self.ha_policy = ha_policy
  44. if isinstance(queues, (tuple, list)):
  45. queues = dict((q.name, q) for q in queues)
  46. for name, q in items(queues or {}):
  47. self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
  48. def __getitem__(self, name):
  49. try:
  50. return self.aliases[name]
  51. except KeyError:
  52. return dict.__getitem__(self, name)
  53. def __setitem__(self, name, queue):
  54. if self.default_exchange and (not queue.exchange or
  55. not queue.exchange.name):
  56. queue.exchange = self.default_exchange
  57. dict.__setitem__(self, name, queue)
  58. if queue.alias:
  59. self.aliases[queue.alias] = queue
  60. def __missing__(self, name):
  61. if self.create_missing:
  62. return self.add(self.new_missing(name))
  63. raise KeyError(name)
  64. def add(self, queue, **kwargs):
  65. """Add new queue.
  66. :param queue: Name of the queue.
  67. :keyword exchange: Name of the exchange.
  68. :keyword routing_key: Binding key.
  69. :keyword exchange_type: Type of exchange.
  70. :keyword \*\*options: Additional declaration options.
  71. """
  72. if not isinstance(queue, Queue):
  73. return self.add_compat(queue, **kwargs)
  74. if self.ha_policy:
  75. if queue.queue_arguments is None:
  76. queue.queue_arguments = {}
  77. self._set_ha_policy(queue.queue_arguments)
  78. self[queue.name] = queue
  79. return queue
  80. def add_compat(self, name, **options):
  81. # docs used to use binding_key as routing key
  82. options.setdefault('routing_key', options.get('binding_key'))
  83. if options['routing_key'] is None:
  84. options['routing_key'] = name
  85. if self.ha_policy is not None:
  86. self._set_ha_policy(options.setdefault('queue_arguments', {}))
  87. q = self[name] = entry_to_queue(name, **options)
  88. return q
  89. def _set_ha_policy(self, args):
  90. policy = self.ha_policy
  91. if isinstance(policy, (list, tuple)):
  92. return args.update({'x-ha-policy': 'nodes',
  93. 'x-ha-policy-params': list(policy)})
  94. args['x-ha-policy'] = policy
  95. def format(self, indent=0, indent_first=True):
  96. """Format routing table into string for log dumps."""
  97. active = self.consume_from
  98. if not active:
  99. return ''
  100. info = [QUEUE_FORMAT.strip().format(q)
  101. for _, q in sorted(items(active))]
  102. if indent_first:
  103. return textindent('\n'.join(info), indent)
  104. return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
  105. def select_add(self, queue, **kwargs):
  106. """Add new task queue that will be consumed from even when
  107. a subset has been selected using the :option:`-Q` option."""
  108. q = self.add(queue, **kwargs)
  109. if self._consume_from is not None:
  110. self._consume_from[q.name] = q
  111. return q
  112. def select_subset(self, wanted):
  113. """Sets :attr:`consume_from` by selecting a subset of the
  114. currently defined queues.
  115. :param wanted: List of wanted queue names.
  116. """
  117. if wanted:
  118. self._consume_from = dict((name, self[name]) for name in wanted)
  119. def select_remove(self, queue):
  120. if self._consume_from is None:
  121. self.select_subset(k for k in self if k != queue)
  122. else:
  123. self._consume_from.pop(queue, None)
  124. def new_missing(self, name):
  125. return Queue(name, Exchange(name), name)
  126. @property
  127. def consume_from(self):
  128. if self._consume_from is not None:
  129. return self._consume_from
  130. return self
  131. class TaskProducer(Producer):
  132. app = None
  133. auto_declare = False
  134. retry = False
  135. retry_policy = None
  136. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  137. self.retry = kwargs.pop('retry', self.retry)
  138. self.retry_policy = kwargs.pop('retry_policy',
  139. self.retry_policy or {})
  140. exchange = exchange or self.exchange
  141. self.queues = self.app.amqp.queues # shortcut
  142. super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
  143. def publish_task(self, task_name, task_args=None, task_kwargs=None,
  144. countdown=None, eta=None, task_id=None, group_id=None,
  145. taskset_id=None, # compat alias to group_id
  146. expires=None, exchange=None, exchange_type=None,
  147. event_dispatcher=None, retry=None, retry_policy=None,
  148. queue=None, now=None, retries=0, chord=None, callbacks=None,
  149. errbacks=None, routing_key=None, serializer=None,
  150. delivery_mode=None, compression=None, reply_to=None,
  151. timeout=None, soft_timeout=None, timeouts=None,
  152. declare=None, **kwargs):
  153. """Send task message."""
  154. retry = self.retry if retry is None else retry
  155. declare = declare or []
  156. qname = queue
  157. if queue is not None:
  158. if isinstance(queue, basestring):
  159. qname, queue = queue, self.queues[queue]
  160. else:
  161. qname = queue.name
  162. exchange = exchange or queue.exchange.name
  163. routing_key = routing_key or queue.routing_key
  164. # merge default and custom policy
  165. retry = self.retry if retry is None else retry
  166. _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
  167. else self.retry_policy)
  168. task_id = task_id or uuid()
  169. task_args = task_args or []
  170. task_kwargs = task_kwargs or {}
  171. if not isinstance(task_args, (list, tuple)):
  172. raise ValueError('task args must be a list or tuple')
  173. if not isinstance(task_kwargs, dict):
  174. raise ValueError('task kwargs must be a dictionary')
  175. if countdown: # Convert countdown to ETA.
  176. now = now or self.app.now()
  177. eta = now + timedelta(seconds=countdown)
  178. if isinstance(expires, (int, float)):
  179. now = now or self.app.now()
  180. expires = now + timedelta(seconds=expires)
  181. eta = eta and eta.isoformat()
  182. expires = expires and expires.isoformat()
  183. body = {
  184. 'task': task_name,
  185. 'id': task_id,
  186. 'args': task_args,
  187. 'kwargs': task_kwargs,
  188. 'retries': retries or 0,
  189. 'eta': eta,
  190. 'expires': expires,
  191. 'utc': self.utc,
  192. 'callbacks': callbacks,
  193. 'errbacks': errbacks,
  194. 'reply_to': reply_to,
  195. 'timeouts': timeouts or (timeout, soft_timeout),
  196. 'taskset': group_id or taskset_id,
  197. 'chord': chord,
  198. }
  199. self.publish(body,
  200. exchange=exchange, routing_key=routing_key,
  201. serializer=serializer or self.serializer,
  202. compression=compression or self.compression,
  203. retry=retry, retry_policy=_rp,
  204. delivery_mode=delivery_mode, declare=declare,
  205. **kwargs)
  206. signals.task_sent.send(sender=task_name, **body)
  207. if event_dispatcher:
  208. exname = exchange or self.exchange
  209. if isinstance(exname, Exchange):
  210. exname = exname.name
  211. event_dispatcher.send('task-sent', uuid=task_id,
  212. name=task_name,
  213. args=safe_repr(task_args),
  214. kwargs=safe_repr(task_kwargs),
  215. retries=retries,
  216. eta=eta,
  217. expires=expires,
  218. queue=qname,
  219. exchange=exname,
  220. routing_key=routing_key)
  221. return task_id
  222. delay_task = publish_task # XXX Compat
  223. class TaskPublisher(TaskProducer):
  224. """Deprecated version of :class:`TaskProducer`."""
  225. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  226. self.app = app_or_default(kwargs.pop('app', self.app))
  227. self.retry = kwargs.pop('retry', self.retry)
  228. self.retry_policy = kwargs.pop('retry_policy',
  229. self.retry_policy or {})
  230. exchange = exchange or self.exchange
  231. if not isinstance(exchange, Exchange):
  232. exchange = Exchange(exchange,
  233. kwargs.pop('exchange_type', 'direct'))
  234. self.queues = self.app.amqp.queues # shortcut
  235. super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
  236. class TaskConsumer(Consumer):
  237. app = None
  238. def __init__(self, channel, queues=None, app=None, **kw):
  239. self.app = app or self.app
  240. super(TaskConsumer, self).__init__(channel,
  241. queues or list(self.app.amqp.queues.consume_from.values()),
  242. **kw)
  243. class AMQP(object):
  244. Connection = Connection
  245. Consumer = Consumer
  246. #: compat alias to Connection
  247. BrokerConnection = Connection
  248. #: Cached and prepared routing table.
  249. _rtable = None
  250. #: Underlying producer pool instance automatically
  251. #: set by the :attr:`producer_pool`.
  252. _producer_pool = None
  253. def __init__(self, app):
  254. self.app = app
  255. def flush_routes(self):
  256. self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
  257. def Queues(self, queues, create_missing=None, ha_policy=None):
  258. """Create new :class:`Queues` instance, using queue defaults
  259. from the current configuration."""
  260. conf = self.app.conf
  261. if create_missing is None:
  262. create_missing = conf.CELERY_CREATE_MISSING_QUEUES
  263. if ha_policy is None:
  264. ha_policy = conf.CELERY_QUEUE_HA_POLICY
  265. if not queues and conf.CELERY_DEFAULT_QUEUE:
  266. queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
  267. exchange=self.default_exchange,
  268. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
  269. return Queues(queues, self.default_exchange, create_missing, ha_policy)
  270. def Router(self, queues=None, create_missing=None):
  271. """Returns the current task router."""
  272. return _routes.Router(self.routes, queues or self.queues,
  273. self.app.either('CELERY_CREATE_MISSING_QUEUES',
  274. create_missing), app=self.app)
  275. @cached_property
  276. def TaskConsumer(self):
  277. """Return consumer configured to consume from the queues
  278. we are configured for (``app.amqp.queues.consume_from``)."""
  279. return self.app.subclass_with_self(TaskConsumer,
  280. reverse='amqp.TaskConsumer')
  281. get_task_consumer = TaskConsumer # XXX compat
  282. @cached_property
  283. def TaskProducer(self):
  284. """Returns publisher used to send tasks.
  285. You should use `app.send_task` instead.
  286. """
  287. conf = self.app.conf
  288. return self.app.subclass_with_self(TaskProducer,
  289. reverse='amqp.TaskProducer',
  290. exchange=self.default_exchange,
  291. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
  292. serializer=conf.CELERY_TASK_SERIALIZER,
  293. compression=conf.CELERY_MESSAGE_COMPRESSION,
  294. retry=conf.CELERY_TASK_PUBLISH_RETRY,
  295. retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
  296. utc=conf.CELERY_ENABLE_UTC)
  297. TaskPublisher = TaskProducer # compat
  298. @cached_property
  299. def default_queue(self):
  300. return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
  301. @cached_property
  302. def queues(self):
  303. """Queue name⇒ declaration mapping."""
  304. return self.Queues(self.app.conf.CELERY_QUEUES)
  305. @queues.setter # noqa
  306. def queues(self, queues):
  307. return self.Queues(queues)
  308. @property
  309. def routes(self):
  310. if self._rtable is None:
  311. self.flush_routes()
  312. return self._rtable
  313. @cached_property
  314. def router(self):
  315. return self.Router()
  316. @property
  317. def producer_pool(self):
  318. if self._producer_pool is None:
  319. self._producer_pool = ProducerPool(self.app.pool,
  320. limit=self.app.pool.limit,
  321. Producer=self.TaskProducer,
  322. )
  323. return self._producer_pool
  324. publisher_pool = producer_pool # compat alias
  325. @cached_property
  326. def default_exchange(self):
  327. return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
  328. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)