|
@@ -12,14 +12,13 @@ AMQ related functionality.
|
|
from datetime import datetime, timedelta
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
from kombu import BrokerConnection, Exchange
|
|
from kombu import BrokerConnection, Exchange
|
|
-from kombu.connection import Resource
|
|
|
|
from kombu import compat as messaging
|
|
from kombu import compat as messaging
|
|
|
|
+from kombu.pools import ProducerPool
|
|
from kombu.utils import cached_property
|
|
from kombu.utils import cached_property
|
|
|
|
|
|
from celery import routes as _routes
|
|
from celery import routes as _routes
|
|
from celery import signals
|
|
from celery import signals
|
|
from celery.utils import gen_unique_id, textindent
|
|
from celery.utils import gen_unique_id, textindent
|
|
-from celery.utils import promise, maybe_promise
|
|
|
|
|
|
|
|
#: List of known options to a Kombu producers send method.
|
|
#: List of known options to a Kombu producers send method.
|
|
#: Used to extract the message related options out of any `dict`.
|
|
#: Used to extract the message related options out of any `dict`.
|
|
@@ -90,6 +89,8 @@ class Queues(dict):
|
|
def format(self, indent=0, indent_first=True):
|
|
def format(self, indent=0, indent_first=True):
|
|
"""Format routing table into string for log dumps."""
|
|
"""Format routing table into string for log dumps."""
|
|
active = self.consume_from
|
|
active = self.consume_from
|
|
|
|
+ if not active:
|
|
|
|
+ return ""
|
|
info = [QUEUE_FORMAT.strip() % dict(
|
|
info = [QUEUE_FORMAT.strip() % dict(
|
|
name=(name + ":").ljust(12), **config)
|
|
name=(name + ":").ljust(12), **config)
|
|
for name, config in sorted(active.iteritems())]
|
|
for name, config in sorted(active.iteritems())]
|
|
@@ -132,6 +133,8 @@ class Queues(dict):
|
|
def with_defaults(cls, queues, default_exchange, default_exchange_type):
|
|
def with_defaults(cls, queues, default_exchange, default_exchange_type):
|
|
"""Alternate constructor that adds default exchange and
|
|
"""Alternate constructor that adds default exchange and
|
|
exchange type information to queues that does not have any."""
|
|
exchange type information to queues that does not have any."""
|
|
|
|
+ if queues is None:
|
|
|
|
+ queues = {}
|
|
for opts in queues.values():
|
|
for opts in queues.values():
|
|
opts.setdefault("exchange", default_exchange),
|
|
opts.setdefault("exchange", default_exchange),
|
|
opts.setdefault("exchange_type", default_exchange_type)
|
|
opts.setdefault("exchange_type", default_exchange_type)
|
|
@@ -248,40 +251,19 @@ class TaskPublisher(messaging.Publisher):
|
|
self.close()
|
|
self.close()
|
|
|
|
|
|
|
|
|
|
-class PublisherPool(Resource):
|
|
|
|
|
|
+class PublisherPool(ProducerPool):
|
|
|
|
|
|
- def __init__(self, app=None):
|
|
|
|
|
|
+ def __init__(self, app):
|
|
self.app = app
|
|
self.app = app
|
|
- super(PublisherPool, self).__init__(limit=self.app.pool.limit)
|
|
|
|
|
|
+ super(PublisherPool, self).__init__(self.app.pool,
|
|
|
|
+ limit=self.app.pool.limit)
|
|
|
|
|
|
- def create_publisher(self):
|
|
|
|
- conn = self.app.pool.acquire(block=True)
|
|
|
|
|
|
+ def create_producer(self):
|
|
|
|
+ conn = self.connections.acquire(block=True)
|
|
pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
|
|
pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
|
|
- conn._publisher_chan = pub.channel
|
|
|
|
- return pub
|
|
|
|
-
|
|
|
|
- def new(self):
|
|
|
|
- return promise(self.create_publisher)
|
|
|
|
-
|
|
|
|
- def setup(self):
|
|
|
|
- if self.limit:
|
|
|
|
- for _ in xrange(self.limit):
|
|
|
|
- self._resource.put_nowait(self.new())
|
|
|
|
-
|
|
|
|
- def prepare(self, publisher):
|
|
|
|
- pub = maybe_promise(publisher)
|
|
|
|
- if not pub.connection:
|
|
|
|
- pub.connection = self.app.pool.acquire(block=True)
|
|
|
|
- if not getattr(pub.connection, "_publisher_chan", None):
|
|
|
|
- pub.connection._publisher_chan = pub.connection.channel()
|
|
|
|
- pub.revive(pub.connection._publisher_chan)
|
|
|
|
|
|
+ conn._producer_chan = pub.channel
|
|
return pub
|
|
return pub
|
|
|
|
|
|
- def release(self, resource):
|
|
|
|
- resource.connection.release()
|
|
|
|
- resource.connection = None
|
|
|
|
- super(PublisherPool, self).release(resource)
|
|
|
|
-
|
|
|
|
|
|
|
|
class AMQP(object):
|
|
class AMQP(object):
|
|
BrokerConnection = BrokerConnection
|
|
BrokerConnection = BrokerConnection
|
|
@@ -302,7 +284,7 @@ class AMQP(object):
|
|
"""Create new :class:`Queues` instance, using queue defaults
|
|
"""Create new :class:`Queues` instance, using queue defaults
|
|
from the current configuration."""
|
|
from the current configuration."""
|
|
conf = self.app.conf
|
|
conf = self.app.conf
|
|
- if not queues:
|
|
|
|
|
|
+ if not queues and conf.CELERY_DEFAULT_QUEUE:
|
|
queues = {conf.CELERY_DEFAULT_QUEUE: {
|
|
queues = {conf.CELERY_DEFAULT_QUEUE: {
|
|
"exchange": conf.CELERY_DEFAULT_EXCHANGE,
|
|
"exchange": conf.CELERY_DEFAULT_EXCHANGE,
|
|
"exchange_type": conf.CELERY_DEFAULT_EXCHANGE_TYPE,
|
|
"exchange_type": conf.CELERY_DEFAULT_EXCHANGE_TYPE,
|
|
@@ -367,4 +349,4 @@ class AMQP(object):
|
|
|
|
|
|
@cached_property
|
|
@cached_property
|
|
def publisher_pool(self):
|
|
def publisher_pool(self):
|
|
- return PublisherPool(app=self.app)
|
|
|
|
|
|
+ return PublisherPool(self.app)
|