cache.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. # -*- coding: utf-8 -*-
  2. """Memcached and in-memory cache result backend."""
  3. from kombu.utils.encoding import bytes_to_str, ensure_bytes
  4. from kombu.utils.objects import cached_property
  5. from celery.exceptions import ImproperlyConfigured
  6. from celery.utils.functional import LRUCache
  7. from .base import KeyValueStoreBackend
  8. __all__ = ['CacheBackend']
  9. _imp = [None]
  10. REQUIRES_BACKEND = """\
  11. The Memcached backend requires either pylibmc or python-memcached.\
  12. """
  13. UNKNOWN_BACKEND = """\
  14. The cache backend {0!r} is unknown,
  15. Please use one of the following backends instead: {1}\
  16. """
  17. def import_best_memcache():
  18. if _imp[0] is None:
  19. is_pylibmc = False
  20. try:
  21. import pylibmc as memcache
  22. is_pylibmc = True
  23. except ImportError:
  24. try:
  25. import memcache # noqa
  26. except ImportError:
  27. raise ImproperlyConfigured(REQUIRES_BACKEND)
  28. _imp[0] = (is_pylibmc, memcache, bytes_to_str)
  29. return _imp[0]
  30. def get_best_memcache(*args, **kwargs):
  31. # pylint: disable=unpacking-non-sequence
  32. # This is most definitely a sequence, but pylint thinks it's not.
  33. is_pylibmc, memcache, key_t = import_best_memcache()
  34. Client = _Client = memcache.Client
  35. if not is_pylibmc:
  36. def Client(*args, **kwargs): # noqa
  37. kwargs.pop('behaviors', None)
  38. return _Client(*args, **kwargs)
  39. return Client, key_t
  40. class DummyClient:
  41. def __init__(self, *args, **kwargs):
  42. self.cache = LRUCache(limit=5000)
  43. def get(self, key, *args, **kwargs):
  44. return self.cache.get(key)
  45. def get_multi(self, keys):
  46. cache = self.cache
  47. return {k: cache[k] for k in keys if k in cache}
  48. def set(self, key, value, *args, **kwargs):
  49. self.cache[key] = value
  50. def delete(self, key, *args, **kwargs):
  51. self.cache.pop(key, None)
  52. def incr(self, key, delta=1):
  53. return self.cache.incr(key, delta)
  54. backends = {
  55. 'memcache': get_best_memcache,
  56. 'memcached': get_best_memcache,
  57. 'pylibmc': get_best_memcache,
  58. 'memory': lambda: (DummyClient, ensure_bytes),
  59. }
  60. class CacheBackend(KeyValueStoreBackend):
  61. """Cache result backend."""
  62. servers = None
  63. supports_autoexpire = True
  64. supports_native_join = True
  65. implements_incr = True
  66. def __init__(self, app, expires=None, backend=None,
  67. options={}, url=None, **kwargs):
  68. super().__init__(app, **kwargs)
  69. self.url = url
  70. self.options = dict(self.app.conf.cache_backend_options,
  71. **options)
  72. self.backend = url or backend or self.app.conf.cache_backend
  73. if self.backend:
  74. self.backend, _, servers = self.backend.partition('://')
  75. self.servers = servers.rstrip('/').split(';')
  76. self.expires = self.prepare_expires(expires, type=int)
  77. try:
  78. self.Client, self.key_t = backends[self.backend]()
  79. except KeyError:
  80. raise ImproperlyConfigured(UNKNOWN_BACKEND.format(
  81. self.backend, ', '.join(backends)))
  82. self._encode_prefixes() # rencode the keyprefixes
  83. def get(self, key):
  84. return self.client.get(key)
  85. def mget(self, keys):
  86. return self.client.get_multi(keys)
  87. def set(self, key, value):
  88. return self.client.set(key, value, self.expires)
  89. def delete(self, key):
  90. return self.client.delete(key)
  91. def _apply_chord_incr(self, header, partial_args, group_id, body, **opts):
  92. self.client.set(self.get_key_for_chord(group_id), 0, time=86400)
  93. return super()._apply_chord_incr(
  94. header, partial_args, group_id, body, **opts)
  95. def incr(self, key):
  96. return self.client.incr(key)
  97. @cached_property
  98. def client(self):
  99. return self.Client(self.servers, **self.options)
  100. def __reduce__(self, args=(), kwargs={}):
  101. servers = ';'.join(self.servers)
  102. backend = '{0}://{1}/'.format(self.backend, servers)
  103. kwargs.update(
  104. dict(backend=backend,
  105. expires=self.expires,
  106. options=self.options))
  107. return super().__reduce__(args, kwargs)
  108. def as_uri(self, *args, **kwargs):
  109. """Return the backend as an URI.
  110. This properly handles the case of multiple servers.
  111. """
  112. servers = ';'.join(self.servers)
  113. return '{0}://{1}/'.format(self.backend, servers)