amqp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ===============
  5. AMQ related functionality.
  6. :copyright: (c) 2009 - 2010 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from datetime import datetime, timedelta
  10. from kombu import BrokerConnection
  11. from kombu import compat as messaging
  12. from celery import routes
  13. from celery import signals
  14. from celery.utils import gen_unique_id, textindent, cached_property
  15. from celery.utils.compat import UserDict
  16. #: List of known options to a Kombu producers send method.
  17. #: Used to extract the message related options out of any `dict`.
  18. MSG_OPTIONS = ("mandatory", "priority", "immediate",
  19. "routing_key", "serializer", "delivery_mode",
  20. "compression")
  21. #: Human readable queue declaration.
  22. QUEUE_FORMAT = """
  23. . %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
  24. binding:%(binding_key)s
  25. """
  26. #: Broker connection info -> URI
  27. BROKER_FORMAT = """\
  28. %(transport)s://%(userid)s@%(hostname)s%(port)s%(virtual_host)s\
  29. """
  30. #: Set of exchange names that has already been declared.
  31. _exchanges_declared = set()
  32. def extract_msg_options(options, keep=MSG_OPTIONS):
  33. """Extracts known options to `basic_publish` from a dict,
  34. and returns a new dict."""
  35. return dict((name, options.get(name)) for name in keep)
  36. class Queues(UserDict):
  37. """Queue name⇒ declaration mapping.
  38. Celery will consult this mapping to find the options
  39. for any queue by name.
  40. :param queues: Initial mapping.
  41. """
  42. def __init__(self, queues):
  43. self.data = {}
  44. for queue_name, options in (queues or {}).items():
  45. self.add(queue_name, **options)
  46. def add(self, queue, exchange=None, routing_key=None,
  47. exchange_type="direct", **options):
  48. """Add new queue.
  49. :param queue: Name of the queue.
  50. :keyword exchange: Name of the exchange.
  51. :keyword routing_key: Binding key.
  52. :keyword exchange_type: Type of exchange.
  53. :keyword \*\*options: Additional declaration options.
  54. """
  55. q = self[queue] = self.options(exchange, routing_key,
  56. exchange_type, **options)
  57. return q
  58. def options(self, exchange, routing_key,
  59. exchange_type="direct", **options):
  60. """Creates new option mapping for queue, with required
  61. keys present."""
  62. return dict(options, routing_key=routing_key,
  63. binding_key=routing_key,
  64. exchange=exchange,
  65. exchange_type=exchange_type)
  66. def format(self, indent=0):
  67. """Format routing table into string for log dumps."""
  68. info = "\n".join(QUEUE_FORMAT.strip() % dict(name=name, **config)
  69. for name, config in self.items())
  70. return textindent(info, indent=indent)
  71. def select_subset(self, wanted, create_missing=True):
  72. """Select subset of the currently defined queues.
  73. Does not return anything: queues not in `wanted` will
  74. be discarded in-place.
  75. :param wanted: List of wanted queue names.
  76. :keyword create_missing: By default any unknown queues will be
  77. added automatically, but if disabled
  78. the occurrence of unknown queues
  79. in `wanted` will raise :exc:`KeyError`.
  80. """
  81. acc = {}
  82. for queue in wanted:
  83. try:
  84. options = self[queue]
  85. except KeyError:
  86. if not create_missing:
  87. raise
  88. options = self.options(queue, queue)
  89. acc[queue] = options
  90. self.data.clear()
  91. self.data.update(acc)
  92. @classmethod
  93. def with_defaults(cls, queues, default_exchange, default_exchange_type):
  94. """Alternate constructor that adds default exchange and
  95. exchange type information to queues that does not have any."""
  96. for opts in queues.values():
  97. opts.setdefault("exchange", default_exchange),
  98. opts.setdefault("exchange_type", default_exchange_type)
  99. opts.setdefault("binding_key", default_exchange)
  100. opts.setdefault("routing_key", opts.get("binding_key"))
  101. return cls(queues)
  102. class TaskPublisher(messaging.Publisher):
  103. auto_declare = False
  104. def declare(self):
  105. if self.exchange.name not in _exchanges_declared:
  106. super(TaskPublisher, self).declare()
  107. _exchanges_declared.add(self.exchange.name)
  108. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  109. countdown=None, eta=None, task_id=None, taskset_id=None,
  110. expires=None, exchange=None, exchange_type=None,
  111. event_dispatcher=None, **kwargs):
  112. """Send task message."""
  113. task_id = task_id or gen_unique_id()
  114. task_args = task_args or []
  115. task_kwargs = task_kwargs or {}
  116. now = None
  117. if countdown: # Convert countdown to ETA.
  118. now = datetime.now()
  119. eta = now + timedelta(seconds=countdown)
  120. if not isinstance(task_args, (list, tuple)):
  121. raise ValueError("task args must be a list or tuple")
  122. if not isinstance(task_kwargs, dict):
  123. raise ValueError("task kwargs must be a dictionary")
  124. if isinstance(expires, int):
  125. now = now or datetime.now()
  126. expires = now + timedelta(seconds=expires)
  127. retries = kwargs.get("retries", 0)
  128. eta = eta and eta.isoformat()
  129. expires = expires and expires.isoformat()
  130. message_data = {
  131. "task": task_name,
  132. "id": task_id,
  133. "args": task_args or [],
  134. "kwargs": task_kwargs or {},
  135. "retries": retries,
  136. "eta": eta,
  137. "expires": expires,
  138. }
  139. if taskset_id:
  140. message_data["taskset"] = taskset_id
  141. # custom exchange passed, need to declare it.
  142. if exchange and exchange not in _exchanges_declared:
  143. exchange_type = exchange_type or self.exchange_type
  144. self.backend.exchange_declare(exchange=exchange,
  145. type=exchange_type,
  146. durable=self.durable,
  147. auto_delete=self.auto_delete)
  148. _exchanges_declared.add(exchange)
  149. self.send(message_data, exchange=exchange,
  150. **extract_msg_options(kwargs))
  151. signals.task_sent.send(sender=task_name, **message_data)
  152. if event_dispatcher:
  153. event_dispatcher.send("task-sent", uuid=task_id,
  154. name=task_name,
  155. args=repr(task_args),
  156. kwargs=repr(task_kwargs),
  157. retries=retries,
  158. eta=eta,
  159. expires=expires)
  160. return task_id
  161. class AMQP(object):
  162. BrokerConnection = BrokerConnection
  163. Publisher = messaging.Publisher
  164. Consumer = messaging.Consumer
  165. ConsumerSet = messaging.ConsumerSet
  166. #: Set to :const:`True` when the configured queues has been declared.
  167. _queues_declared = False
  168. def __init__(self, app):
  169. self.app = app
  170. def Queues(self, queues):
  171. """Create new :class:`Queues` instance, using queue defaults
  172. from the current configuration."""
  173. return Queues.with_defaults(queues,
  174. self.app.conf.CELERY_DEFAULT_EXCHANGE,
  175. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
  176. def Router(self, queues=None, create_missing=None):
  177. """Returns the current task router."""
  178. return routes.Router(self.app.conf.CELERY_ROUTES,
  179. queues or self.app.conf.CELERY_QUEUES,
  180. self.app.either("CELERY_CREATE_MISSING_QUEUES",
  181. create_missing),
  182. app=self.app)
  183. def TaskConsumer(self, *args, **kwargs):
  184. """Returns consumer for a single task queue."""
  185. default_queue_name, default_queue = self.get_default_queue()
  186. defaults = dict({"queue": default_queue_name}, **default_queue)
  187. defaults["routing_key"] = defaults.pop("binding_key", None)
  188. return self.Consumer(*args,
  189. **self.app.merge(defaults, kwargs))
  190. def TaskPublisher(self, *args, **kwargs):
  191. """Returns publisher used to send tasks.
  192. You should use `app.send_task` instead.
  193. """
  194. _, default_queue = self.get_default_queue()
  195. defaults = {"exchange": default_queue["exchange"],
  196. "exchange_type": default_queue["exchange_type"],
  197. "routing_key": self.app.conf.CELERY_DEFAULT_ROUTING_KEY,
  198. "serializer": self.app.conf.CELERY_TASK_SERIALIZER}
  199. publisher = TaskPublisher(*args,
  200. **self.app.merge(defaults, kwargs))
  201. # Make sure all queues are declared.
  202. if not self._queues_declared:
  203. self.get_task_consumer(publisher.connection).close()
  204. self._queues_declared = True
  205. publisher.declare()
  206. return publisher
  207. def get_task_consumer(self, connection, queues=None, **kwargs):
  208. """Return consumer configured to consume from all known task
  209. queues."""
  210. return self.ConsumerSet(connection, from_dict=queues or self.queues,
  211. **kwargs)
  212. def get_default_queue(self):
  213. """Returns `(queue_name, queue_options)` tuple for the queue
  214. configured to be default (:setting:`CELERY_DEFAULT_QUEUE`)."""
  215. q = self.app.conf.CELERY_DEFAULT_QUEUE
  216. return q, self.queues[q]
  217. def get_broker_info(self, broker_connection=None):
  218. """Returns information about the current broker connection
  219. as a `dict`."""
  220. if broker_connection is None:
  221. broker_connection = self.app.broker_connection()
  222. info = broker_connection.info()
  223. port = info["port"]
  224. if port:
  225. info["port"] = ":%s" % (port, )
  226. vhost = info["virtual_host"]
  227. if not vhost.startswith("/"):
  228. info["virtual_host"] = "/" + vhost
  229. return info
  230. def format_broker_info(self, info=None):
  231. """Get message broker connection info string for log dumps."""
  232. return BROKER_FORMAT % self.get_broker_info()
  233. @cached_property
  234. def queues(self):
  235. """Queue name⇒ declaration mapping."""
  236. return self.Queues(self.app.conf.CELERY_QUEUES)
  237. @queues.setter
  238. def queues(self, value):
  239. return self.Queues(value)