|
@@ -12,11 +12,11 @@
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
from datetime import timedelta
|
|
|
+from weakref import WeakValueDictionary
|
|
|
|
|
|
-from kombu import BrokerConnection, Exchange
|
|
|
-from kombu import compat as messaging
|
|
|
-from kombu import pools
|
|
|
-from kombu.common import maybe_declare
|
|
|
+from kombu import BrokerConnection, Consumer, Exchange, Producer, Queue
|
|
|
+from kombu.common import entry_to_queue, maybe_declare
|
|
|
+from kombu.pools import ProducerPool
|
|
|
|
|
|
from celery import signals
|
|
|
from celery.utils import cached_property, lpmerge, uuid
|
|
@@ -26,18 +26,14 @@ from . import routes as _routes
|
|
|
|
|
|
#: Human readable queue declaration.
|
|
|
QUEUE_FORMAT = """
|
|
|
-. %(name)s exchange:%(exchange)s (%(exchange_type)s) \
|
|
|
-binding:%(binding_key)s
|
|
|
+. %(name)s exchange:%(exchange)s(%(exchange_type)s) binding:%(routing_key)s
|
|
|
"""
|
|
|
|
|
|
|
|
|
class Queues(dict):
|
|
|
"""Queue name⇒ declaration mapping.
|
|
|
|
|
|
- Celery will consult this mapping to find the options
|
|
|
- for any queue by name.
|
|
|
-
|
|
|
- :param queues: Initial mapping.
|
|
|
+ :param queues: Initial list/tuple or dict of queues.
|
|
|
|
|
|
"""
|
|
|
#: If set, this is a subset of queues to consume from.
|
|
@@ -45,12 +41,20 @@ class Queues(dict):
|
|
|
_consume_from = None
|
|
|
|
|
|
def __init__(self, queues):
|
|
|
+ self.aliases = WeakValueDictionary()
|
|
|
+ if isinstance(queues, (tuple, list)):
|
|
|
+ queues = dict((q.name, q) for q in queues)
|
|
|
dict.__init__(self)
|
|
|
- for queue_name, options in (queues or {}).items():
|
|
|
- self.add(queue_name, **options)
|
|
|
+ for name, q in (queues or {}).items():
|
|
|
+ self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
|
|
|
|
|
|
- def add(self, queue, exchange=None, routing_key=None,
|
|
|
- exchange_type="direct", **options):
|
|
|
+ def __getitem__(self, name):
|
|
|
+ try:
|
|
|
+ return dict.__getitem__(self, key)
|
|
|
+ except KeyError:
|
|
|
+ return self.aliases[key]
|
|
|
+
|
|
|
+ def add(self, queue, **kwargs):
|
|
|
"""Add new queue.
|
|
|
|
|
|
:param queue: Name of the queue.
|
|
@@ -60,16 +64,24 @@ class Queues(dict):
|
|
|
:keyword \*\*options: Additional declaration options.
|
|
|
|
|
|
"""
|
|
|
- q = self[queue] = self.options(exchange, routing_key,
|
|
|
- exchange_type, **options)
|
|
|
- return q
|
|
|
+ if not isinstance(queue, Queue):
|
|
|
+ return self.add_compat(queue, **kwargs)
|
|
|
+ self[queue.name] = queue
|
|
|
+ if queue.alias:
|
|
|
+ self.aliases[queue.alias] = queue
|
|
|
+ return queue
|
|
|
+
|
|
|
+ def add_compat(self, name, **options):
|
|
|
+ # docs used to use binding_key as routing key
|
|
|
+ options.setdefault("routing_key", options.get("binding_key"))
|
|
|
+ self[name] = queue = entry_to_queue(name, **options)
|
|
|
+ return queue
|
|
|
|
|
|
def options(self, exchange, routing_key,
|
|
|
exchange_type="direct", **options):
|
|
|
"""Creates new option mapping for queue, with required
|
|
|
keys present."""
|
|
|
return dict(options, routing_key=routing_key,
|
|
|
- binding_key=routing_key,
|
|
|
exchange=exchange,
|
|
|
exchange_type=exchange_type)
|
|
|
|
|
@@ -78,18 +90,19 @@ class Queues(dict):
|
|
|
active = self.consume_from
|
|
|
if not active:
|
|
|
return ""
|
|
|
- info = [QUEUE_FORMAT.strip() % dict(
|
|
|
- name=(name + ":").ljust(12), **config)
|
|
|
- for name, config in sorted(active.iteritems())]
|
|
|
+ info = [QUEUE_FORMAT.strip() % {
|
|
|
+ "name": (name + ":").ljust(12),
|
|
|
+ "exchange": q.exchange.name,
|
|
|
+ "exchange_type": q.exchange.type,
|
|
|
+ "routing_key": q.routing_key}
|
|
|
+ for name, q in sorted(active.iteritems())]
|
|
|
if indent_first:
|
|
|
return text.indent("\n".join(info), indent)
|
|
|
return info[0] + "\n" + text.indent("\n".join(info[1:]), indent)
|
|
|
|
|
|
def select_subset(self, wanted, create_missing=True):
|
|
|
- """Select subset of the currently defined queues.
|
|
|
-
|
|
|
- Does not return anything: queues not in `wanted` will
|
|
|
- be discarded in-place.
|
|
|
+ """Sets :attr:`consume_from` by selecting a subset of the
|
|
|
+ currently defined queues.
|
|
|
|
|
|
:param wanted: List of wanted queue names.
|
|
|
:keyword create_missing: By default any unknown queues will be
|
|
@@ -102,15 +115,18 @@ class Queues(dict):
|
|
|
acc = {}
|
|
|
for queue in wanted:
|
|
|
try:
|
|
|
- options = self[queue]
|
|
|
+ Q = self[queue]
|
|
|
except KeyError:
|
|
|
if not create_missing:
|
|
|
raise
|
|
|
- options = self.options(queue, queue)
|
|
|
- acc[queue] = options
|
|
|
+ Q = self.new_missing(queue)
|
|
|
+ acc[queue] = Q
|
|
|
self._consume_from = acc
|
|
|
self.update(acc)
|
|
|
|
|
|
+ def new_missing(self, name):
|
|
|
+ return Queue(name, Exchange(name), name)
|
|
|
+
|
|
|
@property
|
|
|
def consume_from(self):
|
|
|
if self._consume_from is not None:
|
|
@@ -118,56 +134,33 @@ class Queues(dict):
|
|
|
return self
|
|
|
|
|
|
@classmethod
|
|
|
- def with_defaults(cls, queues, default_exchange, default_exchange_type):
|
|
|
+ def with_defaults(cls, queues, default_exchange):
|
|
|
"""Alternate constructor that adds default exchange and
|
|
|
exchange type information to queues that does not have any."""
|
|
|
- if queues is None:
|
|
|
- queues = {}
|
|
|
- for opts in queues.values():
|
|
|
- opts.setdefault("exchange", default_exchange),
|
|
|
- opts.setdefault("exchange_type", default_exchange_type)
|
|
|
- opts.setdefault("binding_key", default_exchange)
|
|
|
- opts.setdefault("routing_key", opts.get("binding_key"))
|
|
|
- return cls(queues)
|
|
|
+ queues = cls(queues if queues is not None else {})
|
|
|
+ for q in queues.itervalues():
|
|
|
+ if not q.exchange or not q.exchange.name:
|
|
|
+ q.exchange = default_exchange
|
|
|
+ if not q.routing_key:
|
|
|
+ q.routing_key = default_exchange.name
|
|
|
+ return queues
|
|
|
|
|
|
|
|
|
-class TaskPublisher(messaging.Publisher):
|
|
|
+class TaskProducer(Producer):
|
|
|
auto_declare = False
|
|
|
retry = False
|
|
|
retry_policy = None
|
|
|
- _queue_cache = {}
|
|
|
- _exchange_cache = {}
|
|
|
|
|
|
- def __init__(self, *args, **kwargs):
|
|
|
- self.app = kwargs.pop("app")
|
|
|
+ def __init__(self, channel=None, exchange=None, *args, **kwargs):
|
|
|
+ self.app = kwargs.get("app") or self.app
|
|
|
self.retry = kwargs.pop("retry", self.retry)
|
|
|
self.retry_policy = kwargs.pop("retry_policy",
|
|
|
self.retry_policy or {})
|
|
|
- self.utc = kwargs.pop("enable_utc", False)
|
|
|
- super(TaskPublisher, self).__init__(*args, **kwargs)
|
|
|
-
|
|
|
- def _get_queue(self, name):
|
|
|
- if name not in self._queue_cache:
|
|
|
- options = self.app.amqp.queues[name]
|
|
|
- self._queue_cache[name] = messaging.entry_to_queue(name, **options)
|
|
|
- return self._queue_cache[name]
|
|
|
-
|
|
|
- def _get_exchange(self, name, type=None):
|
|
|
- if name not in self._exchange_cache:
|
|
|
- self._exchange_cache[name] = Exchange(name,
|
|
|
- type=type or self.exchange_type,
|
|
|
- durable=self.durable,
|
|
|
- auto_delete=self.auto_delete,
|
|
|
- )
|
|
|
- return self._exchange_cache[name]
|
|
|
-
|
|
|
- def _declare_queue(self, name, retry=False, retry_policy={}):
|
|
|
- maybe_declare(self._get_queue(name), self.channel,
|
|
|
- retry=retry, **retry_policy)
|
|
|
-
|
|
|
- def _declare_exchange(self, name, type=None, retry=False, retry_policy={}):
|
|
|
- maybe_declare(self._get_exchange(name, type), self.channel,
|
|
|
- retry=retry, **retry_policy)
|
|
|
+ exchange = exchange or self.exchange
|
|
|
+ if not isinstance(exchange, Exchange):
|
|
|
+ exchange = Exchange(exchange,
|
|
|
+ kwargs.get("exchange_type") or self.exchange_type)
|
|
|
+ super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
|
|
|
|
|
|
def delay_task(self, task_name, task_args=None, task_kwargs=None,
|
|
|
countdown=None, eta=None, task_id=None, taskset_id=None,
|
|
@@ -183,14 +176,6 @@ class TaskPublisher(messaging.Publisher):
|
|
|
_retry_policy = self.retry_policy
|
|
|
if retry_policy: # merge default and custom policy
|
|
|
_retry_policy = dict(_retry_policy, **retry_policy)
|
|
|
-
|
|
|
- # declare entities
|
|
|
- if queue:
|
|
|
- self._declare_queue(queue, retry, _retry_policy)
|
|
|
- else:
|
|
|
- self._declare_exchange(exchange, exchange_type,
|
|
|
- retry, _retry_policy)
|
|
|
-
|
|
|
task_id = task_id or uuid()
|
|
|
task_args = task_args or []
|
|
|
task_kwargs = task_kwargs or {}
|
|
@@ -222,15 +207,14 @@ class TaskPublisher(messaging.Publisher):
|
|
|
if chord:
|
|
|
body["chord"] = chord
|
|
|
|
|
|
- do_retry = retry if retry is not None else self.retry
|
|
|
- send = self.send
|
|
|
- if do_retry:
|
|
|
- send = connection.ensure(self, self.send, **_retry_policy)
|
|
|
- send(body, exchange=exchange, mandatory=mandatory,
|
|
|
+ self.publish(body, exchange=exchange, mandatory=mandatory,
|
|
|
immediate=immediate, routing_key=routing_key,
|
|
|
serializer=serializer or self.serializer,
|
|
|
delivery_mode=delivery_mode,
|
|
|
- compression=compression or self.compression)
|
|
|
+ compression=compression or self.compression,
|
|
|
+ retry=retry, retry_policy=retry_policy,
|
|
|
+ declare=[self.app.amqp.queues[queue]] if queue else [])
|
|
|
+
|
|
|
signals.task_sent.send(sender=task_name, **body)
|
|
|
if event_dispatcher:
|
|
|
event_dispatcher.send("task-sent", uuid=task_id,
|
|
@@ -241,33 +225,21 @@ class TaskPublisher(messaging.Publisher):
|
|
|
eta=eta,
|
|
|
expires=expires)
|
|
|
return task_id
|
|
|
+TaskPublisher = TaskProducer # compat
|
|
|
|
|
|
- def __exit__(self, *exc_info):
|
|
|
- try:
|
|
|
- self.release()
|
|
|
- except AttributeError:
|
|
|
- self.close()
|
|
|
|
|
|
+class TaskConsumer(Consumer):
|
|
|
+ app = None
|
|
|
|
|
|
-class PublisherPool(pools.ProducerPool):
|
|
|
-
|
|
|
- def __init__(self, app):
|
|
|
- self.app = app
|
|
|
- super(PublisherPool, self).__init__(self.app.pool,
|
|
|
- limit=self.app.pool.limit)
|
|
|
-
|
|
|
- def create_producer(self):
|
|
|
- conn = self.connections.acquire(block=True)
|
|
|
- pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
|
|
|
- conn._producer_chan = pub.channel
|
|
|
- return pub
|
|
|
+ def __init__(self, channel, queues=None, app=None, **kw):
|
|
|
+ self.app = app or self.app
|
|
|
+ super(TaskConsumer, self).__init__(channel,
|
|
|
+ queues or self.app.amqp.queues.consume_from.values(), **kw)
|
|
|
|
|
|
|
|
|
class AMQP(object):
|
|
|
BrokerConnection = BrokerConnection
|
|
|
- Publisher = messaging.Publisher
|
|
|
- Consumer = messaging.Consumer
|
|
|
- ConsumerSet = messaging.ConsumerSet
|
|
|
+ Consumer = Consumer
|
|
|
|
|
|
#: Cached and prepared routing table.
|
|
|
_rtable = None
|
|
@@ -283,12 +255,10 @@ class AMQP(object):
|
|
|
from the current configuration."""
|
|
|
conf = self.app.conf
|
|
|
if not queues and conf.CELERY_DEFAULT_QUEUE:
|
|
|
- queues = {conf.CELERY_DEFAULT_QUEUE: {
|
|
|
- "exchange": conf.CELERY_DEFAULT_EXCHANGE,
|
|
|
- "exchange_type": conf.CELERY_DEFAULT_EXCHANGE_TYPE,
|
|
|
- "binding_key": conf.CELERY_DEFAULT_ROUTING_KEY}}
|
|
|
- return Queues.with_defaults(queues, conf.CELERY_DEFAULT_EXCHANGE,
|
|
|
- conf.CELERY_DEFAULT_EXCHANGE_TYPE)
|
|
|
+ queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
|
|
|
+ exchange=self.default_exchange,
|
|
|
+ routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
|
|
|
+ return Queues.with_defaults(queues, self.default_exchange)
|
|
|
|
|
|
def Router(self, queues=None, create_missing=None):
|
|
|
"""Returns the current task router."""
|
|
@@ -296,51 +266,55 @@ class AMQP(object):
|
|
|
self.app.either("CELERY_CREATE_MISSING_QUEUES",
|
|
|
create_missing), app=self.app)
|
|
|
|
|
|
- def TaskConsumer(self, *args, **kwargs):
|
|
|
+ @cached_property
|
|
|
+ def TaskConsumer(self):
|
|
|
"""Returns consumer for a single task queue."""
|
|
|
- default_queue_name, default_queue = self.get_default_queue()
|
|
|
- defaults = dict({"queue": default_queue_name}, **default_queue)
|
|
|
- defaults["routing_key"] = defaults.pop("binding_key", None)
|
|
|
- return self.Consumer(*args, **lpmerge(defaults, kwargs))
|
|
|
+ return self.app.subclass_with_self(TaskConsumer,
|
|
|
+ reverse="amqp.TaskConsumer")
|
|
|
+
|
|
|
+ def queue_or_default(self, q):
|
|
|
+ if q:
|
|
|
+ return self.queues[q] if not isinstance(q, Queue) else q
|
|
|
+ return self.default_queue
|
|
|
|
|
|
- def TaskPublisher(self, *args, **kwargs):
|
|
|
+ @cached_property
|
|
|
+ def TaskProducer(self):
|
|
|
"""Returns publisher used to send tasks.
|
|
|
|
|
|
You should use `app.send_task` instead.
|
|
|
|
|
|
"""
|
|
|
conf = self.app.conf
|
|
|
- _, default_queue = self.get_default_queue()
|
|
|
- defaults = {"exchange": default_queue["exchange"],
|
|
|
- "exchange_type": default_queue["exchange_type"],
|
|
|
- "routing_key": conf.CELERY_DEFAULT_ROUTING_KEY,
|
|
|
- "serializer": conf.CELERY_TASK_SERIALIZER,
|
|
|
- "compression": conf.CELERY_MESSAGE_COMPRESSION,
|
|
|
- "retry": conf.CELERY_TASK_PUBLISH_RETRY,
|
|
|
- "retry_policy": conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
|
|
|
- "enable_utc": conf.CELERY_ENABLE_UTC,
|
|
|
- "app": self.app}
|
|
|
- return TaskPublisher(*args, **lpmerge(defaults, kwargs))
|
|
|
-
|
|
|
- def get_task_consumer(self, connection, queues=None, **kwargs):
|
|
|
+ return self.app.subclass_with_self(TaskProducer,
|
|
|
+ reverse="amqp.TaskProducer",
|
|
|
+ exchange=self.default_exchange,
|
|
|
+ exchange_type=self.default_exchange.type,
|
|
|
+ routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
|
|
|
+ serializer=conf.CELERY_TASK_SERIALIZER,
|
|
|
+ compression=conf.CELERY_MESSAGE_COMPRESSION,
|
|
|
+ retry=conf.CELERY_TASK_PUBLISH_RETRY,
|
|
|
+ retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
|
|
|
+ utc=conf.CELERY_ENABLE_UTC)
|
|
|
+ TaskPublisher = TaskProducer # compat
|
|
|
+
|
|
|
+ def get_task_consumer(self, channel, *args, **kwargs):
|
|
|
"""Return consumer configured to consume from all known task
|
|
|
queues."""
|
|
|
- return self.ConsumerSet(connection,
|
|
|
- from_dict=queues or self.queues.consume_from,
|
|
|
- channel=connection.default_channel,
|
|
|
- **kwargs)
|
|
|
+ return self.TaskConsumer(channel, *args, **kwargs)
|
|
|
|
|
|
- def get_default_queue(self):
|
|
|
- """Returns `(queue_name, queue_options)` tuple for the queue
|
|
|
- configured to be default (:setting:`CELERY_DEFAULT_QUEUE`)."""
|
|
|
- q = self.app.conf.CELERY_DEFAULT_QUEUE
|
|
|
- return q, self.queues[q]
|
|
|
+ @cached_property
|
|
|
+ def default_queue(self):
|
|
|
+ return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
|
|
|
|
|
|
@cached_property
|
|
|
def queues(self):
|
|
|
"""Queue name⇒ declaration mapping."""
|
|
|
return self.Queues(self.app.conf.CELERY_QUEUES)
|
|
|
|
|
|
+ @queues.setter
|
|
|
+ def queues(self, queues): # noqa
|
|
|
+ return self.Queues(queues)
|
|
|
+
|
|
|
@property
|
|
|
def routes(self):
|
|
|
if self._rtable is None:
|
|
@@ -353,4 +327,10 @@ class AMQP(object):
|
|
|
|
|
|
@cached_property
|
|
|
def publisher_pool(self):
|
|
|
- return PublisherPool(self.app)
|
|
|
+ return ProducerPool(self.app.pool, limit=self.app.pool.limit,
|
|
|
+ Producer=self.TaskProducer)
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def default_exchange(self):
|
|
|
+ return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
|
|
|
+ self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
|