فهرست منبع

amqp backend now supports pika

Ask Solem 15 سال پیش
والد
کامیت
0ac58c5e83
1فایلهای تغییر یافته به همراه28 افزوده شده و 6 حذف شده
  1. 28 6
      celery/backends/amqp.py

+ 28 - 6
celery/backends/amqp.py

@@ -1,13 +1,17 @@
 """celery.backends.amqp"""
 import socket
+import time
+
+from datetime import timedelta
 
 from carrot.messaging import Consumer, Publisher
 
 from celery import conf
 from celery import states
-from celery.exceptions import TimeoutError
 from celery.backends.base import BaseDictBackend
+from celery.exceptions import TimeoutError
 from celery.messaging import establish_connection
+from celery.utils import timeutils
 
 
 class ResultPublisher(Publisher):
@@ -31,8 +35,10 @@ class ResultConsumer(Consumer):
     no_ack = True
     auto_delete = True
 
-    def __init__(self, connection, task_id, **kwargs):
+    def __init__(self, connection, task_id, expires=None, **kwargs):
         routing_key = task_id.replace("-", "")
+        if expires is not None:
+            self.queue_arguments = {"x-expires": expires}
         super(ResultConsumer, self).__init__(connection,
                 queue=routing_key, routing_key=routing_key, **kwargs)
 
@@ -50,13 +56,20 @@ class AMQPBackend(BaseDictBackend):
     _connection = None
 
     def __init__(self, connection=None, exchange=None, exchange_type=None,
-            persistent=None, serializer=None, auto_delete=None, **kwargs):
+            persistent=None, serializer=None, auto_delete=None, expires=None, **kwargs):
         self._connection = connection
         self.exchange = exchange
         self.exchange_type = exchange_type
         self.persistent = persistent
         self.serializer = serializer
         self.auto_delete = auto_delete
+        self.expires = expires
+        if self.expires is None:
+            self.expires = conf.TASK_RESULT_EXPIRES
+        if isinstance(self.expires, timedelta):
+            self.expires = timeutils.timedelta_seconds(self.expires)
+        if self.expires is not None:
+            self.expires = int(self.expires)
         super(AMQPBackend, self).__init__(**kwargs)
 
     def _create_publisher(self, task_id, connection):
@@ -77,7 +90,8 @@ class AMQPBackend(BaseDictBackend):
                               exchange=self.exchange,
                               exchange_type=self.exchange_type,
                               durable=self.persistent,
-                              auto_delete=self.auto_delete)
+                              auto_delete=self.auto_delete,
+                              expires=self.expires)
 
     def store_result(self, task_id, result, status, traceback=None):
         """Send task return value and status."""
@@ -136,13 +150,21 @@ class AMQPBackend(BaseDictBackend):
         def callback(message_data, message):
             results.append(message_data)
 
-        wait = self.connection.connection.wait_multi
+        wait = self.connection.drain_events
         consumer = self._create_consumer(task_id, self.connection)
         consumer.register_callback(callback)
 
         consumer.consume()
         try:
-            wait([consumer.backend.channel], timeout=timeout)
+            time_start = time.time()
+            while True:
+                # Total time spent may exceed a single call to wait()
+                if timeout and time.time() - time_start >= timeout:
+                    raise socket.timeout()
+                wait(timeout=timeout)
+                if results:
+                    # Got event on the wanted channel.
+                    break
         finally:
             consumer.close()