123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- import sys
- import unittest
- from billiard.serialization import pickle
- from django.core.cache.backends.base import InvalidCacheBackendError
- from celery import result
- from celery import states
- from celery.utils import gen_unique_id
- from celery.backends.cache import CacheBackend
- from celery.datastructures import ExceptionInfo
- class SomeClass(object):
- def __init__(self, data):
- self.data = data
- class TestCacheBackend(unittest.TestCase):
- def test_mark_as_done(self):
- cb = CacheBackend()
- tid = gen_unique_id()
- self.assertFalse(cb.is_successful(tid))
- self.assertEquals(cb.get_status(tid), states.PENDING)
- self.assertEquals(cb.get_result(tid), None)
- cb.mark_as_done(tid, 42)
- self.assertTrue(cb.is_successful(tid))
- self.assertEquals(cb.get_status(tid), states.SUCCESS)
- self.assertEquals(cb.get_result(tid), 42)
- self.assertTrue(cb.get_result(tid), 42)
- def test_save_restore_taskset(self):
- backend = CacheBackend()
- taskset_id = gen_unique_id()
- subtask_ids = [gen_unique_id() for i in range(10)]
- subtasks = map(result.AsyncResult, subtask_ids)
- res = result.TaskSetResult(taskset_id, subtasks)
- res.save(backend=backend)
- saved = result.TaskSetResult.restore(taskset_id, backend=backend)
- self.assertEquals(saved.subtasks, subtasks)
- self.assertEquals(saved.taskset_id, taskset_id)
- def test_is_pickled(self):
- cb = CacheBackend()
- tid2 = gen_unique_id()
- result = {"foo": "baz", "bar": SomeClass(12345)}
- cb.mark_as_done(tid2, result)
- # is serialized properly.
- rindb = cb.get_result(tid2)
- self.assertEquals(rindb.get("foo"), "baz")
- self.assertEquals(rindb.get("bar").data, 12345)
- def test_mark_as_failure(self):
- cb = CacheBackend()
- einfo = None
- tid3 = gen_unique_id()
- try:
- raise KeyError("foo")
- except KeyError, exception:
- einfo = ExceptionInfo(sys.exc_info())
- pass
- cb.mark_as_failure(tid3, exception, traceback=einfo.traceback)
- self.assertFalse(cb.is_successful(tid3))
- self.assertEquals(cb.get_status(tid3), states.FAILURE)
- self.assertTrue(isinstance(cb.get_result(tid3), KeyError))
- self.assertEquals(cb.get_traceback(tid3), einfo.traceback)
- def test_process_cleanup(self):
- cb = CacheBackend()
- cb.process_cleanup()
- class TestCustomCacheBackend(unittest.TestCase):
- def test_custom_cache_backend(self):
- from celery import conf
- prev_backend = conf.CELERY_CACHE_BACKEND
- prev_module = sys.modules["celery.backends.cache"]
- conf.CELERY_CACHE_BACKEND = "dummy://"
- sys.modules.pop("celery.backends.cache")
- try:
- from celery.backends.cache import cache
- from django.core.cache import cache as django_cache
- self.assertEquals(cache.__class__.__module__,
- "django.core.cache.backends.dummy")
- self.assertTrue(cache is not django_cache)
- finally:
- conf.CELERY_CACHE_BACKEND = prev_backend
- sys.modules["celery.backends.cache"] = prev_module
- class TestMemcacheWrapper(unittest.TestCase):
- def test_memcache_wrapper(self):
- try:
- from django.core.cache.backends import memcached
- from django.core.cache.backends import locmem
- except InvalidCacheBackendError:
- sys.stderr.write(
- "\n* Memcache library is not installed. Skipping test.\n")
- return
- prev_cache_cls = memcached.CacheClass
- memcached.CacheClass = locmem.CacheClass
- prev_backend_module = sys.modules.pop("celery.backends.cache")
- try:
- from celery.backends.cache import cache, DjangoMemcacheWrapper
- self.assertTrue(isinstance(cache, DjangoMemcacheWrapper))
- key = "cu.test_memcache_wrapper"
- val = "The quick brown fox."
- default = "The lazy dog."
- self.assertEquals(cache.get(key, default=default), default)
- cache.set(key, val)
- self.assertEquals(pickle.loads(cache.get(key, default=default)),
- val)
- finally:
- memcached.CacheClass = prev_cache_cls
- sys.modules["celery.backends.cache"] = prev_backend_module
|