123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- from __future__ import absolute_import, unicode_literals
- import sys
- import types
- from contextlib import contextmanager
- import pytest
- from case import Mock, mock, patch, skip
- from kombu.utils.encoding import ensure_bytes, str_to_bytes
- from celery import signature, states, uuid
- from celery.backends.cache import CacheBackend, DummyClient, backends
- from celery.exceptions import ImproperlyConfigured
- from celery.five import PY3, bytes_if_py2, items, string, text_t
- class SomeClass(object):
- def __init__(self, data):
- self.data = data
- class test_CacheBackend:
- def setup(self):
- self.app.conf.result_serializer = 'pickle'
- self.tb = CacheBackend(backend='memory://', app=self.app)
- self.tid = uuid()
- self.old_get_best_memcached = backends['memcache']
- backends['memcache'] = lambda: (DummyClient, ensure_bytes)
- def teardown(self):
- backends['memcache'] = self.old_get_best_memcached
- def test_no_backend(self):
- self.app.conf.cache_backend = None
- with pytest.raises(ImproperlyConfigured):
- CacheBackend(backend=None, app=self.app)
- def test_mark_as_done(self):
- assert self.tb.get_state(self.tid) == states.PENDING
- assert self.tb.get_result(self.tid) is None
- self.tb.mark_as_done(self.tid, 42)
- assert self.tb.get_state(self.tid) == states.SUCCESS
- assert self.tb.get_result(self.tid) == 42
- def test_is_pickled(self):
- result = {'foo': 'baz', 'bar': SomeClass(12345)}
- self.tb.mark_as_done(self.tid, result)
- # is serialized properly.
- rindb = self.tb.get_result(self.tid)
- assert rindb.get('foo') == 'baz'
- assert rindb.get('bar').data == 12345
- def test_mark_as_failure(self):
- try:
- raise KeyError('foo')
- except KeyError as exception:
- self.tb.mark_as_failure(self.tid, exception)
- assert self.tb.get_state(self.tid) == states.FAILURE
- assert isinstance(self.tb.get_result(self.tid), KeyError)
- def test_apply_chord(self):
- tb = CacheBackend(backend='memory://', app=self.app)
- result = self.app.GroupResult(
- uuid(),
- [self.app.AsyncResult(uuid()) for _ in range(3)],
- )
- tb.apply_chord(result, None)
- assert self.app.GroupResult.restore(result.id, backend=tb) == result
- @patch('celery.result.GroupResult.restore')
- def test_on_chord_part_return(self, restore):
- tb = CacheBackend(backend='memory://', app=self.app)
- deps = Mock()
- deps.__len__ = Mock()
- deps.__len__.return_value = 2
- restore.return_value = deps
- task = Mock()
- task.name = 'foobarbaz'
- self.app.tasks['foobarbaz'] = task
- task.request.chord = signature(task)
- result = self.app.GroupResult(
- uuid(),
- [self.app.AsyncResult(uuid()) for _ in range(3)],
- )
- task.request.group = result.id
- tb.apply_chord(result, None)
- deps.join_native.assert_not_called()
- tb.on_chord_part_return(task.request, 'SUCCESS', 10)
- deps.join_native.assert_not_called()
- tb.on_chord_part_return(task.request, 'SUCCESS', 10)
- deps.join_native.assert_called_with(propagate=True, timeout=3.0)
- deps.delete.assert_called_with()
- def test_mget(self):
- self.tb.set('foo', 1)
- self.tb.set('bar', 2)
- assert self.tb.mget(['foo', 'bar']) == {'foo': 1, 'bar': 2}
- def test_forget(self):
- self.tb.mark_as_done(self.tid, {'foo': 'bar'})
- x = self.app.AsyncResult(self.tid, backend=self.tb)
- x.forget()
- assert x.result is None
- def test_process_cleanup(self):
- self.tb.process_cleanup()
- def test_expires_as_int(self):
- tb = CacheBackend(backend='memory://', expires=10, app=self.app)
- assert tb.expires == 10
- def test_unknown_backend_raises_ImproperlyConfigured(self):
- with pytest.raises(ImproperlyConfigured):
- CacheBackend(backend='unknown://', app=self.app)
- def test_as_uri_no_servers(self):
- assert self.tb.as_uri() == 'memory:///'
- def test_as_uri_one_server(self):
- backend = 'memcache://127.0.0.1:11211/'
- b = CacheBackend(backend=backend, app=self.app)
- assert b.as_uri() == backend
- def test_as_uri_multiple_servers(self):
- backend = 'memcache://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
- b = CacheBackend(backend=backend, app=self.app)
- assert b.as_uri() == backend
- @skip.unless_module('memcached', name='python-memcached')
- def test_regression_worker_startup_info(self):
- self.app.conf.result_backend = (
- 'cache+memcached://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
- )
- worker = self.app.Worker()
- with mock.stdouts():
- worker.on_start()
- assert worker.startup_info()
- class MyMemcachedStringEncodingError(Exception):
- pass
- class MemcachedClient(DummyClient):
- def set(self, key, value, *args, **kwargs):
- if PY3:
- key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
- else:
- key_t, must_be, not_be, cod = text_t, 'bytes', 'string', 'encode'
- if isinstance(key, key_t):
- raise MyMemcachedStringEncodingError(
- 'Keys must be {0}, not {1}. Convert your '
- 'strings using mystring.{2}(charset)!'.format(
- must_be, not_be, cod))
- return super(MemcachedClient, self).set(key, value, *args, **kwargs)
- class MockCacheMixin(object):
- @contextmanager
- def mock_memcache(self):
- memcache = types.ModuleType(bytes_if_py2('memcache'))
- memcache.Client = MemcachedClient
- memcache.Client.__module__ = memcache.__name__
- prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
- try:
- yield True
- finally:
- if prev is not None:
- sys.modules['memcache'] = prev
- @contextmanager
- def mock_pylibmc(self):
- pylibmc = types.ModuleType(bytes_if_py2('pylibmc'))
- pylibmc.Client = MemcachedClient
- pylibmc.Client.__module__ = pylibmc.__name__
- prev = sys.modules.get('pylibmc')
- sys.modules['pylibmc'] = pylibmc
- try:
- yield True
- finally:
- if prev is not None:
- sys.modules['pylibmc'] = prev
- class test_get_best_memcache(MockCacheMixin):
- def test_pylibmc(self):
- with self.mock_pylibmc():
- with mock.reset_modules('celery.backends.cache'):
- from celery.backends import cache
- cache._imp = [None]
- assert cache.get_best_memcache()[0].__module__ == 'pylibmc'
- def test_memcache(self):
- with self.mock_memcache():
- with mock.reset_modules('celery.backends.cache'):
- with mock.mask_modules('pylibmc'):
- from celery.backends import cache
- cache._imp = [None]
- assert (cache.get_best_memcache()[0]().__module__ ==
- 'memcache')
- def test_no_implementations(self):
- with mock.mask_modules('pylibmc', 'memcache'):
- with mock.reset_modules('celery.backends.cache'):
- from celery.backends import cache
- cache._imp = [None]
- with pytest.raises(ImproperlyConfigured):
- cache.get_best_memcache()
- def test_cached(self):
- with self.mock_pylibmc():
- with mock.reset_modules('celery.backends.cache'):
- from celery.backends import cache
- cache._imp = [None]
- cache.get_best_memcache()[0](behaviors={'foo': 'bar'})
- assert cache._imp[0]
- cache.get_best_memcache()[0]()
- def test_backends(self):
- from celery.backends.cache import backends
- with self.mock_memcache():
- for name, fun in items(backends):
- assert fun()
- class test_memcache_key(MockCacheMixin):
- def test_memcache_unicode_key(self):
- with self.mock_memcache():
- with mock.reset_modules('celery.backends.cache'):
- with mock.mask_modules('pylibmc'):
- from celery.backends import cache
- cache._imp = [None]
- task_id, result = string(uuid()), 42
- b = cache.CacheBackend(backend='memcache', app=self.app)
- b.store_result(task_id, result, state=states.SUCCESS)
- assert b.get_result(task_id) == result
- def test_memcache_bytes_key(self):
- with self.mock_memcache():
- with mock.reset_modules('celery.backends.cache'):
- with mock.mask_modules('pylibmc'):
- from celery.backends import cache
- cache._imp = [None]
- task_id, result = str_to_bytes(uuid()), 42
- b = cache.CacheBackend(backend='memcache', app=self.app)
- b.store_result(task_id, result, state=states.SUCCESS)
- assert b.get_result(task_id) == result
- def test_pylibmc_unicode_key(self):
- with mock.reset_modules('celery.backends.cache'):
- with self.mock_pylibmc():
- from celery.backends import cache
- cache._imp = [None]
- task_id, result = string(uuid()), 42
- b = cache.CacheBackend(backend='memcache', app=self.app)
- b.store_result(task_id, result, state=states.SUCCESS)
- assert b.get_result(task_id) == result
- def test_pylibmc_bytes_key(self):
- with mock.reset_modules('celery.backends.cache'):
- with self.mock_pylibmc():
- from celery.backends import cache
- cache._imp = [None]
- task_id, result = str_to_bytes(uuid()), 42
- b = cache.CacheBackend(backend='memcache', app=self.app)
- b.store_result(task_id, result, state=states.SUCCESS)
- assert b.get_result(task_id) == result
|