test_cache.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import sys
  2. import unittest
  3. from billiard.serialization import pickle
  4. from django.core.cache.backends.base import InvalidCacheBackendError
  5. from celery import result
  6. from celery import states
  7. from celery.utils import gen_unique_id
  8. from celery.backends.cache import CacheBackend
  9. from celery.datastructures import ExceptionInfo
  10. class SomeClass(object):
  11. def __init__(self, data):
  12. self.data = data
  13. class TestCacheBackend(unittest.TestCase):
  14. def test_mark_as_done(self):
  15. cb = CacheBackend()
  16. tid = gen_unique_id()
  17. self.assertFalse(cb.is_successful(tid))
  18. self.assertEquals(cb.get_status(tid), states.PENDING)
  19. self.assertEquals(cb.get_result(tid), None)
  20. cb.mark_as_done(tid, 42)
  21. self.assertTrue(cb.is_successful(tid))
  22. self.assertEquals(cb.get_status(tid), states.SUCCESS)
  23. self.assertEquals(cb.get_result(tid), 42)
  24. self.assertTrue(cb.get_result(tid), 42)
  25. def test_save_restore_taskset(self):
  26. backend = CacheBackend()
  27. taskset_id = gen_unique_id()
  28. subtask_ids = [gen_unique_id() for i in range(10)]
  29. subtasks = map(result.AsyncResult, subtask_ids)
  30. res = result.TaskSetResult(taskset_id, subtasks)
  31. res.save(backend=backend)
  32. saved = result.TaskSetResult.restore(taskset_id, backend=backend)
  33. self.assertEquals(saved.subtasks, subtasks)
  34. self.assertEquals(saved.taskset_id, taskset_id)
  35. def test_is_pickled(self):
  36. cb = CacheBackend()
  37. tid2 = gen_unique_id()
  38. result = {"foo": "baz", "bar": SomeClass(12345)}
  39. cb.mark_as_done(tid2, result)
  40. # is serialized properly.
  41. rindb = cb.get_result(tid2)
  42. self.assertEquals(rindb.get("foo"), "baz")
  43. self.assertEquals(rindb.get("bar").data, 12345)
  44. def test_mark_as_failure(self):
  45. cb = CacheBackend()
  46. einfo = None
  47. tid3 = gen_unique_id()
  48. try:
  49. raise KeyError("foo")
  50. except KeyError, exception:
  51. einfo = ExceptionInfo(sys.exc_info())
  52. pass
  53. cb.mark_as_failure(tid3, exception, traceback=einfo.traceback)
  54. self.assertFalse(cb.is_successful(tid3))
  55. self.assertEquals(cb.get_status(tid3), states.FAILURE)
  56. self.assertTrue(isinstance(cb.get_result(tid3), KeyError))
  57. self.assertEquals(cb.get_traceback(tid3), einfo.traceback)
  58. def test_process_cleanup(self):
  59. cb = CacheBackend()
  60. cb.process_cleanup()
  61. class TestCustomCacheBackend(unittest.TestCase):
  62. def test_custom_cache_backend(self):
  63. from celery import conf
  64. prev_backend = conf.CELERY_CACHE_BACKEND
  65. prev_module = sys.modules["celery.backends.cache"]
  66. conf.CELERY_CACHE_BACKEND = "dummy://"
  67. sys.modules.pop("celery.backends.cache")
  68. try:
  69. from celery.backends.cache import cache
  70. from django.core.cache import cache as django_cache
  71. self.assertEquals(cache.__class__.__module__,
  72. "django.core.cache.backends.dummy")
  73. self.assertTrue(cache is not django_cache)
  74. finally:
  75. conf.CELERY_CACHE_BACKEND = prev_backend
  76. sys.modules["celery.backends.cache"] = prev_module
  77. class TestMemcacheWrapper(unittest.TestCase):
  78. def test_memcache_wrapper(self):
  79. try:
  80. from django.core.cache.backends import memcached
  81. from django.core.cache.backends import locmem
  82. except InvalidCacheBackendError:
  83. sys.stderr.write(
  84. "\n* Memcache library is not installed. Skipping test.\n")
  85. return
  86. prev_cache_cls = memcached.CacheClass
  87. memcached.CacheClass = locmem.CacheClass
  88. prev_backend_module = sys.modules.pop("celery.backends.cache")
  89. try:
  90. from celery.backends.cache import cache, DjangoMemcacheWrapper
  91. self.assertTrue(isinstance(cache, DjangoMemcacheWrapper))
  92. key = "cu.test_memcache_wrapper"
  93. val = "The quick brown fox."
  94. default = "The lazy dog."
  95. self.assertEquals(cache.get(key, default=default), default)
  96. cache.set(key, val)
  97. self.assertEquals(pickle.loads(cache.get(key, default=default)),
  98. val)
  99. finally:
  100. memcached.CacheClass = prev_cache_cls
  101. sys.modules["celery.backends.cache"] = prev_backend_module