Bläddra i källkod

Cleans up the redis backend, configuration keys now CELERY_REDIS_* instead of REDIS_*

Ask Solem 14 år sedan
förälder
incheckning
ca43b67523

+ 4 - 6
celery/app/defaults.py

@@ -80,6 +80,10 @@ NAMESPACES = {
         "MAX_CACHED_RESULTS": Option(5000, type="int"),
         "MESSAGE_COMPRESSION": Option(None, type="string"),
         "MONGODB_BACKEND_SETTINGS": Option(None, type="dict"),
+        "REDIS_HOST": Option(None, type="string"),
+        "REDIS_PORT": Option(None, type="int"),
+        "REDIS_DB": Option(None, type="int"),
+        "REDIS_PASSWORD": Option(None, type="string"),
         "RESULT_BACKEND": Option(None, type="string"),
         "RESULT_DBURI": Option(),
         "RESULT_ENGINE_OPTIONS": Option(None, type="dict"),
@@ -149,12 +153,6 @@ NAMESPACES = {
     },
     "SERVER_EMAIL": Option("celery@localhost"),
     "ADMINS": Option((), type="tuple"),
-    "REDIS": {
-        "HOST": Option(None, type="string"),
-        "PORT": Option(None, type="int"),
-        "DB": Option(None, type="int"),
-        "PASSWORD": Option(None, type="string"),
-    },
     "TT": {
         "HOST": Option(None, type="string"),
         "PORT": Option(None, type="int"),

+ 1 - 1
celery/backends/__init__.py

@@ -5,7 +5,7 @@ from celery.utils import get_cls_by_name
 BACKEND_ALIASES = {
     "amqp": "celery.backends.amqp.AMQPBackend",
     "cache": "celery.backends.cache.CacheBackend",
-    "redis": "celery.backends.pyredis.RedisBackend",
+    "redis": "celery.backends.redis.RedisBackend",
     "mongodb": "celery.backends.mongodb.MongoBackend",
     "tyrant": "celery.backends.tyrant.TyrantBackend",
     "database": "celery.backends.database.DatabaseBackend",

+ 12 - 0
celery/backends/base.py

@@ -1,8 +1,11 @@
 """celery.backends.base"""
 import time
 
+from datetime import timedelta
+
 from celery import states
 from celery.exceptions import TimeoutError, TaskRevokedError
+from celery.utils import timeutils
 from celery.utils.serialization import pickle, get_pickled_exception
 from celery.utils.serialization import get_pickleable_exception
 from celery.datastructures import LocalCache
@@ -20,6 +23,15 @@ class BaseBackend(object):
         from celery.app import app_or_default
         self.app = app_or_default(kwargs.get("app"))
 
+    def prepare_expires(self, value, type=None):
+        if value is None:
+            value = self.app.conf.CELERY_TASK_RESULT_EXPIRES
+        if isinstance(value, timedelta):
+            value = timeutils.timedelta_seconds(value)
+        if value is not None and type:
+            return type(value)
+        return value
+
     def encode_result(self, result, status):
         if status in self.EXCEPTION_STATES:
             return self.prepare_exception(result)

+ 15 - 98
celery/backends/pyredis.py

@@ -1,105 +1,22 @@
-from datetime import timedelta
+"""
+This is here for backwards compatibility only.
 
-from kombu.utils import cached_property
+Please use :class:`celery.backends.redis.RedisBackend` instead.
 
-from celery.backends.base import KeyValueStoreBackend
-from celery.exceptions import ImproperlyConfigured
-from celery.utils import timeutils
+"""
+from __future__ import absolute_import
 
-try:
-    import redis
-    from redis.exceptions import ConnectionError
-except ImportError:
-    redis = None            # noqa
-    ConnectionError = None  # noqa
+from celery.backends import redis
 
 
-class RedisBackend(KeyValueStoreBackend):
-    """Redis task result store."""
-
-    #: redis-py client module.
-    redis = redis
-
-    #: default Redis server hostname (`localhost`).
-    redis_host = "localhost"
-
-    #: default Redis server port (6379)
-    redis_port = 6379
-    redis_db = 0
-
-    #: default Redis password (:const:`None`)
-    redis_password = None
+class RedisBackend(redis.RedisBackend):
 
     def __init__(self, redis_host=None, redis_port=None, redis_db=None,
-            redis_password=None,
-            expires=None, **kwargs):
-        super(RedisBackend, self).__init__(**kwargs)
-        if self.redis is None:
-            raise ImproperlyConfigured(
-                    "You need to install the redis library in order to use "
-                  + "Redis result store backend.")
-
-        self.redis_host = (redis_host or
-                           self.app.conf.get("REDIS_HOST") or
-                           self.redis_host)
-        self.redis_port = (redis_port or
-                           self.app.conf.get("REDIS_PORT") or
-                           self.redis_port)
-        self.redis_db = (redis_db or
-                         self.app.conf.get("REDIS_DB") or
-                         self.redis_db)
-        self.redis_password = (redis_password or
-                               self.app.conf.get("REDIS_PASSWORD") or
-                               self.redis_password)
-        self.expires = expires
-        if self.expires is None:
-            self.expires = self.app.conf.CELERY_TASK_RESULT_EXPIRES
-        if isinstance(self.expires, timedelta):
-            self.expires = timeutils.timedelta_seconds(self.expires)
-        if self.expires is not None:
-            self.expires = int(self.expires)
-        self.redis_port = int(self.redis_port)
-
-    def get(self, key):
-        return self.client.get(key)
-
-    def set(self, key, value):
-        client = self.client
-        client.set(key, value)
-        if self.expires is not None:
-            client.expire(key, self.expires)
-
-    def delete(self, key):
-        self.client.delete(key)
-
-    def close(self):
-        """Closes the Redis connection."""
-        del(self.client)
-
-    def process_cleanup(self):
-        self.close()
-
-    def on_chord_apply(self, *args, **kwargs):
-        pass
-
-    def on_chord_part_return(self, task, keyprefix="chord-unlock-%s"):
-        from celery.task.sets import subtask
-        from celery.result import TaskSetResult
-        setid = task.request.taskset
-        key = keyprefix % setid
-        deps = TaskSetResult.restore(setid, backend=task.backend)
-        if self.client.incr(key) >= deps.total:
-            subtask(task.request.chord).delay(deps.join())
-            deps.delete()
-        self.client.expire(key, 86400)
-
-    @cached_property
-    def client(self):
-        return self.redis.Redis(host=self.redis_host,
-                                port=self.redis_port,
-                                db=self.redis_db,
-                                password=self.redis_password)
-
-    @client.deleter  # noqa
-    def client(self, client):
-        client.connection.disconnect()
+            redis_password=None, **kwargs):
+        self.redis_host = redis_host
+        self.redis_port = redis_port
+        self.redis_db = redis_db
+        self.redis_password = redis_password
+        super(RedisBackend, self).__init__(host=redis_host,
+                                           port=redis_port, db=redis_db,
+                                           password=redis_password, **kwargs)

+ 97 - 0
celery/backends/redis.py

@@ -0,0 +1,97 @@
+from __future__ import absolute_import
+
+from kombu.utils import cached_property
+
+from celery.backends.base import KeyValueStoreBackend
+from celery.exceptions import ImproperlyConfigured
+
+try:
+    import redis
+    from redis.exceptions import ConnectionError
+except ImportError:
+    redis = None            # noqa
+    ConnectionError = None  # noqa
+
+
+class RedisBackend(KeyValueStoreBackend):
+    """Redis task result store."""
+
+    #: redis-py client module.
+    redis = redis
+
+    #: default Redis server hostname (`localhost`).
+    host = "localhost"
+
+    #: default Redis server port (6379)
+    port = 6379
+
+    #: default Redis db number (0)
+    db = 0
+
+    #: default Redis password (:const:`None`)
+    password = None
+
+    def __init__(self, host=None, port=None, db=None, password=None,
+            expires=None, **kwargs):
+        super(RedisBackend, self).__init__(**kwargs)
+        conf = self.app.conf
+        if self.redis is None:
+            raise ImproperlyConfigured(
+                    "You need to install the redis library in order to use "
+                  + "Redis result store backend.")
+
+        # For compatability with the old REDIS_* configuration keys.
+        def _get(key):
+            for prefix in "REDIS_%s", "CELERY_REDIS_%s":
+                try:
+                    return conf[prefix % key]
+                except KeyError:
+                    pass
+
+        self.host = host or _get("HOST") or self.host
+        self.port = int(port or _get("PORT") or self.port)
+        self.db = db or _get("DB") or self.db
+        self.password = password or _get("PASSWORD") or self.password
+        self.expires = self.prepare_expires(expires, type=int)
+
+    def get(self, key):
+        return self.client.get(key)
+
+    def set(self, key, value):
+        client = self.client
+        client.set(key, value)
+        if self.expires is not None:
+            client.expire(key, self.expires)
+
+    def delete(self, key):
+        self.client.delete(key)
+
+    def close(self):
+        """Closes the Redis connection."""
+        del(self.client)
+
+    def process_cleanup(self):
+        self.close()
+
+    def on_chord_apply(self, *args, **kwargs):
+        pass
+
+    def on_chord_part_return(self, task, keyprefix="chord-unlock-%s"):
+        from celery.task.sets import subtask
+        from celery.result import TaskSetResult
+        setid = task.request.taskset
+        key = keyprefix % setid
+        deps = TaskSetResult.restore(setid, backend=task.backend)
+        if self.client.incr(key) >= deps.total:
+            subtask(task.request.chord).delay(deps.join())
+            deps.delete()
+        self.client.expire(key, 86400)
+
+    @cached_property
+    def client(self):
+        return self.redis.Redis(host=self.host, port=self.port,
+                                db=self.db, password=self.password)
+
+    @client.deleter  # noqa
+    def client(self, client):
+        client.connection.disconnect()

+ 2 - 2
celery/tests/test_backends/test_database.py

@@ -179,9 +179,9 @@ class test_DatabaseBackend(unittest.TestCase):
             tb.save_taskset(gen_unique_id(), {"foo": "bar"})
         s = tb.ResultSession()
         for t in s.query(Task).all():
-            t.date_done = datetime.now() - tb.result_expires * 2
+            t.date_done = datetime.now() - tb.expires * 2
         for t in s.query(TaskSet).all():
-            t.date_done = datetime.now() - tb.result_expires * 2
+            t.date_done = datetime.now() - tb.expires * 2
         s.commit()
         s.close()
 

+ 12 - 12
celery/tests/test_backends/test_redis.py

@@ -1,4 +1,4 @@
-from __future__ import with_statement
+from __future__ import absolute_import, with_statement
 
 import sys
 import socket
@@ -10,8 +10,8 @@ from celery.exceptions import ImproperlyConfigured
 
 from celery import states
 from celery.utils import gen_unique_id
-from celery.backends import pyredis
-from celery.backends.pyredis import RedisBackend
+from celery.backends import redis
+from celery.backends.redis import RedisBackend
 
 from celery.tests.utils import mask_modules
 
@@ -39,7 +39,7 @@ def get_redis_or_SkipTest():
             sys.stderr.write("\n" + _no_redis_msg % reason + "\n")
             _no_redis_msg_emitted = True
 
-    if pyredis.redis is None:
+    if redis.redis is None:
         emit_no_redis_msg("not installed")
         raise SkipTest("redis library not installed")
     try:
@@ -108,19 +108,19 @@ class TestRedisBackend(unittest.TestCase):
 class TestRedisBackendNoRedis(unittest.TestCase):
 
     def test_redis_None_if_redis_not_installed(self):
-        prev = sys.modules.pop("celery.backends.pyredis")
+        prev = sys.modules.pop("celery.backends.redis")
         try:
             with mask_modules("redis"):
-                from celery.backends.pyredis import redis
+                from celery.backends.redis import redis
                 self.assertIsNone(redis)
         finally:
-            sys.modules["celery.backends.pyredis"] = prev
+            sys.modules["celery.backends.redis"] = prev
 
     def test_constructor_raises_if_redis_not_installed(self):
-        from celery.backends import pyredis
-        prev = pyredis.RedisBackend.redis
-        pyredis.RedisBackend.redis = None
+        from celery.backends import redis
+        prev = redis.RedisBackend.redis
+        redis.RedisBackend.redis = None
         try:
-            self.assertRaises(ImproperlyConfigured, pyredis.RedisBackend)
+            self.assertRaises(ImproperlyConfigured, redis.RedisBackend)
         finally:
-            pyredis.RedisBackend.redis = prev
+            redis.RedisBackend.redis = prev

+ 2 - 2
celery/tests/test_backends/test_redis_unit.py

@@ -45,9 +45,9 @@ class redis(object):
 class test_RedisBackend(unittest.TestCase):
 
     def get_backend(self):
-        from celery.backends import pyredis
+        from celery.backends import redis
 
-        class RedisBackend(pyredis.RedisBackend):
+        class RedisBackend(redis.RedisBackend):
             redis = redis
 
         return RedisBackend

+ 2 - 0
celery/tests/utils.py

@@ -1,3 +1,5 @@
+from __future__ import absolute_import
+
 try:
     import unittest
     unittest.skip