Sfoglia il codice sorgente

Simplify configuration a bit.

The Celery instance can now be created with a broker URL:

    celery = Celery(broker="redis://")

in addition the result backend can now be set using an URL:

    CELERY_RESULT_BACKEND = "redis://foo:312/1"

(only support by redis so far, but other backends can easily add support)
Ask Solem 13 anni fa
parent
commit
28f35e5983

+ 13 - 5
celery/app/base.py

@@ -27,7 +27,7 @@ from kombu.clocks import LamportClock
 
 from .. import datastructures
 from .. import platforms
-from ..backends import get_backend_cls
+from ..backends import get_backend_by_url
 from ..exceptions import AlwaysEagerIgnored
 from ..loaders import get_loader_cls
 from ..local import maybe_evaluate
@@ -116,7 +116,7 @@ class BaseApp(object):
     def __init__(self, main=None, loader=None, backend=None,
             amqp=None, events=None, log=None, control=None,
             set_as_current=True, accept_magic_kwargs=False,
-            tasks=None, **kwargs):
+            tasks=None, broker=None, **kwargs):
         self.clock = LamportClock()
         self.main = main
         self.amqp_cls = amqp or self.amqp_cls
@@ -133,6 +133,12 @@ class BaseApp(object):
         self._pending = deque()
         self._tasks = instantiate(self.registry_cls)
 
+        # these options are moved to the config to
+        # simplify pickling of the app object.
+        self._preconf = {}
+        if broker:
+            self._preconf["BROKER_URL"] = broker
+
         self.on_init()
 
     def on_init(self):
@@ -303,6 +309,7 @@ class BaseApp(object):
 
     def prepare_config(self, c):
         """Prepare configuration before it is merged with the defaults."""
+        c.update(self._preconf)
         return find_deprecated_settings(c)
 
     def now(self):
@@ -332,9 +339,10 @@ class BaseApp(object):
         return first(None, values) or self.conf.get(default_key)
 
     def _get_backend(self):
-        return get_backend_cls(
-                    self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
-                    loader=self.loader)(app=self)
+        backend, url = get_backend_by_url(
+                self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
+                self.loader)
+        return backend(app=self, url=url)
 
     def _get_config(self):
         return Settings({}, [self.prepare_config(self.loader.conf),

+ 11 - 0
celery/backends/__init__.py

@@ -3,6 +3,8 @@ from __future__ import absolute_import
 
 import sys
 
+from kombu.utils.url import _parse_url
+
 from .. import current_app
 from ..local import Proxy
 from ..utils.imports import symbol_by_name
@@ -37,5 +39,14 @@ def get_backend_cls(backend=None, loader=None):
                     backend, exc)), sys.exc_info()[2]
 
 
+def get_backend_by_url(backend=None, loader=None):
+    url = None
+    if backend and '://' in backend:
+        url = backend
+        backend, _, _, _, _, _, _ = _parse_url(url)
+    return get_backend_cls(backend, loader), url
+
+
+
 # deprecate this.
 default_backend = Proxy(lambda: current_app.backend)

+ 13 - 6
celery/backends/redis.py

@@ -1,6 +1,8 @@
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 
+from kombu.utils.url import _parse_url
+
 from ..exceptions import ImproperlyConfigured
 from ..utils import cached_property
 
@@ -38,7 +40,7 @@ class RedisBackend(KeyValueStoreBackend):
     supports_native_join = True
 
     def __init__(self, host=None, port=None, db=None, password=None,
-            expires=None, max_connections=None, **kwargs):
+            expires=None, max_connections=None, url=None, **kwargs):
         super(RedisBackend, self).__init__(**kwargs)
         conf = self.app.conf
         if self.redis is None:
@@ -53,11 +55,16 @@ class RedisBackend(KeyValueStoreBackend):
                     return conf[prefix % key]
                 except KeyError:
                     pass
-
-        self.host = host or _get("HOST") or self.host
-        self.port = int(port or _get("PORT") or self.port)
-        self.db = db or _get("DB") or self.db
-        self.password = password or _get("PASSWORD") or self.password
+        if host and '://' in host:
+            url, host = host, None
+        self.url = url
+        uhost = uport = upass = udb = None
+        if url:
+            _, uhost, uport, _, upass, udb, _ = _parse_url(url)
+        self.host = uhost or host or _get("HOST") or self.host
+        self.port = int(uport or port or _get("PORT") or self.port)
+        self.db = udb or db or _get("DB") or self.db
+        self.password = upass or password or _get("PASSWORD") or self.password
         self.expires = self.prepare_expires(expires, type=int)
         self.max_connections = (max_connections
                                 or _get("MAX_CONNECTIONS")

+ 1 - 4
docs/getting-started/brokers/redis.rst

@@ -43,10 +43,7 @@ Results
 If you also want to store the state and return values of tasks in Redis,
 you should configure these settings::
 
-    CELERY_RESULT_BACKEND = "redis"
-    CELERY_REDIS_HOST = "localhost"
-    CELERY_REDIS_PORT = 6379
-    CELERY_REDIS_DB = 0
+    CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
 
 For a complete list of options supported by the Redis result backend see
 :ref:`conf-redis-result-backend`