test_cache.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. from __future__ import absolute_import, unicode_literals
  2. import sys
  3. import types
  4. from contextlib import contextmanager
  5. import pytest
  6. from case import Mock, mock, patch, skip
  7. from kombu.utils.encoding import ensure_bytes, str_to_bytes
  8. from celery import signature, states, uuid
  9. from celery.backends.cache import CacheBackend, DummyClient, backends
  10. from celery.exceptions import ImproperlyConfigured
  11. from celery.five import bytes_if_py2, items, string, text_t
  12. PY3 = sys.version_info[0] == 3
  13. class SomeClass(object):
  14. def __init__(self, data):
  15. self.data = data
  16. class test_CacheBackend:
  17. def setup(self):
  18. self.app.conf.result_serializer = 'pickle'
  19. self.tb = CacheBackend(backend='memory://', app=self.app)
  20. self.tid = uuid()
  21. self.old_get_best_memcached = backends['memcache']
  22. backends['memcache'] = lambda: (DummyClient, ensure_bytes)
  23. def teardown(self):
  24. backends['memcache'] = self.old_get_best_memcached
  25. def test_no_backend(self):
  26. self.app.conf.cache_backend = None
  27. with pytest.raises(ImproperlyConfigured):
  28. CacheBackend(backend=None, app=self.app)
  29. def test_mark_as_done(self):
  30. assert self.tb.get_state(self.tid) == states.PENDING
  31. assert self.tb.get_result(self.tid) is None
  32. self.tb.mark_as_done(self.tid, 42)
  33. assert self.tb.get_state(self.tid) == states.SUCCESS
  34. assert self.tb.get_result(self.tid) == 42
  35. def test_is_pickled(self):
  36. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  37. self.tb.mark_as_done(self.tid, result)
  38. # is serialized properly.
  39. rindb = self.tb.get_result(self.tid)
  40. assert rindb.get('foo') == 'baz'
  41. assert rindb.get('bar').data == 12345
  42. def test_mark_as_failure(self):
  43. try:
  44. raise KeyError('foo')
  45. except KeyError as exception:
  46. self.tb.mark_as_failure(self.tid, exception)
  47. assert self.tb.get_state(self.tid) == states.FAILURE
  48. assert isinstance(self.tb.get_result(self.tid), KeyError)
  49. def test_apply_chord(self):
  50. tb = CacheBackend(backend='memory://', app=self.app)
  51. result = self.app.GroupResult(
  52. uuid(),
  53. [self.app.AsyncResult(uuid()) for _ in range(3)],
  54. )
  55. tb.apply_chord(result, None)
  56. assert self.app.GroupResult.restore(result.id, backend=tb) == result
  57. @patch('celery.result.GroupResult.restore')
  58. def test_on_chord_part_return(self, restore):
  59. tb = CacheBackend(backend='memory://', app=self.app)
  60. deps = Mock()
  61. deps.__len__ = Mock()
  62. deps.__len__.return_value = 2
  63. restore.return_value = deps
  64. task = Mock()
  65. task.name = 'foobarbaz'
  66. self.app.tasks['foobarbaz'] = task
  67. task.request.chord = signature(task)
  68. result = self.app.GroupResult(
  69. uuid(),
  70. [self.app.AsyncResult(uuid()) for _ in range(3)],
  71. )
  72. task.request.group = result.id
  73. tb.apply_chord(result, None)
  74. deps.join_native.assert_not_called()
  75. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  76. deps.join_native.assert_not_called()
  77. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  78. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  79. deps.delete.assert_called_with()
  80. def test_mget(self):
  81. self.tb.set('foo', 1)
  82. self.tb.set('bar', 2)
  83. assert self.tb.mget(['foo', 'bar']) == {'foo': 1, 'bar': 2}
  84. def test_forget(self):
  85. self.tb.mark_as_done(self.tid, {'foo': 'bar'})
  86. x = self.app.AsyncResult(self.tid, backend=self.tb)
  87. x.forget()
  88. assert x.result is None
  89. def test_process_cleanup(self):
  90. self.tb.process_cleanup()
  91. def test_expires_as_int(self):
  92. tb = CacheBackend(backend='memory://', expires=10, app=self.app)
  93. assert tb.expires == 10
  94. def test_unknown_backend_raises_ImproperlyConfigured(self):
  95. with pytest.raises(ImproperlyConfigured):
  96. CacheBackend(backend='unknown://', app=self.app)
  97. def test_as_uri_no_servers(self):
  98. assert self.tb.as_uri() == 'memory:///'
  99. def test_as_uri_one_server(self):
  100. backend = 'memcache://127.0.0.1:11211/'
  101. b = CacheBackend(backend=backend, app=self.app)
  102. assert b.as_uri() == backend
  103. def test_as_uri_multiple_servers(self):
  104. backend = 'memcache://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  105. b = CacheBackend(backend=backend, app=self.app)
  106. assert b.as_uri() == backend
  107. @skip.unless_module('memcached', name='python-memcached')
  108. def test_regression_worker_startup_info(self):
  109. self.app.conf.result_backend = (
  110. 'cache+memcached://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  111. )
  112. worker = self.app.Worker()
  113. with mock.stdouts():
  114. worker.on_start()
  115. assert worker.startup_info()
  116. class MyMemcachedStringEncodingError(Exception):
  117. pass
  118. class MemcachedClient(DummyClient):
  119. def set(self, key, value, *args, **kwargs):
  120. if PY3:
  121. key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
  122. else:
  123. key_t, must_be, not_be, cod = text_t, 'bytes', 'string', 'encode'
  124. if isinstance(key, key_t):
  125. raise MyMemcachedStringEncodingError(
  126. 'Keys must be {0}, not {1}. Convert your '
  127. 'strings using mystring.{2}(charset)!'.format(
  128. must_be, not_be, cod))
  129. return super(MemcachedClient, self).set(key, value, *args, **kwargs)
  130. class MockCacheMixin(object):
  131. @contextmanager
  132. def mock_memcache(self):
  133. memcache = types.ModuleType(bytes_if_py2('memcache'))
  134. memcache.Client = MemcachedClient
  135. memcache.Client.__module__ = memcache.__name__
  136. prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
  137. try:
  138. yield True
  139. finally:
  140. if prev is not None:
  141. sys.modules['memcache'] = prev
  142. @contextmanager
  143. def mock_pylibmc(self):
  144. pylibmc = types.ModuleType(bytes_if_py2('pylibmc'))
  145. pylibmc.Client = MemcachedClient
  146. pylibmc.Client.__module__ = pylibmc.__name__
  147. prev = sys.modules.get('pylibmc')
  148. sys.modules['pylibmc'] = pylibmc
  149. try:
  150. yield True
  151. finally:
  152. if prev is not None:
  153. sys.modules['pylibmc'] = prev
  154. class test_get_best_memcache(MockCacheMixin):
  155. def test_pylibmc(self):
  156. with self.mock_pylibmc():
  157. with mock.reset_modules('celery.backends.cache'):
  158. from celery.backends import cache
  159. cache._imp = [None]
  160. assert cache.get_best_memcache()[0].__module__ == 'pylibmc'
  161. def test_memcache(self):
  162. with self.mock_memcache():
  163. with mock.reset_modules('celery.backends.cache'):
  164. with mock.mask_modules('pylibmc'):
  165. from celery.backends import cache
  166. cache._imp = [None]
  167. assert (cache.get_best_memcache()[0]().__module__ ==
  168. 'memcache')
  169. def test_no_implementations(self):
  170. with mock.mask_modules('pylibmc', 'memcache'):
  171. with mock.reset_modules('celery.backends.cache'):
  172. from celery.backends import cache
  173. cache._imp = [None]
  174. with pytest.raises(ImproperlyConfigured):
  175. cache.get_best_memcache()
  176. def test_cached(self):
  177. with self.mock_pylibmc():
  178. with mock.reset_modules('celery.backends.cache'):
  179. from celery.backends import cache
  180. cache._imp = [None]
  181. cache.get_best_memcache()[0](behaviors={'foo': 'bar'})
  182. assert cache._imp[0]
  183. cache.get_best_memcache()[0]()
  184. def test_backends(self):
  185. from celery.backends.cache import backends
  186. with self.mock_memcache():
  187. for name, fun in items(backends):
  188. assert fun()
  189. class test_memcache_key(MockCacheMixin):
  190. def test_memcache_unicode_key(self):
  191. with self.mock_memcache():
  192. with mock.reset_modules('celery.backends.cache'):
  193. with mock.mask_modules('pylibmc'):
  194. from celery.backends import cache
  195. cache._imp = [None]
  196. task_id, result = string(uuid()), 42
  197. b = cache.CacheBackend(backend='memcache', app=self.app)
  198. b.store_result(task_id, result, state=states.SUCCESS)
  199. assert b.get_result(task_id) == result
  200. def test_memcache_bytes_key(self):
  201. with self.mock_memcache():
  202. with mock.reset_modules('celery.backends.cache'):
  203. with mock.mask_modules('pylibmc'):
  204. from celery.backends import cache
  205. cache._imp = [None]
  206. task_id, result = str_to_bytes(uuid()), 42
  207. b = cache.CacheBackend(backend='memcache', app=self.app)
  208. b.store_result(task_id, result, state=states.SUCCESS)
  209. assert b.get_result(task_id) == result
  210. def test_pylibmc_unicode_key(self):
  211. with mock.reset_modules('celery.backends.cache'):
  212. with self.mock_pylibmc():
  213. from celery.backends import cache
  214. cache._imp = [None]
  215. task_id, result = string(uuid()), 42
  216. b = cache.CacheBackend(backend='memcache', app=self.app)
  217. b.store_result(task_id, result, state=states.SUCCESS)
  218. assert b.get_result(task_id) == result
  219. def test_pylibmc_bytes_key(self):
  220. with mock.reset_modules('celery.backends.cache'):
  221. with self.mock_pylibmc():
  222. from celery.backends import cache
  223. cache._imp = [None]
  224. task_id, result = str_to_bytes(uuid()), 42
  225. b = cache.CacheBackend(backend='memcache', app=self.app)
  226. b.store_result(task_id, result, state=states.SUCCESS)
  227. assert b.get_result(task_id) == result