Jelajahi Sumber

Reanamed *routing_table* -> *queues*, to not be confused with the new routing functionality.

Ask Solem 15 tahun lalu
induk
melakukan
a1b13b79b9

+ 2 - 2
celery/bin/celeryd.py

@@ -269,11 +269,11 @@ class Worker(object):
             include_builtins = self.loglevel <= logging.DEBUG
             include_builtins = self.loglevel <= logging.DEBUG
             tasklist = self.tasklist(include_builtins=include_builtins)
             tasklist = self.tasklist(include_builtins=include_builtins)
 
 
-        routing_table = conf.get_routing_table()
+        queues = conf.get_queues()
 
 
         return STARTUP_INFO_FMT % {
         return STARTUP_INFO_FMT % {
             "conninfo": info.format_broker_info(),
             "conninfo": info.format_broker_info(),
-            "queues": info.format_routing_table(routing_table, indent=8),
+            "queues": info.format_queues(queues, indent=8),
             "concurrency": self.concurrency,
             "concurrency": self.concurrency,
             "loglevel": conf.LOG_LEVELS[self.loglevel],
             "loglevel": conf.LOG_LEVELS[self.loglevel],
             "logfile": self.logfile or "[stderr]",
             "logfile": self.logfile or "[stderr]",

+ 3 - 3
celery/conf.py

@@ -232,7 +232,7 @@ CELERYMON_LOG_LEVEL = _get("CELERYMON_LOG_LEVEL")
 CELERYMON_LOG_FILE = _get("CELERYMON_LOG_FILE")
 CELERYMON_LOG_FILE = _get("CELERYMON_LOG_FILE")
 
 
 
 
-def _init_routing_table(queues):
+def _init_queues(queues):
     """Convert configuration mapping to a table of queues digestible
     """Convert configuration mapping to a table of queues digestible
     by a :class:`carrot.messaging.ConsumerSet`."""
     by a :class:`carrot.messaging.ConsumerSet`."""
 
 
@@ -246,5 +246,5 @@ def _init_routing_table(queues):
     return dict((queue, _defaults(opts)) for queue, opts in queues.items())
     return dict((queue, _defaults(opts)) for queue, opts in queues.items())
 
 
 
 
-def get_routing_table():
-    return _init_routing_table(QUEUES)
+def get_queues():
+    return _init_queues(QUEUES)

+ 4 - 4
celery/execute/__init__.py

@@ -17,7 +17,7 @@ extract_exec_options = mattrgetter("queue", "routing_key", "exchange",
 @with_connection
 @with_connection
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
         task_id=None, publisher=None, connection=None, connect_timeout=None,
         task_id=None, publisher=None, connection=None, connect_timeout=None,
-        routes=None, routing_table=None, **options):
+        routes=None, queues=None, **options):
     """Run a task asynchronously by the celery daemon(s).
     """Run a task asynchronously by the celery daemon(s).
 
 
     :param task: The :class:`~celery.task.base.Task` to run.
     :param task: The :class:`~celery.task.base.Task` to run.
@@ -81,8 +81,8 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     """
     """
     if routes is None:
     if routes is None:
         routes = conf.ROUTES
         routes = conf.ROUTES
-    if routing_table is None:
-        routing_table = conf.get_routing_table()
+    if queues is None:
+        queues = conf.get_queues()
 
 
     if conf.ALWAYS_EAGER:
     if conf.ALWAYS_EAGER:
         return apply(task, args, kwargs, task_id=task_id)
         return apply(task, args, kwargs, task_id=task_id)
@@ -90,7 +90,7 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     task = tasks[task.name] # get instance from registry
     task = tasks[task.name] # get instance from registry
 
 
     options = dict(extract_exec_options(task), **options)
     options = dict(extract_exec_options(task), **options)
-    options = route(routes, options, routing_table,
+    options = route(routes, options, queues,
                     task.name, args, kwargs)
                     task.name, args, kwargs)
     exchange = options.get("exchange")
     exchange = options.get("exchange")
     exchange_type = options.get("exchange_type")
     exchange_type = options.get("exchange_type")

+ 2 - 2
celery/messaging.py

@@ -23,7 +23,7 @@ MSG_OPTIONS = ("mandatory", "priority", "immediate",
 
 
 get_msg_options = mitemgetter(*MSG_OPTIONS)
 get_msg_options = mitemgetter(*MSG_OPTIONS)
 extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
 extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
-default_queue = conf.get_routing_table()[conf.DEFAULT_QUEUE]
+default_queue = conf.get_queues()[conf.DEFAULT_QUEUE]
 
 
 _queues_declared = False
 _queues_declared = False
 _exchanges_declared = {}
 _exchanges_declared = {}
@@ -256,7 +256,7 @@ def get_consumer_set(connection, queues=None, **options):
     Defaults to the queues in ``CELERY_QUEUES``.
     Defaults to the queues in ``CELERY_QUEUES``.
 
 
     """
     """
-    queues = queues or conf.get_routing_table()
+    queues = queues or conf.get_queues()
     cset = ConsumerSet(connection)
     cset = ConsumerSet(connection)
     for queue_name, queue_options in queues.items():
     for queue_name, queue_options in queues.items():
         queue_options = dict(queue_options)
         queue_options = dict(queue_options)

+ 5 - 5
celery/routes.py

@@ -12,7 +12,7 @@ class MapRoute(object):
         return self.map.get(task)
         return self.map.get(task)
 
 
 
 
-def expand_destination(route, routing_table):
+def expand_destination(route, queues):
     # The route can simply be a queue name,
     # The route can simply be a queue name,
     # this is convenient for direct exchanges.
     # this is convenient for direct exchanges.
     if isinstance(route, basestring):
     if isinstance(route, basestring):
@@ -24,7 +24,7 @@ def expand_destination(route, routing_table):
 
 
     if queue:
     if queue:
         try:
         try:
-            dest = dict(routing_table[queue])
+            dest = dict(queues[queue])
         except KeyError:
         except KeyError:
             raise RouteNotFound(
             raise RouteNotFound(
                 "Route %s does not exist in the routing table "
                 "Route %s does not exist in the routing table "
@@ -50,13 +50,13 @@ def prepare(routes):
     return map(expand_route, routes)
     return map(expand_route, routes)
 
 
 
 
-def route(routes, options, routing_table, task, args=(), kwargs={}):
+def route(routes, options, queues, task, args=(), kwargs={}):
     # Expand "queue" keys in options.
     # Expand "queue" keys in options.
-    options = expand_destination(options, routing_table)
+    options = expand_destination(options, queues)
     if routes:
     if routes:
         route = lookup_route(routes, task, args, kwargs)
         route = lookup_route(routes, task, args, kwargs)
         # Also expand "queue" keys in route.
         # Also expand "queue" keys in route.
-        return dict(options, **expand_destination(route, routing_table))
+        return dict(options, **expand_destination(route, queues))
     return options
     return options
 
 
 
 

+ 13 - 13
celery/tests/test_routes.py

@@ -7,9 +7,9 @@ from celery.utils.functional import wraps
 from celery.exceptions import RouteNotFound
 from celery.exceptions import RouteNotFound
 
 
 
 
-def E(routing_table):
+def E(queues):
     def expand(answer):
     def expand(answer):
-        return routes.expand_destination(answer, routing_table)
+        return routes.expand_destination(answer, queues)
     return expand
     return expand
 
 
 
 
@@ -28,29 +28,29 @@ def with_queues(**queues):
     return patch_fun
     return patch_fun
 
 
 
 
-a_route = {"exchange": "fooexchange",
+a_queue = {"exchange": "fooexchange",
            "exchange_type": "fanout",
            "exchange_type": "fanout",
                "binding_key": "xuzzy"}
                "binding_key": "xuzzy"}
-b_route = {"exchange": "barexchange",
+b_queue = {"exchange": "barexchange",
            "exchange_type": "topic",
            "exchange_type": "topic",
            "binding_key": "b.b.#"}
            "binding_key": "b.b.#"}
 
 
 
 
 class test_MapRoute(unittest.TestCase):
 class test_MapRoute(unittest.TestCase):
 
 
-    @with_queues(foo=a_route, bar=b_route)
+    @with_queues(foo=a_queue, bar=b_queue)
     def test_route_for_task_expanded_route(self):
     def test_route_for_task_expanded_route(self):
         expand = E(conf.QUEUES)
         expand = E(conf.QUEUES)
         route = routes.MapRoute({"celery.ping": "foo"})
         route = routes.MapRoute({"celery.ping": "foo"})
-        self.assertDictContainsSubset(a_route,
+        self.assertDictContainsSubset(a_queue,
                              expand(route.route_for_task("celery.ping")))
                              expand(route.route_for_task("celery.ping")))
         self.assertIsNone(route.route_for_task("celery.awesome"))
         self.assertIsNone(route.route_for_task("celery.awesome"))
 
 
-    @with_queues(foo=a_route, bar=b_route)
+    @with_queues(foo=a_queue, bar=b_queue)
     def test_route_for_task(self):
     def test_route_for_task(self):
         expand = E(conf.QUEUES)
         expand = E(conf.QUEUES)
-        route = routes.MapRoute({"celery.ping": b_route})
-        self.assertDictContainsSubset(b_route,
+        route = routes.MapRoute({"celery.ping": b_queue})
+        self.assertDictContainsSubset(b_queue,
                              expand(route.route_for_task("celery.ping")))
                              expand(route.route_for_task("celery.ping")))
         self.assertIsNone(route.route_for_task("celery.awesome"))
         self.assertIsNone(route.route_for_task("celery.awesome"))
 
 
@@ -62,21 +62,21 @@ class test_MapRoute(unittest.TestCase):
 
 
 class test_lookup_route(unittest.TestCase):
 class test_lookup_route(unittest.TestCase):
 
 
-    @with_queues(foo=a_route, bar=b_route)
+    @with_queues(foo=a_queue, bar=b_queue)
     def test_lookup_takes_first(self):
     def test_lookup_takes_first(self):
         expand = E(conf.QUEUES)
         expand = E(conf.QUEUES)
         R = routes.prepare(({"celery.ping": "bar"},
         R = routes.prepare(({"celery.ping": "bar"},
                             {"celery.ping": "foo"}))
                             {"celery.ping": "foo"}))
-        self.assertDictContainsSubset(b_route,
+        self.assertDictContainsSubset(b_queue,
                 expand(routes.lookup_route(R, "celery.ping",
                 expand(routes.lookup_route(R, "celery.ping",
                     args=[1, 2], kwargs={})))
                     args=[1, 2], kwargs={})))
 
 
-    @with_queues(foo=a_route, bar=b_route)
+    @with_queues(foo=a_queue, bar=b_queue)
     def test_lookup_paths_traversed(self):
     def test_lookup_paths_traversed(self):
         expand = E(conf.QUEUES)
         expand = E(conf.QUEUES)
         R = routes.prepare(({"celery.xaza": "bar"},
         R = routes.prepare(({"celery.xaza": "bar"},
                             {"celery.ping": "foo"}))
                             {"celery.ping": "foo"}))
-        self.assertDictContainsSubset(a_route,
+        self.assertDictContainsSubset(a_queue,
                 expand(routes.lookup_route(R, "celery.ping",
                 expand(routes.lookup_route(R, "celery.ping",
                     args=[1, 2], kwargs={})))
                     args=[1, 2], kwargs={})))
         self.assertIsNone(routes.lookup_route(R, "celery.poza"))
         self.assertIsNone(routes.lookup_route(R, "celery.poza"))

+ 4 - 4
celery/tests/test_utils_info.py

@@ -16,7 +16,7 @@ RANDTEXT_RES = """\
     lazy dog\
     lazy dog\
 """
 """
 
 
-ROUTE = {"queue1": {
+QUEUES = {"queue1": {
             "exchange": "exchange1",
             "exchange": "exchange1",
             "exchange_type": "type1",
             "exchange_type": "type1",
             "binding_key": "bind1"},
             "binding_key": "bind1"},
@@ -26,7 +26,7 @@ ROUTE = {"queue1": {
             "binding_key": "bind2"}}
             "binding_key": "bind2"}}
 
 
 
 
-ROUTE_FORMAT = """
+QUEUE_FORMAT = """
 . queue1 -> exchange:exchange1 (type1) binding:bind1
 . queue1 -> exchange:exchange1 (type1) binding:bind1
 . queue2 -> exchange:exchange2 (type2) binding:bind2
 . queue2 -> exchange:exchange2 (type2) binding:bind2
 """.strip()
 """.strip()
@@ -55,8 +55,8 @@ class TestInfo(unittest.TestCase):
     def test_textindent(self):
     def test_textindent(self):
         self.assertEqual(info.textindent(RANDTEXT, 4), RANDTEXT_RES)
         self.assertEqual(info.textindent(RANDTEXT, 4), RANDTEXT_RES)
 
 
-    def test_format_routing_table(self):
-        self.assertEqual(info.format_routing_table(ROUTE), ROUTE_FORMAT)
+    def test_format_queues(self):
+        self.assertEqual(info.format_queues(QUEUES), QUEUE_FORMAT)
 
 
     def test_broker_info(self):
     def test_broker_info(self):
         info.format_broker_info()
         info.format_broker_info()

+ 6 - 6
celery/utils/info.py

@@ -2,7 +2,7 @@ import math
 
 
 from celery.messaging import establish_connection
 from celery.messaging import establish_connection
 
 
-ROUTE_FORMAT = """
+QUEUE_FORMAT = """
 . %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
 . %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
 binding:%(binding_key)s
 binding:%(binding_key)s
 """
 """
@@ -30,12 +30,12 @@ def textindent(t, indent=0):
     return "\n".join(" " * indent + p for p in t.split("\n"))
     return "\n".join(" " * indent + p for p in t.split("\n"))
 
 
 
 
-def format_routing_table(table, indent=0):
+def format_queues(queues, indent=0):
     """Format routing table into string for log dumps."""
     """Format routing table into string for log dumps."""
-    format = lambda **route: ROUTE_FORMAT.strip() % route
-    routes = "\n".join(format(name=name, **route)
-                            for name, route in table.items())
-    return textindent(routes, indent=indent)
+    format = lambda **queue: QUEUE_FORMAT.strip() % queue
+    info = "\n".join(format(name=name, **config)
+                            for name, config in queues.items())
+    return textindent(info, indent=indent)
 
 
 
 
 def get_broker_info():
 def get_broker_info():