amqp.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ~~~~~~~~~~~~~~~
  5. AMQ related functionality.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from datetime import timedelta
  11. from weakref import WeakValueDictionary
  12. from kombu import BrokerConnection, Consumer, Exchange, Producer, Queue
  13. from kombu.common import entry_to_queue
  14. from kombu.pools import ProducerPool
  15. from celery import signals
  16. from celery.utils import cached_property, uuid
  17. from celery.utils.text import indent as textindent
  18. from . import routes as _routes
  19. #: Human readable queue declaration.
  20. QUEUE_FORMAT = """
  21. . %(name)s exchange:%(exchange)s(%(exchange_type)s) binding:%(routing_key)s
  22. """
  23. class Queues(dict):
  24. """Queue name⇒ declaration mapping.
  25. :param queues: Initial list/tuple or dict of queues.
  26. :keyword create_missing: By default any unknown queues will be
  27. added automatically, but if disabled
  28. the occurrence of unknown queues
  29. in `wanted` will raise :exc:`KeyError`.
  30. """
  31. #: If set, this is a subset of queues to consume from.
  32. #: The rest of the queues are then used for routing only.
  33. _consume_from = None
  34. def __init__(self, queues, default_exchange=None, create_missing=True):
  35. dict.__init__(self)
  36. self.aliases = WeakValueDictionary()
  37. self.default_exchange = default_exchange
  38. self.create_missing = create_missing
  39. if isinstance(queues, (tuple, list)):
  40. queues = dict((q.name, q) for q in queues)
  41. for name, q in (queues or {}).iteritems():
  42. self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
  43. def __getitem__(self, name):
  44. try:
  45. return self.aliases[name]
  46. except KeyError:
  47. return dict.__getitem__(self, name)
  48. def __setitem__(self, name, queue):
  49. if self.default_exchange:
  50. if not queue.exchange or not queue.exchange.name:
  51. queue.exchange = self.default_exchange
  52. if queue.exchange.type == 'direct' and not queue.routing_key:
  53. queue.routing_key = name
  54. dict.__setitem__(self, name, queue)
  55. if queue.alias:
  56. self.aliases[queue.alias] = queue
  57. def __missing__(self, name):
  58. if self.create_missing:
  59. return self.add(self.new_missing(name))
  60. raise KeyError(name)
  61. def add(self, queue, **kwargs):
  62. """Add new queue.
  63. :param queue: Name of the queue.
  64. :keyword exchange: Name of the exchange.
  65. :keyword routing_key: Binding key.
  66. :keyword exchange_type: Type of exchange.
  67. :keyword \*\*options: Additional declaration options.
  68. """
  69. if not isinstance(queue, Queue):
  70. return self.add_compat(queue, **kwargs)
  71. self[queue.name] = queue
  72. return queue
  73. def add_compat(self, name, **options):
  74. # docs used to use binding_key as routing key
  75. options.setdefault("routing_key", options.get("binding_key"))
  76. q = self[name] = entry_to_queue(name, **options)
  77. return q
  78. def format(self, indent=0, indent_first=True):
  79. """Format routing table into string for log dumps."""
  80. active = self.consume_from
  81. if not active:
  82. return ""
  83. info = [QUEUE_FORMAT.strip() % {
  84. "name": (name + ":").ljust(12),
  85. "exchange": q.exchange.name,
  86. "exchange_type": q.exchange.type,
  87. "routing_key": q.routing_key}
  88. for name, q in sorted(active.iteritems())]
  89. if indent_first:
  90. return textindent("\n".join(info), indent)
  91. return info[0] + "\n" + textindent("\n".join(info[1:]), indent)
  92. def select_subset(self, wanted):
  93. """Sets :attr:`consume_from` by selecting a subset of the
  94. currently defined queues.
  95. :param wanted: List of wanted queue names.
  96. """
  97. if wanted:
  98. self._consume_from = dict((name, self[name]) for name in wanted)
  99. def new_missing(self, name):
  100. return Queue(name, Exchange(name), name)
  101. @property
  102. def consume_from(self):
  103. if self._consume_from is not None:
  104. return self._consume_from
  105. return self
  106. class TaskProducer(Producer):
  107. auto_declare = False
  108. retry = False
  109. retry_policy = None
  110. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  111. self.app = kwargs.get("app") or self.app
  112. self.retry = kwargs.pop("retry", self.retry)
  113. self.retry_policy = kwargs.pop("retry_policy",
  114. self.retry_policy or {})
  115. exchange = exchange or self.exchange
  116. if not isinstance(exchange, Exchange):
  117. exchange = Exchange(exchange,
  118. kwargs.get("exchange_type") or self.exchange_type)
  119. self.queues = self.app.amqp.queues # shortcut
  120. super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
  121. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  122. countdown=None, eta=None, task_id=None, taskset_id=None,
  123. expires=None, exchange=None, exchange_type=None,
  124. event_dispatcher=None, retry=None, retry_policy=None,
  125. queue=None, now=None, retries=0, chord=None, callbacks=None,
  126. errbacks=None, mandatory=None, priority=None, immediate=None,
  127. routing_key=None, serializer=None, delivery_mode=None,
  128. compression=None, **kwargs):
  129. """Send task message."""
  130. # merge default and custom policy
  131. _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
  132. else self.retry_policy)
  133. task_id = task_id or uuid()
  134. task_args = task_args or []
  135. task_kwargs = task_kwargs or {}
  136. if not isinstance(task_args, (list, tuple)):
  137. raise ValueError("task args must be a list or tuple")
  138. if not isinstance(task_kwargs, dict):
  139. raise ValueError("task kwargs must be a dictionary")
  140. if countdown: # Convert countdown to ETA.
  141. now = now or self.app.now()
  142. eta = now + timedelta(seconds=countdown)
  143. if isinstance(expires, (int, float)):
  144. now = now or self.app.now()
  145. expires = now + timedelta(seconds=expires)
  146. eta = eta and eta.isoformat()
  147. expires = expires and expires.isoformat()
  148. body = {"task": task_name,
  149. "id": task_id,
  150. "args": task_args,
  151. "kwargs": task_kwargs,
  152. "retries": retries or 0,
  153. "eta": eta,
  154. "expires": expires,
  155. "utc": self.utc,
  156. "callbacks": callbacks,
  157. "errbacks": errbacks}
  158. if taskset_id:
  159. body["taskset"] = taskset_id
  160. if chord:
  161. body["chord"] = chord
  162. self.publish(body, exchange=exchange, mandatory=mandatory,
  163. immediate=immediate, routing_key=routing_key,
  164. serializer=serializer or self.serializer,
  165. compression=compression or self.compression,
  166. retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
  167. declare=[self.queues[queue]] if queue else [],
  168. **kwargs)
  169. signals.task_sent.send(sender=task_name, **body)
  170. if event_dispatcher:
  171. event_dispatcher.send("task-sent", uuid=task_id,
  172. name=task_name,
  173. args=repr(task_args),
  174. kwargs=repr(task_kwargs),
  175. retries=retries,
  176. eta=eta,
  177. expires=expires,
  178. queue=queue)
  179. return task_id
  180. TaskPublisher = TaskProducer # compat
  181. class TaskConsumer(Consumer):
  182. app = None
  183. def __init__(self, channel, queues=None, app=None, **kw):
  184. self.app = app or self.app
  185. super(TaskConsumer, self).__init__(channel,
  186. queues or self.app.amqp.queues.consume_from.values(), **kw)
  187. class AMQP(object):
  188. BrokerConnection = BrokerConnection
  189. Consumer = Consumer
  190. #: Cached and prepared routing table.
  191. _rtable = None
  192. def __init__(self, app):
  193. self.app = app
  194. def flush_routes(self):
  195. self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
  196. def Queues(self, queues, create_missing=None):
  197. """Create new :class:`Queues` instance, using queue defaults
  198. from the current configuration."""
  199. conf = self.app.conf
  200. if create_missing is None:
  201. create_missing = conf.CELERY_CREATE_MISSING_QUEUES
  202. if not queues and conf.CELERY_DEFAULT_QUEUE:
  203. queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
  204. exchange=self.default_exchange,
  205. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
  206. return Queues(queues, self.default_exchange, create_missing)
  207. def Router(self, queues=None, create_missing=None):
  208. """Returns the current task router."""
  209. return _routes.Router(self.routes, queues or self.queues,
  210. self.app.either("CELERY_CREATE_MISSING_QUEUES",
  211. create_missing), app=self.app)
  212. @cached_property
  213. def TaskConsumer(self):
  214. """Returns consumer for a single task queue."""
  215. return self.app.subclass_with_self(TaskConsumer,
  216. reverse="amqp.TaskConsumer")
  217. def queue_or_default(self, q):
  218. if q:
  219. return self.queues[q] if not isinstance(q, Queue) else q
  220. return self.default_queue
  221. @cached_property
  222. def TaskProducer(self):
  223. """Returns publisher used to send tasks.
  224. You should use `app.send_task` instead.
  225. """
  226. conf = self.app.conf
  227. return self.app.subclass_with_self(TaskProducer,
  228. reverse="amqp.TaskProducer",
  229. exchange=self.default_exchange,
  230. exchange_type=self.default_exchange.type,
  231. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
  232. serializer=conf.CELERY_TASK_SERIALIZER,
  233. compression=conf.CELERY_MESSAGE_COMPRESSION,
  234. retry=conf.CELERY_TASK_PUBLISH_RETRY,
  235. retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
  236. utc=conf.CELERY_ENABLE_UTC)
  237. TaskPublisher = TaskProducer # compat
  238. def get_task_consumer(self, channel, *args, **kwargs):
  239. """Return consumer configured to consume from all known task
  240. queues."""
  241. return self.TaskConsumer(channel, *args, **kwargs)
  242. @cached_property
  243. def default_queue(self):
  244. return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
  245. @cached_property
  246. def queues(self):
  247. """Queue name⇒ declaration mapping."""
  248. return self.Queues(self.app.conf.CELERY_QUEUES)
  249. @queues.setter # noqa
  250. def queues(self, queues):
  251. return self.Queues(queues)
  252. @property
  253. def routes(self):
  254. if self._rtable is None:
  255. self.flush_routes()
  256. return self._rtable
  257. @cached_property
  258. def router(self):
  259. return self.Router()
  260. @cached_property
  261. def publisher_pool(self):
  262. return ProducerPool(self.app.pool, limit=self.app.pool.limit,
  263. Producer=self.TaskProducer)
  264. @cached_property
  265. def default_exchange(self):
  266. return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
  267. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)