Browse Source

Be compatible with previous queue configuration scheme, but raise DeprecationWarnings if any of the old settings is set.

Ask Solem 15 years ago
parent
commit
447ed8e1cc
2 changed files with 72 additions and 15 deletions
  1. 71 15
      celery/conf.py
  2. 1 0
      celery/execute/__init__.py

+ 71 - 15
celery/conf.py

@@ -1,4 +1,5 @@
 import logging
+import warnings
 from datetime import timedelta
 
 from celery.registry import tasks
@@ -45,48 +46,103 @@ def _get(name, default=None):
         default = _DEFAULTS.get(name)
     return getattr(settings, name, default)
 
-SEND_EVENTS = _get("CELERY_SEND_EVENTS")
+# <--- Task options                                <-   --   --- - ----- -- #
 ALWAYS_EAGER = _get("CELERY_ALWAYS_EAGER")
 CELERY_BACKEND = _get("CELERY_BACKEND")
 CELERY_CACHE_BACKEND = _get("CELERY_CACHE_BACKEND")
-DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
-DISABLE_RATE_LIMITS = _get("CELERY_DISABLE_RATE_LIMITS")
-STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
 TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
 TASK_RESULT_EXPIRES = _get("CELERY_TASK_RESULT_EXPIRES")
 # Make sure TASK_RESULT_EXPIRES is a timedelta.
 if isinstance(TASK_RESULT_EXPIRES, int):
     TASK_RESULT_EXPIRES = timedelta(seconds=TASK_RESULT_EXPIRES)
+
+# <--- Worker                                      <-   --   --- - ----- -- #
+
+SEND_EVENTS = _get("CELERY_SEND_EVENTS")
+DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
+DISABLE_RATE_LIMITS = _get("CELERY_DISABLE_RATE_LIMITS")
+STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
 SEND_CELERY_TASK_ERROR_EMAILS = _get("SEND_CELERY_TASK_ERROR_EMAILS",
                                      not settings.DEBUG)
+LOG_FORMAT = _get("CELERYD_DAEMON_LOG_FORMAT")
+DAEMON_LOG_FILE = _get("CELERYD_LOG_FILE")
+DAEMON_LOG_LEVEL = _get("CELERYD_DAEMON_LOG_LEVEL")
+DAEMON_LOG_LEVEL = LOG_LEVELS[DAEMON_LOG_LEVEL.upper()]
+DAEMON_PID_FILE = _get("CELERYD_PID_FILE")
+DAEMON_CONCURRENCY = _get("CELERYD_CONCURRENCY")
 
-DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
+# <--- Message routing                             <-   --   --- - ----- -- #
+QUEUES = _get("CELERY_QUEUES")
 DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")
+DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
 DEFAULT_EXCHANGE = _get("CELERY_DEFAULT_EXCHANGE")
 DEFAULT_EXCHANGE_TYPE = _get("CELERY_DEFAULT_EXCHANGE_TYPE")
 
-QUEUES = _get("CELERY_QUEUES", {DEFAULT_QUEUE: {
-                                    "exchange": DEFAULT_EXCHANGE,
-                                    "exchange_type": DEFAULT_EXCHANGE_TYPE,
-                                    "binding_key": DEFAULT_ROUTING_KEY}})
+_DEPRECATIONS = {"CELERY_AMQP_CONSUMER_QUEUES": "CELERY_QUEUES",
+                 "CELERY_AMQP_EXCHANGE": "CELERY_DEFAULT_EXCHANGE",
+                 "CELERY_AMQP_EXCHANGE_TYPE": "CELERY_DEFAULT_EXCHANGE_TYPE",
+                 "CELERY_AMQP_CONSUMER_ROUTING_KEY": "CELERY_QUEUES",
+                 "CELERY_AMQP_PUBLISHER_ROUTING_KEY":
+                    "CELERY_DEFAULT_ROUTING_KEY"}
+
+
+_DEPRECATED_QUEUE_SETTING_FMT = """
+%s is deprecated in favor of %s and scheduled for removal in celery v1.0.
+Please visit http://bit.ly/5DsSuX for more information.
+
+We're sorry for the inconvenience.
+""".strip()
+
+
+def _find_deprecated_queue_settings():
+    global DEFAULT_QUEUE, DEFAULT_ROUTING_KEY
+    global DEFAULT_EXCHANGE, DEFAULT_EXCHANGE_TYPE
+    binding_key = None
+
+    multi = _get("CELERY_AMQP_CONSUMER_QUEUES")
+    if multi:
+        return multi
+
+    single = _get("CELERY_AMQP_CONSUMER_QUEUE")
+    if single:
+        DEFAULT_QUEUE = single
+        DEFAULT_EXCHANGE = _get("CELERY_AMQP_EXCHANGE", DEFAULT_EXCHANGE)
+        DEFAULT_EXCHANGE_TYPE = _get("CELERY_AMQP_EXCHANGE_TYPE",
+                                     DEFAULT_EXCHANGE_TYPE)
+        binding_key = _get("CELERY_AMQP_CONSUMER_ROUTING_KEY",
+                            DEFAULT_ROUTING_KEY)
+        DEFAULT_ROUTING_KEY = _get("CELERY_AMQP_PUBLISHER_ROUTING_KEY",
+                                   DEFAULT_ROUTING_KEY)
+    binding_key = binding_key or DEFAULT_ROUTING_KEY
+    return {DEFAULT_QUEUE: {"exchange": DEFAULT_EXCHANGE,
+                            "exchange_type": DEFAULT_EXCHANGE_TYPE,
+                            "binding_key": binding_key}}
 
+def _warn_if_deprecated_queue_settings():
+    for setting, new_setting in _DEPRECATIONS.items():
+        if _get(setting):
+            warnings.warn(DeprecationWarning(_DEPRECATED_QUEUE_SETTING_FMT % (
+                setting, _DEPRECATIONS[setting])))
+            break
+
+_warn_if_deprecated_queue_settings()
+if not QUEUES:
+    QUEUES = _find_deprecated_queue_settings()
+
+# :--- Broker connections                           <-   --   --- - ----- -- #
 AMQP_CONNECTION_TIMEOUT = _get("CELERY_AMQP_CONNECTION_TIMEOUT")
 AMQP_CONNECTION_RETRY = _get("CELERY_AMQP_CONNECTION_RETRY")
 AMQP_CONNECTION_MAX_RETRIES = _get("CELERY_AMQP_CONNECTION_MAX_RETRIES")
 
-LOG_FORMAT = _get("CELERYD_DAEMON_LOG_FORMAT")
-DAEMON_LOG_FILE = _get("CELERYD_LOG_FILE")
-DAEMON_LOG_LEVEL = _get("CELERYD_DAEMON_LOG_LEVEL")
-DAEMON_LOG_LEVEL = LOG_LEVELS[DAEMON_LOG_LEVEL.upper()]
-DAEMON_PID_FILE = _get("CELERYD_PID_FILE")
-DAEMON_CONCURRENCY = _get("CELERYD_CONCURRENCY")
 
+# :--- Celery Beat                                  <-   --   --- - ----- -- #
 CELERYBEAT_PID_FILE = _get("CELERYBEAT_PID_FILE")
 CELERYBEAT_LOG_LEVEL = _get("CELERYBEAT_LOG_LEVEL")
 CELERYBEAT_LOG_FILE = _get("CELERYBEAT_LOG_FILE")
 CELERYBEAT_SCHEDULE_FILENAME = _get("CELERYBEAT_SCHEDULE_FILENAME")
 CELERYBEAT_MAX_LOOP_INTERVAL = _get("CELERYBEAT_MAX_LOOP_INTERVAL")
 
+# :--- Celery Monitor                               <-   --   --- - ----- -- #
 CELERYMON_PID_FILE = _get("CELERYMON_PID_FILE")
 CELERYMON_LOG_LEVEL = _get("CELERYMON_LOG_LEVEL")
 CELERYMON_LOG_FILE = _get("CELERYMON_LOG_FILE")

+ 1 - 0
celery/execute/__init__.py

@@ -71,6 +71,7 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
         return apply(task, args, kwargs)
 
     task = tasks[task.name] # Get instance.
+    exchange = options.get("exchange")
     options = dict(extract_exec_options(task), **options)
 
     if countdown: # Convert countdown to ETA.