amqp.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ===============
  5. AMQ related functionality.
  6. :copyright: (c) 2009 - 2011 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from datetime import datetime, timedelta
  10. from kombu import BrokerConnection, Exchange
  11. from kombu.connection import Resource
  12. from kombu import compat as messaging
  13. from kombu.utils import cached_property
  14. from celery import routes as _routes
  15. from celery import signals
  16. from celery.utils import gen_unique_id, textindent
  17. from celery.utils import promise, maybe_promise
  18. from celery.utils.compat import UserDict
  19. #: List of known options to a Kombu producers send method.
  20. #: Used to extract the message related options out of any `dict`.
  21. MSG_OPTIONS = ("mandatory", "priority", "immediate", "routing_key",
  22. "serializer", "delivery_mode", "compression")
  23. #: Human readable queue declaration.
  24. QUEUE_FORMAT = """
  25. . %(name)s exchange:%(exchange)s (%(exchange_type)s) \
  26. binding:%(binding_key)s
  27. """
  28. #: Set of exchange names that have already been declared.
  29. _exchanges_declared = set()
  30. #: Set of queue names that have already been declared.
  31. _queues_declared = set()
  32. def extract_msg_options(options, keep=MSG_OPTIONS):
  33. """Extracts known options to `basic_publish` from a dict,
  34. and returns a new dict."""
  35. return dict((name, options.get(name)) for name in keep)
  36. class Queues(UserDict):
  37. """Queue name⇒ declaration mapping.
  38. Celery will consult this mapping to find the options
  39. for any queue by name.
  40. :param queues: Initial mapping.
  41. """
  42. #: If set, this is a subset of queues to consume from.
  43. #: The rest of the queues are then used for routing only.
  44. _consume_from = None
  45. def __init__(self, queues):
  46. self.data = {}
  47. for queue_name, options in (queues or {}).items():
  48. self.add(queue_name, **options)
  49. def add(self, queue, exchange=None, routing_key=None,
  50. exchange_type="direct", **options):
  51. """Add new queue.
  52. :param queue: Name of the queue.
  53. :keyword exchange: Name of the exchange.
  54. :keyword routing_key: Binding key.
  55. :keyword exchange_type: Type of exchange.
  56. :keyword \*\*options: Additional declaration options.
  57. """
  58. q = self[queue] = self.options(exchange, routing_key,
  59. exchange_type, **options)
  60. return q
  61. def options(self, exchange, routing_key,
  62. exchange_type="direct", **options):
  63. """Creates new option mapping for queue, with required
  64. keys present."""
  65. return dict(options, routing_key=routing_key,
  66. binding_key=routing_key,
  67. exchange=exchange,
  68. exchange_type=exchange_type)
  69. def format(self, indent=0, indent_first=True):
  70. """Format routing table into string for log dumps."""
  71. queues = self
  72. if self._consume_from is not None:
  73. queues = self._consume_from
  74. info = [QUEUE_FORMAT.strip() % dict(
  75. name=(name + ":").ljust(12), **config)
  76. for name, config in sorted(queues.items())]
  77. if indent_first:
  78. return textindent("\n".join(info), indent)
  79. return info[0] + "\n" + textindent("\n".join(info[1:]), indent)
  80. def select_subset(self, wanted, create_missing=True):
  81. """Select subset of the currently defined queues.
  82. Does not return anything: queues not in `wanted` will
  83. be discarded in-place.
  84. :param wanted: List of wanted queue names.
  85. :keyword create_missing: By default any unknown queues will be
  86. added automatically, but if disabled
  87. the occurrence of unknown queues
  88. in `wanted` will raise :exc:`KeyError`.
  89. """
  90. acc = {}
  91. for queue in wanted:
  92. try:
  93. options = self[queue]
  94. except KeyError:
  95. if not create_missing:
  96. raise
  97. options = self.options(queue, queue)
  98. acc[queue] = options
  99. self._consume_from = acc
  100. self.update(acc)
  101. @classmethod
  102. def with_defaults(cls, queues, default_exchange, default_exchange_type):
  103. """Alternate constructor that adds default exchange and
  104. exchange type information to queues that does not have any."""
  105. for opts in queues.values():
  106. opts.setdefault("exchange", default_exchange),
  107. opts.setdefault("exchange_type", default_exchange_type)
  108. opts.setdefault("binding_key", default_exchange)
  109. opts.setdefault("routing_key", opts.get("binding_key"))
  110. return cls(queues)
  111. @property
  112. def consume_from(self):
  113. if self._consume_from is not None:
  114. return self._consume_from
  115. return self
  116. class TaskPublisher(messaging.Publisher):
  117. auto_declare = True
  118. retry = False
  119. retry_policy = None
  120. def __init__(self, *args, **kwargs):
  121. self.app = kwargs.pop("app")
  122. self.retry = kwargs.pop("retry", self.retry)
  123. self.retry_policy = kwargs.pop("retry_policy",
  124. self.retry_policy or {})
  125. super(TaskPublisher, self).__init__(*args, **kwargs)
  126. def declare(self):
  127. if self.exchange.name and \
  128. self.exchange.name not in _exchanges_declared:
  129. super(TaskPublisher, self).declare()
  130. _exchanges_declared.add(self.exchange.name)
  131. def _declare_queue(self, name, retry=False, retry_policy={}):
  132. options = self.app.queues[name]
  133. queue = messaging.entry_to_queue(name, **options)(self.channel)
  134. if retry:
  135. self.connection.ensure(queue, queue.declare, **retry_policy)()
  136. else:
  137. queue.declare()
  138. return queue
  139. def _declare_exchange(self, name, type, retry=False, retry_policy={}):
  140. ex = Exchange(name, type=type, durable=self.durable,
  141. auto_delete=self.auto_delete)(self.channel)
  142. if retry:
  143. return self.connection.ensure(ex, ex.declare, **retry_policy)
  144. return ex.declare()
  145. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  146. countdown=None, eta=None, task_id=None, taskset_id=None,
  147. expires=None, exchange=None, exchange_type=None,
  148. event_dispatcher=None, retry=None, retry_policy=None,
  149. queue=None, now=None, retries=0, chord=None, **kwargs):
  150. """Send task message."""
  151. connection = self.connection
  152. _retry_policy = self.retry_policy
  153. if retry_policy: # merge default and custom policy
  154. _retry_policy = dict(_retry_policy, **retry_policy)
  155. # declare entities
  156. if queue and queue not in _queues_declared:
  157. entity = self._declare_queue(queue, retry, _retry_policy)
  158. _exchanges_declared.add(entity.exchange.name)
  159. _queues_declared.add(entity.name)
  160. if exchange and exchange not in _exchanges_declared:
  161. self._declare_exchange(exchange,
  162. exchange_type or self.exchange_type, retry, _retry_policy)
  163. _exchanges_declared.add(exchange)
  164. task_id = task_id or gen_unique_id()
  165. task_args = task_args or []
  166. task_kwargs = task_kwargs or {}
  167. if not isinstance(task_args, (list, tuple)):
  168. raise ValueError("task args must be a list or tuple")
  169. if not isinstance(task_kwargs, dict):
  170. raise ValueError("task kwargs must be a dictionary")
  171. if countdown: # Convert countdown to ETA.
  172. now = now or datetime.now()
  173. eta = now + timedelta(seconds=countdown)
  174. if isinstance(expires, int):
  175. now = now or datetime.now()
  176. expires = now + timedelta(seconds=expires)
  177. eta = eta and eta.isoformat()
  178. expires = expires and expires.isoformat()
  179. body = {"task": task_name,
  180. "id": task_id,
  181. "args": task_args or [],
  182. "kwargs": task_kwargs or {},
  183. "retries": retries or 0,
  184. "eta": eta,
  185. "expires": expires}
  186. if taskset_id:
  187. body["taskset"] = taskset_id
  188. if chord:
  189. body["chord"] = chord
  190. send = self.send
  191. if retry is None and self.retry or retry:
  192. send = connection.ensure(self, self.send, **_retry_policy)
  193. send(body, exchange=exchange, **extract_msg_options(kwargs))
  194. signals.task_sent.send(sender=task_name, **body)
  195. if event_dispatcher:
  196. event_dispatcher.send("task-sent", uuid=task_id,
  197. name=task_name,
  198. args=repr(task_args),
  199. kwargs=repr(task_kwargs),
  200. retries=retries,
  201. eta=eta,
  202. expires=expires)
  203. return task_id
  204. def __exit__(self, *exc_info):
  205. try:
  206. self.release()
  207. except AttributeError:
  208. self.close()
  209. class PublisherPool(Resource):
  210. def __init__(self, app=None):
  211. self.app = app
  212. super(PublisherPool, self).__init__(limit=self.app.pool.limit)
  213. def create_publisher(self):
  214. conn = self.app.pool.acquire(block=True)
  215. pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
  216. conn._publisher_chan = pub.channel
  217. return pub
  218. def new(self):
  219. return promise(self.create_publisher)
  220. def setup(self):
  221. if self.limit:
  222. for _ in xrange(self.limit):
  223. self._resource.put_nowait(self.new())
  224. def prepare(self, publisher):
  225. pub = maybe_promise(publisher)
  226. if not pub.connection:
  227. pub.connection = self.app.pool.acquire(block=True)
  228. if not getattr(pub.connection, "_publisher_chan", None):
  229. pub.connection._publisher_chan = pub.connection.channel()
  230. pub.revive(pub.connection._publisher_chan)
  231. return pub
  232. def release(self, resource):
  233. resource.connection.release()
  234. resource.connection = None
  235. super(PublisherPool, self).release(resource)
  236. class AMQP(object):
  237. BrokerConnection = BrokerConnection
  238. Publisher = messaging.Publisher
  239. Consumer = messaging.Consumer
  240. ConsumerSet = messaging.ConsumerSet
  241. #: Cached and prepared routing table.
  242. _rtable = None
  243. def __init__(self, app):
  244. self.app = app
  245. def flush_routes(self):
  246. self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
  247. def Queues(self, queues):
  248. """Create new :class:`Queues` instance, using queue defaults
  249. from the current configuration."""
  250. conf = self.app.conf
  251. if not queues:
  252. queues = {conf.CELERY_DEFAULT_QUEUE: {
  253. "exchange": conf.CELERY_DEFAULT_EXCHANGE,
  254. "exchange_type": conf.CELERY_DEFAULT_EXCHANGE_TYPE,
  255. "binding_key": conf.CELERY_DEFAULT_ROUTING_KEY}}
  256. return Queues.with_defaults(queues, conf.CELERY_DEFAULT_EXCHANGE,
  257. conf.CELERY_DEFAULT_EXCHANGE_TYPE)
  258. def Router(self, queues=None, create_missing=None):
  259. """Returns the current task router."""
  260. return _routes.Router(self.routes, queues or self.queues,
  261. self.app.either("CELERY_CREATE_MISSING_QUEUES",
  262. create_missing), app=self.app)
  263. def TaskConsumer(self, *args, **kwargs):
  264. """Returns consumer for a single task queue."""
  265. default_queue_name, default_queue = self.get_default_queue()
  266. defaults = dict({"queue": default_queue_name}, **default_queue)
  267. defaults["routing_key"] = defaults.pop("binding_key", None)
  268. return self.Consumer(*args,
  269. **self.app.merge(defaults, kwargs))
  270. def TaskPublisher(self, *args, **kwargs):
  271. """Returns publisher used to send tasks.
  272. You should use `app.send_task` instead.
  273. """
  274. conf = self.app.conf
  275. _, default_queue = self.get_default_queue()
  276. defaults = {"exchange": default_queue["exchange"],
  277. "exchange_type": default_queue["exchange_type"],
  278. "routing_key": conf.CELERY_DEFAULT_ROUTING_KEY,
  279. "serializer": conf.CELERY_TASK_SERIALIZER,
  280. "retry": conf.CELERY_TASK_PUBLISH_RETRY,
  281. "retry_policy": conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
  282. "app": self}
  283. return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
  284. def get_task_consumer(self, connection, queues=None, **kwargs):
  285. """Return consumer configured to consume from all known task
  286. queues."""
  287. return self.ConsumerSet(connection,
  288. from_dict=queues or self.queues.consume_from,
  289. **kwargs)
  290. def get_default_queue(self):
  291. """Returns `(queue_name, queue_options)` tuple for the queue
  292. configured to be default (:setting:`CELERY_DEFAULT_QUEUE`)."""
  293. q = self.app.conf.CELERY_DEFAULT_QUEUE
  294. return q, self.queues[q]
  295. @cached_property
  296. def queues(self):
  297. """Queue name⇒ declaration mapping."""
  298. return self.Queues(self.app.conf.CELERY_QUEUES)
  299. @property
  300. def routes(self):
  301. if self._rtable is None:
  302. self.flush_routes()
  303. return self._rtable
  304. @cached_property
  305. def publisher_pool(self):
  306. return PublisherPool(app=self.app)