Kaynağa Gözat

Changing Task Consumer to Task Consumer Set in Worker.

screeley 16 yıl önce
ebeveyn
işleme
e2d0a56c91
4 değiştirilmiş dosya ile 44 ekleme ve 8 silme
  1. 18 0
      celery/conf.py
  2. 7 2
      celery/messaging.py
  3. 16 3
      celery/task.py
  4. 3 3
      celery/worker.py

+ 18 - 0
celery/conf.py

@@ -133,6 +133,24 @@ AMQP_CONSUMER_ROUTING_KEY = getattr(settings,
 AMQP_CONSUMER_QUEUE = getattr(settings, "CELERY_AMQP_CONSUMER_QUEUE",
                               DEFAULT_AMQP_CONSUMER_QUEUE)
 
+
+"""
+.. data:: AMQP_CONSUMER_QUEUES
+
+    Dictionary defining multiple AMQP queues.
+
+"""
+DEFAULT_AMQP_CONSUMER_QUEUES = {
+        AMQP_CONSUMER_QUEUE : {
+            "exchange" : AMQP_EXCHANGE,
+            "routing_key" : AMQP_CONSUMER_ROUTING_KEY,
+            "exchange_type" : AMQP_EXCHANGE_TYPE
+        }
+}
+
+AMQP_CONSUMER_QUEUES = getattr(settings, "CELERY_AMQP_CONSUMER_QUEUES",
+                              DEFAULT_AMQP_CONSUMER_QUEUES)
+
 """
 .. data:: AMQP_CONNECTION_TIMEOUT
 

+ 7 - 2
celery/messaging.py

@@ -3,7 +3,7 @@
 Sending and Receiving Messages
 
 """
-from carrot.messaging import Publisher, Consumer
+from carrot.messaging import Publisher, Consumer, ConsumerSet
 from celery import conf
 import uuid
 
@@ -64,6 +64,11 @@ class TaskPublisher(Publisher):
         return task_id
 
 
+class TaskConsumerSet(ConsumerSet):
+    
+    def __init__(self, connection, queues=conf.AMQP_CONSUMER_QUEUES, consumers=[], **options):
+        super(TaskConsumerSet, self).__init__(connection, queues=queues, consumers=consumers, **options)
+
 class TaskConsumer(Consumer):
     """The AMQP Task Consumer class."""
     queue = conf.AMQP_CONSUMER_QUEUE
@@ -90,4 +95,4 @@ class StatsConsumer(Consumer):
     no_ack=True
 
     def receive(self, message_data, message):
-        pass
+        pass

+ 16 - 3
celery/task.py

@@ -18,7 +18,8 @@ import pickle
 
 def apply_async(task, args=None, kwargs=None, routing_key=None,
         immediate=None, mandatory=None, connection=None,
-        connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None):
+        connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None, 
+        exchange=None):
     """Run a task asynchronously by the celery daemon(s).
 
     :param task: The task to run (a callable object, or a :class:`Task`
@@ -63,7 +64,12 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
         connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
         need_to_close_connection = True
 
-    publisher = TaskPublisher(connection=connection)
+    publisher_opts = {'routing_key' : routing_key,
+                      'exchange' : exchange }
+    for option_name, option_value in publisher_opts.items():
+        publisher_opts[option_name] = getattr(task, option_name, option_value)
+
+    publisher = TaskPublisher(connection=connection, **publisher_opts)
     task_id = publisher.delay_task(task.name, args, kwargs, **message_opts)
     publisher.close()
     if need_to_close_connection:
@@ -193,6 +199,7 @@ class Task(object):
     routing_key = None
     immediate = False
     mandatory = False
+    exchange = None
 
     def __init__(self):
         if not self.name:
@@ -233,8 +240,14 @@ class Task(object):
             >>> publisher.connection.close()
 
         """
+        
+        kwargs = {}
+        for key in ['exchange', 'routing_key']:
+            kwargs[key] = getattr(self, exchange, None)
+
         return TaskPublisher(connection=DjangoAMQPConnection(
-                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
+                                connect_timeout=AMQP_CONNECTION_TIMEOUT),
+                                **kwargs)
 
     def get_consumer(self):
         """Get a celery task message consumer.

+ 3 - 3
celery/worker.py

@@ -1,6 +1,6 @@
 """celery.worker"""
 from carrot.connection import DjangoAMQPConnection
-from celery.messaging import TaskConsumer
+from celery.messaging import TaskConsumerSet
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
 from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
 from celery.log import setup_logger
@@ -342,7 +342,7 @@ class WorkController(object):
 
     .. attribute:: task_consumer
 
-        The :class:`celery.messaging.TaskConsumer` instance used.
+        The :class:`celery.messaging.TaskConsumerSet` instance used.
 
     """
     loglevel = logging.ERROR
@@ -378,7 +378,7 @@ class WorkController(object):
         """
         self.close_connection()
         self.amqp_connection = DjangoAMQPConnection()
-        self.task_consumer = TaskConsumer(connection=self.amqp_connection)
+        self.task_consumer = TaskConsumerSet(connection=self.amqp_connection)
         self.task_consumer.register_callback(self._message_callback)
         return self.task_consumer