Browse Source

All result backends now implement __reduce__ so they can safely be pickled.

Closes #441
rnoel 13 years ago
parent
commit
187513368d

+ 12 - 0
celery/backends/amqp.py

@@ -234,3 +234,15 @@ class AMQPBackend(BaseDictBackend):
     def delete_taskset(self, taskset_id):
         raise NotImplementedError(
                 "delete_taskset is not supported by this backend.")
+
+    def __reduce__(self, args=(), kwargs={}):
+        kwargs.update(
+            dict(connection=self._connection, 
+                 exchange=self.exchange.name,
+                 exchange_type=self.exchange.type,
+                 persistent=self.persistent,
+                 serializer=self.serializer,
+                 auto_delete=self.auto_delete,
+                 expires=self.expires,
+                 connection_max=self.connection_max))
+        return super(AMQPBackend, self).__reduce__(args, kwargs)

+ 6 - 2
celery/backends/base.py

@@ -12,6 +12,10 @@ from celery.utils.serialization import get_pickleable_exception
 from celery.utils.serialization import create_exception_cls
 from celery.datastructures import LocalCache
 
+def unpickle_backend(cls, args, kwargs):
+  """Returns an unpickled backend."""
+  return cls(*args, **kwargs)
+
 
 class BaseBackend(object):
     """Base backend class."""
@@ -181,8 +185,8 @@ class BaseBackend(object):
         tasks["celery.chord_unlock"].apply_async((setid, body, ), kwargs,
                                                  countdown=1)
 
-    def __reduce__(self):
-        return (self.__class__, ())
+    def __reduce__(self, args=(), kwargs={}):
+        return (unpickle_backend, (self.__class__, args, kwargs))
 
 
 class BaseDictBackend(BaseBackend):

+ 9 - 0
celery/backends/cache.py

@@ -93,3 +93,12 @@ class CacheBackend(KeyValueStoreBackend):
     @cached_property
     def client(self):
         return self.Client(self.servers, **self.options)
+
+    def __reduce__(self, args=(), kwargs={}):
+        servers = ";".join(self.servers)
+        backend = "%s://%s/" % (self.backend, servers)
+        kwargs.update(
+            dict(backend=backend, 
+                 expires=self.expires,
+                 options=self.options))
+        return super(CacheBackend, self).__reduce__(args, kwargs)

+ 8 - 0
celery/backends/cassandra.py

@@ -151,3 +151,11 @@ class CassandraBackend(BaseDictBackend):
             return meta
 
         return self._retry_on_error(_do_get)
+
+    def __reduce__(self, args=(), kwargs={}):
+        kwargs.update(
+            dict(servers=self.servers, 
+                 keyspace=self.keyspace,
+                 column_family=self.column_family,
+                 cassandra_options=self.cassandra_options))
+        return super(CassandraBackend, self).__reduce__(args, kwargs)

+ 7 - 0
celery/backends/database.py

@@ -129,3 +129,10 @@ class DatabaseBackend(BaseDictBackend):
             session.commit()
         finally:
             session.close()
+
+    def __reduce__(self, args=(), kwargs={}):
+        kwargs.update(
+            dict(dburi=self.dburi, 
+                 expires=self.expires,
+                 engine_options=self.engine_options))
+        return super(DatabaseBackend, self).__reduce__(args, kwargs)

+ 5 - 0
celery/backends/mongodb.py

@@ -136,3 +136,8 @@ class MongoBackend(BaseDictBackend):
                     "$lt": datetime.now() - self.expires,
                  }
         })
+
+    def __reduce__(self, args=(), kwargs={}):
+        kwargs.update(
+            dict(expires=self.expires))
+        return super(MongoBackend, self).__reduce__(args, kwargs)

+ 13 - 2
celery/backends/pyredis.py

@@ -17,6 +17,17 @@ class RedisBackend(redis.RedisBackend):
         self.redis_port = redis_port
         self.redis_db = redis_db
         self.redis_password = redis_password
-        super(RedisBackend, self).__init__(host=redis_host,
+        # Changed in order to avoid duplicated arguments
+        super(RedisBackend, self).__init__(**dict(kwargs, host=redis_host,
                                            port=redis_port, db=redis_db,
-                                           password=redis_password, **kwargs)
+                                           password=redis_password))
+
+    def __reduce__(self, args=(), kwargs={}):
+        # Not very usefull, but without the following, the redis_* attributes 
+        # would not be set.
+        kwargs.update(
+            dict(redis_host=self.redis_host, 
+                 redis_port=redis_self.redis_port,
+                 redis_db=redis_self.redis_db,
+                 redis_password=self.redis_password))
+        return super(RedisBackend, self).__reduce__(args, kwargs)

+ 9 - 0
celery/backends/redis.py

@@ -91,3 +91,12 @@ class RedisBackend(KeyValueStoreBackend):
     def client(self):
         return self.redis.Redis(host=self.host, port=self.port,
                                 db=self.db, password=self.password)
+
+    def __reduce__(self, args=(), kwargs={}):
+        kwargs.update(
+            dict(host=self.host, 
+                 port=self.port,
+                 db=self.db,
+                 password=self.password,
+                 expires=self.expires))
+        return super(RedisBackend, self).__reduce__(args, kwargs)

+ 6 - 0
celery/backends/tyrant.py

@@ -82,3 +82,9 @@ class TyrantBackend(KeyValueStoreBackend):
 
     def delete(self, key):
         self.open().pop(key, None)
+
+    def __reduce__(self, args=(), kwargs={}):
+        kwargs.update(
+            dict(tyrant_host=self.tyrant_host, 
+                 tyrant_port=self.tyrant_port))
+        return super(TyrantBackend, self).__reduce__(args, kwargs)