|
@@ -1,4 +1,4 @@
|
|
|
-"""celery.backends.amqp"""
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
import socket
|
|
|
import time
|
|
|
import warnings
|
|
@@ -15,8 +15,9 @@ from celery.utils import timeutils
|
|
|
|
|
|
|
|
|
def repair_uuid(s):
|
|
|
- # Historically the dashes in uuids are removed for the amq entity names,
|
|
|
- # hopefully we'll be able to fix this in v3.0.
|
|
|
+ # Historically the dashes in UUIDS are removed from AMQ entity names,
|
|
|
+ # but there is no known reason to. Hopefully we'll be able to fix
|
|
|
+ # this in v3.0.
|
|
|
return "%s-%s-%s-%s-%s" % (s[:8], s[8:12], s[12:16], s[16:20], s[20:])
|
|
|
|
|
|
|
|
@@ -208,9 +209,6 @@ class AMQPBackend(BaseDictBackend):
|
|
|
conn = self.pool.acquire(block=True)
|
|
|
channel = conn.channel()
|
|
|
try:
|
|
|
- bindings = [self._create_binding(task_id) for task_id in task_ids]
|
|
|
- consumer = self._create_consumer(bindings, channel)
|
|
|
- consumer.consume()
|
|
|
ids = set(task_ids)
|
|
|
cached_ids = set()
|
|
|
for task_id in ids:
|
|
@@ -223,17 +221,27 @@ class AMQPBackend(BaseDictBackend):
|
|
|
yield task_id, cached
|
|
|
cached_ids.add(task_id)
|
|
|
ids ^= cached_ids
|
|
|
+
|
|
|
+ bindings = [self._create_binding(task_id) for task_id in task_ids]
|
|
|
+ consumer = self._create_consumer(bindings, channel)
|
|
|
+ consumer.consume()
|
|
|
try:
|
|
|
while ids:
|
|
|
r = self.drain_events(consumer, timeout=timeout)
|
|
|
ids ^= set(r.keys())
|
|
|
for ready_id, ready_meta in r.items():
|
|
|
yield ready_id, ready_meta
|
|
|
- finally:
|
|
|
+ except: # ☹ Py2.4 — Cannot yield inside try: finally: block
|
|
|
consumer.cancel()
|
|
|
- finally:
|
|
|
+ raise
|
|
|
+ consumer.cancel()
|
|
|
+
|
|
|
+ except: # … ☹
|
|
|
channel.close()
|
|
|
conn.release()
|
|
|
+ raise
|
|
|
+ channel.close()
|
|
|
+ conn.release()
|
|
|
|
|
|
def close(self):
|
|
|
if self._pool is not None:
|