test_cache.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. from __future__ import absolute_import, unicode_literals
  2. import sys
  3. import types
  4. from contextlib import contextmanager
  5. from kombu.utils.encoding import str_to_bytes, ensure_bytes
  6. from celery import signature
  7. from celery import states
  8. from celery import group
  9. from celery.backends.cache import CacheBackend, DummyClient, backends
  10. from celery.exceptions import ImproperlyConfigured
  11. from celery.five import items, module_name_t, string, text_t
  12. from celery.utils import uuid
  13. from celery.tests.case import (
  14. AppCase, Mock, override_stdouts, mask_modules, patch, reset_modules,
  15. )
  16. PY3 = sys.version_info[0] == 3
  17. class SomeClass(object):
  18. def __init__(self, data):
  19. self.data = data
  20. class test_CacheBackend(AppCase):
  21. def setup(self):
  22. self.app.conf.result_serializer = 'pickle'
  23. self.tb = CacheBackend(backend='memory://', app=self.app)
  24. self.tid = uuid()
  25. self.old_get_best_memcached = backends['memcache']
  26. backends['memcache'] = lambda: (DummyClient, ensure_bytes)
  27. def teardown(self):
  28. backends['memcache'] = self.old_get_best_memcached
  29. def test_no_backend(self):
  30. self.app.conf.cache_backend = None
  31. with self.assertRaises(ImproperlyConfigured):
  32. CacheBackend(backend=None, app=self.app)
  33. def test_mark_as_done(self):
  34. self.assertEqual(self.tb.get_state(self.tid), states.PENDING)
  35. self.assertIsNone(self.tb.get_result(self.tid))
  36. self.tb.mark_as_done(self.tid, 42)
  37. self.assertEqual(self.tb.get_state(self.tid), states.SUCCESS)
  38. self.assertEqual(self.tb.get_result(self.tid), 42)
  39. def test_is_pickled(self):
  40. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  41. self.tb.mark_as_done(self.tid, result)
  42. # is serialized properly.
  43. rindb = self.tb.get_result(self.tid)
  44. self.assertEqual(rindb.get('foo'), 'baz')
  45. self.assertEqual(rindb.get('bar').data, 12345)
  46. def test_mark_as_failure(self):
  47. try:
  48. raise KeyError('foo')
  49. except KeyError as exception:
  50. self.tb.mark_as_failure(self.tid, exception)
  51. self.assertEqual(self.tb.get_state(self.tid), states.FAILURE)
  52. self.assertIsInstance(self.tb.get_result(self.tid), KeyError)
  53. def test_apply_chord(self):
  54. tb = CacheBackend(backend='memory://', app=self.app)
  55. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  56. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  57. @patch('celery.result.GroupResult.restore')
  58. def test_on_chord_part_return(self, restore):
  59. tb = CacheBackend(backend='memory://', app=self.app)
  60. deps = Mock()
  61. deps.__len__ = Mock()
  62. deps.__len__.return_value = 2
  63. restore.return_value = deps
  64. task = Mock()
  65. task.name = 'foobarbaz'
  66. self.app.tasks['foobarbaz'] = task
  67. task.request.chord = signature(task)
  68. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  69. task.request.group = gid
  70. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  71. self.assertFalse(deps.join_native.called)
  72. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  73. self.assertFalse(deps.join_native.called)
  74. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  75. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  76. deps.delete.assert_called_with()
  77. def test_mget(self):
  78. self.tb.set('foo', 1)
  79. self.tb.set('bar', 2)
  80. self.assertDictEqual(self.tb.mget(['foo', 'bar']),
  81. {'foo': 1, 'bar': 2})
  82. def test_forget(self):
  83. self.tb.mark_as_done(self.tid, {'foo': 'bar'})
  84. x = self.app.AsyncResult(self.tid, backend=self.tb)
  85. x.forget()
  86. self.assertIsNone(x.result)
  87. def test_process_cleanup(self):
  88. self.tb.process_cleanup()
  89. def test_expires_as_int(self):
  90. tb = CacheBackend(backend='memory://', expires=10, app=self.app)
  91. self.assertEqual(tb.expires, 10)
  92. def test_unknown_backend_raises_ImproperlyConfigured(self):
  93. with self.assertRaises(ImproperlyConfigured):
  94. CacheBackend(backend='unknown://', app=self.app)
  95. def test_as_uri_no_servers(self):
  96. self.assertEqual(self.tb.as_uri(), 'memory:///')
  97. def test_as_uri_one_server(self):
  98. backend = 'memcache://127.0.0.1:11211/'
  99. b = CacheBackend(backend=backend, app=self.app)
  100. self.assertEqual(b.as_uri(), backend)
  101. def test_as_uri_multiple_servers(self):
  102. backend = 'memcache://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  103. b = CacheBackend(backend=backend, app=self.app)
  104. self.assertEqual(b.as_uri(), backend)
  105. @override_stdouts
  106. def test_regression_worker_startup_info(self):
  107. self.app.conf.result_backend = (
  108. 'cache+memcached://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  109. )
  110. worker = self.app.Worker()
  111. worker.on_start()
  112. self.assertTrue(worker.startup_info())
  113. class MyMemcachedStringEncodingError(Exception):
  114. pass
  115. class MemcachedClient(DummyClient):
  116. def set(self, key, value, *args, **kwargs):
  117. if PY3:
  118. key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
  119. else:
  120. key_t, must_be, not_be, cod = text_t, 'bytes', 'string', 'encode'
  121. if isinstance(key, key_t):
  122. raise MyMemcachedStringEncodingError(
  123. 'Keys must be {0}, not {1}. Convert your '
  124. 'strings using mystring.{2}(charset)!'.format(
  125. must_be, not_be, cod))
  126. return super(MemcachedClient, self).set(key, value, *args, **kwargs)
  127. class MockCacheMixin(object):
  128. @contextmanager
  129. def mock_memcache(self):
  130. memcache = types.ModuleType(module_name_t('memcache'))
  131. memcache.Client = MemcachedClient
  132. memcache.Client.__module__ = memcache.__name__
  133. prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
  134. try:
  135. yield True
  136. finally:
  137. if prev is not None:
  138. sys.modules['memcache'] = prev
  139. @contextmanager
  140. def mock_pylibmc(self):
  141. pylibmc = types.ModuleType(module_name_t('pylibmc'))
  142. pylibmc.Client = MemcachedClient
  143. pylibmc.Client.__module__ = pylibmc.__name__
  144. prev = sys.modules.get('pylibmc')
  145. sys.modules['pylibmc'] = pylibmc
  146. try:
  147. yield True
  148. finally:
  149. if prev is not None:
  150. sys.modules['pylibmc'] = prev
  151. class test_get_best_memcache(AppCase, MockCacheMixin):
  152. def test_pylibmc(self):
  153. with self.mock_pylibmc():
  154. with reset_modules('celery.backends.cache'):
  155. from celery.backends import cache
  156. cache._imp = [None]
  157. self.assertEqual(cache.get_best_memcache()[0].__module__,
  158. 'pylibmc')
  159. def test_memcache(self):
  160. with self.mock_memcache():
  161. with reset_modules('celery.backends.cache'):
  162. with mask_modules('pylibmc'):
  163. from celery.backends import cache
  164. cache._imp = [None]
  165. self.assertEqual(cache.get_best_memcache()[0]().__module__,
  166. 'memcache')
  167. def test_no_implementations(self):
  168. with mask_modules('pylibmc', 'memcache'):
  169. with reset_modules('celery.backends.cache'):
  170. from celery.backends import cache
  171. cache._imp = [None]
  172. with self.assertRaises(ImproperlyConfigured):
  173. cache.get_best_memcache()
  174. def test_cached(self):
  175. with self.mock_pylibmc():
  176. with reset_modules('celery.backends.cache'):
  177. from celery.backends import cache
  178. cache._imp = [None]
  179. cache.get_best_memcache()[0](behaviors={'foo': 'bar'})
  180. self.assertTrue(cache._imp[0])
  181. cache.get_best_memcache()[0]()
  182. def test_backends(self):
  183. from celery.backends.cache import backends
  184. with self.mock_memcache():
  185. for name, fun in items(backends):
  186. self.assertTrue(fun())
  187. class test_memcache_key(AppCase, MockCacheMixin):
  188. def test_memcache_unicode_key(self):
  189. with self.mock_memcache():
  190. with reset_modules('celery.backends.cache'):
  191. with mask_modules('pylibmc'):
  192. from celery.backends import cache
  193. cache._imp = [None]
  194. task_id, result = string(uuid()), 42
  195. b = cache.CacheBackend(backend='memcache', app=self.app)
  196. b.store_result(task_id, result, state=states.SUCCESS)
  197. self.assertEqual(b.get_result(task_id), result)
  198. def test_memcache_bytes_key(self):
  199. with self.mock_memcache():
  200. with reset_modules('celery.backends.cache'):
  201. with mask_modules('pylibmc'):
  202. from celery.backends import cache
  203. cache._imp = [None]
  204. task_id, result = str_to_bytes(uuid()), 42
  205. b = cache.CacheBackend(backend='memcache', app=self.app)
  206. b.store_result(task_id, result, state=states.SUCCESS)
  207. self.assertEqual(b.get_result(task_id), result)
  208. def test_pylibmc_unicode_key(self):
  209. with reset_modules('celery.backends.cache'):
  210. with self.mock_pylibmc():
  211. from celery.backends import cache
  212. cache._imp = [None]
  213. task_id, result = string(uuid()), 42
  214. b = cache.CacheBackend(backend='memcache', app=self.app)
  215. b.store_result(task_id, result, state=states.SUCCESS)
  216. self.assertEqual(b.get_result(task_id), result)
  217. def test_pylibmc_bytes_key(self):
  218. with reset_modules('celery.backends.cache'):
  219. with self.mock_pylibmc():
  220. from celery.backends import cache
  221. cache._imp = [None]
  222. task_id, result = str_to_bytes(uuid()), 42
  223. b = cache.CacheBackend(backend='memcache', app=self.app)
  224. b.store_result(task_id, result, state=states.SUCCESS)
  225. self.assertEqual(b.get_result(task_id), result)