|
@@ -236,15 +236,18 @@ def shutdown(panel, **kwargs):
|
|
|
def add_consumer(panel, queue=None, exchange=None, exchange_type="direct",
|
|
|
routing_key=None, **options):
|
|
|
cset = panel.consumer.task_consumer
|
|
|
- declaration = dict(queue=queue,
|
|
|
- exchange=exchange,
|
|
|
- exchange_type=exchange_type,
|
|
|
- routing_key=routing_key,
|
|
|
- **options)
|
|
|
- cset.add_consumer_from_dict(**declaration)
|
|
|
- cset.consume()
|
|
|
- panel.logger.info("Started consuming from %r" % (declaration, ))
|
|
|
- return {"ok": "started consuming from %s" % (queue, )}
|
|
|
+ if not cset.consuming_from(queue):
|
|
|
+ declaration = dict(queue=queue,
|
|
|
+ exchange=exchange,
|
|
|
+ exchange_type=exchange_type,
|
|
|
+ routing_key=routing_key,
|
|
|
+ **options)
|
|
|
+ cset.add_consumer_from_dict(**declaration)
|
|
|
+ cset.consume()
|
|
|
+ panel.logger.info("Started consuming from %r" % (declaration, ))
|
|
|
+ return {"ok": "started consuming from %s" % (queue, )}
|
|
|
+ else:
|
|
|
+ return {"ok": "already consuming from %s" % (queue, )}
|
|
|
|
|
|
|
|
|
@Panel.register
|