|
@@ -6,11 +6,16 @@ from carrot import messaging
|
|
|
|
|
|
from celery import routes
|
|
|
from celery import signals
|
|
|
-from celery.utils import gen_unique_id, mitemgetter
|
|
|
+from celery.utils import gen_unique_id, mitemgetter, textindent
|
|
|
|
|
|
|
|
|
MSG_OPTIONS = ("mandatory", "priority", "immediate",
|
|
|
"routing_key", "serializer", "delivery_mode")
|
|
|
+QUEUE_FORMAT = """
|
|
|
+. %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
|
|
|
+binding:%(binding_key)s
|
|
|
+"""
|
|
|
+BROKER_FORMAT = "%(carrot_backend)s://%(userid)s@%(host)s%(port)s%(vhost)s"
|
|
|
|
|
|
get_msg_options = mitemgetter(*MSG_OPTIONS)
|
|
|
extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
|
|
@@ -20,6 +25,8 @@ _queues_declared = False
|
|
|
_exchanges_declared = set()
|
|
|
|
|
|
|
|
|
+
|
|
|
+
|
|
|
class TaskPublisher(messaging.Publisher):
|
|
|
auto_declare = False
|
|
|
|
|
@@ -63,7 +70,7 @@ class TaskPublisher(messaging.Publisher):
|
|
|
if taskset_id:
|
|
|
message_data["taskset"] = taskset_id
|
|
|
|
|
|
- # FIXME (carrot Publisher.send needs to accept exchange argument)
|
|
|
+ # custom exchange passed, need to declare it.
|
|
|
if exchange and exchange not in _exchanges_declared:
|
|
|
exchange_type = exchange_type or self.exchange_type
|
|
|
self.backend.exchange_declare(exchange=exchange,
|
|
@@ -177,3 +184,35 @@ class AMQP(object):
|
|
|
backend=cset.backend, **queue_options)
|
|
|
cset.consumers.append(consumer)
|
|
|
return cset
|
|
|
+
|
|
|
+ def format_queues(self, queues, indent=0):
|
|
|
+ """Format routing table into string for log dumps."""
|
|
|
+ format = lambda **queue: QUEUE_FORMAT.strip() % queue
|
|
|
+ info = "\n".join(format(name=name, **config)
|
|
|
+ for name, config in queues.items())
|
|
|
+ return textindent(info, indent=indent)
|
|
|
+
|
|
|
+ def get_broker_info(self):
|
|
|
+ broker_connection = self.app.broker_connection()
|
|
|
+ carrot_backend = broker_connection.backend_cls
|
|
|
+ if carrot_backend and not isinstance(carrot_backend, str):
|
|
|
+ carrot_backend = carrot_backend.__name__
|
|
|
+ carrot_backend = carrot_backend or "amqp"
|
|
|
+
|
|
|
+ port = broker_connection.port or \
|
|
|
+ broker_connection.get_backend_cls().default_port
|
|
|
+ port = port and ":%s" % port or ""
|
|
|
+
|
|
|
+ vhost = broker_connection.virtual_host
|
|
|
+ if not vhost.startswith("/"):
|
|
|
+ vhost = "/" + vhost
|
|
|
+
|
|
|
+ return {"carrot_backend": carrot_backend,
|
|
|
+ "userid": broker_connection.userid,
|
|
|
+ "host": broker_connection.hostname,
|
|
|
+ "port": port,
|
|
|
+ "vhost": vhost}
|
|
|
+
|
|
|
+ def format_broker_info(self, info=None):
|
|
|
+ """Get message broker connection info string for log dumps."""
|
|
|
+ return BROKER_FORMAT % self.get_broker_info()
|