|
@@ -145,6 +145,7 @@ class TaskProducer(Producer):
|
|
|
if not isinstance(exchange, Exchange):
|
|
|
exchange = Exchange(exchange,
|
|
|
kwargs.get("exchange_type") or self.exchange_type)
|
|
|
+ self.queues = self.app.amqp.queues # shortcut
|
|
|
super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
|
|
|
|
|
|
def delay_task(self, task_name, task_args=None, task_kwargs=None,
|
|
@@ -190,14 +191,12 @@ class TaskProducer(Producer):
|
|
|
if chord:
|
|
|
body["chord"] = chord
|
|
|
|
|
|
- print("KWARGS: %r" % (kwargs, ))
|
|
|
-
|
|
|
self.publish(body, exchange=exchange, mandatory=mandatory,
|
|
|
immediate=immediate, routing_key=routing_key,
|
|
|
serializer=serializer or self.serializer,
|
|
|
compression=compression or self.compression,
|
|
|
retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
|
|
|
- declare=[self.app.amqp.queues[queue]] if queue else [],
|
|
|
+ declare=[self.queues[queue]] if queue else [],
|
|
|
**kwargs)
|
|
|
|
|
|
signals.task_sent.send(sender=task_name, **body)
|