test_cache.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. from __future__ import absolute_import, unicode_literals
  2. import sys
  3. import types
  4. from contextlib import contextmanager
  5. import pytest
  6. from case import Mock, mock, patch, skip
  7. from celery import group, signature, states, uuid
  8. from celery.backends.cache import CacheBackend, DummyClient, backends
  9. from celery.exceptions import ImproperlyConfigured
  10. from celery.five import bytes_if_py2, items, string, text_t
  11. from kombu.utils.encoding import ensure_bytes, str_to_bytes
  12. PY3 = sys.version_info[0] == 3
  13. class SomeClass(object):
  14. def __init__(self, data):
  15. self.data = data
  16. class test_CacheBackend:
  17. def setup(self):
  18. self.app.conf.result_serializer = 'pickle'
  19. self.tb = CacheBackend(backend='memory://', app=self.app)
  20. self.tid = uuid()
  21. self.old_get_best_memcached = backends['memcache']
  22. backends['memcache'] = lambda: (DummyClient, ensure_bytes)
  23. def teardown(self):
  24. backends['memcache'] = self.old_get_best_memcached
  25. def test_no_backend(self):
  26. self.app.conf.cache_backend = None
  27. with pytest.raises(ImproperlyConfigured):
  28. CacheBackend(backend=None, app=self.app)
  29. def test_mark_as_done(self):
  30. assert self.tb.get_state(self.tid) == states.PENDING
  31. assert self.tb.get_result(self.tid) is None
  32. self.tb.mark_as_done(self.tid, 42)
  33. assert self.tb.get_state(self.tid) == states.SUCCESS
  34. assert self.tb.get_result(self.tid) == 42
  35. def test_is_pickled(self):
  36. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  37. self.tb.mark_as_done(self.tid, result)
  38. # is serialized properly.
  39. rindb = self.tb.get_result(self.tid)
  40. assert rindb.get('foo') == 'baz'
  41. assert rindb.get('bar').data == 12345
  42. def test_mark_as_failure(self):
  43. try:
  44. raise KeyError('foo')
  45. except KeyError as exception:
  46. self.tb.mark_as_failure(self.tid, exception)
  47. assert self.tb.get_state(self.tid) == states.FAILURE
  48. assert isinstance(self.tb.get_result(self.tid), KeyError)
  49. def test_apply_chord(self):
  50. tb = CacheBackend(backend='memory://', app=self.app)
  51. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  52. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  53. @patch('celery.result.GroupResult.restore')
  54. def test_on_chord_part_return(self, restore):
  55. tb = CacheBackend(backend='memory://', app=self.app)
  56. deps = Mock()
  57. deps.__len__ = Mock()
  58. deps.__len__.return_value = 2
  59. restore.return_value = deps
  60. task = Mock()
  61. task.name = 'foobarbaz'
  62. self.app.tasks['foobarbaz'] = task
  63. task.request.chord = signature(task)
  64. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  65. task.request.group = gid
  66. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  67. deps.join_native.assert_not_called()
  68. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  69. deps.join_native.assert_not_called()
  70. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  71. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  72. deps.delete.assert_called_with()
  73. def test_mget(self):
  74. self.tb.set('foo', 1)
  75. self.tb.set('bar', 2)
  76. assert self.tb.mget(['foo', 'bar']) == {'foo': 1, 'bar': 2}
  77. def test_forget(self):
  78. self.tb.mark_as_done(self.tid, {'foo': 'bar'})
  79. x = self.app.AsyncResult(self.tid, backend=self.tb)
  80. x.forget()
  81. assert x.result is None
  82. def test_process_cleanup(self):
  83. self.tb.process_cleanup()
  84. def test_expires_as_int(self):
  85. tb = CacheBackend(backend='memory://', expires=10, app=self.app)
  86. assert tb.expires == 10
  87. def test_unknown_backend_raises_ImproperlyConfigured(self):
  88. with pytest.raises(ImproperlyConfigured):
  89. CacheBackend(backend='unknown://', app=self.app)
  90. def test_as_uri_no_servers(self):
  91. assert self.tb.as_uri() == 'memory:///'
  92. def test_as_uri_one_server(self):
  93. backend = 'memcache://127.0.0.1:11211/'
  94. b = CacheBackend(backend=backend, app=self.app)
  95. assert b.as_uri() == backend
  96. def test_as_uri_multiple_servers(self):
  97. backend = 'memcache://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  98. b = CacheBackend(backend=backend, app=self.app)
  99. assert b.as_uri() == backend
  100. @skip.unless_module('memcached', name='python-memcached')
  101. def test_regression_worker_startup_info(self):
  102. self.app.conf.result_backend = (
  103. 'cache+memcached://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  104. )
  105. worker = self.app.Worker()
  106. with mock.stdouts():
  107. worker.on_start()
  108. assert worker.startup_info()
  109. class MyMemcachedStringEncodingError(Exception):
  110. pass
  111. class MemcachedClient(DummyClient):
  112. def set(self, key, value, *args, **kwargs):
  113. if PY3:
  114. key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
  115. else:
  116. key_t, must_be, not_be, cod = text_t, 'bytes', 'string', 'encode'
  117. if isinstance(key, key_t):
  118. raise MyMemcachedStringEncodingError(
  119. 'Keys must be {0}, not {1}. Convert your '
  120. 'strings using mystring.{2}(charset)!'.format(
  121. must_be, not_be, cod))
  122. return super(MemcachedClient, self).set(key, value, *args, **kwargs)
  123. class MockCacheMixin(object):
  124. @contextmanager
  125. def mock_memcache(self):
  126. memcache = types.ModuleType(bytes_if_py2('memcache'))
  127. memcache.Client = MemcachedClient
  128. memcache.Client.__module__ = memcache.__name__
  129. prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
  130. try:
  131. yield True
  132. finally:
  133. if prev is not None:
  134. sys.modules['memcache'] = prev
  135. @contextmanager
  136. def mock_pylibmc(self):
  137. pylibmc = types.ModuleType(bytes_if_py2('pylibmc'))
  138. pylibmc.Client = MemcachedClient
  139. pylibmc.Client.__module__ = pylibmc.__name__
  140. prev = sys.modules.get('pylibmc')
  141. sys.modules['pylibmc'] = pylibmc
  142. try:
  143. yield True
  144. finally:
  145. if prev is not None:
  146. sys.modules['pylibmc'] = prev
  147. class test_get_best_memcache(MockCacheMixin):
  148. def test_pylibmc(self):
  149. with self.mock_pylibmc():
  150. with mock.reset_modules('celery.backends.cache'):
  151. from celery.backends import cache
  152. cache._imp = [None]
  153. assert cache.get_best_memcache()[0].__module__ == 'pylibmc'
  154. def test_memcache(self):
  155. with self.mock_memcache():
  156. with mock.reset_modules('celery.backends.cache'):
  157. with mock.mask_modules('pylibmc'):
  158. from celery.backends import cache
  159. cache._imp = [None]
  160. assert (cache.get_best_memcache()[0]().__module__ ==
  161. 'memcache')
  162. def test_no_implementations(self):
  163. with mock.mask_modules('pylibmc', 'memcache'):
  164. with mock.reset_modules('celery.backends.cache'):
  165. from celery.backends import cache
  166. cache._imp = [None]
  167. with pytest.raises(ImproperlyConfigured):
  168. cache.get_best_memcache()
  169. def test_cached(self):
  170. with self.mock_pylibmc():
  171. with mock.reset_modules('celery.backends.cache'):
  172. from celery.backends import cache
  173. cache._imp = [None]
  174. cache.get_best_memcache()[0](behaviors={'foo': 'bar'})
  175. assert cache._imp[0]
  176. cache.get_best_memcache()[0]()
  177. def test_backends(self):
  178. from celery.backends.cache import backends
  179. with self.mock_memcache():
  180. for name, fun in items(backends):
  181. assert fun()
  182. class test_memcache_key(MockCacheMixin):
  183. def test_memcache_unicode_key(self):
  184. with self.mock_memcache():
  185. with mock.reset_modules('celery.backends.cache'):
  186. with mock.mask_modules('pylibmc'):
  187. from celery.backends import cache
  188. cache._imp = [None]
  189. task_id, result = string(uuid()), 42
  190. b = cache.CacheBackend(backend='memcache', app=self.app)
  191. b.store_result(task_id, result, state=states.SUCCESS)
  192. assert b.get_result(task_id) == result
  193. def test_memcache_bytes_key(self):
  194. with self.mock_memcache():
  195. with mock.reset_modules('celery.backends.cache'):
  196. with mock.mask_modules('pylibmc'):
  197. from celery.backends import cache
  198. cache._imp = [None]
  199. task_id, result = str_to_bytes(uuid()), 42
  200. b = cache.CacheBackend(backend='memcache', app=self.app)
  201. b.store_result(task_id, result, state=states.SUCCESS)
  202. assert b.get_result(task_id) == result
  203. def test_pylibmc_unicode_key(self):
  204. with mock.reset_modules('celery.backends.cache'):
  205. with self.mock_pylibmc():
  206. from celery.backends import cache
  207. cache._imp = [None]
  208. task_id, result = string(uuid()), 42
  209. b = cache.CacheBackend(backend='memcache', app=self.app)
  210. b.store_result(task_id, result, state=states.SUCCESS)
  211. assert b.get_result(task_id) == result
  212. def test_pylibmc_bytes_key(self):
  213. with mock.reset_modules('celery.backends.cache'):
  214. with self.mock_pylibmc():
  215. from celery.backends import cache
  216. cache._imp = [None]
  217. task_id, result = str_to_bytes(uuid()), 42
  218. b = cache.CacheBackend(backend='memcache', app=self.app)
  219. b.store_result(task_id, result, state=states.SUCCESS)
  220. assert b.get_result(task_id) == result