cache.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.cache
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Memcache and in-memory cache result backend.
  6. """
  7. from __future__ import absolute_import
  8. from kombu.utils import cached_property
  9. from celery.exceptions import ImproperlyConfigured
  10. from celery.utils.functional import LRUCache
  11. from .base import KeyValueStoreBackend
  12. __all__ = ['CacheBackend']
  13. _imp = [None]
  14. REQUIRES_BACKEND = """\
  15. The memcached backend requires either pylibmc or python-memcached.\
  16. """
  17. UNKNOWN_BACKEND = """\
  18. The cache backend {0!r} is unknown,
  19. Please use one of the following backends instead: {1}\
  20. """
  21. def import_best_memcache():
  22. if _imp[0] is None:
  23. is_pylibmc = False
  24. try:
  25. import pylibmc as memcache
  26. is_pylibmc = True
  27. except ImportError:
  28. try:
  29. import memcache # noqa
  30. except ImportError:
  31. raise ImproperlyConfigured(REQUIRES_BACKEND)
  32. _imp[0] = (is_pylibmc, memcache)
  33. return _imp[0]
  34. def get_best_memcache(*args, **kwargs):
  35. behaviors = kwargs.pop('behaviors', None)
  36. is_pylibmc, memcache = import_best_memcache()
  37. client = memcache.Client(*args, **kwargs)
  38. if is_pylibmc and behaviors is not None:
  39. client.behaviors = behaviors
  40. return client
  41. class DummyClient(object):
  42. def __init__(self, *args, **kwargs):
  43. self.cache = LRUCache(limit=5000)
  44. def get(self, key, *args, **kwargs):
  45. return self.cache.get(key)
  46. def get_multi(self, keys):
  47. cache = self.cache
  48. return dict((k, cache[k]) for k in keys if k in cache)
  49. def set(self, key, value, *args, **kwargs):
  50. self.cache[key] = value
  51. def delete(self, key, *args, **kwargs):
  52. self.cache.pop(key, None)
  53. def incr(self, key, delta=1):
  54. return self.cache.incr(key, delta)
  55. backends = {'memcache': lambda: get_best_memcache,
  56. 'memcached': lambda: get_best_memcache,
  57. 'pylibmc': lambda: get_best_memcache,
  58. 'memory': lambda: DummyClient}
  59. class CacheBackend(KeyValueStoreBackend):
  60. servers = None
  61. supports_autoexpire = True
  62. supports_native_join = True
  63. implements_incr = True
  64. def __init__(self, app, expires=None, backend=None,
  65. options={}, url=None, **kwargs):
  66. super(CacheBackend, self).__init__(app, **kwargs)
  67. self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
  68. **options)
  69. self.backend = url or backend or self.app.conf.CELERY_CACHE_BACKEND
  70. if self.backend:
  71. self.backend, _, servers = self.backend.partition('://')
  72. self.servers = servers.rstrip('/').split(';')
  73. self.expires = self.prepare_expires(expires, type=int)
  74. try:
  75. self.Client = backends[self.backend]()
  76. except KeyError:
  77. raise ImproperlyConfigured(UNKNOWN_BACKEND.format(
  78. self.backend, ', '.join(backends)))
  79. def get(self, key):
  80. return self.client.get(key)
  81. def mget(self, keys):
  82. return self.client.get_multi(keys)
  83. def set(self, key, value):
  84. return self.client.set(key, value, self.expires)
  85. def delete(self, key):
  86. return self.client.delete(key)
  87. def on_chord_apply(self, group_id, body, result=None, **kwargs):
  88. self.client.set(self.get_key_for_chord(group_id), '0', time=86400)
  89. self.save_group(group_id, self.app.GroupResult(group_id, result))
  90. def incr(self, key):
  91. return self.client.incr(key)
  92. @cached_property
  93. def client(self):
  94. return self.Client(self.servers, **self.options)
  95. def __reduce__(self, args=(), kwargs={}):
  96. servers = ';'.join(self.servers)
  97. backend = '{0}://{1}/'.format(self.backend, servers)
  98. kwargs.update(
  99. dict(backend=backend,
  100. expires=self.expires,
  101. options=self.options))
  102. return super(CacheBackend, self).__reduce__(args, kwargs)