|
@@ -83,9 +83,9 @@ class AMQPBackend(BaseDictBackend):
|
|
|
auto_delete=self.auto_delete,
|
|
|
queue_arguments=self.queue_arguments)
|
|
|
|
|
|
- def _create_producer(self, task_id, channel):
|
|
|
- self._create_binding(task_id)(channel).declare()
|
|
|
- return self.Producer(channel, exchange=self.exchange,
|
|
|
+ def _create_producer(self, task_id, connection):
|
|
|
+ self._create_binding(task_id)(connection.default_channel).declare()
|
|
|
+ return self.Producer(connection, exchange=self.exchange,
|
|
|
routing_key=task_id.replace("-", ""),
|
|
|
serializer=self.serializer)
|
|
|
|
|
@@ -94,12 +94,7 @@ class AMQPBackend(BaseDictBackend):
|
|
|
|
|
|
def _publish_result(self, connection, task_id, meta):
|
|
|
# cache single channel
|
|
|
- if connection._default_channel is not None and \
|
|
|
- connection._default_channel.connection is None:
|
|
|
- connection.maybe_close_channel(connection._default_channel)
|
|
|
- channel = connection.default_channel
|
|
|
-
|
|
|
- self._create_producer(task_id, channel).publish(meta)
|
|
|
+ self._create_producer(task_id, connection).publish(meta)
|
|
|
|
|
|
def revive(self, channel):
|
|
|
pass
|