test_cache.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import sys
  4. import types
  5. from contextlib import contextmanager
  6. from kombu.utils.encoding import str_to_bytes
  7. from mock import Mock, patch
  8. from celery import current_app
  9. from celery import states
  10. from celery.backends.cache import CacheBackend, DummyClient
  11. from celery.exceptions import ImproperlyConfigured
  12. from celery.result import AsyncResult
  13. from celery.task import subtask
  14. from celery.utils import uuid
  15. from celery.tests.utils import Case, mask_modules, reset_modules
  16. class SomeClass(object):
  17. def __init__(self, data):
  18. self.data = data
  19. class test_CacheBackend(Case):
  20. def setUp(self):
  21. self.tb = CacheBackend(backend='memory://')
  22. self.tid = uuid()
  23. def test_mark_as_done(self):
  24. self.assertEqual(self.tb.get_status(self.tid), states.PENDING)
  25. self.assertIsNone(self.tb.get_result(self.tid))
  26. self.tb.mark_as_done(self.tid, 42)
  27. self.assertEqual(self.tb.get_status(self.tid), states.SUCCESS)
  28. self.assertEqual(self.tb.get_result(self.tid), 42)
  29. def test_is_pickled(self):
  30. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  31. self.tb.mark_as_done(self.tid, result)
  32. # is serialized properly.
  33. rindb = self.tb.get_result(self.tid)
  34. self.assertEqual(rindb.get('foo'), 'baz')
  35. self.assertEqual(rindb.get('bar').data, 12345)
  36. def test_mark_as_failure(self):
  37. try:
  38. raise KeyError('foo')
  39. except KeyError, exception:
  40. self.tb.mark_as_failure(self.tid, exception)
  41. self.assertEqual(self.tb.get_status(self.tid), states.FAILURE)
  42. self.assertIsInstance(self.tb.get_result(self.tid), KeyError)
  43. def test_on_chord_apply(self):
  44. tb = CacheBackend(backend='memory://')
  45. tb.on_chord_apply('group_id', [])
  46. @patch('celery.result.GroupResult')
  47. def test_on_chord_part_return(self, setresult):
  48. tb = CacheBackend(backend='memory://')
  49. deps = Mock()
  50. deps.__len__ = Mock()
  51. deps.__len__.return_value = 2
  52. setresult.restore.return_value = deps
  53. task = Mock()
  54. task.name = 'foobarbaz'
  55. try:
  56. current_app.tasks['foobarbaz'] = task
  57. task.request.chord = subtask(task)
  58. task.request.group = 'group_id'
  59. tb.on_chord_apply(task.request.group, [])
  60. self.assertFalse(deps.join_native.called)
  61. tb.on_chord_part_return(task)
  62. self.assertFalse(deps.join_native.called)
  63. tb.on_chord_part_return(task)
  64. deps.join_native.assert_called_with(propagate=True)
  65. deps.delete.assert_called_with()
  66. finally:
  67. current_app.tasks.pop('foobarbaz')
  68. def test_mget(self):
  69. self.tb.set('foo', 1)
  70. self.tb.set('bar', 2)
  71. self.assertDictEqual(self.tb.mget(['foo', 'bar']),
  72. {'foo': 1, 'bar': 2})
  73. def test_forget(self):
  74. self.tb.mark_as_done(self.tid, {'foo': 'bar'})
  75. x = AsyncResult(self.tid, backend=self.tb)
  76. x.forget()
  77. self.assertIsNone(x.result)
  78. def test_process_cleanup(self):
  79. self.tb.process_cleanup()
  80. def test_expires_as_int(self):
  81. tb = CacheBackend(backend='memory://', expires=10)
  82. self.assertEqual(tb.expires, 10)
  83. def test_unknown_backend_raises_ImproperlyConfigured(self):
  84. with self.assertRaises(ImproperlyConfigured):
  85. CacheBackend(backend='unknown://')
  86. class MyMemcachedStringEncodingError(Exception):
  87. pass
  88. class MemcachedClient(DummyClient):
  89. def set(self, key, value, *args, **kwargs):
  90. if isinstance(key, unicode):
  91. raise MyMemcachedStringEncodingError(
  92. 'Keys must be str, not unicode. Convert your unicode '
  93. 'strings using mystring.encode(charset)!')
  94. return super(MemcachedClient, self).set(key, value, *args, **kwargs)
  95. class MockCacheMixin(object):
  96. @contextmanager
  97. def mock_memcache(self):
  98. memcache = types.ModuleType('memcache')
  99. memcache.Client = MemcachedClient
  100. memcache.Client.__module__ = memcache.__name__
  101. prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
  102. try:
  103. yield True
  104. finally:
  105. if prev is not None:
  106. sys.modules['memcache'] = prev
  107. @contextmanager
  108. def mock_pylibmc(self):
  109. pylibmc = types.ModuleType('pylibmc')
  110. pylibmc.Client = MemcachedClient
  111. pylibmc.Client.__module__ = pylibmc.__name__
  112. prev = sys.modules.get('pylibmc')
  113. sys.modules['pylibmc'] = pylibmc
  114. try:
  115. yield True
  116. finally:
  117. if prev is not None:
  118. sys.modules['pylibmc'] = prev
  119. class test_get_best_memcache(Case, MockCacheMixin):
  120. def test_pylibmc(self):
  121. with self.mock_pylibmc():
  122. with reset_modules('celery.backends.cache'):
  123. from celery.backends import cache
  124. cache._imp = [None]
  125. self.assertEqual(cache.get_best_memcache().__module__,
  126. 'pylibmc')
  127. def test_memcache(self):
  128. with self.mock_memcache():
  129. with reset_modules('celery.backends.cache'):
  130. with mask_modules('pylibmc'):
  131. from celery.backends import cache
  132. cache._imp = [None]
  133. self.assertEqual(cache.get_best_memcache().__module__,
  134. 'memcache')
  135. def test_no_implementations(self):
  136. with mask_modules('pylibmc', 'memcache'):
  137. with reset_modules('celery.backends.cache'):
  138. from celery.backends import cache
  139. cache._imp = [None]
  140. with self.assertRaises(ImproperlyConfigured):
  141. cache.get_best_memcache()
  142. def test_cached(self):
  143. with self.mock_pylibmc():
  144. with reset_modules('celery.backends.cache'):
  145. from celery.backends import cache
  146. cache._imp = [None]
  147. cache.get_best_memcache(behaviors={'foo': 'bar'})
  148. self.assertTrue(cache._imp[0])
  149. cache.get_best_memcache()
  150. def test_backends(self):
  151. from celery.backends.cache import backends
  152. for name, fun in backends.items():
  153. self.assertTrue(fun())
  154. class test_memcache_key(Case, MockCacheMixin):
  155. def test_memcache_unicode_key(self):
  156. with self.mock_memcache():
  157. with reset_modules('celery.backends.cache'):
  158. with mask_modules('pylibmc'):
  159. from celery.backends import cache
  160. cache._imp = [None]
  161. task_id, result = unicode(uuid()), 42
  162. b = cache.CacheBackend(backend='memcache')
  163. b.store_result(task_id, result, status=states.SUCCESS)
  164. self.assertEqual(b.get_result(task_id), result)
  165. def test_memcache_bytes_key(self):
  166. with self.mock_memcache():
  167. with reset_modules('celery.backends.cache'):
  168. with mask_modules('pylibmc'):
  169. from celery.backends import cache
  170. cache._imp = [None]
  171. task_id, result = str_to_bytes(uuid()), 42
  172. b = cache.CacheBackend(backend='memcache')
  173. b.store_result(task_id, result, status=states.SUCCESS)
  174. self.assertEqual(b.get_result(task_id), result)
  175. def test_pylibmc_unicode_key(self):
  176. with reset_modules('celery.backends.cache'):
  177. with self.mock_pylibmc():
  178. from celery.backends import cache
  179. cache._imp = [None]
  180. task_id, result = unicode(uuid()), 42
  181. b = cache.CacheBackend(backend='memcache')
  182. b.store_result(task_id, result, status=states.SUCCESS)
  183. self.assertEqual(b.get_result(task_id), result)
  184. def test_pylibmc_bytes_key(self):
  185. with reset_modules('celery.backends.cache'):
  186. with self.mock_pylibmc():
  187. from celery.backends import cache
  188. cache._imp = [None]
  189. task_id, result = str_to_bytes(uuid()), 42
  190. b = cache.CacheBackend(backend='memcache')
  191. b.store_result(task_id, result, status=states.SUCCESS)
  192. self.assertEqual(b.get_result(task_id), result)