|
@@ -12,7 +12,7 @@ from datetime import timedelta
|
|
|
from weakref import WeakValueDictionary
|
|
|
|
|
|
from kombu import Connection, Consumer, Exchange, Producer, Queue
|
|
|
-from kombu.common import entry_to_queue
|
|
|
+from kombu.common import Broadcast
|
|
|
from kombu.pools import ProducerPool
|
|
|
from kombu.utils import cached_property, uuid
|
|
|
from kombu.utils.encoding import safe_repr
|
|
@@ -112,7 +112,7 @@ class Queues(dict):
|
|
|
options['routing_key'] = name
|
|
|
if self.ha_policy is not None:
|
|
|
self._set_ha_policy(options.setdefault('queue_arguments', {}))
|
|
|
- q = self[name] = entry_to_queue(name, **options)
|
|
|
+ q = self[name] = Queue.from_dict(name, **options)
|
|
|
return q
|
|
|
|
|
|
def _set_ha_policy(self, args):
|
|
@@ -209,7 +209,8 @@ class TaskProducer(Producer):
|
|
|
qname = queue.name
|
|
|
exchange = exchange or queue.exchange.name
|
|
|
routing_key = routing_key or queue.routing_key
|
|
|
- declare = declare or ([queue] if queue else [])
|
|
|
+ if declare is None and queue and not isinstance(queue, Broadcast):
|
|
|
+ declare = [queue]
|
|
|
|
|
|
# merge default and custom policy
|
|
|
retry = self.retry if retry is None else retry
|