cache.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from celery.datastructures import LRUCache
  4. from celery.exceptions import ImproperlyConfigured
  5. from celery.utils import cached_property
  6. from .base import KeyValueStoreBackend
  7. _imp = [None]
  8. def import_best_memcache():
  9. if _imp[0] is None:
  10. is_pylibmc = False
  11. try:
  12. import pylibmc as memcache
  13. is_pylibmc = True
  14. except ImportError:
  15. try:
  16. import memcache # noqa
  17. except ImportError:
  18. raise ImproperlyConfigured(
  19. "Memcached backend requires either the 'pylibmc' "
  20. "or 'memcache' library")
  21. _imp[0] = (is_pylibmc, memcache)
  22. return _imp[0]
  23. def get_best_memcache(*args, **kwargs):
  24. behaviors = kwargs.pop("behaviors", None)
  25. is_pylibmc, memcache = import_best_memcache()
  26. client = memcache.Client(*args, **kwargs)
  27. if is_pylibmc and behaviors is not None:
  28. client.behaviors = behaviors
  29. return client
  30. class DummyClient(object):
  31. def __init__(self, *args, **kwargs):
  32. self.cache = LRUCache(limit=5000)
  33. def get(self, key, *args, **kwargs):
  34. return self.cache.get(key)
  35. def get_multi(self, keys):
  36. cache = self.cache
  37. return dict((k, cache[k]) for k in keys if k in cache)
  38. def set(self, key, value, *args, **kwargs):
  39. self.cache[key] = value
  40. def delete(self, key, *args, **kwargs):
  41. self.cache.pop(key, None)
  42. def incr(self, key, delta=1):
  43. return self.cache.incr(key, delta)
  44. backends = {"memcache": lambda: get_best_memcache,
  45. "memcached": lambda: get_best_memcache,
  46. "pylibmc": lambda: get_best_memcache,
  47. "memory": lambda: DummyClient}
  48. class CacheBackend(KeyValueStoreBackend):
  49. servers = None
  50. supports_native_join = True
  51. implements_incr = True
  52. def __init__(self, expires=None, backend=None, options={}, **kwargs):
  53. super(CacheBackend, self).__init__(self, **kwargs)
  54. self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
  55. **options)
  56. self.backend = backend or self.app.conf.CELERY_CACHE_BACKEND
  57. if self.backend:
  58. self.backend, _, servers = self.backend.partition("://")
  59. self.servers = servers.rstrip('/').split(";")
  60. self.expires = self.prepare_expires(expires, type=int)
  61. try:
  62. self.Client = backends[self.backend]()
  63. except KeyError:
  64. raise ImproperlyConfigured(
  65. "Unknown cache backend: %s. Please use one of the "
  66. "following backends: %s" % (self.backend,
  67. ", ".join(backends.keys())))
  68. def get(self, key):
  69. return self.client.get(key)
  70. def mget(self, keys):
  71. return self.client.get_multi(keys)
  72. def set(self, key, value):
  73. return self.client.set(key, value, self.expires)
  74. def delete(self, key):
  75. return self.client.delete(key)
  76. def on_chord_apply(self, setid, body, result=None, **kwargs):
  77. self.client.set(self.get_key_for_chord(setid), '0', time=86400)
  78. def incr(self, key):
  79. return self.client.incr(key)
  80. @cached_property
  81. def client(self):
  82. return self.Client(self.servers, **self.options)
  83. def __reduce__(self, args=(), kwargs={}):
  84. servers = ";".join(self.servers)
  85. backend = "%s://%s/" % (self.backend, servers)
  86. kwargs.update(
  87. dict(backend=backend,
  88. expires=self.expires,
  89. options=self.options))
  90. return super(CacheBackend, self).__reduce__(args, kwargs)