ソースを参照

CELERY_ROUTES is now prepared by app.amqp, instead of post_config_merge

Ask Solem 14 年 前
コミット
6645fa464f
2 ファイル変更15 行追加5 行削除
  1. 15 2
      celery/app/amqp.py
  2. 0 3
      celery/app/base.py

+ 15 - 2
celery/app/amqp.py

@@ -252,9 +252,16 @@ class AMQP(object):
     #: Set to :const:`True` when the configured queues has been declared.
     _queues_declared = False
 
+    #: Cached and prepared routing table.
+    _routes = None
+
     def __init__(self, app):
         self.app = app
 
+    def flush_routes(self):
+        self._routes = routes.prepare(
+                        self.app.conf.get("CELERY_ROUTES") or {})
+
     def Queues(self, queues):
         """Create new :class:`Queues` instance, using queue defaults
         from the current configuration."""
@@ -264,8 +271,8 @@ class AMQP(object):
 
     def Router(self, queues=None, create_missing=None):
         """Returns the current task router."""
-        return routes.Router(self.app.conf.CELERY_ROUTES,
-                             queues or self.app.conf.CELERY_QUEUES,
+        return routes.Router(self.routes,
+                             queues or self.queues,
                              self.app.either("CELERY_CREATE_MISSING_QUEUES",
                                              create_missing),
                              app=self.app)
@@ -344,3 +351,9 @@ class AMQP(object):
     @queues.setter
     def queues(self, value):
         return self.Queues(value)
+
+    @property
+    def routes(self):
+        if self._routes is None:
+            self.flush_routes()
+        return self._routes

+ 0 - 3
celery/app/base.py

@@ -8,12 +8,10 @@ Application Base Class.
 :license: BSD, see LICENSE for more details.
 
 """
-import sys
 import platform as _platform
 
 from datetime import timedelta
 
-from celery import routes
 from celery.app.defaults import DEFAULTS
 from celery.datastructures import ConfigurationView
 from celery.utils import instantiate, cached_property, maybe_promise
@@ -217,7 +215,6 @@ class BaseApp(object):
                     "exchange": c["CELERY_DEFAULT_EXCHANGE"],
                     "exchange_type": c["CELERY_DEFAULT_EXCHANGE_TYPE"],
                     "binding_key": c["CELERY_DEFAULT_ROUTING_KEY"]}}
-        c["CELERY_ROUTES"] = routes.prepare(c.get("CELERY_ROUTES") or {})
         if isinstance(c["CELERY_TASK_RESULT_EXPIRES"], int):
             c["CELERY_TASK_RESULT_EXPIRES"] = timedelta(
                     seconds=c["CELERY_TASK_RESULT_EXPIRES"])