amqp.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. """
  2. celery.app.amqp
  3. ===============
  4. AMQ related functionality.
  5. :copyright: (c) 2009 - 2010 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. from datetime import datetime, timedelta
  9. from celery import routes
  10. from celery import signals
  11. from celery.utils import gen_unique_id, textindent
  12. from celery.utils.compat import UserDict
  13. from kombu import compat as messaging
  14. from kombu import BrokerConnection
  15. MSG_OPTIONS = ("mandatory", "priority", "immediate",
  16. "routing_key", "serializer", "delivery_mode",
  17. "compression")
  18. QUEUE_FORMAT = """
  19. . %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
  20. binding:%(binding_key)s
  21. """
  22. BROKER_FORMAT = """\
  23. %(transport)s://%(userid)s@%(hostname)s%(port)s%(virtual_host)s\
  24. """
  25. #: Set to :const:`True` when the configured queues has been declared.
  26. _queues_declared = False
  27. #: Set of exchange names that has already been declared.
  28. _exchanges_declared = set()
  29. def extract_msg_options(options, keep=MSG_OPTIONS):
  30. """Extracts known options to `basic_publish` from a dict,
  31. and returns a new dict."""
  32. return dict((name, options.get(name)) for name in keep)
  33. class Queues(UserDict):
  34. def __init__(self, queues):
  35. self.data = {}
  36. for queue_name, options in (queues or {}).items():
  37. self.add(queue_name, **options)
  38. def add(self, queue, exchange=None, routing_key=None,
  39. exchange_type="direct", **options):
  40. q = self[queue] = self.options(exchange, routing_key,
  41. exchange_type, **options)
  42. return q
  43. def options(self, exchange, routing_key,
  44. exchange_type="direct", **options):
  45. return dict(options, routing_key=routing_key,
  46. binding_key=routing_key,
  47. exchange=exchange,
  48. exchange_type=exchange_type)
  49. def format(self, indent=0):
  50. """Format routing table into string for log dumps."""
  51. format = lambda **queue: QUEUE_FORMAT.strip() % queue
  52. info = "\n".join(format(name=name, **config)
  53. for name, config in self.items())
  54. return textindent(info, indent=indent)
  55. def select_subset(self, wanted, create_missing=True):
  56. acc = {}
  57. for queue in wanted:
  58. try:
  59. options = self[queue]
  60. except KeyError:
  61. if not create_missing:
  62. raise
  63. options = self.options(queue, queue)
  64. acc[queue] = options
  65. self.data.clear()
  66. self.data.update(acc)
  67. @classmethod
  68. def with_defaults(cls, queues, default_exchange, default_exchange_type):
  69. for opts in queues.values():
  70. opts.setdefault("exchange", default_exchange),
  71. opts.setdefault("exchange_type", default_exchange_type)
  72. opts.setdefault("binding_key", default_exchange)
  73. opts.setdefault("routing_key", opts.get("binding_key"))
  74. return cls(queues)
  75. class TaskPublisher(messaging.Publisher):
  76. auto_declare = False
  77. def declare(self):
  78. if self.exchange.name not in _exchanges_declared:
  79. super(TaskPublisher, self).declare()
  80. _exchanges_declared.add(self.exchange.name)
  81. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  82. countdown=None, eta=None, task_id=None, taskset_id=None,
  83. expires=None, exchange=None, exchange_type=None,
  84. event_dispatcher=None, **kwargs):
  85. """Delay task for execution by the celery nodes."""
  86. task_id = task_id or gen_unique_id()
  87. task_args = task_args or []
  88. task_kwargs = task_kwargs or {}
  89. now = None
  90. if countdown: # Convert countdown to ETA.
  91. now = datetime.now()
  92. eta = now + timedelta(seconds=countdown)
  93. if not isinstance(task_args, (list, tuple)):
  94. raise ValueError("task args must be a list or tuple")
  95. if not isinstance(task_kwargs, dict):
  96. raise ValueError("task kwargs must be a dictionary")
  97. if isinstance(expires, int):
  98. now = now or datetime.now()
  99. expires = now + timedelta(seconds=expires)
  100. retries = kwargs.get("retries", 0)
  101. eta = eta and eta.isoformat()
  102. expires = expires and expires.isoformat()
  103. message_data = {
  104. "task": task_name,
  105. "id": task_id,
  106. "args": task_args or [],
  107. "kwargs": task_kwargs or {},
  108. "retries": retries,
  109. "eta": eta,
  110. "expires": expires,
  111. }
  112. if taskset_id:
  113. message_data["taskset"] = taskset_id
  114. # custom exchange passed, need to declare it.
  115. if exchange and exchange not in _exchanges_declared:
  116. exchange_type = exchange_type or self.exchange_type
  117. self.backend.exchange_declare(exchange=exchange,
  118. type=exchange_type,
  119. durable=self.durable,
  120. auto_delete=self.auto_delete)
  121. self.send(message_data, exchange=exchange,
  122. **extract_msg_options(kwargs))
  123. signals.task_sent.send(sender=task_name, **message_data)
  124. if event_dispatcher:
  125. event_dispatcher.send("task-sent", uuid=task_id,
  126. name=task_name,
  127. args=repr(task_args),
  128. kwargs=repr(task_kwargs),
  129. retries=retries,
  130. eta=eta,
  131. expires=expires)
  132. return task_id
  133. class AMQP(object):
  134. BrokerConnection = BrokerConnection
  135. Publisher = messaging.Publisher
  136. Consumer = messaging.Consumer
  137. _queues = None
  138. def __init__(self, app):
  139. self.app = app
  140. def ConsumerSet(self, *args, **kwargs):
  141. return messaging.ConsumerSet(*args, **kwargs)
  142. def Queues(self, queues):
  143. return Queues.with_defaults(queues,
  144. self.app.conf.CELERY_DEFAULT_EXCHANGE,
  145. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
  146. def Router(self, queues=None, create_missing=None):
  147. return routes.Router(self.app.conf.CELERY_ROUTES,
  148. queues or self.app.conf.CELERY_QUEUES,
  149. self.app.either("CELERY_CREATE_MISSING_QUEUES",
  150. create_missing),
  151. app=self.app)
  152. def TaskConsumer(self, *args, **kwargs):
  153. default_queue_name, default_queue = self.get_default_queue()
  154. defaults = dict({"queue": default_queue_name}, **default_queue)
  155. defaults["routing_key"] = defaults.pop("binding_key", None)
  156. return self.Consumer(*args,
  157. **self.app.merge(defaults, kwargs))
  158. def TaskPublisher(self, *args, **kwargs):
  159. _, default_queue = self.get_default_queue()
  160. defaults = {"exchange": default_queue["exchange"],
  161. "exchange_type": default_queue["exchange_type"],
  162. "routing_key": self.app.conf.CELERY_DEFAULT_ROUTING_KEY,
  163. "serializer": self.app.conf.CELERY_TASK_SERIALIZER}
  164. publisher = TaskPublisher(*args,
  165. **self.app.merge(defaults, kwargs))
  166. # Make sure all queues are declared.
  167. global _queues_declared
  168. if not _queues_declared:
  169. self.get_task_consumer(publisher.connection).close()
  170. _queues_declared = True
  171. publisher.declare()
  172. return publisher
  173. def get_task_consumer(self, connection, queues=None, **kwargs):
  174. return self.ConsumerSet(connection, from_dict=queues or self.queues,
  175. **kwargs)
  176. def get_default_queue(self):
  177. q = self.app.conf.CELERY_DEFAULT_QUEUE
  178. return q, self.queues[q]
  179. def get_broker_info(self, broker_connection=None):
  180. if broker_connection is None:
  181. broker_connection = self.app.broker_connection()
  182. info = broker_connection.info()
  183. port = info["port"]
  184. if port:
  185. info["port"] = ":%s" % (port, )
  186. vhost = info["virtual_host"]
  187. if not vhost.startswith("/"):
  188. info["virtual_host"] = "/" + vhost
  189. return info
  190. def format_broker_info(self, info=None):
  191. """Get message broker connection info string for log dumps."""
  192. return BROKER_FORMAT % self.get_broker_info()
  193. def _get_queues(self):
  194. if self._queues is None:
  195. c = self.app.conf
  196. self._queues = self.Queues(c.CELERY_QUEUES)
  197. return self._queues
  198. def _set_queues(self, queues):
  199. self._queues = self.Queues(queues)
  200. queues = property(_get_queues, _set_queues)