Ver Fonte

Merge branch 'screeley/master' into consumerset

Conflicts:
	celery/task.py
Ask Solem há 16 anos atrás
pai
commit
c4e58d5d09
6 ficheiros alterados com 5249 adições e 4113 exclusões
  1. 18 0
      celery/conf.py
  2. 7 2
      celery/messaging.py
  3. 17 9
      celery/task.py
  4. 3 3
      celery/worker.py
  5. 2488 1940
      docs/graffles/Celery-Overview.graffle
  6. 2716 2159
      docs/graffles/InsideTheWorker.graffle

+ 18 - 0
celery/conf.py

@@ -133,6 +133,24 @@ AMQP_CONSUMER_ROUTING_KEY = getattr(settings,
 AMQP_CONSUMER_QUEUE = getattr(settings, "CELERY_AMQP_CONSUMER_QUEUE",
                               DEFAULT_AMQP_CONSUMER_QUEUE)
 
+
+"""
+.. data:: AMQP_CONSUMER_QUEUES
+
+    Dictionary defining multiple AMQP queues.
+
+"""
+DEFAULT_AMQP_CONSUMER_QUEUES = {
+        AMQP_CONSUMER_QUEUE : {
+            "exchange" : AMQP_EXCHANGE,
+            "routing_key" : AMQP_CONSUMER_ROUTING_KEY,
+            "exchange_type" : AMQP_EXCHANGE_TYPE
+        }
+}
+
+AMQP_CONSUMER_QUEUES = getattr(settings, "CELERY_AMQP_CONSUMER_QUEUES",
+                              DEFAULT_AMQP_CONSUMER_QUEUES)
+
 """
 .. data:: AMQP_CONNECTION_TIMEOUT
 

+ 7 - 2
celery/messaging.py

@@ -3,7 +3,7 @@
 Sending and Receiving Messages
 
 """
-from carrot.messaging import Publisher, Consumer
+from carrot.messaging import Publisher, Consumer, ConsumerSet
 from celery import conf
 import uuid
 
@@ -64,6 +64,11 @@ class TaskPublisher(Publisher):
         return task_id
 
 
+class TaskConsumerSet(ConsumerSet):
+    
+    def __init__(self, connection, queues=conf.AMQP_CONSUMER_QUEUES, consumers=[], **options):
+        super(TaskConsumerSet, self).__init__(connection, queues=queues, consumers=consumers, **options)
+
 class TaskConsumer(Consumer):
     """The AMQP Task Consumer class."""
     queue = conf.AMQP_CONSUMER_QUEUE
@@ -90,4 +95,4 @@ class StatsConsumer(Consumer):
     no_ack=True
 
     def receive(self, message_data, message):
-        pass
+        pass

+ 17 - 9
celery/task.py

@@ -19,7 +19,8 @@ import pickle
 
 def apply_async(task, args=None, kwargs=None, routing_key=None,
         immediate=None, mandatory=None, connection=None,
-        connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None, **opts):
+        connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None, 
+        exchange=None, **opts):
     """Run a task asynchronously by the celery daemon(s).
 
     :param task: The task to run (a callable object, or a :class:`Task`
@@ -50,6 +51,7 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
     """
     args = args or []
     kwargs = kwargs or {}
+    exchange = exchange or getattr(task, "exchange", None)
     routing_key = routing_key or getattr(task, "routing_key", None)
     immediate = immediate or getattr(task, "immediate", None)
     mandatory = mandatory or getattr(task, "mandatory", None)
@@ -62,7 +64,7 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
         if not connection:
             connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
             need_to_close_connection = True
-        publisher = TaskPublisher(connection=connection)
+        publisher = TaskPublisher(connection=connection, exchange=exchange)
 
     delay_task = publisher.delay_task
     if taskset_id:
@@ -231,6 +233,7 @@ class Task(object):
     """
     name = None
     type = "regular"
+    exchange = None
     routing_key = None
     immediate = False
     mandatory = False
@@ -264,7 +267,7 @@ class Task(object):
         """
         return setup_logger(**kwargs)
 
-    def get_publisher(self):
+    def get_publisher(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
         """Get a celery task message publisher.
 
         :rtype: :class:`celery.messaging.TaskPublisher`.
@@ -277,10 +280,13 @@ class Task(object):
             >>> publisher.connection.close()
 
         """
-        return TaskPublisher(connection=DjangoAMQPConnection(
-                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
+       
+        connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+        return TaskPublisher(connection=connection,
+                             exchange=self.exchange,
+                             routing_key=self.routing_key)
 
-    def get_consumer(self):
+    def get_consumer(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
         """Get a celery task message consumer.
 
         :rtype: :class:`celery.messaging.TaskConsumer`.
@@ -293,8 +299,9 @@ class Task(object):
             >>> consumer.connection.close()
 
         """
-        return TaskConsumer(connection=DjangoAMQPConnection(
-                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
+        connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+        return TaskConsumer(connection=connection, exchange=self.exchange,
+                            routing_key=self.routing_key)
 
     @classmethod
     def delay(cls, *args, **kwargs):
@@ -409,7 +416,8 @@ class TaskSet(object):
         """
         taskset_id = str(uuid.uuid4())
         conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
-        publisher = TaskPublisher(connection=conn)
+        publisher = TaskPublisher(connection=conn,
+                                  exchange=self.task.exchange)
         subtasks = [apply_async(self.task, args, kwargs,
                                 taskset_id=taskset_id, publisher=publisher)
                         for args, kwargs in self.arguments]

+ 3 - 3
celery/worker.py

@@ -1,6 +1,6 @@
 """celery.worker"""
 from carrot.connection import DjangoAMQPConnection
-from celery.messaging import TaskConsumer
+from celery.messaging import TaskConsumerSet
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
 from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
 from celery.log import setup_logger
@@ -347,7 +347,7 @@ class WorkController(object):
 
     .. attribute:: task_consumer
 
-        The :class:`celery.messaging.TaskConsumer` instance used.
+        The :class:`celery.messaging.TaskConsumerSet` instance used.
 
     """
     loglevel = logging.ERROR
@@ -383,7 +383,7 @@ class WorkController(object):
         """
         self.close_connection()
         self.amqp_connection = DjangoAMQPConnection()
-        self.task_consumer = TaskConsumer(connection=self.amqp_connection)
+        self.task_consumer = TaskConsumerSet(connection=self.amqp_connection)
         self.task_consumer.register_callback(self._message_callback)
         return self.task_consumer
 

Diff do ficheiro suprimidas por serem muito extensas
+ 2488 - 1940
docs/graffles/Celery-Overview.graffle


Diff do ficheiro suprimidas por serem muito extensas
+ 2716 - 2159
docs/graffles/InsideTheWorker.graffle


Alguns ficheiros não foram mostrados porque muitos ficheiros mudaram neste diff