@@ -10,12 +10,7 @@ from billiard.utils.functional import wraps
from celery import conf
from celery import signals
-from celery.utils import gen_unique_id, mitemgetter, noop, textindent
-. %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
+from celery.utils import gen_unique_id, mitemgetter, noop
MSG_OPTIONS = ("mandatory", "priority",
@@ -24,36 +19,19 @@ MSG_OPTIONS = ("mandatory", "priority",
get_msg_options = mitemgetter(*MSG_OPTIONS)
extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
-def routing_table():
- def _defaults(opts):
- opts.setdefault("exchange", conf.DEFAULT_EXCHANGE),
- opts.setdefault("exchange_type", conf.DEFAULT_EXCHANGE_TYPE)
- opts.setdefault("binding_key", "")
- return opts
- return dict((queue, _defaults(opts))
- for queue, opts in conf.QUEUES.items())
-_default_queue = routing_table()[conf.DEFAULT_QUEUE]
+default_queue = conf.routing_table[conf.DEFAULT_QUEUE]
class TaskPublisher(Publisher):
- """The AMQP Task Publisher class."""
- exchange = _default_queue["exchange"]
- exchange_type = _default_queue["exchange_type"]
+ """Publish tasks."""
+ exchange = default_queue["exchange"]
+ exchange_type = default_queue["exchange_type"]
routing_key = conf.DEFAULT_ROUTING_KEY
serializer = conf.TASK_SERIALIZER
- def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
+ def delay_task(self, task_name, task_args=None, task_kwargs=None,
+ task_id=None, taskset_id=None, **kwargs):
"""Delay task for execution by the celery nodes."""
- return self._delay_task(task_name=task_name, task_args=task_args,
- task_kwargs=task_kwargs, **kwargs)
- def _delay_task(self, task_name, task_id=None, taskset_id=None,
- task_args=None, task_kwargs=None, **kwargs):
- """INTERNAL"""
task_id = task_id or gen_unique_id()
eta = kwargs.get("eta")
@@ -77,27 +55,22 @@ class TaskPublisher(Publisher):
return task_id
-def get_consumer_set(connection, queues=None, **options):
- queues = queues or routing_table()
- return ConsumerSet(connection, from_dict=queues, **options)
class TaskConsumer(Consumer):
- """The AMQP Task Consumer class."""
+ """Consume tasks"""
queue = conf.DEFAULT_QUEUE
- exchange = _default_queue["exchange"]
- routing_key = _default_queue["binding_key"]
- exchange_type = _default_queue["exchange_type"]
- auto_ack = False
- no_ack = False
+ exchange = default_queue["exchange"]
+ routing_key = default_queue["binding_key"]
+ exchange_type = default_queue["exchange_type"]
class EventPublisher(Publisher):
+ """Publish events"""
exchange = "celeryevent"
routing_key = "event"
class EventConsumer(Consumer):
+ """Consume events"""
queue = "celeryevent"
exchange = "celeryevent"
routing_key = "event"
@@ -106,17 +79,20 @@ class EventConsumer(Consumer):
class BroadcastPublisher(Publisher):
+ """Publish broadcast commands"""
exchange = "celeryctl"
exchange_type = "fanout"
routing_key = ""
def send(self, type, arguments, destination=None):
+ """Send broadcast command."""
arguments["command"] = type
arguments["destination"] = destination
super(BroadcastPublisher, self).send({"control": arguments})
class BroadcastConsumer(Consumer):
+ """Consume broadcast commands"""
queue = "celeryctl"
exchange = "celeryctl"
routing_key = ""
@@ -125,16 +101,19 @@ class BroadcastConsumer(Consumer):
def establish_connection(connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
+ """Establish a connection to the message broker."""
return DjangoBrokerConnection(connect_timeout=connect_timeout)
def with_connection(fun):
+ """Decorator for providing default message broker connection for functions
+ supporting the ``connection`` and ``connect_timeout`` keyword
+ arguments."""
def _inner(*args, **kwargs):
connection = kwargs.get("connection")
- timeout = kwargs.get("connect_timeout",
+ timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
kwargs["connection"] = conn = connection or \
close_connection = not connection and conn.close or noop
@@ -143,42 +122,16 @@ def with_connection(fun):
return fun(*args, **kwargs)
return _inner
-def with_connection_inline(fun, connection=None,
- connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
- conn = connection or establish_connection()
- close_connection = not connection and conn.close or noop
- try:
- return fun(conn)
- finally:
- close_connection()
-def get_connection_info():
- broker_connection = DjangoBrokerConnection()
- carrot_backend = broker_connection.backend_cls
- if carrot_backend and not isinstance(carrot_backend, str):
- carrot_backend = carrot_backend.__name__
- port = broker_connection.port or \
- broker_connection.get_backend_cls().default_port
- port = port and ":%s" % port or ""
- vhost = broker_connection.virtual_host
- if not vhost.startswith("/"):
- vhost = "/" + vhost
- return "%(carrot_backend)s://%(userid)s@%(host)s%(port)s%(vhost)s" % {
- "carrot_backend": carrot_backend,
- "userid": broker_connection.userid,
- "host": broker_connection.hostname,
- "port": port,
- "vhost": vhost}
-def format_routing_table(table=None, indent=0):
- table = table or routing_table()
- format = lambda **route: ROUTE_INFO_FORMAT.strip() % route
- routes = "\n".join(format(name=name, **route)
- for name, route in table.items())
- return textindent(routes, indent=indent)
+def get_consumer_set(connection, queues=None, **options):
+ """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
+ configuration.
+ Defaults to the queues in ``CELERY_QUEUES``.
+ """
+ queues = queues or conf.routing_table
+ return ConsumerSet(connection, from_dict=queues, **options)