cache.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. from datetime import timedelta
  2. from kombu.utils import partition
  3. from celery.backends.base import KeyValueStoreBackend
  4. from celery.exceptions import ImproperlyConfigured
  5. from celery.utils import cached_property
  6. from celery.utils import timeutils
  7. from celery.datastructures import LocalCache
  8. _imp = [None]
  9. def import_best_memcache():
  10. if _imp[0] is None:
  11. is_pylibmc = False
  12. try:
  13. import pylibmc as memcache
  14. is_pylibmc = True
  15. except ImportError:
  16. try:
  17. import memcache
  18. except ImportError:
  19. raise ImproperlyConfigured(
  20. "Memcached backend requires either the 'pylibmc' "
  21. "or 'memcache' library")
  22. _imp[0] = is_pylibmc, memcache
  23. return _imp[0]
  24. def get_best_memcache(*args, **kwargs):
  25. behaviors = kwargs.pop("behaviors", None)
  26. is_pylibmc, memcache = import_best_memcache()
  27. client = memcache.Client(*args, **kwargs)
  28. if is_pylibmc and behaviors is not None:
  29. client.behaviors = behaviors
  30. return client
  31. class DummyClient(object):
  32. def __init__(self, *args, **kwargs):
  33. self.cache = LocalCache(5000)
  34. def get(self, key, *args, **kwargs):
  35. return self.cache.get(key)
  36. def set(self, key, value, *args, **kwargs):
  37. self.cache[key] = value
  38. def delete(self, key, *args, **kwargs):
  39. self.cache.pop(key, None)
  40. backends = {"memcache": lambda: get_best_memcache,
  41. "memcached": lambda: get_best_memcache,
  42. "pylibmc": lambda: get_best_memcache,
  43. "memory": lambda: DummyClient}
  44. class CacheBackend(KeyValueStoreBackend):
  45. def __init__(self, expires=None, backend=None, options={}, **kwargs):
  46. super(CacheBackend, self).__init__(self, **kwargs)
  47. self.expires = expires or self.app.conf.CELERY_TASK_RESULT_EXPIRES
  48. if isinstance(self.expires, timedelta):
  49. self.expires = timeutils.timedelta_seconds(self.expires)
  50. self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
  51. **options)
  52. backend = backend or self.app.conf.CELERY_CACHE_BACKEND
  53. self.expires = int(self.expires)
  54. self.backend, _, servers = partition(backend, "://")
  55. self.servers = servers.split(";")
  56. try:
  57. self.Client = backends[self.backend]()
  58. except KeyError:
  59. raise ImproperlyConfigured(
  60. "Unknown cache backend: %s. Please use one of the "
  61. "following backends: %s" % (self.backend,
  62. ", ".join(backends.keys())))
  63. def get(self, key):
  64. return self.client.get(key)
  65. def set(self, key, value):
  66. return self.client.set(key, value, self.expires)
  67. def delete(self, key):
  68. return self.client.delete(key)
  69. @cached_property
  70. def client(self):
  71. return self.Client(self.servers, **self.options)