|
@@ -43,10 +43,14 @@ def repair_uuid(s):
|
|
|
return '%s-%s-%s-%s-%s' % (s[:8], s[8:12], s[12:16], s[16:20], s[20:])
|
|
|
|
|
|
|
|
|
+class NoCacheQueue(Queue):
|
|
|
+ can_cache_declaration = False
|
|
|
+
|
|
|
+
|
|
|
class AMQPBackend(BaseBackend):
|
|
|
"""Publishes results by sending messages."""
|
|
|
Exchange = Exchange
|
|
|
- Queue = Queue
|
|
|
+ Queue = NoCacheQueue
|
|
|
Consumer = Consumer
|
|
|
Producer = Producer
|
|
|
|
|
@@ -108,16 +112,16 @@ class AMQPBackend(BaseBackend):
|
|
|
|
|
|
def _store_result(self, task_id, result, status, traceback=None):
|
|
|
"""Send task return value and status."""
|
|
|
- with self.app.amqp.producer_pool.acquire(block=True) as pub:
|
|
|
- pub.publish({'task_id': task_id, 'status': status,
|
|
|
- 'result': self.encode_result(result, status),
|
|
|
- 'traceback': traceback,
|
|
|
- 'children': self.current_task_children()},
|
|
|
- exchange=self.exchange,
|
|
|
- routing_key=self._routing_key(task_id),
|
|
|
- serializer=self.serializer,
|
|
|
- retry=True, retry_policy=self.retry_policy,
|
|
|
- declare=self.on_reply_declare(task_id))
|
|
|
+ with self.app.amqp.producer_pool.acquire(block=True) as producer:
|
|
|
+ producer.publish({'task_id': task_id, 'status': status,
|
|
|
+ 'result': self.encode_result(result, status),
|
|
|
+ 'traceback': traceback,
|
|
|
+ 'children': self.current_task_children()},
|
|
|
+ exchange=self.exchange,
|
|
|
+ routing_key=self._routing_key(task_id),
|
|
|
+ serializer=self.serializer,
|
|
|
+ retry=True, retry_policy=self.retry_policy,
|
|
|
+ declare=self.on_reply_declare(task_id))
|
|
|
return result
|
|
|
|
|
|
def on_reply_declare(self, task_id):
|