Browse Source

Adds Consumer.add_task_queue + cancel_task_queue

Ask Solem 12 years ago
parent
commit
24071a88a5
2 changed files with 23 additions and 18 deletions
  1. 19 0
      celery/worker/consumer.py
  2. 4 18
      celery/worker/control.py

+ 19 - 0
celery/worker/consumer.py

@@ -764,6 +764,25 @@ class Consumer(object):
         elif state.should_terminate:
             raise SystemTerminate()
 
+    def add_task_queue(self, queue, exchange=None, exchange_type=None,
+            routing_key=None, **options):
+        cset = self.task_consumer
+        exchange = queue if exchange is None else exchange
+        routing_key = queue if routing_key is None else routing_key
+        exchange_type = 'direct' if exchange_type is None else exchange_type
+        if not cset.consuming_from(queue):
+            q = self.app.amqp.queues.add(queue,
+                    exchange=exchange,
+                    exchange_type=exchange_type,
+                    routing_key=routing_key, **options)
+            cset.add_queue(q)
+            cset.consume()
+            logger.info('Started consuming from %r', queue)
+
+    def cancel_task_queue(self, queue):
+        self.app.amqp.queues.select_remove(queue)
+        self.task_consumer.cancel_by_queue(queue)
+
     @property
     def info(self):
         """Returns information about this consumer instance

+ 4 - 18
celery/worker/control.py

@@ -267,28 +267,14 @@ def shutdown(panel, msg='Got shutdown from remote', **kwargs):
 @Panel.register
 def add_consumer(panel, queue, exchange=None, exchange_type=None,
         routing_key=None, **options):
-    cset = panel.consumer.task_consumer
-    exchange = queue if exchange is None else exchange
-    routing_key = queue if routing_key is None else routing_key
-    exchange_type = 'direct' if exchange_type is None else exchange_type
-    if not cset.consuming_from(queue):
-        q = panel.app.amqp.queues.add(queue,
-                exchange=exchange,
-                exchange_type=exchange_type,
-                routing_key=routing_key, **options)
-        cset.add_queue(q)
-        cset.consume()
-        logger.info('Started consuming from %r', queue)
-        return {'ok': 'started consuming from %r' % (queue, )}
-    else:
-        return {'ok': 'already consuming from %r' % (queue, )}
+    panel.consumer.add_task_queue(queue, exchange, exchange_type,
+                                  routing_key, **options)
+    return {'ok': 'add consumer %r' % (queue, )}
 
 
 @Panel.register
 def cancel_consumer(panel, queue=None, **_):
-    panel.app.amqp.queues.select_remove(queue)
-    cset = panel.consumer.task_consumer
-    cset.cancel_by_queue(queue)
+    panel.consumer.cancel_task_queue(queue)
     return {'ok': 'no longer consuming from %s' % (queue, )}