datastructures.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. """
  2. celery.datastructures
  3. =====================
  4. Custom types and data structures.
  5. """
  6. from __future__ import absolute_import
  7. from __future__ import with_statement
  8. import sys
  9. import time
  10. import traceback
  11. from itertools import chain
  12. from threading import RLock
  13. from .utils.compat import UserDict, OrderedDict
  14. __all__ = ["AttributeDictMixin", "AttributeDict", "DictAttribute",
  15. "ConfigurationView", "ExceptionInfo", "LimitedSet",
  16. "LRUCache", "TokenBucket"]
  17. class AttributeDictMixin(object):
  18. """Adds attribute access to mappings.
  19. `d.key -> d[key]`
  20. """
  21. def __getattr__(self, key):
  22. """`d.key -> d[key]`"""
  23. try:
  24. return self[key]
  25. except KeyError:
  26. raise AttributeError("'%s' object has no attribute '%s'" % (
  27. self.__class__.__name__, key))
  28. def __setattr__(self, key, value):
  29. """`d[key] = value -> d.key = value`"""
  30. self[key] = value
  31. class AttributeDict(dict, AttributeDictMixin):
  32. """Dict subclass with attribute access."""
  33. pass
  34. class DictAttribute(object):
  35. """Dict interface to attributes.
  36. `obj[k] -> obj.k`
  37. """
  38. def __init__(self, obj):
  39. self.obj = obj
  40. def get(self, key, default=None):
  41. try:
  42. return self[key]
  43. except KeyError:
  44. return default
  45. def setdefault(self, key, default):
  46. try:
  47. return self[key]
  48. except KeyError:
  49. self[key] = default
  50. return default
  51. def __getitem__(self, key):
  52. try:
  53. return getattr(self.obj, key)
  54. except AttributeError:
  55. raise KeyError(key)
  56. def __setitem__(self, key, value):
  57. setattr(self.obj, key, value)
  58. def __contains__(self, key):
  59. return hasattr(self.obj, key)
  60. def _iterate_items(self):
  61. return vars(self.obj).iteritems()
  62. iteritems = _iterate_items
  63. if sys.version_info >= (3, 0):
  64. items = _iterate_items
  65. else:
  66. def items(self):
  67. return list(self._iterate_items())
  68. class ConfigurationView(AttributeDictMixin):
  69. """A view over an applications configuration dicts.
  70. If the key does not exist in ``changes``, the ``defaults`` dict
  71. is consulted.
  72. :param changes: Dict containing changes to the configuration.
  73. :param defaults: Dict containing the default configuration.
  74. """
  75. changes = None
  76. defaults = None
  77. _order = None
  78. def __init__(self, changes, defaults):
  79. self.__dict__.update(changes=changes, defaults=defaults,
  80. _order=[changes] + defaults)
  81. def __getitem__(self, key):
  82. for d in self._order:
  83. try:
  84. return d[key]
  85. except KeyError:
  86. pass
  87. raise KeyError(key)
  88. def __setitem__(self, key, value):
  89. self.changes[key] = value
  90. def get(self, key, default=None):
  91. try:
  92. return self[key]
  93. except KeyError:
  94. return default
  95. def setdefault(self, key, default):
  96. try:
  97. return self[key]
  98. except KeyError:
  99. self[key] = default
  100. return default
  101. def update(self, *args, **kwargs):
  102. return self.changes.update(*args, **kwargs)
  103. def __contains__(self, key):
  104. for d in self._order:
  105. if key in d:
  106. return True
  107. return False
  108. def __repr__(self):
  109. return repr(dict(self.iteritems()))
  110. def __iter__(self):
  111. return self.iterkeys()
  112. def _iter(self, op):
  113. # defaults must be first in the stream, so values in
  114. # changes takes precedence.
  115. return chain(*[op(d) for d in reversed(self._order)])
  116. def _iterate_keys(self):
  117. return self._iter(lambda d: d.iterkeys())
  118. iterkeys = _iterate_keys
  119. def _iterate_items(self):
  120. return self._iter(lambda d: d.iteritems())
  121. iteritems = _iterate_items
  122. def _iterate_values(self):
  123. return self._iter(lambda d: d.itervalues())
  124. itervalues = _iterate_values
  125. def keys(self):
  126. return list(self._iterate_keys())
  127. def items(self):
  128. return list(self._iterate_items())
  129. def values(self):
  130. return list(self._iterate_values())
  131. class _Code(object):
  132. def __init__(self, code):
  133. self.co_filename = code.co_filename
  134. self.co_name = code.co_name
  135. class _Frame(object):
  136. Code = _Code
  137. def __init__(self, frame):
  138. self.f_globals = {"__file__": frame.f_globals["__file__"]}
  139. self.f_code = self.Code(frame.f_code)
  140. class Traceback(object):
  141. Frame = _Frame
  142. def __init__(self, tb):
  143. self.tb_frame = self.Frame(tb.tb_frame)
  144. self.tb_lineno = tb.tb_lineno
  145. if tb.tb_next is None:
  146. self.tb_next = None
  147. else:
  148. self.tb_next = Traceback(tb.tb_next)
  149. class ExceptionInfo(object):
  150. """Exception wrapping an exception and its traceback.
  151. :param exc_info: The exception info tuple as returned by
  152. :func:`sys.exc_info`.
  153. """
  154. #: Exception type.
  155. type = None
  156. #: Exception instance.
  157. exception = None
  158. #: Pickleable traceback instance for use with :mod:`traceback`
  159. tb = None
  160. #: String representation of the traceback.
  161. traceback = None
  162. def __init__(self, exc_info):
  163. self.type, self.exception, tb = exc_info
  164. self.tb = Traceback(tb)
  165. self.traceback = ''.join(traceback.format_exception(*exc_info))
  166. def __str__(self):
  167. return self.traceback
  168. def __repr__(self):
  169. return "<ExceptionInfo: %r>" % (self.exception, )
  170. @property
  171. def exc_info(self):
  172. return self.type, self.exception, self.tb
  173. class LimitedSet(object):
  174. """Kind-of Set with limitations.
  175. Good for when you need to test for membership (`a in set`),
  176. but the list might become to big, so you want to limit it so it doesn't
  177. consume too much resources.
  178. :keyword maxlen: Maximum number of members before we start
  179. evicting expired members.
  180. :keyword expires: Time in seconds, before a membership expires.
  181. """
  182. __slots__ = ("maxlen", "expires", "_data")
  183. def __init__(self, maxlen=None, expires=None):
  184. self.maxlen = maxlen
  185. self.expires = expires
  186. self._data = {}
  187. def add(self, value):
  188. """Add a new member."""
  189. self._expire_item()
  190. self._data[value] = time.time()
  191. def clear(self):
  192. """Remove all members"""
  193. self._data.clear()
  194. def pop_value(self, value):
  195. """Remove membership by finding value."""
  196. self._data.pop(value, None)
  197. def _expire_item(self):
  198. """Hunt down and remove an expired item."""
  199. while 1:
  200. if self.maxlen and len(self) >= self.maxlen:
  201. value, when = self.first
  202. if not self.expires or time.time() > when + self.expires:
  203. try:
  204. self.pop_value(value)
  205. except TypeError: # pragma: no cover
  206. continue
  207. break
  208. def __contains__(self, value):
  209. return value in self._data
  210. def update(self, other):
  211. if isinstance(other, self.__class__):
  212. self._data.update(other._data)
  213. else:
  214. self._data.update(other)
  215. def as_dict(self):
  216. return self._data
  217. def __iter__(self):
  218. return iter(self._data.keys())
  219. def __len__(self):
  220. return len(self._data.keys())
  221. def __repr__(self):
  222. return "LimitedSet([%s])" % (repr(self._data.keys()))
  223. @property
  224. def chronologically(self):
  225. return sorted(self._data.items(), key=lambda (value, when): when)
  226. @property
  227. def first(self):
  228. """Get the oldest member."""
  229. return self.chronologically[0]
  230. class LRUCache(UserDict):
  231. """LRU Cache implementation using a doubly linked list to track access.
  232. :keyword limit: The maximum number of keys to keep in the cache.
  233. When a new key is inserted and the limit has been exceeded,
  234. the *Least Recently Used* key will be discarded from the
  235. cache.
  236. """
  237. def __init__(self, limit=None):
  238. self.limit = limit
  239. self.mutex = RLock()
  240. self.data = OrderedDict()
  241. def __getitem__(self, key):
  242. with self.mutex:
  243. value = self[key] = self.data.pop(key)
  244. return value
  245. def keys(self):
  246. # userdict.keys in py3k calls __getitem__
  247. return self.data.keys()
  248. def values(self):
  249. return list(self._iterate_values())
  250. def items(self):
  251. return list(self._iterate_items())
  252. def __setitem__(self, key, value):
  253. # remove least recently used key.
  254. with self.mutex:
  255. if self.limit and len(self.data) >= self.limit:
  256. self.data.pop(iter(self.data).next())
  257. self.data[key] = value
  258. def __iter__(self):
  259. return self.data.iterkeys()
  260. def _iterate_items(self):
  261. for k in self.data:
  262. try:
  263. yield (k, self.data[k])
  264. except KeyError:
  265. pass
  266. iteritems = _iterate_items
  267. def _iterate_values(self):
  268. for k in self.data:
  269. try:
  270. yield self.data[k]
  271. except KeyError:
  272. pass
  273. itervalues = _iterate_values
  274. class TokenBucket(object):
  275. """Token Bucket Algorithm.
  276. See http://en.wikipedia.org/wiki/Token_Bucket
  277. Most of this code was stolen from an entry in the ASPN Python Cookbook:
  278. http://code.activestate.com/recipes/511490/
  279. .. admonition:: Thread safety
  280. This implementation may not be thread safe.
  281. """
  282. #: The rate in tokens/second that the bucket will be refilled
  283. fill_rate = None
  284. #: Maximum number of tokensin the bucket.
  285. capacity = 1
  286. #: Timestamp of the last time a token was taken out of the bucket.
  287. timestamp = None
  288. def __init__(self, fill_rate, capacity=1):
  289. self.capacity = float(capacity)
  290. self._tokens = capacity
  291. self.fill_rate = float(fill_rate)
  292. self.timestamp = time.time()
  293. def can_consume(self, tokens=1):
  294. """Returns :const:`True` if `tokens` number of tokens can be consumed
  295. from the bucket."""
  296. if tokens <= self._get_tokens():
  297. self._tokens -= tokens
  298. return True
  299. return False
  300. def expected_time(self, tokens=1):
  301. """Returns the expected time in seconds when a new token should be
  302. available.
  303. .. admonition:: Warning
  304. This consumes a token from the bucket.
  305. """
  306. _tokens = self._get_tokens()
  307. tokens = max(tokens, _tokens)
  308. return (tokens - _tokens) / self.fill_rate
  309. def _get_tokens(self):
  310. if self._tokens < self.capacity:
  311. now = time.time()
  312. delta = self.fill_rate * (now - self.timestamp)
  313. self._tokens = min(self.capacity, self._tokens + delta)
  314. self.timestamp = now
  315. return self._tokens