test_cache.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. from __future__ import absolute_import, unicode_literals
  2. import sys
  3. import types
  4. from contextlib import contextmanager
  5. import pytest
  6. from case import Mock, mock, patch, skip
  7. from kombu.utils.encoding import ensure_bytes, str_to_bytes
  8. from celery import group, signature, states, uuid
  9. from celery.backends.cache import CacheBackend, DummyClient, backends
  10. from celery.exceptions import ImproperlyConfigured
  11. from celery.five import bytes_if_py2, items, string, text_t
  12. PY3 = sys.version_info[0] == 3
  13. class SomeClass(object):
  14. def __init__(self, data):
  15. self.data = data
  16. class test_CacheBackend:
  17. def setup(self):
  18. self.app.conf.result_serializer = 'pickle'
  19. self.tb = CacheBackend(backend='memory://', app=self.app)
  20. self.tid = uuid()
  21. self.old_get_best_memcached = backends['memcache']
  22. backends['memcache'] = lambda: (DummyClient, ensure_bytes)
  23. def teardown(self):
  24. backends['memcache'] = self.old_get_best_memcached
  25. def test_no_backend(self):
  26. self.app.conf.cache_backend = None
  27. with pytest.raises(ImproperlyConfigured):
  28. CacheBackend(backend=None, app=self.app)
  29. def test_mark_as_done(self):
  30. assert self.tb.get_state(self.tid) == states.PENDING
  31. assert self.tb.get_result(self.tid) is None
  32. self.tb.mark_as_done(self.tid, 42)
  33. assert self.tb.get_state(self.tid) == states.SUCCESS
  34. assert self.tb.get_result(self.tid) == 42
  35. def test_is_pickled(self):
  36. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  37. self.tb.mark_as_done(self.tid, result)
  38. # is serialized properly.
  39. rindb = self.tb.get_result(self.tid)
  40. assert rindb.get('foo') == 'baz'
  41. assert rindb.get('bar').data == 12345
  42. def test_mark_as_failure(self):
  43. try:
  44. raise KeyError('foo')
  45. except KeyError as exception:
  46. self.tb.mark_as_failure(self.tid, exception)
  47. assert self.tb.get_state(self.tid) == states.FAILURE
  48. assert isinstance(self.tb.get_result(self.tid), KeyError)
  49. def test_apply_chord(self):
  50. tb = CacheBackend(backend='memory://', app=self.app)
  51. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  52. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  53. @patch('celery.result.GroupResult.restore')
  54. def test_on_chord_part_return(self, restore):
  55. tb = CacheBackend(backend='memory://', app=self.app)
  56. deps = Mock()
  57. deps.__len__ = Mock()
  58. deps.__len__.return_value = 2
  59. restore.return_value = deps
  60. task = Mock()
  61. task.name = 'foobarbaz'
  62. self.app.tasks['foobarbaz'] = task
  63. task.request.chord = signature(task)
  64. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  65. task.request.group = gid
  66. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  67. deps.join_native.assert_not_called()
  68. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  69. deps.join_native.assert_not_called()
  70. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  71. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  72. deps.delete.assert_called_with()
  73. def test_mget(self):
  74. self.tb.set('foo', 1)
  75. self.tb.set('bar', 2)
  76. assert self.tb.mget(['foo', 'bar']) == {'foo': 1, 'bar': 2}
  77. def test_forget(self):
  78. self.tb.mark_as_done(self.tid, {'foo': 'bar'})
  79. x = self.app.AsyncResult(self.tid, backend=self.tb)
  80. x.forget()
  81. assert x.result is None
  82. def test_process_cleanup(self):
  83. self.tb.process_cleanup()
  84. def test_expires_as_int(self):
  85. tb = CacheBackend(backend='memory://', expires=10, app=self.app)
  86. assert tb.expires == 10
  87. def test_unknown_backend_raises_ImproperlyConfigured(self):
  88. with pytest.raises(ImproperlyConfigured):
  89. CacheBackend(backend='unknown://', app=self.app)
  90. def test_as_uri_no_servers(self):
  91. assert self.tb.as_uri() == 'memory:///'
  92. def test_as_uri_one_server(self):
  93. backend = 'memcache://127.0.0.1:11211/'
  94. b = CacheBackend(backend=backend, app=self.app)
  95. assert b.as_uri() == backend
  96. def test_as_uri_multiple_servers(self):
  97. backend = 'memcache://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  98. b = CacheBackend(backend=backend, app=self.app)
  99. assert b.as_uri() == backend
  100. @skip.unless_module('memcached', name='python-memcached')
  101. def test_regression_worker_startup_info(self):
  102. self.app.conf.result_backend = (
  103. 'cache+memcached://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  104. )
  105. worker = self.app.Worker()
  106. with mock.stdouts():
  107. worker.on_start()
  108. assert worker.startup_info()
  109. class MyMemcachedStringEncodingError(Exception):
  110. pass
  111. class MemcachedClient(DummyClient):
  112. def set(self, key, value, *args, **kwargs):
  113. if PY3:
  114. key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
  115. else:
  116. key_t, must_be, not_be, cod = text_t, 'bytes', 'string', 'encode'
  117. if isinstance(key, key_t):
  118. raise MyMemcachedStringEncodingError(
  119. 'Keys must be {0}, not {1}. Convert your '
  120. 'strings using mystring.{2}(charset)!'.format(
  121. must_be, not_be, cod))
  122. return super(MemcachedClient, self).set(key, value, *args, **kwargs)
  123. class MockCacheMixin(object):
  124. @contextmanager
  125. def mock_memcache(self):
  126. memcache = types.ModuleType(bytes_if_py2('memcache'))
  127. memcache.Client = MemcachedClient
  128. memcache.Client.__module__ = memcache.__name__
  129. prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
  130. try:
  131. yield True
  132. finally:
  133. if prev is not None:
  134. sys.modules['memcache'] = prev
  135. @contextmanager
  136. def mock_pylibmc(self):
  137. pylibmc = types.ModuleType(bytes_if_py2('pylibmc'))
  138. pylibmc.Client = MemcachedClient
  139. pylibmc.Client.__module__ = pylibmc.__name__
  140. prev = sys.modules.get('pylibmc')
  141. sys.modules['pylibmc'] = pylibmc
  142. try:
  143. yield True
  144. finally:
  145. if prev is not None:
  146. sys.modules['pylibmc'] = prev
  147. class test_get_best_memcache(MockCacheMixin):
  148. def test_pylibmc(self):
  149. with self.mock_pylibmc():
  150. with mock.reset_modules('celery.backends.cache'):
  151. from celery.backends import cache
  152. cache._imp = [None]
  153. assert cache.get_best_memcache()[0].__module__ == 'pylibmc'
  154. def test_memcache(self):
  155. with self.mock_memcache():
  156. with mock.reset_modules('celery.backends.cache'):
  157. with mock.mask_modules('pylibmc'):
  158. from celery.backends import cache
  159. cache._imp = [None]
  160. assert (cache.get_best_memcache()[0]().__module__ ==
  161. 'memcache')
  162. def test_no_implementations(self):
  163. with mock.mask_modules('pylibmc', 'memcache'):
  164. with mock.reset_modules('celery.backends.cache'):
  165. from celery.backends import cache
  166. cache._imp = [None]
  167. with pytest.raises(ImproperlyConfigured):
  168. cache.get_best_memcache()
  169. def test_cached(self):
  170. with self.mock_pylibmc():
  171. with mock.reset_modules('celery.backends.cache'):
  172. from celery.backends import cache
  173. cache._imp = [None]
  174. cache.get_best_memcache()[0](behaviors={'foo': 'bar'})
  175. assert cache._imp[0]
  176. cache.get_best_memcache()[0]()
  177. def test_backends(self):
  178. from celery.backends.cache import backends
  179. with self.mock_memcache():
  180. for name, fun in items(backends):
  181. assert fun()
  182. class test_memcache_key(MockCacheMixin):
  183. def test_memcache_unicode_key(self):
  184. with self.mock_memcache():
  185. with mock.reset_modules('celery.backends.cache'):
  186. with mock.mask_modules('pylibmc'):
  187. from celery.backends import cache
  188. cache._imp = [None]
  189. task_id, result = string(uuid()), 42
  190. b = cache.CacheBackend(backend='memcache', app=self.app)
  191. b.store_result(task_id, result, state=states.SUCCESS)
  192. assert b.get_result(task_id) == result
  193. def test_memcache_bytes_key(self):
  194. with self.mock_memcache():
  195. with mock.reset_modules('celery.backends.cache'):
  196. with mock.mask_modules('pylibmc'):
  197. from celery.backends import cache
  198. cache._imp = [None]
  199. task_id, result = str_to_bytes(uuid()), 42
  200. b = cache.CacheBackend(backend='memcache', app=self.app)
  201. b.store_result(task_id, result, state=states.SUCCESS)
  202. assert b.get_result(task_id) == result
  203. def test_pylibmc_unicode_key(self):
  204. with mock.reset_modules('celery.backends.cache'):
  205. with self.mock_pylibmc():
  206. from celery.backends import cache
  207. cache._imp = [None]
  208. task_id, result = string(uuid()), 42
  209. b = cache.CacheBackend(backend='memcache', app=self.app)
  210. b.store_result(task_id, result, state=states.SUCCESS)
  211. assert b.get_result(task_id) == result
  212. def test_pylibmc_bytes_key(self):
  213. with mock.reset_modules('celery.backends.cache'):
  214. with self.mock_pylibmc():
  215. from celery.backends import cache
  216. cache._imp = [None]
  217. task_id, result = str_to_bytes(uuid()), 42
  218. b = cache.CacheBackend(backend='memcache', app=self.app)
  219. b.store_result(task_id, result, state=states.SUCCESS)
  220. assert b.get_result(task_id) == result