Browse Source

Use new kombu BrokerConnection.default_channel

Ask Solem 13 years ago
parent
commit
32b062d840
2 changed files with 5 additions and 10 deletions
  1. 4 7
      celery/backends/amqp.py
  2. 1 3
      celery/task/control.py

+ 4 - 7
celery/backends/amqp.py

@@ -92,12 +92,10 @@ class AMQPBackend(BaseDictBackend):
 
     def _publish_result(self, connection, task_id, meta):
         # cache single channel
-        if hasattr(connection, "_result_producer_chan") and \
-                connection._result_producer_chan is not None and \
-                connection._result_producer_chan.connection is not None:
-            channel = connection._result_producer_chan
-        else:
-            channel = connection._result_producer_chan = connection.channel()
+        if connection._default_channel is not None and \
+                connection._default_channel.connection is None:
+            connection.maybe_close_channel(connection._default_channel)
+        channel = connection.default_channel
 
         self._create_producer(task_id, channel).publish(meta)
 
@@ -112,7 +110,6 @@ class AMQPBackend(BaseDictBackend):
             with self.app.pool.acquire(block=True) as conn:
 
                 def errback(error, delay):
-                    conn._result_producer_chan = None
                     print("Couldn't send result for %r: %r. Retry in %rs." % (
                             task_id, error, delay))
 

+ 1 - 3
celery/task/control.py

@@ -209,9 +209,7 @@ class Control(object):
         """
         with self.app.default_connection(connection, connect_timeout) as conn:
             if channel is None:
-                if not getattr(conn, "_producer_chan", None):
-                    conn._producer_chan = conn.channel()
-                channel = conn._producer_chan
+                channel = conn.default_channel
             return self.mailbox(conn)._broadcast(command, arguments,
                                                  destination, reply, timeout,
                                                  limit, callback,