Kaynağa Gözat

Updated with new AMQP queue/exchange settings: http://wiki.github.com/ask/celery/rewriting-the-amqp-routing-options

Ask Solem 15 yıl önce
ebeveyn
işleme
ee79e3b466

+ 74 - 32
FAQ

@@ -260,9 +260,13 @@ Use the following specific settings in your ``settings.py``:
 
 
     # The queue name to use (both queue and exchange must be set to the
     # The queue name to use (both queue and exchange must be set to the
     # same queue name when using STOMP)
     # same queue name when using STOMP)
-    CELERY_AMQP_CONSUMER_QUEUE = "/queue/celery"
-    CELERY_AMQP_EXCHANGE = "/queue/celery" 
-   
+    CELERY_DEFAULT_QUEUE = "/queue/celery"
+    CELERY_DEFAULT_EXCHANGE = "/queue/celery" 
+
+    CELERY_QUEUES = {
+        "/queue/celery": {"exchange": "/queue/celery"}
+    }
+
 Now you can go on reading the tutorial in the README, ignoring any AMQP
 Now you can go on reading the tutorial in the README, ignoring any AMQP
 specific options. 
 specific options. 
 
 
@@ -286,10 +290,8 @@ Features
 Can I send some tasks to only some servers?
 Can I send some tasks to only some servers?
 --------------------------------------------
 --------------------------------------------
 
 
-**Answer:** As of now there is only one use-case that works like this,
-and that is tasks of type ``A`` can be sent to servers ``x`` and ``y``,
-while tasks of type ``B`` can be sent to server ``z``. One server can't
-handle more than one routing_key, but this is coming in a later release.
+**Answer:** Yes. You can route tasks to an arbitrary server using AMQP,
+and a worker can bind to as many queues as it wants.
 
 
 Say you have two servers, ``x``, and ``y`` that handles regular tasks,
 Say you have two servers, ``x``, and ``y`` that handles regular tasks,
 and one server ``z``, that only handles feed related tasks, you can use this
 and one server ``z``, that only handles feed related tasks, you can use this
@@ -299,34 +301,33 @@ configuration:
 
 
     .. code-block:: python
     .. code-block:: python
 
 
-        BROKER_HOST = "rabbit"
-        BROKER_PORT = 5678
-        BROKER_USER = "myapp"
-        BROKER_PASSWORD = "secret"
-        BROKER_VHOST = "myapp"
-
-        CELERY_AMQP_CONSUMER_QUEUE = "regular_tasks"
-        CELERY_AMQP_EXCHANGE = "tasks"
-        CELERY_AMQP_PUBLISHER_ROUTING_KEY = "task.regular"
-        CELERY_AMQP_CONSUMER_ROUTING_KEY = "task.#"
-        CELERY_AMQP_EXCHANGE_TYPE = "topic"
+    CELERY_DEFAULT_QUEUE = "regular_tasks"
+    CELERY_QUEUES = {
+        "regular_tasks": {
+            "binding_key": "task.#",
+        },
+    }
+        CELERY_DEFAULT_EXCHANGE = "tasks"
+        CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
+        CELERY_DEFAULT_ROUTING_KEY = "task.regular"
 
 
     * Server ``z``: settings.py:
     * Server ``z``: settings.py:
 
 
     .. code-block:: python
     .. code-block:: python
 
 
-        BROKER_HOST = "rabbit"
-        BROKER_PORT = 5678
-        BROKER_USER = "myapp"
-        BROKER_PASSWORD = "secret"
-        BROKER_VHOST = "myapp"
+        CELERY_DEFAULT_QUEUE = "feed_tasks"
+        CELERY_QUEUES = {
+            "feed_tasks": {
+                "binding_key": "feed.#",
+            },
+        }
+        CELERY_DEFAULT_EXCHANGE = "tasks"
+        CELERY_DEFAULT_ROUTING_KEY = "task.regular"
+        CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
 
 
-        CELERY_AMQP_EXCHANGE = "tasks"
-        CELERY_AMQP_PUBLISHER_ROUTING_KEY = "task.regular"
-        CELERY_AMQP_EXCHANGE_TYPE = "topic"
-        # This is the settings different for this server:
-        CELERY_AMQP_CONSUMER_QUEUE = "feed_tasks"
-        CELERY_AMQP_CONSUMER_ROUTING_KEY = "feed.#"
+``CELERY_QUEUES`` is a map of queue names and their exchange/type/binding_key,
+if you don't set exchange or exchange type, they will be taken from the
+``CELERY_DEFAULT_EXCHANGE``/``CELERY_DEFAULT_EXCHANGE_TYPE`` settings.
 
 
 Now to make a Task run on the ``z`` server you need to set its
 Now to make a Task run on the ``z`` server you need to set its
 ``routing_key`` attribute so it starts with the words ``"task.feed."``:
 ``routing_key`` attribute so it starts with the words ``"task.feed."``:
@@ -347,11 +348,52 @@ Now to make a Task run on the ``z`` server you need to set its
 You can also override this using the ``routing_key`` argument to
 You can also override this using the ``routing_key`` argument to
 :func:`celery.task.apply_async`:
 :func:`celery.task.apply_async`:
 
 
-    >>> from celery.task import apply_async
     >>> from myapp.tasks import RefreshFeedTask
     >>> from myapp.tasks import RefreshFeedTask
-    >>> apply_async(RefreshFeedTask, args=["http://cnn.com/rss"],
-    ...             routing_key="feed.importer")
+    >>> RefreshFeedTask.apply_async(args=["http://cnn.com/rss"],
+    ...                             routing_key="feed.importer")
+
+
+ If you want, you can even have your feed processing worker handle regular
+ tasks as well, maybe in times when there's a lot of work to do.
+ Just add a new queue to server ``z``'s ``CELERY_QUEUES``:
+
+ .. code-block:: python
+
+        CELERY_QUEUES = {
+            "feed_tasks": {
+                "binding_key": "feed.#",
+            },
+            "regular_tasks": {
+                "binding_key": "task.#",
+            },
+        }
+
+Since the default exchange is ``tasks``, they will both use the same
+exchange.
+
+If you have another queue but on another exchange you want to add,
+just specify a custom exchange and exchange type:
+
+.. code-block:: python
 
 
+CELERY_QUEUES = {
+            "feed_tasks": {
+                "binding_key": "feed.#",
+            },
+            "regular_tasks": {
+                "binding_key": "task.#",
+            }
+            "image_tasks": {
+                "binding_key": "image.compress",
+                "exchange": "mediatasks",
+                "exchange_type": "direct",
+            },
+        }
+
+Easy? No? If you're confused about these terms, you should read up on
+AMQP and RabbitMQ. It might be hard to grok the concepts of
+queues, exchanges and routing/binding keys at first, but it's all very simple,
+I assure you.
 
 
 Can I use celery without Django?
 Can I use celery without Django?
 --------------------------------
 --------------------------------

+ 7 - 9
celery/bin/celerybeat.py

@@ -63,9 +63,8 @@ from celery.messaging import get_connection_info
 STARTUP_INFO_FMT = """
 STARTUP_INFO_FMT = """
 Configuration ->
 Configuration ->
     . broker -> %(conninfo)s
     . broker -> %(conninfo)s
-    . exchange -> %(exchange)s (%(exchange_type)s)
-    . consumer -> queue:%(consumer_queue)s binding:%(consumer_rkey)s
     . schedule -> %(schedule)s
     . schedule -> %(schedule)s
+    . sys -> %(logfile)s@%(loglevel)s %(pidfile)s
 """.strip()
 """.strip()
 
 
 OPTION_LIST = (
 OPTION_LIST = (
@@ -124,15 +123,14 @@ def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
     # Dump configuration to screen so we have some basic information
     # Dump configuration to screen so we have some basic information
     # when users sends e-mails.
     # when users sends e-mails.
 
 
+    from celery.messaging import format_routing_table
+
+
     print(STARTUP_INFO_FMT % {
     print(STARTUP_INFO_FMT % {
             "conninfo": get_connection_info(),
             "conninfo": get_connection_info(),
-            "exchange": conf.AMQP_EXCHANGE,
-            "exchange_type": conf.AMQP_EXCHANGE_TYPE,
-            "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
-            "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
-            "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
-            "loglevel": loglevel,
-            "pidfile": pidfile,
+            "logfile": logfile or "@stderr",
+            "loglevel": conf.LOG_LEVELS[loglevel],
+            "pidfile": detach and pidfile or "",
             "schedule": schedule,
             "schedule": schedule,
     })
     })
 
 

+ 8 - 10
celery/bin/celeryd.py

@@ -76,14 +76,15 @@ from celery.utils import noop
 from celery.worker import WorkController
 from celery.worker import WorkController
 from celery.loaders import current_loader, settings
 from celery.loaders import current_loader, settings
 from celery.loaders import settings
 from celery.loaders import settings
-from celery.messaging import get_connection_info
+from celery.messaging import get_connection_info, format_routing_table
 
 
 STARTUP_INFO_FMT = """
 STARTUP_INFO_FMT = """
 Configuration ->
 Configuration ->
     . broker -> %(conninfo)s
     . broker -> %(conninfo)s
-    . exchange -> %(exchange)s (%(exchange_type)s)
-    . consumer -> queue:%(consumer_queue)s binding:%(consumer_rkey)s
+    . queues ->
+%(queues)s
     . concurrency -> %(concurrency)s
     . concurrency -> %(concurrency)s
+    . sys -> %(logfile)s@%(loglevel)s %(pidfile)s
     . events -> %(events)s
     . events -> %(events)s
     . beat -> %(celerybeat)s
     . beat -> %(celerybeat)s
 """.strip()
 """.strip()
@@ -173,14 +174,11 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
 
 
     print(STARTUP_INFO_FMT % {
     print(STARTUP_INFO_FMT % {
             "conninfo": get_connection_info(),
             "conninfo": get_connection_info(),
-            "exchange": conf.AMQP_EXCHANGE,
-            "exchange_type": conf.AMQP_EXCHANGE_TYPE,
-            "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
-            "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
-            "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
+            "queues": format_routing_table(indent=8),
             "concurrency": concurrency,
             "concurrency": concurrency,
-            "loglevel": loglevel,
-            "pidfile": pidfile,
+            "loglevel": conf.LOG_LEVELS[loglevel],
+            "logfile": logfile or "@stderr",
+            "pidfile": detach and pidfile or "",
             "celerybeat": run_clockservice and "ON" or "OFF",
             "celerybeat": run_clockservice and "ON" or "OFF",
             "events": events and "ON" or "OFF",
             "events": events and "ON" or "OFF",
     })
     })

+ 17 - 28
celery/conf.py

@@ -6,22 +6,15 @@ from celery.loaders import settings
 
 
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
 
 
-LOG_LEVELS = {
-    "DEBUG": logging.DEBUG,
-    "INFO": logging.INFO,
-    "WARNING": logging.WARNING,
-    "WARN": logging.WARNING,
-    "ERROR": logging.ERROR,
-    "CRITICAL": logging.CRITICAL,
-    "FATAL": logging.FATAL,
-}
+LOG_LEVELS = dict(logging._levelNames)
+LOG_LEVELS["FATAL"] = logging.FATAL
+LOG_LEVELS[logging.FATAL] = "FATAL"
 
 
 _DEFAULTS = {
 _DEFAULTS = {
-    "CELERY_AMQP_EXCHANGE": "celery",
-    "CELERY_AMQP_PUBLISHER_ROUTING_KEY": "celery",
-    "CELERY_AMQP_CONSUMER_ROUTING_KEY": "celery",
-    "CELERY_AMQP_CONSUMER_QUEUE": "celery",
-    "CELERY_AMQP_EXCHANGE_TYPE": "direct",
+    "CELERY_DEFAULT_ROUTING_KEY": "celery",
+    "CELERY_DEFAULT_QUEUE": "celery",
+    "CELERY_DEFAULT_EXCHANGE": "celery",
+    "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
     "CELERYD_CONCURRENCY": 0, # defaults to cpu count
     "CELERYD_CONCURRENCY": 0, # defaults to cpu count
     "CELERYD_PID_FILE": "celeryd.pid",
     "CELERYD_PID_FILE": "celeryd.pid",
     "CELERYD_DAEMON_LOG_FORMAT": DEFAULT_LOG_FMT,
     "CELERYD_DAEMON_LOG_FORMAT": DEFAULT_LOG_FMT,
@@ -67,20 +60,16 @@ if isinstance(TASK_RESULT_EXPIRES, int):
 SEND_CELERY_TASK_ERROR_EMAILS = _get("SEND_CELERY_TASK_ERROR_EMAILS",
 SEND_CELERY_TASK_ERROR_EMAILS = _get("SEND_CELERY_TASK_ERROR_EMAILS",
                                      not settings.DEBUG)
                                      not settings.DEBUG)
 
 
-AMQP_EXCHANGE = _get("CELERY_AMQP_EXCHANGE")
-AMQP_EXCHANGE_TYPE = _get("CELERY_AMQP_EXCHANGE_TYPE")
-AMQP_PUBLISHER_ROUTING_KEY = _get("CELERY_AMQP_PUBLISHER_ROUTING_KEY")
-AMQP_CONSUMER_ROUTING_KEY = _get("CELERY_AMQP_CONSUMER_ROUTING_KEY")
-AMQP_CONSUMER_QUEUE = _get("CELERY_AMQP_CONSUMER_QUEUE")
-DEFAULT_AMQP_CONSUMER_QUEUES = {
-        AMQP_CONSUMER_QUEUE: {
-            "exchange": AMQP_EXCHANGE,
-            "routing_key": AMQP_CONSUMER_ROUTING_KEY,
-            "exchange_type": AMQP_EXCHANGE_TYPE,
-        }
-}
-AMQP_CONSUMER_QUEUES = _get("CELERY_AMQP_CONSUMER_QUEUES",
-                            DEFAULT_AMQP_CONSUMER_QUEUES)
+DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
+DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")
+DEFAULT_EXCHANGE = _get("CELERY_DEFAULT_EXCHANGE")
+DEFAULT_EXCHANGE_TYPE = _get("CELERY_DEFAULT_EXCHANGE_TYPE")
+
+QUEUES = _get("CELERY_QUEUES", {DEFAULT_QUEUE: {
+                                    "exchange": DEFAULT_EXCHANGE,
+                                    "exchange_type": DEFAULT_EXCHANGE_TYPE,
+                                    "binding_key": DEFAULT_ROUTING_KEY}})
+
 AMQP_CONNECTION_TIMEOUT = _get("CELERY_AMQP_CONNECTION_TIMEOUT")
 AMQP_CONNECTION_TIMEOUT = _get("CELERY_AMQP_CONNECTION_TIMEOUT")
 AMQP_CONNECTION_RETRY = _get("CELERY_AMQP_CONNECTION_RETRY")
 AMQP_CONNECTION_RETRY = _get("CELERY_AMQP_CONNECTION_RETRY")
 AMQP_CONNECTION_MAX_RETRIES = _get("CELERY_AMQP_CONNECTION_MAX_RETRIES")
 AMQP_CONNECTION_MAX_RETRIES = _get("CELERY_AMQP_CONNECTION_MAX_RETRIES")

+ 37 - 10
celery/messaging.py

@@ -10,22 +10,40 @@ from billiard.utils.functional import wraps
 
 
 from celery import conf
 from celery import conf
 from celery import signals
 from celery import signals
-from celery.utils import gen_unique_id, mitemgetter, noop
+from celery.utils import gen_unique_id, mitemgetter, noop, textindent
+
+ROUTE_INFO_FORMAT = """
+. %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
+binding:%(binding_key)s
+"""
+
 
 
 MSG_OPTIONS = ("mandatory", "priority",
 MSG_OPTIONS = ("mandatory", "priority",
                "immediate", "routing_key",
                "immediate", "routing_key",
                "serializer")
                "serializer")
 
 
 get_msg_options = mitemgetter(*MSG_OPTIONS)
 get_msg_options = mitemgetter(*MSG_OPTIONS)
-
 extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
 extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
 
 
 
 
+def routing_table():
+
+    def _defaults(opts):
+        opts.setdefault("exchange", conf.DEFAULT_EXCHANGE),
+        opts.setdefault("exchange_type", conf.DEFAULT_EXCHANGE_TYPE)
+        opts.setdefault("binding_key", "")
+        return opts
+
+    return dict((queue, _defaults(opts))
+                    for queue, opts in conf.QUEUES.items())
+_default_queue = routing_table()[conf.DEFAULT_QUEUE]
+
+
 class TaskPublisher(Publisher):
 class TaskPublisher(Publisher):
     """The AMQP Task Publisher class."""
     """The AMQP Task Publisher class."""
-    exchange = conf.AMQP_EXCHANGE
-    exchange_type = conf.AMQP_EXCHANGE_TYPE
-    routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
+    exchange = _default_queue["exchange"]
+    exchange_type = _default_queue["exchange_type"]
+    routing_key = conf.DEFAULT_ROUTING_KEY
     serializer = conf.TASK_SERIALIZER
     serializer = conf.TASK_SERIALIZER
 
 
     def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
     def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
@@ -59,16 +77,17 @@ class TaskPublisher(Publisher):
         return task_id
         return task_id
 
 
 
 
-def get_consumer_set(connection, queues=conf.AMQP_CONSUMER_QUEUES, **options):
+def get_consumer_set(connection, queues=None, **options):
+    queues = queues or routing_table()
     return ConsumerSet(connection, from_dict=queues, **options)
     return ConsumerSet(connection, from_dict=queues, **options)
 
 
 
 
 class TaskConsumer(Consumer):
 class TaskConsumer(Consumer):
     """The AMQP Task Consumer class."""
     """The AMQP Task Consumer class."""
-    queue = conf.AMQP_CONSUMER_QUEUE
-    exchange = conf.AMQP_EXCHANGE
-    routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
-    exchange_type = conf.AMQP_EXCHANGE_TYPE
+    queue = conf.DEFAULT_QUEUE
+    exchange = _default_queue["exchange"]
+    routing_key = _default_queue["binding_key"]
+    exchange_type = _default_queue["exchange_type"]
     auto_ack = False
     auto_ack = False
     no_ack = False
     no_ack = False
 
 
@@ -155,3 +174,11 @@ def get_connection_info():
                 "host": broker_connection.hostname,
                 "host": broker_connection.hostname,
                 "port": port,
                 "port": port,
                 "vhost": vhost}
                 "vhost": vhost}
+
+
+def format_routing_table(table=None, indent=0):
+    table = table or routing_table()
+    format = lambda **route: ROUTE_INFO_FORMAT.strip() % route
+    routes = "\n".join(format(name=name, **route)
+                            for name, route in table.items())
+    return textindent(routes, indent=indent)

+ 4 - 5
celery/tests/test_conf.py

@@ -4,11 +4,10 @@ from django.conf import settings
 
 
 
 
 SETTING_VARS = (
 SETTING_VARS = (
-    ("CELERY_AMQP_CONSUMER_QUEUE", "AMQP_CONSUMER_QUEUE"),
-    ("CELERY_AMQP_PUBLISHER_ROUTING_KEY", "AMQP_PUBLISHER_ROUTING_KEY"),
-    ("CELERY_AMQP_CONSUMER_ROUTING_KEY", "AMQP_CONSUMER_ROUTING_KEY"),
-    ("CELERY_AMQP_EXCHANGE_TYPE", "AMQP_EXCHANGE_TYPE"),
-    ("CELERY_AMQP_EXCHANGE", "AMQP_EXCHANGE"),
+    ("CELERY_DEFAULT_QUEUE", "DEFAULT_QUEUE"),
+    ("CELERY_DEFAULT_ROUTING_KEY", "DEFAULT_ROUTING_KEY"),
+    ("CELERY_DEFAULT_EXCHANGE_TYPE", "DEFAULT_EXCHANGE_TYPE"),
+    ("CELERY_DEFAULT_EXCHANGE", "DEFAULT_EXCHANGE"),
     ("CELERYD_CONCURRENCY", "DAEMON_CONCURRENCY"),
     ("CELERYD_CONCURRENCY", "DAEMON_CONCURRENCY"),
     ("CELERYD_PID_FILE", "DAEMON_PID_FILE"),
     ("CELERYD_PID_FILE", "DAEMON_PID_FILE"),
     ("CELERYD_LOG_FILE", "DAEMON_LOG_FILE"),
     ("CELERYD_LOG_FILE", "DAEMON_LOG_FILE"),

+ 6 - 0
celery/utils.py

@@ -227,3 +227,9 @@ except NameError:
             if item:
             if item:
                 return True
                 return True
         return False
         return False
+
+
+def textindent(t, indent=0):
+    """Indent text."""
+    indent = " " * indent
+    return "\n".join(indent + p for p in t.split("\n"))

+ 128 - 41
docs/configuration.rst

@@ -30,14 +30,15 @@ it should contain all you need to run a basic celery set-up.
     BROKER_USER = "guest"
     BROKER_USER = "guest"
     BROKER_PASSWORD = "guest"
     BROKER_PASSWORD = "guest"
 
 
-    ## If you're doing mostly I/O you can have higher concurrency,
-    ## if mostly spending time in the CPU, try to keep it close to the
-    ## number of CPUs on your machine.
+    ## If you're doing mostly I/O you can have more processes,
+    ## but if mostly spending CPU, try to keep it close to the
+    ## number of CPUs on your machine. If not set, the number of CPUs/cores
+    ## available will be used.
     # CELERYD_CONCURRENCY = 8
     # CELERYD_CONCURRENCY = 8
 
 
-    CELERYD_LOG_FILE = "celeryd.log"
-    CELERYD_PID_FILE = "celeryd.pid"
-    CELERYD_DAEMON_LOG_LEVEL = "INFO"
+    # CELERYD_LOG_FILE = "celeryd.log"
+    # CELERYD_PID_FILE = "celeryd.pid"
+    # CELERYD_DAEMON_LOG_LEVEL = "INFO"
 
 
 Concurrency settings
 Concurrency settings
 ====================
 ====================
@@ -45,7 +46,7 @@ Concurrency settings
 * CELERYD_CONCURRENCY
 * CELERYD_CONCURRENCY
     The number of concurrent worker processes, executing tasks simultaneously.
     The number of concurrent worker processes, executing tasks simultaneously.
 
 
-    Defaults to the number of CPUs in the system.
+    Defaults to the number of CPUs/cores available.
 
 
 
 
 Task result backend settings
 Task result backend settings
@@ -109,6 +110,16 @@ Example configuration
     DATABASE_NAME = "mydatabase"
     DATABASE_NAME = "mydatabase"
     DATABASE_HOST = "localhost"
     DATABASE_HOST = "localhost"
 
 
+AMQP backend settings
+=====================
+
+The AMQP backend does not have any settings yet.
+
+Example configuration
+---------------------
+
+    CELERY_BACKEND = "amqp"
+
 Cache backend settings
 Cache backend settings
 ======================
 ======================
 
 
@@ -252,33 +263,41 @@ Example configuration
     }
     }
 
 
 
 
-Broker settings
-===============
+Messaging settings
+==================
 
 
-* CELERY_AMQP_EXCHANGE
+Routing
+-------
 
 
-    Name of the AMQP exchange.
+* CELERY_QUEUES
+  The mapping of queues the worker consumes from. This is a dictionary
+  of queue name/options. See :doc:`userguide/routing` for more information.
 
 
-* CELERY_AMQP_EXCHANGE_TYPE
-    The type of exchange. If the exchange type is ``direct``, all messages
-    receives all tasks. However, if the exchange type is ``topic``, you can
-    route e.g. some tasks to one server, and others to the rest.
-    See `Exchange types and the effect of bindings`_.
+  The default is a queue/exchange/binding key of ``"celery"``, with
+  exchange type ``direct``.
 
 
-    .. _`Exchange types and the effect of bindings`:
-        http://bit.ly/wpamqpexchanges
+  You don't have to care about this unless you want custom routing facilities.
 
 
-* CELERY_AMQP_PUBLISHER_ROUTING_KEY
-    The default AMQP routing key used when publishing tasks.
+* CELERY_DEFAULT_QUEUE
+    The queue used by default, if no custom queue is specified.
+    This queue must be listed in ``CELERY_QUEUES``.
+    The default is: ``celery``.
 
 
-* CELERY_AMQP_CONSUMER_ROUTING_KEY
-    The AMQP routing key used when consuming tasks.
+* CELERY_DEFAULT_EXCHANGE
+    Name of the default exchange to use when no custom exchange
+    is specified.
+    The default is: ``celery``.
 
 
-* CELERY_AMQP_CONSUMER_QUEUE
-    The name of the AMQP queue.
+* CELERY_DEFAULT_EXCHANGE_TYPE
+    Default exchange type used when no custom exchange is specified.
+    The default is: ``direct``.
 
 
-* CELERY_AMQP_CONSUMER_QUEUES
-    Dictionary defining multiple AMQP queues.
+* CELERY_DEFAULT_ROUTING_KEY
+    The default routing key used when sending tasks.
+    The default is: ``celery``.
+
+Connection
+----------
 
 
 * CELERY_AMQP_CONNECTION_TIMEOUT
 * CELERY_AMQP_CONNECTION_TIMEOUT
     The timeout in seconds before we give up establishing a connection
     The timeout in seconds before we give up establishing a connection
@@ -304,10 +323,6 @@ Broker settings
 Task execution settings
 Task execution settings
 =======================
 =======================
 
 
-* SEND_CELERY_TASK_ERROR_EMAILS
-    If set to ``True``, errors in tasks will be sent to admins by e-mail.
-    If unset, it will send the e-mails if ``settings.DEBUG`` is False.
-
 * CELERY_ALWAYS_EAGER
 * CELERY_ALWAYS_EAGER
     If this is ``True``, all tasks will be executed locally by blocking
     If this is ``True``, all tasks will be executed locally by blocking
     until it is finished. ``apply_async`` and ``Task.delay`` will return
     until it is finished. ``apply_async`` and ``Task.delay`` will return
@@ -334,18 +349,27 @@ Task execution settings
 
 
     Default is ``pickle``.
     Default is ``pickle``.
 
 
-* CELERY_STORE_ERRORS_EVEN_IF_IGNORED
-
-    If set, the worker stores all task errors in the result store even if
-    ``Task.ignore_result`` is on.
+Worker: celeryd
+===============
 
 
 * CELERY_IMPORTS
 * CELERY_IMPORTS
     A sequence of modules to import when the celery daemon starts.  This is
     A sequence of modules to import when the celery daemon starts.  This is
     useful to add tasks if you are not using django or cannot use task
     useful to add tasks if you are not using django or cannot use task
     autodiscovery.
     autodiscovery.
 
 
-Logging settings
-================
+* CELERY_SEND_EVENTS
+    Send events so the worker can be monitored by tools like ``celerymon``.
+
+* SEND_CELERY_TASK_ERROR_EMAILS
+    If set to ``True``, errors in tasks will be sent to admins by e-mail.
+    If unset, it will send the e-mails if ``settings.DEBUG`` is False.
+
+* CELERY_STORE_ERRORS_EVEN_IF_IGNORED
+    If set, the worker stores all task errors in the result store even if
+    ``Task.ignore_result`` is on.
+
+Logging
+-------
 
 
 * CELERYD_LOG_FILE
 * CELERYD_LOG_FILE
     The default filename the worker daemon logs messages to, can be
     The default filename the worker daemon logs messages to, can be
@@ -355,9 +379,13 @@ Logging settings
     when running in the background, detached as a daemon, the default
     when running in the background, detached as a daemon, the default
     logfile is ``celeryd.log``.
     logfile is ``celeryd.log``.
 
 
+    Can also be set via the ``--logfile`` argument.
+
 * CELERYD_DAEMON_LOG_LEVEL
 * CELERYD_DAEMON_LOG_LEVEL
     Worker log level, can be any of ``DEBUG``, ``INFO``, ``WARNING``,
     Worker log level, can be any of ``DEBUG``, ``INFO``, ``WARNING``,
-    ``ERROR``, ``CRITICAL``, or ``FATAL``.
+    ``ERROR``, ``CRITICAL``.
+
+    Can also be set via the ``--loglevel`` argument.
 
 
     See the :mod:`logging` module for more information.
     See the :mod:`logging` module for more information.
 
 
@@ -370,9 +398,68 @@ Logging settings
     See the Python :mod:`logging` module for more information about log
     See the Python :mod:`logging` module for more information about log
     formats.
     formats.
 
 
-Process settings
-================
+Process
+-------
 
 
 * CELERYD_PID_FILE
 * CELERYD_PID_FILE
-    Full path to the daemon pid file. Default is ``celeryd.pid``.
-    Can be overridden using the ``--pidfile`` option to ``celeryd``.
+    Full path to the pid file. Default is ``celeryd.pid``.
+    Can also be set via the ``--pidfile`` argument.
+
+Periodic Task Server: celerybeat
+================================
+
+* CELERYBEAT_SCHEDULE_FILENAME
+
+    Name of the file celerybeat stores the current schedule in.
+    Can be a relative or absolute path, but be aware that the suffix ``.db``
+    will be appended to the filename.
+
+    Can also be set via the ``--schedule`` argument.
+
+* CELERYBEAT_MAX_LOOP_INTERVAL
+
+    The maximum number of seconds celerybeat can sleep between checking
+    the schedule. Default is 300 seconds (5 minutes).
+
+* CELERYBEAT_LOG_FILE
+    The default filename to log messages to, can be
+    overridden using the `--logfile`` option.
+
+    The default is to log using ``stderr`` if running in the foreground,
+    when running in the background, detached as a daemon, the default
+    logfile is ``celerybeat.log``.
+
+    Can also be set via the ``--logfile`` argument.
+
+* CELERYBEAT_LOG_LEVEL
+    Logging level. Can be any of ``DEBUG``, ``INFO``, ``WARNING``,
+    ``ERROR``, or ``CRITICAL``.
+
+    Can also be set via the ``--loglevel`` argument.
+
+    See the :mod:`logging` module for more information.
+
+* CELERYBEAT_PID_FILE
+    Full path to celerybeat's pid file. Default is ``celerybat.pid``.
+    Can also be set via the ``--pidfile`` argument.
+
+Monitor Server: celerymon
+=========================
+
+* CELERYMON_LOG_FILE
+    The default filename to log messages to, can be
+    overridden using the `--logfile`` option.
+
+    The default is to log using ``stderr`` if running in the foreground,
+    when running in the background, detached as a daemon, the default
+    logfile is ``celerymon.log``.
+
+* CELERYMON_LOG_LEVEL
+    Logging level. Can be any of ``DEBUG``, ``INFO``, ``WARNING``,
+    ``ERROR``, or ``CRITICAL``.
+
+    See the :mod:`logging` module for more information.
+
+* CELERYMON_PID_FILE
+    Full path to celerymon's pid file. Default is ``celerymon.pid``.
+    Can be overridden using the ``--pidfile`` option to ``celerymon``.

+ 10 - 14
docs/reference/celery.conf.rst

@@ -2,29 +2,25 @@
 Configuration - celery.conf
 Configuration - celery.conf
 ============================
 ============================
 
 
-.. data:: AMQP_EXCHANGE
+.. data:: QUEUES
 
 
-    Name of the AMQP exchange.
+    Queue name/options mapping.
 
 
-.. data:: AMQP_EXCHANGE_TYPE
+.. data:: DEFAULT_QUEUE
 
 
-    The exchange type.
+    Name of the default queue.
 
 
-.. data:: AMQP_PUBLISHER_ROUTING_KEY
+.. data:: DEFAULT_EXCHANGE
 
 
-    The default AMQP routing key used when publishing tasks.
+    Default exchange.
 
 
-.. data:: AMQP_CONSUMER_ROUTING_KEY
+.. data:: DEFAULT_EXCHANGE_TYPE
 
 
-    The AMQP routing key used when consuming tasks.
+    Default exchange type.
 
 
-.. data:: AMQP_CONSUMER_QUEUE
+.. data:: DEFAULT_ROUTING_KEY
 
 
-    The name of the AMQP queue.
-
-.. data:: AMQP_CONSUMER_QUEUES
-
-    Dictionary defining multiple AMQP queues.
+    Default routing key used when sending tasks.
 
 
 .. data:: AMQP_CONNECTION_TIMEOUT
 .. data:: AMQP_CONNECTION_TIMEOUT
 
 

+ 13 - 4
docs/tutorials/otherqueues.rst

@@ -74,8 +74,17 @@ Important notes
 ---------------
 ---------------
 
 
 These message queues does not have the concept of exchanges and routing keys,
 These message queues does not have the concept of exchanges and routing keys,
-there's only the queue entity. As a result of this you need to set the name of
-the exchange to be the same as the queue::
+there's only the queue entity. As a result of this you need to set the
+name of the exchange to be the same as the queue::
 
 
-    CELERY_AMQP_CONSUMER_QUEUE = "tasks"
-    CELERY_AMQP_EXCHANGE = "tasks"
+    CELERY_DEFAULT_EXCHANGE = "tasks"
+
+or in a custom queue-mapping:
+
+    CELERY_QUEUES = {
+        "tasks": {"exchange": "tasks"},
+        "feeds": {"exchange": "feeds"},
+    }
+
+This isn't a problem if you use the default queue setting, as the default is
+already using the same name for queue/exchange.

+ 5 - 3
testproj/settings.py

@@ -40,9 +40,11 @@ BROKER_PASSWORD = "guest"
 TT_HOST = "localhost"
 TT_HOST = "localhost"
 TT_PORT = 1978
 TT_PORT = 1978
 
 
-CELERY_AMQP_EXCHANGE = "testcelery"
-CELERY_AMQP_ROUTING_KEY = "testcelery"
-CELERY_AMQP_CONSUMER_QUEUE = "testcelery"
+CELERY_DEFAULT_EXCHANGE = "testcelery"
+CELERY_DEFAULT_ROUTING_KEY = "testcelery"
+CELERY_DEFAULT_QUEUE = "testcelery"
+
+CELERY_QUEUES = {"testcelery": {"binding_key": "testcelery"}}
 
 
 MANAGERS = ADMINS
 MANAGERS = ADMINS