cache.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # -*- coding: utf-8 -*-
  2. """Memcached and in-memory cache result backend."""
  3. from __future__ import absolute_import, unicode_literals
  4. import sys
  5. from kombu.utils.encoding import bytes_to_str, ensure_bytes
  6. from kombu.utils.objects import cached_property
  7. from celery.exceptions import ImproperlyConfigured
  8. from celery.utils.functional import LRUCache
  9. from .base import KeyValueStoreBackend
  10. __all__ = ('CacheBackend',)
  11. _imp = [None]
  12. PY3 = sys.version_info[0] == 3
  13. REQUIRES_BACKEND = """\
  14. The Memcached backend requires either pylibmc or python-memcached.\
  15. """
  16. UNKNOWN_BACKEND = """\
  17. The cache backend {0!r} is unknown,
  18. Please use one of the following backends instead: {1}\
  19. """
  20. def import_best_memcache():
  21. if _imp[0] is None:
  22. is_pylibmc, memcache_key_t = False, ensure_bytes
  23. try:
  24. import pylibmc as memcache
  25. is_pylibmc = True
  26. except ImportError:
  27. try:
  28. import memcache # noqa
  29. except ImportError:
  30. raise ImproperlyConfigured(REQUIRES_BACKEND)
  31. if PY3: # pragma: no cover
  32. memcache_key_t = bytes_to_str
  33. _imp[0] = (is_pylibmc, memcache, memcache_key_t)
  34. return _imp[0]
  35. def get_best_memcache(*args, **kwargs):
  36. # pylint: disable=unpacking-non-sequence
  37. # This is most definitely a sequence, but pylint thinks it's not.
  38. is_pylibmc, memcache, key_t = import_best_memcache()
  39. Client = _Client = memcache.Client
  40. if not is_pylibmc:
  41. def Client(*args, **kwargs): # noqa
  42. kwargs.pop('behaviors', None)
  43. return _Client(*args, **kwargs)
  44. return Client, key_t
  45. class DummyClient(object):
  46. def __init__(self, *args, **kwargs):
  47. self.cache = LRUCache(limit=5000)
  48. def get(self, key, *args, **kwargs):
  49. return self.cache.get(key)
  50. def get_multi(self, keys):
  51. cache = self.cache
  52. return {k: cache[k] for k in keys if k in cache}
  53. def set(self, key, value, *args, **kwargs):
  54. self.cache[key] = value
  55. def delete(self, key, *args, **kwargs):
  56. self.cache.pop(key, None)
  57. def incr(self, key, delta=1):
  58. return self.cache.incr(key, delta)
  59. def touch(self, key, expire):
  60. pass
  61. backends = {
  62. 'memcache': get_best_memcache,
  63. 'memcached': get_best_memcache,
  64. 'pylibmc': get_best_memcache,
  65. 'memory': lambda: (DummyClient, ensure_bytes),
  66. }
  67. class CacheBackend(KeyValueStoreBackend):
  68. """Cache result backend."""
  69. servers = None
  70. supports_autoexpire = True
  71. supports_native_join = True
  72. implements_incr = True
  73. def __init__(self, app, expires=None, backend=None,
  74. options={}, url=None, **kwargs):
  75. super(CacheBackend, self).__init__(app, **kwargs)
  76. self.url = url
  77. self.options = dict(self.app.conf.cache_backend_options,
  78. **options)
  79. self.backend = url or backend or self.app.conf.cache_backend
  80. if self.backend:
  81. self.backend, _, servers = self.backend.partition('://')
  82. self.servers = servers.rstrip('/').split(';')
  83. self.expires = self.prepare_expires(expires, type=int)
  84. try:
  85. self.Client, self.key_t = backends[self.backend]()
  86. except KeyError:
  87. raise ImproperlyConfigured(UNKNOWN_BACKEND.format(
  88. self.backend, ', '.join(backends)))
  89. self._encode_prefixes() # rencode the keyprefixes
  90. def get(self, key):
  91. return self.client.get(key)
  92. def mget(self, keys):
  93. return self.client.get_multi(keys)
  94. def set(self, key, value):
  95. return self.client.set(key, value, self.expires)
  96. def delete(self, key):
  97. return self.client.delete(key)
  98. def _apply_chord_incr(self, header_result, body, **kwargs):
  99. chord_key = self.get_key_for_chord(header_result.id)
  100. self.client.set(chord_key, 0, time=self.expires)
  101. return super(CacheBackend, self)._apply_chord_incr(
  102. header_result, body, **kwargs)
  103. def incr(self, key):
  104. return self.client.incr(key)
  105. def expire(self, key, value):
  106. return self.client.touch(key, value)
  107. @cached_property
  108. def client(self):
  109. return self.Client(self.servers, **self.options)
  110. def __reduce__(self, args=(), kwargs={}):
  111. servers = ';'.join(self.servers)
  112. backend = '{0}://{1}/'.format(self.backend, servers)
  113. kwargs.update(
  114. {'backend': backend,
  115. 'expires': self.expires,
  116. 'options': self.options})
  117. return super(CacheBackend, self).__reduce__(args, kwargs)
  118. def as_uri(self, *args, **kwargs):
  119. """Return the backend as an URI.
  120. This properly handles the case of multiple servers.
  121. """
  122. servers = ';'.join(self.servers)
  123. return '{0}://{1}/'.format(self.backend, servers)