Browse Source

AMQP Backend: Now republishes result after polling it

Ask Solem 12 years ago
parent
commit
4d03b84c4f
2 changed files with 16 additions and 3 deletions
  1. 1 1
      celery/app/builtins.py
  2. 15 2
      celery/backends/amqp.py

+ 1 - 1
celery/app/builtins.py

@@ -81,7 +81,7 @@ def add_unlock_chord_task(app):
         if result.ready():
             subtask(callback).delay(j(propagate=propagate))
         else:
-            unlock_chord.retry(countdown=interval, max_retries=max_retries)
+            raise unlock_chord.retry(countdown=interval, max_retries=max_retries)
     return unlock_chord
 
 

+ 15 - 2
celery/backends/amqp.py

@@ -15,8 +15,7 @@ import socket
 import threading
 import time
 
-from kombu.entity import Exchange, Queue
-from kombu.messaging import Consumer, Producer
+from kombu import Exchange, Queue, Producer, Consumer
 
 from celery import states
 from celery.exceptions import TimeoutError
@@ -103,6 +102,18 @@ class AMQPBackend(BaseDictBackend):
     def revive(self, channel):
         pass
 
+    def _republish(self, channel, task_id, body, content_type,
+            content_encoding):
+        return Producer(channel).publish(body,
+            exchange=self.exchange,
+            routing_key=task_id.replace('-', ''),
+            serializer=self.serializer,
+            content_type=content_type,
+            content_encoding=content_encoding,
+            retry=True, retry_policy=self.retry_policy,
+            declare=[self._create_binding(task_id)],
+        )
+
     def _store_result(self, task_id, result, status, traceback=None):
         """Send task return value and status."""
         with self.mutex:
@@ -155,6 +166,8 @@ class AMQPBackend(BaseDictBackend):
 
             if latest:
                 # new state to report
+                self._republish(channel, task_id, latest.body,
+                                latest.content_type, latest.content_encoding)
                 payload = self._cache[task_id] = latest.payload
                 return payload
             else: