|
@@ -79,7 +79,8 @@ def migrate_tasks(source, dest, limit=None, timeout=1.0, ack_messages=False,
|
|
|
if isinstance(queues, basestring):
|
|
|
queues = queues.split(',')
|
|
|
if isinstance(queues, list):
|
|
|
- queues = dict([tuple(islice(cycle(q.split(':')), None, 2)) for q in queues])
|
|
|
+ queues = dict(tuple(islice(cycle(q.split(':')), None, 2))
|
|
|
+ for q in queues)
|
|
|
if queues is None:
|
|
|
queues = {}
|
|
|
|
|
@@ -121,7 +122,8 @@ def migrate_tasks(source, dest, limit=None, timeout=1.0, ack_messages=False,
|
|
|
new_queue = queue(producer.channel)
|
|
|
new_queue.name = queues.get(queue.name, queue.name)
|
|
|
if new_queue.routing_key == queue.name:
|
|
|
- new_queue.routing_key = queues.get(queue.name, new_queue.routing_key)
|
|
|
+ new_queue.routing_key = queues.get(queue.name,
|
|
|
+ new_queue.routing_key)
|
|
|
if new_queue.exchange.name == queue.name:
|
|
|
new_queue.exchange.name = queues.get(queue.name, queue.name)
|
|
|
new_queue.declare()
|
|
@@ -136,7 +138,8 @@ def migrate_tasks(source, dest, limit=None, timeout=1.0, ack_messages=False,
|
|
|
# start migrating messages.
|
|
|
with consumer:
|
|
|
try:
|
|
|
- for _ in eventloop(source, limit=limit, timeout=timeout, ignore_timeouts=forever): # pragma: no cover
|
|
|
+ for _ in eventloop(source, limit=limit, # pragma: no cover
|
|
|
+ timeout=timeout, ignore_timeouts=forever):
|
|
|
pass
|
|
|
except socket.timeout:
|
|
|
return
|