Browse Source

Can now configure broadcast/event queue/exchange settings, and the amqp result backend exchange.

Ask Solem 15 years ago
parent
commit
26611b783f
3 changed files with 42 additions and 19 deletions
  1. 6 6
      celery/backends/amqp.py
  2. 24 0
      celery/conf.py
  3. 12 13
      celery/messaging.py

+ 6 - 6
celery/backends/amqp.py

@@ -1,11 +1,10 @@
 """celery.backends.amqp"""
 from carrot.messaging import Consumer, Publisher
 
+from celery import conf
 from celery.messaging import establish_connection
 from celery.backends.base import BaseBackend
 
-RESULTSTORE_EXCHANGE = "celeryresults"
-
 
 class AMQPBackend(BaseBackend):
     """AMQP backend. Publish results by sending messages to the broker
@@ -17,6 +16,7 @@ class AMQPBackend(BaseBackend):
 
     """
 
+    exchange = conf.RESULT_EXCHANGE
     capabilities = ["ResultStore"]
 
     def __init__(self, *args, **kwargs):
@@ -29,18 +29,18 @@ class AMQPBackend(BaseBackend):
         backend = connection.create_backend()
         backend.queue_declare(queue=routing_key, durable=True,
                                 exclusive=False, auto_delete=True)
-        backend.exchange_declare(exchange=RESULTSTORE_EXCHANGE,
+        backend.exchange_declare(exchange=self.exchange,
                                  type="direct",
                                  durable=True,
                                  auto_delete=False)
-        backend.queue_bind(queue=routing_key, exchange=RESULTSTORE_EXCHANGE,
+        backend.queue_bind(queue=routing_key, exchange=self.exchange,
                            routing_key=routing_key)
         backend.close()
 
     def _publisher_for_task_id(self, task_id, connection):
         routing_key = task_id.replace("-", "")
         self._declare_queue(task_id, connection)
-        p = Publisher(connection, exchange=RESULTSTORE_EXCHANGE,
+        p = Publisher(connection, exchange=self.exchange,
                       exchange_type="direct",
                       routing_key=routing_key)
         return p
@@ -49,7 +49,7 @@ class AMQPBackend(BaseBackend):
         routing_key = task_id.replace("-", "")
         self._declare_queue(task_id, connection)
         return Consumer(connection, queue=routing_key,
-                        exchange=RESULTSTORE_EXCHANGE,
+                        exchange=self.exchange,
                         exchange_type="direct",
                         no_ack=False, auto_ack=False,
                         auto_delete=True,

+ 24 - 0
celery/conf.py

@@ -39,6 +39,14 @@ _DEFAULTS = {
     "CELERYMON_LOG_LEVEL": "INFO",
     "CELERYMON_LOG_FILE": "celerymon.log",
     "CELERYMON_PID_FILE": "celerymon.pid",
+    "CELERY_BROADCAST_QUEUE": "celeryctl",
+    "CELERY_BROADCAST_EXCHANGE": "celeryctl",
+    "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout",
+    "CELERY_EVENT_QUEUE": "celeryevent",
+    "CELERY_EVENT_EXCHANGE": "celeryevent",
+    "CELERY_EVENT_EXCHANGE_TYPE": "direct",
+    "CELERY_EVENT_ROUTING_KEY": "celeryevent",
+    "CELERY_RESULT_EXCHANGE": "celeryresults",
 }
 
 _DEPRECATION_FMT = """
@@ -149,6 +157,19 @@ _warn_if_deprecated_queue_settings()
 if not QUEUES:
     QUEUES = _find_deprecated_queue_settings()
 
+# :--- Broadcast queue settings                     <-   --   --- - ----- -- #
+
+BROADCAST_QUEUE = _get("CELERY_BROADCAST_QUEUE")
+BROADCAST_EXCHANGE = _get("CELERY_BROADCAST_EXCHANGE")
+BROADCAST_EXCHANGE_TYPE = _get("CELERY_BROADCAST_EXCHANGE_TYPE")
+
+# :--- Event queue settings                         <-   --   --- - ----- -- #
+
+EVENT_QUEUE = _get("CELERY_EVENT_QUEUE")
+EVENT_EXCHANGE = _get("CELERY_EVENT_EXCHANGE")
+EVENT_EXCHANGE_TYPE = _get("CELERY_EVENT_EXCHANGE_TYPE")
+EVENT_ROUTING_KEY = _get("CELERY_EVENT_ROUTING_KEY")
+
 # :--- Broker connections                           <-   --   --- - ----- -- #
 BROKER_CONNECTION_TIMEOUT = _get("CELERY_BROKER_CONNECTION_TIMEOUT",
                                 compat=["CELERY_AMQP_CONNECTION_TIMEOUT"])
@@ -157,6 +178,9 @@ BROKER_CONNECTION_RETRY = _get("CELERY_BROKER_CONNECTION_RETRY",
 BROKER_CONNECTION_MAX_RETRIES = _get("CELERY_BROKER_CONNECTION_MAX_RETRIES",
                                 compat=["CELERY_AMQP_CONNECTION_MAX_RETRIES"])
 
+# :--- Backend settings                             <-   --   --- - ----- -- #
+
+RESULT_EXCHANGE = _get("CELERY_RESULT_EXCHANGE")
 
 # :--- Celery Beat                                  <-   --   --- - ----- -- #
 CELERYBEAT_PID_FILE = _get("CELERYBEAT_PID_FILE")

+ 12 - 13
celery/messaging.py

@@ -65,24 +65,24 @@ class TaskConsumer(Consumer):
 
 class EventPublisher(Publisher):
     """Publish events"""
-    exchange = "celeryevent"
-    routing_key = "event"
+    exchange = conf.EVENT_EXCHANGE
+    exchange_type = conf.EVENT_EXCHANGE_TYPE
+    routing_key = conf.EVENT_ROUTING_KEY
 
 
 class EventConsumer(Consumer):
     """Consume events"""
-    queue = "celeryevent"
-    exchange = "celeryevent"
-    routing_key = "event"
-    exchange_type = "direct"
+    queue = conf.EVENT_QUEUE
+    exchange = conf.EVENT_EXCHANGE
+    exchange_type = conf.EVENT_EXCHANGE_TYPE
+    routing_key = conf.EVENT_ROUTING_KEY
     no_ack = True
 
 
 class BroadcastPublisher(Publisher):
     """Publish broadcast commands"""
-    exchange = "celeryctl"
-    exchange_type = "fanout"
-    routing_key = ""
+    exchange = conf.BROADCAST_EXCHANGE
+    exchange_type = conf.BROADCAST_EXCHANGE_TYPE
 
     def send(self, type, arguments, destination=None):
         """Send broadcast command."""
@@ -93,10 +93,9 @@ class BroadcastPublisher(Publisher):
 
 class BroadcastConsumer(Consumer):
     """Consume broadcast commands"""
-    queue = "celeryctl"
-    exchange = "celeryctl"
-    routing_key = ""
-    exchange_type = "fanout"
+    queue = conf.BROADCAST_QUEUE
+    exchange = conf.BROADCAST_EXCHANGE
+    exchange_type = conf.BROADCAST_EXCHANGE_TYPE
     no_ack = True