Browse Source

Do not emit backend_cleanup periodic task for backends with autoexpire. Closes #1031

Ask Solem 12 years ago
parent
commit
30dc635e21

+ 1 - 0
celery/backends/amqp.py

@@ -46,6 +46,7 @@ class AMQPBackend(BaseDictBackend):
 
     BacklogLimitExceeded = BacklogLimitExceeded
 
+    supports_autoexpire = True
     supports_native_join = True
 
     retry_policy = {

+ 5 - 0
celery/backends/base.py

@@ -61,6 +61,11 @@ class BaseBackend(object):
     #: If true the backend must implement :meth:`get_many`.
     supports_native_join = False
 
+    #: If true the backend must automatically expire results.
+    #: The daily backend_cleanup periodic task will not be triggered
+    #: in this case.
+    supports_autoexpire = False
+
     def __init__(self, *args, **kwargs):
         from celery.app import app_or_default
         self.app = app_or_default(kwargs.get('app'))

+ 1 - 0
celery/backends/cache.py

@@ -81,6 +81,7 @@ backends = {'memcache': lambda: get_best_memcache,
 
 class CacheBackend(KeyValueStoreBackend):
     servers = None
+    supports_autoexpire = True
     supports_native_join = True
     implements_incr = True
 

+ 1 - 0
celery/backends/cassandra.py

@@ -45,6 +45,7 @@ class CassandraBackend(BaseDictBackend):
     detailed_mode = False
     _retry_timeout = 300
     _retry_wait = 3
+    supports_autoexpire = True
 
     def __init__(self, servers=None, keyspace=None, column_family=None,
                  cassandra_options=None, detailed_mode=False, **kwargs):

+ 2 - 0
celery/backends/mongodb.py

@@ -47,6 +47,8 @@ class MongoBackend(BaseDictBackend):
     mongodb_taskmeta_collection = 'celery_taskmeta'
     mongodb_max_pool_size = 10
 
+    supports_autoexpire = False
+
     def __init__(self, *args, **kwargs):
         """Initialize MongoDB backend instance.
 

+ 1 - 0
celery/backends/redis.py

@@ -44,6 +44,7 @@ class RedisBackend(KeyValueStoreBackend):
     #: Maximium number of connections in the pool.
     max_connections = None
 
+    supports_autoexpire = True
     supports_native_join = True
     implements_incr = True
 

+ 2 - 1
celery/beat.py

@@ -164,7 +164,8 @@ class Scheduler(object):
 
     def install_default_entries(self, data):
         entries = {}
-        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
+        if self.app.conf.CELERY_TASK_RESULT_EXPIRES and \
+                not self.app.backend.supports_autoexpire:
             if 'celery.backend_cleanup' not in data:
                 entries['celery.backend_cleanup'] = {
                     'task': 'celery.backend_cleanup',

+ 11 - 0
celery/tests/app/test_beat.py

@@ -7,6 +7,7 @@ from datetime import datetime, timedelta
 from mock import Mock, call, patch
 from nose import SkipTest
 
+from celery import current_app
 from celery import beat
 from celery import task
 from celery.result import AsyncResult
@@ -221,11 +222,21 @@ class test_Scheduler(Case):
             s = mScheduler()
             s.install_default_entries({})
             self.assertNotIn('celery.backend_cleanup', s.data)
+        current_app.backend.supports_autoexpire = False
         with patch_settings(CELERY_TASK_RESULT_EXPIRES=30,
                             CELERYBEAT_SCHEDULE={}):
             s = mScheduler()
             s.install_default_entries({})
             self.assertIn('celery.backend_cleanup', s.data)
+        current_app.backend.supports_autoexpire = True
+        try:
+            with patch_settings(CELERY_TASK_RESULT_EXPIRES=31,
+                                CELERYBEAT_SCHEDULE={}):
+                s = mScheduler()
+                s.install_default_entries({})
+                self.assertNotIn('celery.backend_cleanup', s.data)
+        finally:
+            current_app.backend.supports_autoexpire = False
 
     def test_due_tick(self):
         scheduler = mScheduler()