浏览代码

Use prepare_expires in other backends as well

Ask Solem 14 年之前
父节点
当前提交
e482db8eac

+ 1 - 5
celery/backends/amqp.py

@@ -5,7 +5,6 @@ import socket
 import threading
 import time
 
-from datetime import timedelta
 from itertools import count
 
 from kombu.entity import Exchange, Queue
@@ -14,7 +13,6 @@ from kombu.messaging import Consumer, Producer
 from celery import states
 from celery.backends.base import BaseDictBackend
 from celery.exceptions import TimeoutError
-from celery.utils import timeutils
 
 
 class BacklogLimitExceeded(Exception):
@@ -58,10 +56,8 @@ class AMQPBackend(BaseDictBackend):
         self.auto_delete = auto_delete
         self.expires = (conf.CELERY_AMQP_TASK_RESULT_EXPIRES if expires is None
                                                              else expires)
-        if isinstance(self.expires, timedelta):
-            self.expires = timeutils.timedelta_seconds(self.expires)
         if self.expires is not None:
-            self.expires = int(self.expires)
+            self.expires = self.prepare_expires(self.expires)
             # x-expires requires RabbitMQ 2.1.0 or higher.
             self.queue_arguments["x-expires"] = self.expires * 1000.0
         self.connection_max = (connection_max or

+ 1 - 7
celery/backends/cache.py

@@ -1,10 +1,7 @@
-from datetime import timedelta
-
 from kombu.utils import cached_property
 
 from celery.backends.base import KeyValueStoreBackend
 from celery.exceptions import ImproperlyConfigured
-from celery.utils import timeutils
 from celery.datastructures import LocalCache
 
 _imp = [None]
@@ -62,15 +59,12 @@ class CacheBackend(KeyValueStoreBackend):
     def __init__(self, expires=None, backend=None, options={}, **kwargs):
         super(CacheBackend, self).__init__(self, **kwargs)
 
-        self.expires = expires or self.app.conf.CELERY_TASK_RESULT_EXPIRES
-        if isinstance(self.expires, timedelta):
-            self.expires = timeutils.timedelta_seconds(self.expires)
         self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
                             **options)
 
         backend = backend or self.app.conf.CELERY_CACHE_BACKEND
-        self.expires = int(self.expires)
         self.backend, _, servers = backend.partition("://")
+        self.expires = self.prepare_expires(expires, type=int)
         self.servers = servers.rstrip('/').split(";")
         try:
             self.Client = backends[self.backend]()

+ 2 - 3
celery/backends/cassandra.py

@@ -47,8 +47,7 @@ class CassandraBackend(BaseDictBackend):
         self.logger = self.app.log.setup_logger(
                             name="celery.backends.cassandra")
 
-        self.result_expires = kwargs.get("result_expires") or \
-                                maybe_timedelta(
+        self.expires = kwargs.get("expires") or maybe_timedelta(
                                     self.app.conf.CELERY_TASK_RESULT_EXPIRES)
 
         if not pycassa:
@@ -129,7 +128,7 @@ class CassandraBackend(BaseDictBackend):
                     "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
                     "traceback": pickle.dumps(traceback)}
             cf.insert(task_id, meta,
-                      ttl=timedelta_seconds(self.result_expires))
+                      ttl=timedelta_seconds(self.expires))
 
         return self._retry_on_error(_do_store)
 

+ 3 - 5
celery/backends/database.py

@@ -22,12 +22,10 @@ _sqlalchemy_installed()
 class DatabaseBackend(BaseDictBackend):
     """The database result backend."""
 
-    def __init__(self, dburi=None, result_expires=None,
+    def __init__(self, dburi=None, expires=None,
             engine_options=None, **kwargs):
         super(DatabaseBackend, self).__init__(**kwargs)
-        self.result_expires = result_expires or \
-                                maybe_timedelta(
-                                    self.app.conf.CELERY_TASK_RESULT_EXPIRES)
+        self.expires = maybe_timedelta(self.prepare_expires(expires))
         self.dburi = dburi or self.app.conf.CELERY_RESULT_DBURI
         self.engine_options = dict(engine_options or {},
                         **self.app.conf.CELERY_RESULT_ENGINE_OPTIONS or {})
@@ -115,7 +113,7 @@ class DatabaseBackend(BaseDictBackend):
     def cleanup(self):
         """Delete expired metadata."""
         session = self.ResultSession()
-        expires = self.result_expires
+        expires = self.expires
         try:
             session.query(Task).filter(
                     Task.date_done < (datetime.now() - expires)).delete()

+ 2 - 3
celery/backends/mongodb.py

@@ -35,8 +35,7 @@ class MongoBackend(BaseDictBackend):
 
         """
         super(MongoBackend, self).__init__(*args, **kwargs)
-        self.result_expires = kwargs.get("result_expires") or \
-                                maybe_timedelta(
+        self.expires = kwargs.get("expires") or maybe_timedelta(
                                     self.app.conf.CELERY_TASK_RESULT_EXPIRES)
 
         if not pymongo:
@@ -134,6 +133,6 @@ class MongoBackend(BaseDictBackend):
         taskmeta_collection = db[self.mongodb_taskmeta_collection]
         taskmeta_collection.remove({
                 "date_done": {
-                    "$lt": datetime.now() - self.result_expires,
+                    "$lt": datetime.now() - self.expires,
                  }
         })

+ 1 - 1
celery/utils/timeutils.py

@@ -20,7 +20,7 @@ TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
 
 def maybe_timedelta(delta):
     """Coerces integer to timedelta if `delta` is an integer."""
-    if isinstance(delta, int):
+    if isinstance(delta, (int, float)):
         return timedelta(seconds=delta)
     return delta