Prechádzať zdrojové kódy

AMQP result backend: Needs to strip '-'s from the task id for backward compatibility

Ask Solem 15 rokov pred
rodič
commit
98f82ed794
1 zmenil súbory, kde vykonal 12 pridanie a 4 odobranie
  1. 12 4
      celery/backends/amqp.py

+ 12 - 4
celery/backends/amqp.py

@@ -14,6 +14,12 @@ from celery.exceptions import TimeoutError
 from celery.utils import timeutils
 
 
+def repair_uuid(s):
+    # Historically the dashes in uuids are removed for the amq entity names,
+    # hopefully we'll be able to fix this in v3.0.
+    return "%s-%s-%s-%s-%s" % (s[:8], s[8:12], s[12:16], s[16:20], s[20:])
+
+
 class AMQResultWarning(UserWarning):
     pass
 
@@ -64,9 +70,10 @@ class AMQPBackend(BaseDictBackend):
             self.queue_arguments["x-expires"] = int(self.expires * 1000.0)
 
     def _create_binding(self, task_id):
-        return Queue(name=task_id,
+        name = task_id.replace("-", "")
+        return Queue(name=name,
                      exchange=self.exchange,
-                     routing_key=task_id,
+                     routing_key=name,
                      durable=self.persistent,
                      auto_delete=self.auto_delete)
 
@@ -75,7 +82,7 @@ class AMQPBackend(BaseDictBackend):
         binding(self.channel).declare()
 
         return Producer(self.channel, exchange=self.exchange,
-                        routing_key=task_id,
+                        routing_key=task_id.replace("-", ""),
                         serializer=self.serializer)
 
     def _create_consumer(self, bindings):
@@ -147,7 +154,8 @@ class AMQPBackend(BaseDictBackend):
 
         def callback(meta, message):
             if meta["status"] in states.READY_STATES:
-                results[message.delivery_info["routing_key"]] = meta
+                uuid = repair_uuid(message.delivery_info["routing_key"])
+                results[uuid] = meta
 
         consumer.register_callback(callback)