|
@@ -1,3 +1,4 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
"""
|
|
|
celery.app.amqp
|
|
|
===============
|
|
@@ -10,21 +11,27 @@ AMQ related functionality.
|
|
|
"""
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
+from kombu import BrokerConnection
|
|
|
+from kombu import compat as messaging
|
|
|
+
|
|
|
from celery import routes
|
|
|
from celery import signals
|
|
|
-from celery.utils import gen_unique_id, textindent
|
|
|
+from celery.utils import gen_unique_id, textindent, cached_property
|
|
|
from celery.utils.compat import UserDict
|
|
|
|
|
|
-from kombu import compat as messaging
|
|
|
-from kombu import BrokerConnection
|
|
|
-
|
|
|
+#: List of known options to a Kombu producers send method.
|
|
|
+#: Used to extract the message related options out of any `dict`.
|
|
|
MSG_OPTIONS = ("mandatory", "priority", "immediate",
|
|
|
"routing_key", "serializer", "delivery_mode",
|
|
|
"compression")
|
|
|
+
|
|
|
+#: Human readable queue declaration.
|
|
|
QUEUE_FORMAT = """
|
|
|
. %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
|
|
|
binding:%(binding_key)s
|
|
|
"""
|
|
|
+
|
|
|
+#: Broker connection info -> URI
|
|
|
BROKER_FORMAT = """\
|
|
|
%(transport)s://%(userid)s@%(hostname)s%(port)s%(virtual_host)s\
|
|
|
"""
|
|
@@ -43,6 +50,14 @@ def extract_msg_options(options, keep=MSG_OPTIONS):
|
|
|
|
|
|
|
|
|
class Queues(UserDict):
|
|
|
+ """Queue name⇒ declaration mapping.
|
|
|
+
|
|
|
+ Celery will consult this mapping to find the options
|
|
|
+ for any queue by name.
|
|
|
+
|
|
|
+ :param queues: Initial mapping.
|
|
|
+
|
|
|
+ """
|
|
|
|
|
|
def __init__(self, queues):
|
|
|
self.data = {}
|
|
@@ -51,12 +66,23 @@ class Queues(UserDict):
|
|
|
|
|
|
def add(self, queue, exchange=None, routing_key=None,
|
|
|
exchange_type="direct", **options):
|
|
|
+ """Add new queue.
|
|
|
+
|
|
|
+ :param queue: Name of the queue.
|
|
|
+ :keyword exchange: Name of the exchange.
|
|
|
+ :keyword routing_key: Binding key.
|
|
|
+ :keyword exchange_type: Type of exchange.
|
|
|
+ :keyword \*\*options: Additional declaration options.
|
|
|
+
|
|
|
+ """
|
|
|
q = self[queue] = self.options(exchange, routing_key,
|
|
|
exchange_type, **options)
|
|
|
return q
|
|
|
|
|
|
def options(self, exchange, routing_key,
|
|
|
exchange_type="direct", **options):
|
|
|
+ """Creates new option mapping for queue, with required
|
|
|
+ keys present."""
|
|
|
return dict(options, routing_key=routing_key,
|
|
|
binding_key=routing_key,
|
|
|
exchange=exchange,
|
|
@@ -64,12 +90,23 @@ class Queues(UserDict):
|
|
|
|
|
|
def format(self, indent=0):
|
|
|
"""Format routing table into string for log dumps."""
|
|
|
- format = lambda **queue: QUEUE_FORMAT.strip() % queue
|
|
|
- info = "\n".join(format(name=name, **config)
|
|
|
+ info = "\n".join(QUEUE_FORMAT.strip() % dict(name=name, **config)
|
|
|
for name, config in self.items())
|
|
|
return textindent(info, indent=indent)
|
|
|
|
|
|
def select_subset(self, wanted, create_missing=True):
|
|
|
+ """Select subset of the currently defined queues.
|
|
|
+
|
|
|
+ Does not return anything: queues not in `wanted` will
|
|
|
+ be discarded in-place.
|
|
|
+
|
|
|
+ :param wanted: List of wanted queue names.
|
|
|
+ :keyword create_missing: By default any unknown queues will be
|
|
|
+ added automatically, but if disabled
|
|
|
+ the occurrence of unknown queues
|
|
|
+ in `wanted` will raise :exc:`KeyError`.
|
|
|
+
|
|
|
+ """
|
|
|
acc = {}
|
|
|
for queue in wanted:
|
|
|
try:
|
|
@@ -84,7 +121,8 @@ class Queues(UserDict):
|
|
|
|
|
|
@classmethod
|
|
|
def with_defaults(cls, queues, default_exchange, default_exchange_type):
|
|
|
-
|
|
|
+ """Alternate constructor that adds default exchange and
|
|
|
+ exchange type information to queues that does not have any."""
|
|
|
for opts in queues.values():
|
|
|
opts.setdefault("exchange", default_exchange),
|
|
|
opts.setdefault("exchange_type", default_exchange_type)
|
|
@@ -105,7 +143,7 @@ class TaskPublisher(messaging.Publisher):
|
|
|
countdown=None, eta=None, task_id=None, taskset_id=None,
|
|
|
expires=None, exchange=None, exchange_type=None,
|
|
|
event_dispatcher=None, **kwargs):
|
|
|
- """Delay task for execution by the celery nodes."""
|
|
|
+ """Send task message."""
|
|
|
|
|
|
task_id = task_id or gen_unique_id()
|
|
|
task_args = task_args or []
|
|
@@ -167,20 +205,20 @@ class AMQP(object):
|
|
|
BrokerConnection = BrokerConnection
|
|
|
Publisher = messaging.Publisher
|
|
|
Consumer = messaging.Consumer
|
|
|
- _queues = None
|
|
|
+ ConsumerSet = messaging.ConsumerSet
|
|
|
|
|
|
def __init__(self, app):
|
|
|
self.app = app
|
|
|
|
|
|
- def ConsumerSet(self, *args, **kwargs):
|
|
|
- return messaging.ConsumerSet(*args, **kwargs)
|
|
|
-
|
|
|
def Queues(self, queues):
|
|
|
+ """Create new :class:`Queues` instance, using queue defaults
|
|
|
+ from the current configuration."""
|
|
|
return Queues.with_defaults(queues,
|
|
|
self.app.conf.CELERY_DEFAULT_EXCHANGE,
|
|
|
self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
|
|
|
|
|
|
def Router(self, queues=None, create_missing=None):
|
|
|
+ """Returns the current task router."""
|
|
|
return routes.Router(self.app.conf.CELERY_ROUTES,
|
|
|
queues or self.app.conf.CELERY_QUEUES,
|
|
|
self.app.either("CELERY_CREATE_MISSING_QUEUES",
|
|
@@ -188,6 +226,7 @@ class AMQP(object):
|
|
|
app=self.app)
|
|
|
|
|
|
def TaskConsumer(self, *args, **kwargs):
|
|
|
+ """Returns consumer for a single task queue."""
|
|
|
default_queue_name, default_queue = self.get_default_queue()
|
|
|
defaults = dict({"queue": default_queue_name}, **default_queue)
|
|
|
defaults["routing_key"] = defaults.pop("binding_key", None)
|
|
@@ -195,6 +234,11 @@ class AMQP(object):
|
|
|
**self.app.merge(defaults, kwargs))
|
|
|
|
|
|
def TaskPublisher(self, *args, **kwargs):
|
|
|
+ """Returns publisher used to send tasks.
|
|
|
+
|
|
|
+ You should use `app.send_task` instead.
|
|
|
+
|
|
|
+ """
|
|
|
_, default_queue = self.get_default_queue()
|
|
|
defaults = {"exchange": default_queue["exchange"],
|
|
|
"exchange_type": default_queue["exchange_type"],
|
|
@@ -213,14 +257,20 @@ class AMQP(object):
|
|
|
return publisher
|
|
|
|
|
|
def get_task_consumer(self, connection, queues=None, **kwargs):
|
|
|
+ """Return consumer configured to consume from all known task
|
|
|
+ queues."""
|
|
|
return self.ConsumerSet(connection, from_dict=queues or self.queues,
|
|
|
**kwargs)
|
|
|
|
|
|
def get_default_queue(self):
|
|
|
+ """Returns `(queue_name, queue_options)` tuple for the queue
|
|
|
+ configured to be default (:setting:`CELERY_DEFAULT_QUEUE`)."""
|
|
|
q = self.app.conf.CELERY_DEFAULT_QUEUE
|
|
|
return q, self.queues[q]
|
|
|
|
|
|
def get_broker_info(self, broker_connection=None):
|
|
|
+ """Returns information about the current broker connection
|
|
|
+ as a `dict`."""
|
|
|
if broker_connection is None:
|
|
|
broker_connection = self.app.broker_connection()
|
|
|
info = broker_connection.info()
|
|
@@ -236,13 +286,10 @@ class AMQP(object):
|
|
|
"""Get message broker connection info string for log dumps."""
|
|
|
return BROKER_FORMAT % self.get_broker_info()
|
|
|
|
|
|
- def _get_queues(self):
|
|
|
- if self._queues is None:
|
|
|
- c = self.app.conf
|
|
|
- self._queues = self.Queues(c.CELERY_QUEUES)
|
|
|
- return self._queues
|
|
|
-
|
|
|
- def _set_queues(self, queues):
|
|
|
- self._queues = self.Queues(queues)
|
|
|
+ @cached_property
|
|
|
+ def queues(self):
|
|
|
+ return self.Queues(self.app.conf.CELERY_QUEUES)
|
|
|
|
|
|
- queues = property(_get_queues, _set_queues)
|
|
|
+ @queues.setter
|
|
|
+ def queues(self, value):
|
|
|
+ return self.Queues(value)
|