test_cache.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. from __future__ import absolute_import
  2. import sys
  3. import types
  4. from contextlib import contextmanager
  5. from kombu.utils.encoding import str_to_bytes
  6. from celery import signature
  7. from celery import states
  8. from celery import group
  9. from celery.backends.cache import CacheBackend, DummyClient
  10. from celery.exceptions import ImproperlyConfigured
  11. from celery.five import items, string, text_t
  12. from celery.utils import uuid
  13. from celery.tests.case import (
  14. AppCase, Mock, mask_modules, patch, reset_modules,
  15. )
  16. PY3 = sys.version_info[0] == 3
  17. class SomeClass(object):
  18. def __init__(self, data):
  19. self.data = data
  20. class test_CacheBackend(AppCase):
  21. def setup(self):
  22. self.tb = CacheBackend(backend='memory://', app=self.app)
  23. self.tid = uuid()
  24. def test_no_backend(self):
  25. self.app.conf.CELERY_CACHE_BACKEND = None
  26. with self.assertRaises(ImproperlyConfigured):
  27. CacheBackend(backend=None, app=self.app)
  28. def test_mark_as_done(self):
  29. self.assertEqual(self.tb.get_status(self.tid), states.PENDING)
  30. self.assertIsNone(self.tb.get_result(self.tid))
  31. self.tb.mark_as_done(self.tid, 42)
  32. self.assertEqual(self.tb.get_status(self.tid), states.SUCCESS)
  33. self.assertEqual(self.tb.get_result(self.tid), 42)
  34. def test_is_pickled(self):
  35. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  36. self.tb.mark_as_done(self.tid, result)
  37. # is serialized properly.
  38. rindb = self.tb.get_result(self.tid)
  39. self.assertEqual(rindb.get('foo'), 'baz')
  40. self.assertEqual(rindb.get('bar').data, 12345)
  41. def test_mark_as_failure(self):
  42. try:
  43. raise KeyError('foo')
  44. except KeyError as exception:
  45. self.tb.mark_as_failure(self.tid, exception)
  46. self.assertEqual(self.tb.get_status(self.tid), states.FAILURE)
  47. self.assertIsInstance(self.tb.get_result(self.tid), KeyError)
  48. def test_apply_chord(self):
  49. tb = CacheBackend(backend='memory://', app=self.app)
  50. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  51. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  52. @patch('celery.result.GroupResult.restore')
  53. def test_on_chord_part_return(self, restore):
  54. tb = CacheBackend(backend='memory://', app=self.app)
  55. deps = Mock()
  56. deps.__len__ = Mock()
  57. deps.__len__.return_value = 2
  58. restore.return_value = deps
  59. task = Mock()
  60. task.name = 'foobarbaz'
  61. self.app.tasks['foobarbaz'] = task
  62. task.request.chord = signature(task)
  63. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  64. task.request.group = gid
  65. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  66. self.assertFalse(deps.join_native.called)
  67. tb.on_chord_part_return(task)
  68. self.assertFalse(deps.join_native.called)
  69. tb.on_chord_part_return(task)
  70. deps.join_native.assert_called_with(propagate=True)
  71. deps.delete.assert_called_with()
  72. def test_mget(self):
  73. self.tb.set('foo', 1)
  74. self.tb.set('bar', 2)
  75. self.assertDictEqual(self.tb.mget(['foo', 'bar']),
  76. {'foo': 1, 'bar': 2})
  77. def test_forget(self):
  78. self.tb.mark_as_done(self.tid, {'foo': 'bar'})
  79. x = self.app.AsyncResult(self.tid, backend=self.tb)
  80. x.forget()
  81. self.assertIsNone(x.result)
  82. def test_process_cleanup(self):
  83. self.tb.process_cleanup()
  84. def test_expires_as_int(self):
  85. tb = CacheBackend(backend='memory://', expires=10, app=self.app)
  86. self.assertEqual(tb.expires, 10)
  87. def test_unknown_backend_raises_ImproperlyConfigured(self):
  88. with self.assertRaises(ImproperlyConfigured):
  89. CacheBackend(backend='unknown://', app=self.app)
  90. class MyMemcachedStringEncodingError(Exception):
  91. pass
  92. class MemcachedClient(DummyClient):
  93. def set(self, key, value, *args, **kwargs):
  94. if PY3:
  95. key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
  96. else:
  97. key_t, must_be, not_be, cod = text_t, 'bytes', 'string', 'encode'
  98. if isinstance(key, key_t):
  99. raise MyMemcachedStringEncodingError(
  100. 'Keys must be {0}, not {1}. Convert your '
  101. 'strings using mystring.{2}(charset)!'.format(
  102. must_be, not_be, cod))
  103. return super(MemcachedClient, self).set(key, value, *args, **kwargs)
  104. class MockCacheMixin(object):
  105. @contextmanager
  106. def mock_memcache(self):
  107. memcache = types.ModuleType('memcache')
  108. memcache.Client = MemcachedClient
  109. memcache.Client.__module__ = memcache.__name__
  110. prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
  111. try:
  112. yield True
  113. finally:
  114. if prev is not None:
  115. sys.modules['memcache'] = prev
  116. @contextmanager
  117. def mock_pylibmc(self):
  118. pylibmc = types.ModuleType('pylibmc')
  119. pylibmc.Client = MemcachedClient
  120. pylibmc.Client.__module__ = pylibmc.__name__
  121. prev = sys.modules.get('pylibmc')
  122. sys.modules['pylibmc'] = pylibmc
  123. try:
  124. yield True
  125. finally:
  126. if prev is not None:
  127. sys.modules['pylibmc'] = prev
  128. class test_get_best_memcache(AppCase, MockCacheMixin):
  129. def test_pylibmc(self):
  130. with self.mock_pylibmc():
  131. with reset_modules('celery.backends.cache'):
  132. from celery.backends import cache
  133. cache._imp = [None]
  134. self.assertEqual(cache.get_best_memcache()[0].__module__,
  135. 'pylibmc')
  136. def test_memcache(self):
  137. with self.mock_memcache():
  138. with reset_modules('celery.backends.cache'):
  139. with mask_modules('pylibmc'):
  140. from celery.backends import cache
  141. cache._imp = [None]
  142. self.assertEqual(cache.get_best_memcache()[0]().__module__,
  143. 'memcache')
  144. def test_no_implementations(self):
  145. with mask_modules('pylibmc', 'memcache'):
  146. with reset_modules('celery.backends.cache'):
  147. from celery.backends import cache
  148. cache._imp = [None]
  149. with self.assertRaises(ImproperlyConfigured):
  150. cache.get_best_memcache()
  151. def test_cached(self):
  152. with self.mock_pylibmc():
  153. with reset_modules('celery.backends.cache'):
  154. from celery.backends import cache
  155. cache._imp = [None]
  156. cache.get_best_memcache()[0](behaviors={'foo': 'bar'})
  157. self.assertTrue(cache._imp[0])
  158. cache.get_best_memcache()[0]()
  159. def test_backends(self):
  160. from celery.backends.cache import backends
  161. with self.mock_memcache():
  162. for name, fun in items(backends):
  163. self.assertTrue(fun())
  164. class test_memcache_key(AppCase, MockCacheMixin):
  165. def test_memcache_unicode_key(self):
  166. with self.mock_memcache():
  167. with reset_modules('celery.backends.cache'):
  168. with mask_modules('pylibmc'):
  169. from celery.backends import cache
  170. cache._imp = [None]
  171. task_id, result = string(uuid()), 42
  172. b = cache.CacheBackend(backend='memcache', app=self.app)
  173. b.store_result(task_id, result, status=states.SUCCESS)
  174. self.assertEqual(b.get_result(task_id), result)
  175. def test_memcache_bytes_key(self):
  176. with self.mock_memcache():
  177. with reset_modules('celery.backends.cache'):
  178. with mask_modules('pylibmc'):
  179. from celery.backends import cache
  180. cache._imp = [None]
  181. task_id, result = str_to_bytes(uuid()), 42
  182. b = cache.CacheBackend(backend='memcache', app=self.app)
  183. b.store_result(task_id, result, status=states.SUCCESS)
  184. self.assertEqual(b.get_result(task_id), result)
  185. def test_pylibmc_unicode_key(self):
  186. with reset_modules('celery.backends.cache'):
  187. with self.mock_pylibmc():
  188. from celery.backends import cache
  189. cache._imp = [None]
  190. task_id, result = string(uuid()), 42
  191. b = cache.CacheBackend(backend='memcache', app=self.app)
  192. b.store_result(task_id, result, status=states.SUCCESS)
  193. self.assertEqual(b.get_result(task_id), result)
  194. def test_pylibmc_bytes_key(self):
  195. with reset_modules('celery.backends.cache'):
  196. with self.mock_pylibmc():
  197. from celery.backends import cache
  198. cache._imp = [None]
  199. task_id, result = str_to_bytes(uuid()), 42
  200. b = cache.CacheBackend(backend='memcache', app=self.app)
  201. b.store_result(task_id, result, status=states.SUCCESS)
  202. self.assertEqual(b.get_result(task_id), result)