Browse Source

AMQP Backend: Now supports .ready(), .successful(), .result, .status, and even respond to changes in task state.

Ask Solem 15 years ago
parent
commit
3040aaabed
1 changed files with 32 additions and 8 deletions
  1. 32 8
      celery/backends/amqp.py

+ 32 - 8
celery/backends/amqp.py

@@ -91,18 +91,39 @@ class AMQPBackend(BaseDictBackend):
 
         return result
 
-    def wait_for(self, task_id, timeout=None):
-        try:
-            meta = self._get_task_meta_for(task_id, timeout)
-        except socket.timeout:
-            raise TimeoutError("The operation timed out.")
+    def wait_for(self, task_id, timeout=None, cache=True):
+        if task_id in self._cache:
+            meta = self._cache[task_id]
+        else:
+            try:
+                meta = self.consume(task_id, timeout=timeout)
+            except socket.timeout:
+                raise TimeoutError("The operation timed out.")
 
         if meta["status"] == states.SUCCESS:
-            return self.get_result(task_id)
+            return meta["result"]
         elif meta["status"] in states.PROPAGATE_STATES:
-            raise self.get_result(task_id)
+            raise self.exception_to_python(meta["result"])
 
-    def _get_task_meta_for(self, task_id, timeout=None):
+    def poll(self, task_id):
+        routing_key = task_id.replace("-", "")
+        consumer = self._create_consumer(task_id, self.connection)
+        result = consumer.fetch()
+        payload = None
+        if result:
+            payload = self._cache[task_id] = result.payload
+            consumer.backend.queue_delete(routing_key)
+        else:
+            # Use previously received status if any.
+            if task_id in self._cache:
+                payload = self._cache[task_id]
+            else:
+                payload = {"status": states.PENDING, "result": None}
+
+        consumer.close()
+        return payload
+
+    def consume(self, task_id, timeout=None):
         results = []
 
         def callback(message_data, message):
@@ -124,6 +145,9 @@ class AMQPBackend(BaseDictBackend):
         self._cache[task_id] = results[0]
         return results[0]
 
+    def get_task_meta(self, task_id, cache=True):
+        return self.poll(task_id)
+
     def reload_task_result(self, task_id):
         raise NotImplementedError(
                 "reload_task_result is not supported by this backend.")