test_cache.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. import pytest
  2. import sys
  3. import types
  4. from contextlib import contextmanager
  5. from case import Mock, mock, patch, skip
  6. from kombu.utils.encoding import str_to_bytes, ensure_bytes
  7. from celery import states
  8. from celery import group, signature, uuid
  9. from celery.backends.cache import CacheBackend, DummyClient, backends
  10. from celery.exceptions import ImproperlyConfigured
  11. class SomeClass:
  12. def __init__(self, data):
  13. self.data = data
  14. class test_CacheBackend:
  15. def setup(self):
  16. self.app.conf.result_serializer = 'pickle'
  17. self.tb = CacheBackend(backend='memory://', app=self.app)
  18. self.tid = uuid()
  19. self.old_get_best_memcached = backends['memcache']
  20. backends['memcache'] = lambda: (DummyClient, ensure_bytes)
  21. def teardown(self):
  22. backends['memcache'] = self.old_get_best_memcached
  23. def test_no_backend(self):
  24. self.app.conf.cache_backend = None
  25. with pytest.raises(ImproperlyConfigured):
  26. CacheBackend(backend=None, app=self.app)
  27. def test_mark_as_done(self):
  28. assert self.tb.get_state(self.tid) == states.PENDING
  29. assert self.tb.get_result(self.tid) is None
  30. self.tb.mark_as_done(self.tid, 42)
  31. assert self.tb.get_state(self.tid) == states.SUCCESS
  32. assert self.tb.get_result(self.tid) == 42
  33. def test_is_pickled(self):
  34. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  35. self.tb.mark_as_done(self.tid, result)
  36. # is serialized properly.
  37. rindb = self.tb.get_result(self.tid)
  38. assert rindb.get('foo') == 'baz'
  39. assert rindb.get('bar').data == 12345
  40. def test_mark_as_failure(self):
  41. try:
  42. raise KeyError('foo')
  43. except KeyError as exception:
  44. self.tb.mark_as_failure(self.tid, exception)
  45. assert self.tb.get_state(self.tid) == states.FAILURE
  46. assert isinstance(self.tb.get_result(self.tid), KeyError)
  47. def test_apply_chord(self):
  48. tb = CacheBackend(backend='memory://', app=self.app)
  49. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  50. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  51. @patch('celery.result.GroupResult.restore')
  52. def test_on_chord_part_return(self, restore):
  53. tb = CacheBackend(backend='memory://', app=self.app)
  54. deps = Mock()
  55. deps.__len__ = Mock()
  56. deps.__len__.return_value = 2
  57. restore.return_value = deps
  58. task = Mock()
  59. task.name = 'foobarbaz'
  60. self.app.tasks['foobarbaz'] = task
  61. task.request.chord = signature(task)
  62. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  63. task.request.group = gid
  64. tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
  65. deps.join_native.assert_not_called()
  66. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  67. deps.join_native.assert_not_called()
  68. tb.on_chord_part_return(task.request, 'SUCCESS', 10)
  69. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  70. deps.delete.assert_called_with()
  71. def test_mget(self):
  72. self.tb.set('foo', 1)
  73. self.tb.set('bar', 2)
  74. assert self.tb.mget(['foo', 'bar']) == {'foo': 1, 'bar': 2}
  75. def test_forget(self):
  76. self.tb.mark_as_done(self.tid, {'foo': 'bar'})
  77. x = self.app.AsyncResult(self.tid, backend=self.tb)
  78. x.forget()
  79. assert x.result is None
  80. def test_process_cleanup(self):
  81. self.tb.process_cleanup()
  82. def test_expires_as_int(self):
  83. tb = CacheBackend(backend='memory://', expires=10, app=self.app)
  84. assert tb.expires == 10
  85. def test_unknown_backend_raises_ImproperlyConfigured(self):
  86. with pytest.raises(ImproperlyConfigured):
  87. CacheBackend(backend='unknown://', app=self.app)
  88. def test_as_uri_no_servers(self):
  89. assert self.tb.as_uri() == 'memory:///'
  90. def test_as_uri_one_server(self):
  91. backend = 'memcache://127.0.0.1:11211/'
  92. b = CacheBackend(backend=backend, app=self.app)
  93. assert b.as_uri() == backend
  94. def test_as_uri_multiple_servers(self):
  95. backend = 'memcache://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  96. b = CacheBackend(backend=backend, app=self.app)
  97. assert b.as_uri() == backend
  98. @skip.unless_module('memcached', name='python-memcached')
  99. def test_regression_worker_startup_info(self):
  100. self.app.conf.result_backend = (
  101. 'cache+memcached://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
  102. )
  103. worker = self.app.Worker()
  104. with mock.stdouts():
  105. worker.on_start()
  106. assert worker.startup_info()
  107. class MyMemcachedStringEncodingError(Exception):
  108. pass
  109. class MemcachedClient(DummyClient):
  110. def set(self, key, value, *args, **kwargs):
  111. key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
  112. if isinstance(key, key_t):
  113. raise MyMemcachedStringEncodingError(
  114. 'Keys must be {0}, not {1}. Convert your '
  115. 'strings using mystring.{2}(charset)!'.format(
  116. must_be, not_be, cod))
  117. return super().set(key, value, *args, **kwargs)
  118. class MockCacheMixin:
  119. @contextmanager
  120. def mock_memcache(self):
  121. memcache = types.ModuleType('memcache')
  122. memcache.Client = MemcachedClient
  123. memcache.Client.__module__ = memcache.__name__
  124. prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
  125. try:
  126. yield True
  127. finally:
  128. if prev is not None:
  129. sys.modules['memcache'] = prev
  130. @contextmanager
  131. def mock_pylibmc(self):
  132. pylibmc = types.ModuleType('pylibmc')
  133. pylibmc.Client = MemcachedClient
  134. pylibmc.Client.__module__ = pylibmc.__name__
  135. prev = sys.modules.get('pylibmc')
  136. sys.modules['pylibmc'] = pylibmc
  137. try:
  138. yield True
  139. finally:
  140. if prev is not None:
  141. sys.modules['pylibmc'] = prev
  142. class test_get_best_memcache(MockCacheMixin):
  143. def test_pylibmc(self):
  144. with self.mock_pylibmc():
  145. with mock.reset_modules('celery.backends.cache'):
  146. from celery.backends import cache
  147. cache._imp = [None]
  148. assert cache.get_best_memcache()[0].__module__ == 'pylibmc'
  149. def test_memcache(self):
  150. with self.mock_memcache():
  151. with mock.reset_modules('celery.backends.cache'):
  152. with mock.mask_modules('pylibmc'):
  153. from celery.backends import cache
  154. cache._imp = [None]
  155. assert (cache.get_best_memcache()[0]().__module__ ==
  156. 'memcache')
  157. def test_no_implementations(self):
  158. with mock.mask_modules('pylibmc', 'memcache'):
  159. with mock.reset_modules('celery.backends.cache'):
  160. from celery.backends import cache
  161. cache._imp = [None]
  162. with pytest.raises(ImproperlyConfigured):
  163. cache.get_best_memcache()
  164. def test_cached(self):
  165. with self.mock_pylibmc():
  166. with mock.reset_modules('celery.backends.cache'):
  167. from celery.backends import cache
  168. cache._imp = [None]
  169. cache.get_best_memcache()[0](behaviors={'foo': 'bar'})
  170. assert cache._imp[0]
  171. cache.get_best_memcache()[0]()
  172. def test_backends(self):
  173. from celery.backends.cache import backends
  174. with self.mock_memcache():
  175. for name, fun in backends.items():
  176. assert fun()
  177. class test_memcache_key(MockCacheMixin):
  178. def test_memcache_unicode_key(self):
  179. with self.mock_memcache():
  180. with mock.reset_modules('celery.backends.cache'):
  181. with mock.mask_modules('pylibmc'):
  182. from celery.backends import cache
  183. cache._imp = [None]
  184. task_id, result = str(uuid()), 42
  185. b = cache.CacheBackend(backend='memcache', app=self.app)
  186. b.store_result(task_id, result, state=states.SUCCESS)
  187. assert b.get_result(task_id) == result
  188. def test_memcache_bytes_key(self):
  189. with self.mock_memcache():
  190. with mock.reset_modules('celery.backends.cache'):
  191. with mock.mask_modules('pylibmc'):
  192. from celery.backends import cache
  193. cache._imp = [None]
  194. task_id, result = str_to_bytes(uuid()), 42
  195. b = cache.CacheBackend(backend='memcache', app=self.app)
  196. b.store_result(task_id, result, state=states.SUCCESS)
  197. assert b.get_result(task_id) == result
  198. def test_pylibmc_unicode_key(self):
  199. with mock.reset_modules('celery.backends.cache'):
  200. with self.mock_pylibmc():
  201. from celery.backends import cache
  202. cache._imp = [None]
  203. task_id, result = str(uuid()), 42
  204. b = cache.CacheBackend(backend='memcache', app=self.app)
  205. b.store_result(task_id, result, state=states.SUCCESS)
  206. assert b.get_result(task_id) == result
  207. def test_pylibmc_bytes_key(self):
  208. with mock.reset_modules('celery.backends.cache'):
  209. with self.mock_pylibmc():
  210. from celery.backends import cache
  211. cache._imp = [None]
  212. task_id, result = str_to_bytes(uuid()), 42
  213. b = cache.CacheBackend(backend='memcache', app=self.app)
  214. b.store_result(task_id, result, state=states.SUCCESS)
  215. assert b.get_result(task_id) == result