Browse Source

Now uses BrokerConnection instead of DjangoBrokerConnection

Ask Solem 15 years ago
parent
commit
b7a3082fc3
2 changed files with 33 additions and 12 deletions
  1. 15 8
      celery/conf.py
  2. 18 4
      celery/messaging.py

+ 15 - 8
celery/conf.py

@@ -159,6 +159,21 @@ CELERYD_LISTENER = _get("CELERYD_LISTENER")
 CELERYD_MEDIATOR = _get("CELERYD_MEDIATOR")
 CELERYD_ETA_SCHEDULER = _get("CELERYD_ETA_SCHEDULER")
 
+# :--- Broker connections                           <-   --   --- - ----- -- #
+BROKER_HOST = _get("BROKER_HOST")
+BROKER_PORT = _get("BROKER_PORT")
+BROKER_USER = _get("BROKER_USER")
+BROKER_PASSWORD = _get("BROKER_PASSWORD")
+BROKER_VHOST = _get("BROKER_VHOST")
+BROKER_USE_SSL = _get("BROKER_USE_SSL")
+BROKER_INSIST = _get("BROKER_INSIST")
+BROKER_CONNECTION_TIMEOUT = _get("BROKER_CONNECTION_TIMEOUT",
+                                compat=["CELERY_BROKER_CONNECTION_TIMEOUT"])
+BROKER_CONNECTION_RETRY = _get("BROKER_CONNECTION_RETRY",
+                                compat=["CELERY_BROKER_CONNECTION_RETRY"])
+BROKER_CONNECTION_MAX_RETRIES = _get("BROKER_CONNECTION_MAX_RETRIES",
+                            compat=["CELERY_BROKER_CONNECTION_MAX_RETRIES"])
+
 # <--- Message routing                             <-   --   --- - ----- -- #
 DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")
 DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
@@ -184,14 +199,6 @@ EVENT_EXCHANGE_TYPE = _get("CELERY_EVENT_EXCHANGE_TYPE")
 EVENT_ROUTING_KEY = _get("CELERY_EVENT_ROUTING_KEY")
 EVENT_SERIALIZER = _get("CELERY_EVENT_SERIALIZER")
 
-# :--- Broker connections                           <-   --   --- - ----- -- #
-BROKER_CONNECTION_TIMEOUT = _get("CELERY_BROKER_CONNECTION_TIMEOUT",
-                                compat=["CELERY_AMQP_CONNECTION_TIMEOUT"])
-BROKER_CONNECTION_RETRY = _get("CELERY_BROKER_CONNECTION_RETRY",
-                                compat=["CELERY_AMQP_CONNECTION_RETRY"])
-BROKER_CONNECTION_MAX_RETRIES = _get("CELERY_BROKER_CONNECTION_MAX_RETRIES",
-                                compat=["CELERY_AMQP_CONNECTION_MAX_RETRIES"])
-
 # :--- AMQP Backend settings                        <-   --   --- - ----- -- #
 
 RESULT_EXCHANGE = _get("CELERY_RESULT_EXCHANGE")

+ 18 - 4
celery/messaging.py

@@ -7,7 +7,7 @@ import socket
 from datetime import datetime, timedelta
 from itertools import count
 
-from carrot.connection import DjangoBrokerConnection
+from carrot.connection import BrokerConnection
 from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
 
 from celery import conf
@@ -219,10 +219,24 @@ class BroadcastConsumer(Consumer):
         super(BroadcastConsumer, self).__init__(*args, **kwargs)
 
 
-def establish_connection(connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
+def establish_connection(hostname=None, userid=None, password=None,
+        virtual_host=None, port=None, ssl=None, insist=None,
+        connect_timeout=None):
     """Establish a connection to the message broker."""
-    return DjangoBrokerConnection(connect_timeout=connect_timeout,
-                                  settings=load_settings())
+    if insist is None:
+        insist = conf.BROKER_INSIST
+    if ssl is None:
+        ssl = conf.BROKER_USE_SSL
+    if connect_timeout is None:
+        connect_timeout = conf.BROKER_CONNECTION_TIMEOUT
+
+    return BrokerConnection(hostname or conf.BROKER_HOST,
+                            userid or conf.BROKER_USER,
+                            password or conf.BROKER_PASSWORD,
+                            virtual_host or conf.BROKER_VHOST,
+                            port or conf.BROKER_PORT,
+                            insist=insist, ssl=ssl,
+                            connect_timeout=connect_timeout)
 
 
 def with_connection(fun):