|
@@ -172,6 +172,7 @@ class TaskProducer(Producer):
|
|
|
self.retry_policy or {})
|
|
|
exchange = exchange or self.exchange
|
|
|
self.queues = self.app.amqp.queues # shortcut
|
|
|
+ self.default_queue = self.app.amqp.default_queue
|
|
|
super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
|
|
|
|
|
|
def publish_task(self, task_name, task_args=None, task_kwargs=None,
|
|
@@ -184,8 +185,9 @@ class TaskProducer(Producer):
|
|
|
delivery_mode=None, compression=None, declare=None, **kwargs):
|
|
|
"""Send task message."""
|
|
|
|
|
|
- declare = declare or []
|
|
|
qname = queue
|
|
|
+ if queue is None and exchange is None:
|
|
|
+ queue = self.default_queue
|
|
|
if queue is not None:
|
|
|
if isinstance(queue, basestring):
|
|
|
qname, queue = queue, self.queues[queue]
|
|
@@ -193,6 +195,7 @@ class TaskProducer(Producer):
|
|
|
qname = queue.name
|
|
|
exchange = exchange or queue.exchange.name
|
|
|
routing_key = routing_key or queue.routing_key
|
|
|
+ declare = declare or ([queue] if queue else [])
|
|
|
|
|
|
# merge default and custom policy
|
|
|
retry = self.retry if retry is None else retry
|