瀏覽代碼

Missing queue definitions are now created automatically.

You can disable this using the CELERY_CREATE_MISSING_QUEUES setting.

The missing queues are created with the following options:

    CELERY_QUEUES[name] = {"exchange": name,
                           "exchange_type": "direct",
                           "routing_key": "name}
Ask Solem 14 年之前
父節點
當前提交
d1cf157211
共有 5 個文件被更改,包括 95 次插入7 次删除
  1. 9 0
      celery/bin/celeryd.py
  2. 2 0
      celery/conf.py
  3. 2 2
      celery/exceptions.py
  4. 19 5
      celery/routes.py
  5. 63 0
      docs/userguide/routing.rst

+ 9 - 0
celery/bin/celeryd.py

@@ -79,6 +79,7 @@ from celery.utils import info
 from celery.utils import get_full_cls_name
 from celery.worker import WorkController
 from celery.exceptions import ImproperlyConfigured
+from celery.routes import Router
 
 STARTUP_INFO_FMT = """
 Configuration ->
@@ -227,6 +228,14 @@ class Worker(object):
             conf.QUEUES = dict((queue, options)
                                 for queue, options in conf.QUEUES.items()
                                     if queue in self.queues)
+            for queue in self.queues:
+                if queue not in conf.QUEUES:
+                    if conf.CREATE_MISSING_QUEUES:
+                        Router(queues=conf.QUEUES).add_queue(queue)
+                        print("QUEUES: %s" % conf.QUEUES)
+                    else:
+                        raise ImproperlyConfigured(
+                            "Queue '%s' not defined in CELERY_QUEUES" % queue)
 
     def init_loader(self):
         from celery.loaders import current_loader, load_settings

+ 2 - 0
celery/conf.py

@@ -35,6 +35,7 @@ _DEFAULTS = {
     "CELERYD_TASK_SOFT_TIME_LIMIT": None,
     "CELERYD_MAX_TASKS_PER_CHILD": None,
     "CELERY_ROUTES": None,
+    "CELERY_CREATE_MISSING_QUEUES": True,
     "CELERY_DEFAULT_ROUTING_KEY": "celery",
     "CELERY_DEFAULT_QUEUE": "celery",
     "CELERY_DEFAULT_EXCHANGE": "celery",
@@ -199,6 +200,7 @@ QUEUES = _get("CELERY_QUEUES") or {DEFAULT_QUEUE: {
                                        "exchange": DEFAULT_EXCHANGE,
                                        "exchange_type": DEFAULT_EXCHANGE_TYPE,
                                        "binding_key": DEFAULT_ROUTING_KEY}}
+CREATE_MISSING_QUEUES = _get("CELERY_CREATE_MISSING_QUEUES")
 ROUTES = routes.prepare(_get("CELERY_ROUTES") or [])
 # :--- Broadcast queue settings                     <-   --   --- - ----- -- #
 

+ 2 - 2
celery/exceptions.py

@@ -9,8 +9,8 @@ Task of kind %s is not registered, please make sure it's imported.
 """.strip()
 
 
-class RouteNotFound(KeyError):
-    """Task routed to a queue not in the routing table (CELERY_QUEUES)."""
+class QueueNotFound(KeyError):
+    """Task routed to a queue not in CELERY_QUEUES."""
 
 
 class TimeLimitExceeded(Exception):

+ 19 - 5
celery/routes.py

@@ -1,4 +1,4 @@
-from celery.exceptions import RouteNotFound
+from celery.exceptions import QueueNotFound
 from celery.utils import instantiate, firstmethod
 
 _first_route = firstmethod("route_for_task")
@@ -16,9 +16,21 @@ class MapRoute(object):
 
 class Router(object):
 
-    def __init__(self, routes, queues):
+    def __init__(self, routes=None, queues=None, create_missing=False):
+        if queues is None:
+            queues = {}
+        if routes is None:
+            routes = []
         self.queues = queues
         self.routes = routes
+        self.create_missing = create_missing
+
+    def add_queue(self, queue):
+        q = self.queues[queue] = {"binding_key": queue,
+                                  "routing_key": queue,
+                                  "exchange": queue,
+                                  "exchange_type": "direct"}
+        return q
 
     def route(self, options, task, args=(), kwargs={}):
         # Expand "queue" keys in options.
@@ -44,9 +56,11 @@ class Router(object):
             try:
                 dest = dict(self.queues[queue])
             except KeyError:
-                raise RouteNotFound(
-                    "Route %s does not exist in the routing table "
-                    "(CELERY_QUEUES)" % route)
+                if self.create_missing:
+                    dest = self.add_queue(queue)
+                else:
+                    raise QueueNotFound(
+                        "Queue '%s' is not defined in CELERY_QUEUES" % queue)
             dest.setdefault("routing_key", dest.get("binding_key"))
             return dict(route, **dest)
 

+ 63 - 0
docs/userguide/routing.rst

@@ -14,6 +14,69 @@ respective documenation for more information, or contact the `mailinglist`_.
 Basics
 ======
 
+Automatic routing
+-----------------
+
+The simplest way to do routing is to use the ``CELERY_CREATE_MISSING_QUEUES``
+setting (on by default).
+
+When this setting is on a named queue that is not already defined in
+``CELERY_QUEUES`` will be created automatically. This makes it easy to perform
+simple routing tasks.
+
+Say you have two servers, ``x``, and ``y`` that handles regular tasks,
+and one server ``z``, that only handles feed related tasks, you can use this
+configuration:
+
+    CELERY_ROUTES = {"feed.tasks.import_feed": "feeds"}
+
+With this route enabled import feed tasks will be routed to the
+``"feeds"`` queue, while all other tasks will be routed to the default queue
+(named ``"celery"``, for historic reasons).
+
+Now you can start server ``z`` to only process the feeds queue like this::
+
+    (z)$ celeryd -Q feeds
+
+You can specify as many queues as you want, so you can make this server
+process the default queue as well::
+
+    (z)$ celeryd -Q feeds,celery
+
+Changing the name of the default queue
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+You can change the name of the default queue by using the following
+configuration:
+
+.. code-block:: python
+
+    CELERY_QUEUES = {"default": {"exchange": "default",
+                                 "binding_key": "default"}}
+    CELERY_DEFAULT_QUEUE = "default"
+
+How the queues are defined
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The point with this feature is to hide the complex AMQP protocol for users
+with only basic needs. However, you may still be interested in how these queues
+are defined.
+
+A queue named ``"video"`` will be created with the following settings:
+
+.. code-block:: python
+
+    {"exchange": "video",
+     "exchange_type": "direct",
+     "routing_key": "video"}
+
+The non-AMQP backends like ``ghettoq`` does not support exchanges, so they
+require the exchange to have the same name as the queue. Using this design
+ensures it will work for them as well.
+
+Manual routing
+--------------
+
 Say you have two servers, ``x``, and ``y`` that handles regular tasks,
 and one server ``z``, that only handles feed related tasks, you can use this
 configuration: