test_cache.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. import sys
  4. import types
  5. from contextlib import contextmanager
  6. from case import Mock, mock, patch, skip
  7. from kombu.utils.encoding import str_to_bytes, ensure_bytes
  8. from celery import states
  9. from celery import group, signature, uuid
  10. from celery.backends.cache import CacheBackend, DummyClient, backends
  11. from celery.exceptions import ImproperlyConfigured
  12. from celery.five import items, bytes_if_py2, string, text_t
  13. PY3 = sys.version_info[0] == 3
  14. class SomeClass(object):
  15. def __init__(self, data):
  16. self.data = data
  17. class test_CacheBackend:
  18. def setup(self):
  19. self.app.conf.result_serializer = 'pickle'
  20. self.tb = CacheBackend(backend='memory://', app=self.app)
  21. self.tid = uuid()
  22. self.old_get_best_memcached = backends['memcache']
  23. backends['memcache'] = lambda: (DummyClient, ensure_bytes)
  24. def teardown(self):
  25. backends['memcache'] = self.old_get_best_memcached
  26. def test_no_backend(self):
  27. self.app.conf.cache_backend = None
  28. with pytest.raises(ImproperlyConfigured):
  29. CacheBackend(backend=None, app=self.app)
  30. def test_mark_as_done(self):
  31. assert self.tb.get_state(self.tid) == states.PENDING
  32. assert self.tb.get_result(self.tid) is None
  33. self.tb.mark_as_done(self.tid, 42)
  34. assert self.tb.get_state(self.tid) == states.SUCCESS
  35. assert self.tb.get_result(self.tid) == 42
  36. def test_is_pickled(self):
  37. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  38. self.tb.mark_as_done(self.tid, result)
  39. # is serialized properly.
  40. rindb = self.tb.get_result(self.tid)
  41. assert rindb.get('foo') == 'baz'
  42. assert rindb.get('bar').data == 12345
  43. def test_mark_as_failure(self):
  44. try:
  45. raise KeyError('foo')
  46. except KeyError as exception:
  47. self.tb.mark_as_failure(self.tid, exception)
  48. assert self.tb.get_state(self.tid) == states.FAILURE
  49. assert isinstance(self.tb.get_result(self.tid), KeyError)
  50. def test_apply_chord(self):
  51. tb = CacheBackend(backend='memory://', app=self.app)
  52. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  53. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  54. @patch('celery.result.GroupResult.restore')
  55. def test_on_chord_part_return(self, restore):
  56. tb = CacheBackend(backend='memory://', app=self.app)
  57. deps = Mock()
  58. deps.__len__ = Mock()
  59. deps.__len__.return_value = 2
  60. restore.return_value = deps
  61. task = Mock()
  62. task.name = 'foobarbaz'
  63. self.app.tasks['foobarbaz'] = task
  64. task.request.chord = signature(task)
  65. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  66. task.request.group = gid
  67. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  68. deps.join_native.assert_not_called()
  69. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  70. deps.join_native.assert_not_called()
  71. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  72. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  73. deps.delete.assert_called_with()
  74. def test_mget(self):
  75. self.tb.set('foo', 1)
  76. self.tb.set('bar', 2)
  77. assert self.tb.mget(['foo', 'bar']) == {'foo': 1, 'bar': 2}
  78. def test_forget(self):
  79. self.tb.mark_as_done(self.tid, {'foo': 'bar'})
  80. x = self.app.AsyncResult(self.tid, backend=self.tb)
  81. x.forget()
  82. assert x.result is None
  83. def test_process_cleanup(self):
  84. self.tb.process_cleanup()
  85. def test_expires_as_int(self):
  86. tb = CacheBackend(backend='memory://', expires=10, app=self.app)
  87. assert tb.expires == 10
  88. def test_unknown_backend_raises_ImproperlyConfigured(self):
  89. with pytest.raises(ImproperlyConfigured):
  90. CacheBackend(backend='unknown://', app=self.app)
  91. def test_as_uri_no_servers(self):
  92. assert self.tb.as_uri() == 'memory:///'
  93. def test_as_uri_one_server(self):
  94. backend = 'memcache://127.0.0.1:11211/'
  95. b = CacheBackend(backend=backend, app=self.app)
  96. assert b.as_uri() == backend
  97. def test_as_uri_multiple_servers(self):
  98. backend = 'memcache://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  99. b = CacheBackend(backend=backend, app=self.app)
  100. assert b.as_uri() == backend
  101. @skip.unless_module('memcached', name='python-memcached')
  102. def test_regression_worker_startup_info(self):
  103. self.app.conf.result_backend = (
  104. 'cache+memcached://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  105. )
  106. worker = self.app.Worker()
  107. with mock.stdouts():
  108. worker.on_start()
  109. assert worker.startup_info()
  110. class MyMemcachedStringEncodingError(Exception):
  111. pass
  112. class MemcachedClient(DummyClient):
  113. def set(self, key, value, *args, **kwargs):
  114. if PY3:
  115. key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
  116. else:
  117. key_t, must_be, not_be, cod = text_t, 'bytes', 'string', 'encode'
  118. if isinstance(key, key_t):
  119. raise MyMemcachedStringEncodingError(
  120. 'Keys must be {0}, not {1}. Convert your '
  121. 'strings using mystring.{2}(charset)!'.format(
  122. must_be, not_be, cod))
  123. return super(MemcachedClient, self).set(key, value, *args, **kwargs)
  124. class MockCacheMixin(object):
  125. @contextmanager
  126. def mock_memcache(self):
  127. memcache = types.ModuleType(bytes_if_py2('memcache'))
  128. memcache.Client = MemcachedClient
  129. memcache.Client.__module__ = memcache.__name__
  130. prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
  131. try:
  132. yield True
  133. finally:
  134. if prev is not None:
  135. sys.modules['memcache'] = prev
  136. @contextmanager
  137. def mock_pylibmc(self):
  138. pylibmc = types.ModuleType(bytes_if_py2('pylibmc'))
  139. pylibmc.Client = MemcachedClient
  140. pylibmc.Client.__module__ = pylibmc.__name__
  141. prev = sys.modules.get('pylibmc')
  142. sys.modules['pylibmc'] = pylibmc
  143. try:
  144. yield True
  145. finally:
  146. if prev is not None:
  147. sys.modules['pylibmc'] = prev
  148. class test_get_best_memcache(MockCacheMixin):
  149. def test_pylibmc(self):
  150. with self.mock_pylibmc():
  151. with mock.reset_modules('celery.backends.cache'):
  152. from celery.backends import cache
  153. cache._imp = [None]
  154. assert cache.get_best_memcache()[0].__module__ == 'pylibmc'
  155. def test_memcache(self):
  156. with self.mock_memcache():
  157. with mock.reset_modules('celery.backends.cache'):
  158. with mock.mask_modules('pylibmc'):
  159. from celery.backends import cache
  160. cache._imp = [None]
  161. assert (cache.get_best_memcache()[0]().__module__ ==
  162. 'memcache')
  163. def test_no_implementations(self):
  164. with mock.mask_modules('pylibmc', 'memcache'):
  165. with mock.reset_modules('celery.backends.cache'):
  166. from celery.backends import cache
  167. cache._imp = [None]
  168. with pytest.raises(ImproperlyConfigured):
  169. cache.get_best_memcache()
  170. def test_cached(self):
  171. with self.mock_pylibmc():
  172. with mock.reset_modules('celery.backends.cache'):
  173. from celery.backends import cache
  174. cache._imp = [None]
  175. cache.get_best_memcache()[0](behaviors={'foo': 'bar'})
  176. assert cache._imp[0]
  177. cache.get_best_memcache()[0]()
  178. def test_backends(self):
  179. from celery.backends.cache import backends
  180. with self.mock_memcache():
  181. for name, fun in items(backends):
  182. assert fun()
  183. class test_memcache_key(MockCacheMixin):
  184. def test_memcache_unicode_key(self):
  185. with self.mock_memcache():
  186. with mock.reset_modules('celery.backends.cache'):
  187. with mock.mask_modules('pylibmc'):
  188. from celery.backends import cache
  189. cache._imp = [None]
  190. task_id, result = string(uuid()), 42
  191. b = cache.CacheBackend(backend='memcache', app=self.app)
  192. b.store_result(task_id, result, state=states.SUCCESS)
  193. assert b.get_result(task_id) == result
  194. def test_memcache_bytes_key(self):
  195. with self.mock_memcache():
  196. with mock.reset_modules('celery.backends.cache'):
  197. with mock.mask_modules('pylibmc'):
  198. from celery.backends import cache
  199. cache._imp = [None]
  200. task_id, result = str_to_bytes(uuid()), 42
  201. b = cache.CacheBackend(backend='memcache', app=self.app)
  202. b.store_result(task_id, result, state=states.SUCCESS)
  203. assert b.get_result(task_id) == result
  204. def test_pylibmc_unicode_key(self):
  205. with mock.reset_modules('celery.backends.cache'):
  206. with self.mock_pylibmc():
  207. from celery.backends import cache
  208. cache._imp = [None]
  209. task_id, result = string(uuid()), 42
  210. b = cache.CacheBackend(backend='memcache', app=self.app)
  211. b.store_result(task_id, result, state=states.SUCCESS)
  212. assert b.get_result(task_id) == result
  213. def test_pylibmc_bytes_key(self):
  214. with mock.reset_modules('celery.backends.cache'):
  215. with self.mock_pylibmc():
  216. from celery.backends import cache
  217. cache._imp = [None]
  218. task_id, result = str_to_bytes(uuid()), 42
  219. b = cache.CacheBackend(backend='memcache', app=self.app)
  220. b.store_result(task_id, result, state=states.SUCCESS)
  221. assert b.get_result(task_id) == result