Browse Source

AMQP result-store backend is now working (and it's fast!)

Ask Solem 15 years ago
parent
commit
89626c59ee
1 changed files with 42 additions and 21 deletions
  1. 42 21
      celery/backends/amqp.py

+ 42 - 21
celery/backends/amqp.py

@@ -3,15 +3,16 @@ from carrot.connection import DjangoBrokerConnection
 from carrot.messaging import Consumer, Publisher
 from celery.backends.base import BaseBackend
 
-RESULTSTORE_EXCHANGE = "celres"
+RESULTSTORE_EXCHANGE = "celeryresults"
 
 
 class Backend(BaseBackend):
     """AMQP backend. Publish results by sending messages to the broker
     using the task id as routing key.
 
-    Note that results published using this backend is read-once only.
-    After the result has been read, the result is deleted.
+    **NOTE:** Results published using this backend is read-once only.
+    After the result has been read, the result is deleted. (however, it's
+    still cached locally by the backend instance).
 
     """
 
@@ -19,40 +20,59 @@ class Backend(BaseBackend):
 
     def __init__(self, *args, **kwargs):
         super(Backend, self).__init__(*args, **kwargs)
+        self.connection = DjangoBrokerConnection()
         self._cache = {}
 
+    def _declare_queue(self, task_id, connection):
+        routing_key = task_id.replace("-", "")
+        backend = connection.create_backend()
+        backend.queue_declare(queue=routing_key, durable=True,
+                                exclusive=False, auto_delete=True)
+        backend.exchange_declare(exchange=RESULTSTORE_EXCHANGE,
+                                 type="direct",
+                                 durable=True,
+                                 auto_delete=False)
+        backend.queue_bind(queue=routing_key, exchange=RESULTSTORE_EXCHANGE,
+                           routing_key=routing_key)
+        backend.close()
+
     def _publisher_for_task_id(self, task_id, connection):
         routing_key = task_id.replace("-", "")
-        return Publisher(connection, exchange=RESULTSTORE_EXCHANGE,
-                         exchange_type="direct",
-                         routing_key="%s" % routing_key)
+        self._declare_queue(task_id, connection)
+        p = Publisher(connection, exchange=RESULTSTORE_EXCHANGE,
+                      exchange_type="direct",
+                      routing_key=routing_key)
+        return p
 
     def _consumer_for_task_id(self, task_id, connection):
         routing_key = task_id.replace("-", "")
-        return Consumer(connection, queue=task_id,
+        self._declare_queue(task_id, connection)
+        return Consumer(connection, queue=routing_key,
                         exchange=RESULTSTORE_EXCHANGE,
                         exchange_type="direct",
-                        routing_key="%s" % routing_key)
+                        no_ack=False, auto_ack=False,
+                        auto_delete=True,
+                        routing_key=routing_key)
 
     def store_result(self, task_id, result, status):
-        """Mark task as done (executed)."""
+        """Send task return value and status."""
         if status == "DONE":
             result = self.prepare_result(result)
         elif status == "FAILURE":
             result = self.prepare_exception(result)
 
+
         meta = {"task_id": task_id,
                 "result": result,
                 "status": status}
 
-        connection = DjangoBrokerConnection()
+        connection = self.connection
         publisher = self._publisher_for_task_id(task_id, connection)
-        consumer = self._consumer_for_task_id(task_id, connection)
-        c.fetch()
-        publisher.send(meta, serializer="pickle", immediate=False)
+        publisher.send(meta, serializer="pickle")
         publisher.close()
-        connection.close()
-                        
+
+        print("SENT %s RESULT: %s TO %s" % (
+            status, result, task_id.replace("-", "")))
         return result
 
     def is_done(self, task_id):
@@ -64,6 +84,10 @@ class Backend(BaseBackend):
         return self._get_task_meta_for(task_id)["status"]
 
     def _get_task_meta_for(self, task_id):
+
+        if task_id in self._cache:
+            return self._cache[task_id]
+
         results = []
 
         def callback(message_data, message):
@@ -72,16 +96,17 @@ class Backend(BaseBackend):
 
         routing_key = task_id.replace("-", "")
 
-        connection = DjangoBrokerConnection()
+        connection = self.connection
         consumer = self._consumer_for_task_id(task_id, connection)
         consumer.register_callback(callback)
         
         try:
             consumer.iterconsume().next()
         finally:
+            consumer.backend.channel.queue_delete(routing_key)
             consumer.close()
-            connection.close()
 
+        self._cache[task_id] = results[0]
         return results[0]
 
     def get_result(self, task_id):
@@ -91,7 +116,3 @@ class Backend(BaseBackend):
             return self.exception_to_python(result["result"])
         else:
             return result["result"]
-
-    def cleanup(self):
-        """Delete expired metadata."""
-        pass