cache.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. from kombu.utils import cached_property
  2. from celery.backends.base import KeyValueStoreBackend
  3. from celery.exceptions import ImproperlyConfigured
  4. from celery.datastructures import LocalCache
  5. _imp = [None]
  6. def import_best_memcache():
  7. if _imp[0] is None:
  8. is_pylibmc = False
  9. try:
  10. import pylibmc as memcache
  11. is_pylibmc = True
  12. except ImportError:
  13. try:
  14. import memcache # noqa
  15. except ImportError:
  16. raise ImproperlyConfigured(
  17. "Memcached backend requires either the 'pylibmc' "
  18. "or 'memcache' library")
  19. _imp[0] = is_pylibmc, memcache
  20. return _imp[0]
  21. def get_best_memcache(*args, **kwargs):
  22. behaviors = kwargs.pop("behaviors", None)
  23. is_pylibmc, memcache = import_best_memcache()
  24. client = memcache.Client(*args, **kwargs)
  25. if is_pylibmc and behaviors is not None:
  26. client.behaviors = behaviors
  27. return client
  28. class DummyClient(object):
  29. def __init__(self, *args, **kwargs):
  30. self.cache = LocalCache(5000)
  31. def get(self, key, *args, **kwargs):
  32. return self.cache.get(key)
  33. def get_multi(self, keys):
  34. cache = self.cache
  35. return dict((k, cache[k]) for k in keys if k in cache)
  36. def set(self, key, value, *args, **kwargs):
  37. self.cache[key] = value
  38. def delete(self, key, *args, **kwargs):
  39. self.cache.pop(key, None)
  40. backends = {"memcache": lambda: get_best_memcache,
  41. "memcached": lambda: get_best_memcache,
  42. "pylibmc": lambda: get_best_memcache,
  43. "memory": lambda: DummyClient}
  44. class CacheBackend(KeyValueStoreBackend):
  45. def __init__(self, expires=None, backend=None, options={}, **kwargs):
  46. super(CacheBackend, self).__init__(self, **kwargs)
  47. self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
  48. **options)
  49. backend = backend or self.app.conf.CELERY_CACHE_BACKEND
  50. self.backend, _, servers = backend.partition("://")
  51. self.expires = self.prepare_expires(expires, type=int)
  52. self.servers = servers.rstrip('/').split(";")
  53. try:
  54. self.Client = backends[self.backend]()
  55. except KeyError:
  56. raise ImproperlyConfigured(
  57. "Unknown cache backend: %s. Please use one of the "
  58. "following backends: %s" % (self.backend,
  59. ", ".join(backends.keys())))
  60. def get(self, key):
  61. return self.client.get(key)
  62. def mget(self, keys):
  63. return self.client.get_multi(keys)
  64. def set(self, key, value):
  65. return self.client.set(key, value, self.expires)
  66. def delete(self, key):
  67. return self.client.delete(key)
  68. @cached_property
  69. def client(self):
  70. return self.Client(self.servers, **self.options)
  71. def __reduce__(self, args=(), kwargs={}):
  72. servers = ";".join(self.servers)
  73. backend = "%s://%s/" % (self.backend, servers)
  74. kwargs.update(
  75. dict(backend=backend,
  76. expires=self.expires,
  77. options=self.options))
  78. return super(CacheBackend, self).__reduce__(args, kwargs)