datastructures.py 11 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.datastructures
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Custom types and data structures.
  6. :copyright: (c) 2009 - 2011 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import sys
  12. import time
  13. import traceback
  14. from itertools import chain
  15. from threading import RLock
  16. from .utils.compat import UserDict, OrderedDict
  17. class AttributeDictMixin(object):
  18. """Adds attribute access to mappings.
  19. `d.key -> d[key]`
  20. """
  21. def __getattr__(self, key):
  22. """`d.key -> d[key]`"""
  23. try:
  24. return self[key]
  25. except KeyError:
  26. raise AttributeError("'%s' object has no attribute '%s'" % (
  27. self.__class__.__name__, key))
  28. def __setattr__(self, key, value):
  29. """`d[key] = value -> d.key = value`"""
  30. self[key] = value
  31. class AttributeDict(dict, AttributeDictMixin):
  32. """Dict subclass with attribute access."""
  33. pass
  34. class DictAttribute(object):
  35. """Dict interface to attributes.
  36. `obj[k] -> obj.k`
  37. """
  38. def __init__(self, obj):
  39. self.obj = obj
  40. def get(self, key, default=None):
  41. try:
  42. return self[key]
  43. except KeyError:
  44. return default
  45. def setdefault(self, key, default):
  46. try:
  47. return self[key]
  48. except KeyError:
  49. self[key] = default
  50. return default
  51. def __getitem__(self, key):
  52. try:
  53. return getattr(self.obj, key)
  54. except AttributeError:
  55. raise KeyError(key)
  56. def __setitem__(self, key, value):
  57. setattr(self.obj, key, value)
  58. def __contains__(self, key):
  59. return hasattr(self.obj, key)
  60. def _iterate_items(self):
  61. return vars(self.obj).iteritems()
  62. iteritems = _iterate_items
  63. if sys.version_info >= (3, 0):
  64. items = _iterate_items
  65. else:
  66. def items(self):
  67. return list(self._iterate_items())
  68. class ConfigurationView(AttributeDictMixin):
  69. """A view over an applications configuration dicts.
  70. If the key does not exist in ``changes``, the ``defaults`` dict
  71. is consulted.
  72. :param changes: Dict containing changes to the configuration.
  73. :param defaults: Dict containing the default configuration.
  74. """
  75. changes = None
  76. defaults = None
  77. _order = None
  78. def __init__(self, changes, defaults):
  79. self.__dict__.update(changes=changes, defaults=defaults,
  80. _order=[changes] + defaults)
  81. def __getitem__(self, key):
  82. for d in self._order:
  83. try:
  84. return d[key]
  85. except KeyError:
  86. pass
  87. raise KeyError(key)
  88. def __setitem__(self, key, value):
  89. self.changes[key] = value
  90. def get(self, key, default=None):
  91. try:
  92. return self[key]
  93. except KeyError:
  94. return default
  95. def setdefault(self, key, default):
  96. try:
  97. return self[key]
  98. except KeyError:
  99. self[key] = default
  100. return default
  101. def update(self, *args, **kwargs):
  102. return self.changes.update(*args, **kwargs)
  103. def __contains__(self, key):
  104. for d in self._order:
  105. if key in d:
  106. return True
  107. return False
  108. def __repr__(self):
  109. return repr(dict(self.iteritems()))
  110. def __iter__(self):
  111. return self.iterkeys()
  112. def _iter(self, op):
  113. # defaults must be first in the stream, so values in
  114. # changes takes precedence.
  115. return chain(*[op(d) for d in reversed(self._order)])
  116. def _iterate_keys(self):
  117. return self._iter(lambda d: d.iterkeys())
  118. iterkeys = _iterate_keys
  119. def _iterate_items(self):
  120. return self._iter(lambda d: d.iteritems())
  121. iteritems = _iterate_items
  122. def _iterate_values(self):
  123. return self._iter(lambda d: d.itervalues())
  124. itervalues = _iterate_values
  125. def keys(self):
  126. return list(self._iterate_keys())
  127. def items(self):
  128. return list(self._iterate_items())
  129. def values(self):
  130. return list(self._iterate_values())
  131. class _Code(object):
  132. def __init__(self, code):
  133. self.co_filename = code.co_filename
  134. self.co_name = code.co_name
  135. class _Frame(object):
  136. Code = _Code
  137. def __init__(self, frame):
  138. self.f_globals = {
  139. "__file__": frame.f_globals.get("__file__", "__main__"),
  140. }
  141. self.f_code = self.Code(frame.f_code)
  142. class Traceback(object):
  143. Frame = _Frame
  144. def __init__(self, tb):
  145. self.tb_frame = self.Frame(tb.tb_frame)
  146. self.tb_lineno = tb.tb_lineno
  147. if tb.tb_next is None:
  148. self.tb_next = None
  149. else:
  150. self.tb_next = Traceback(tb.tb_next)
  151. class ExceptionInfo(object):
  152. """Exception wrapping an exception and its traceback.
  153. :param exc_info: The exception info tuple as returned by
  154. :func:`sys.exc_info`.
  155. """
  156. #: Exception type.
  157. type = None
  158. #: Exception instance.
  159. exception = None
  160. #: Pickleable traceback instance for use with :mod:`traceback`
  161. tb = None
  162. #: String representation of the traceback.
  163. traceback = None
  164. def __init__(self, exc_info):
  165. self.type, self.exception, tb = exc_info
  166. self.tb = Traceback(tb)
  167. self.traceback = ''.join(traceback.format_exception(*exc_info))
  168. def __str__(self):
  169. return self.traceback
  170. def __repr__(self):
  171. return "<ExceptionInfo: %r>" % (self.exception, )
  172. @property
  173. def exc_info(self):
  174. return self.type, self.exception, self.tb
  175. class LimitedSet(object):
  176. """Kind-of Set with limitations.
  177. Good for when you need to test for membership (`a in set`),
  178. but the list might become to big, so you want to limit it so it doesn't
  179. consume too much resources.
  180. :keyword maxlen: Maximum number of members before we start
  181. evicting expired members.
  182. :keyword expires: Time in seconds, before a membership expires.
  183. """
  184. __slots__ = ("maxlen", "expires", "_data")
  185. def __init__(self, maxlen=None, expires=None):
  186. self.maxlen = maxlen
  187. self.expires = expires
  188. self._data = {}
  189. def add(self, value):
  190. """Add a new member."""
  191. self._expire_item()
  192. self._data[value] = time.time()
  193. def clear(self):
  194. """Remove all members"""
  195. self._data.clear()
  196. def pop_value(self, value):
  197. """Remove membership by finding value."""
  198. self._data.pop(value, None)
  199. def _expire_item(self):
  200. """Hunt down and remove an expired item."""
  201. while 1:
  202. if self.maxlen and len(self) >= self.maxlen:
  203. value, when = self.first
  204. if not self.expires or time.time() > when + self.expires:
  205. try:
  206. self.pop_value(value)
  207. except TypeError: # pragma: no cover
  208. continue
  209. break
  210. def __contains__(self, value):
  211. return value in self._data
  212. def update(self, other):
  213. if isinstance(other, self.__class__):
  214. self._data.update(other._data)
  215. else:
  216. self._data.update(other)
  217. def as_dict(self):
  218. return self._data
  219. def __iter__(self):
  220. return iter(self._data.keys())
  221. def __len__(self):
  222. return len(self._data.keys())
  223. def __repr__(self):
  224. return "LimitedSet([%s])" % (repr(self._data.keys()))
  225. @property
  226. def chronologically(self):
  227. return sorted(self._data.items(), key=lambda (value, when): when)
  228. @property
  229. def first(self):
  230. """Get the oldest member."""
  231. return self.chronologically[0]
  232. class LRUCache(UserDict):
  233. """LRU Cache implementation using a doubly linked list to track access.
  234. :keyword limit: The maximum number of keys to keep in the cache.
  235. When a new key is inserted and the limit has been exceeded,
  236. the *Least Recently Used* key will be discarded from the
  237. cache.
  238. """
  239. def __init__(self, limit=None):
  240. self.limit = limit
  241. self.mutex = RLock()
  242. self.data = OrderedDict()
  243. def __getitem__(self, key):
  244. with self.mutex:
  245. value = self[key] = self.data.pop(key)
  246. return value
  247. def keys(self):
  248. # userdict.keys in py3k calls __getitem__
  249. return self.data.keys()
  250. def values(self):
  251. return list(self._iterate_values())
  252. def items(self):
  253. return list(self._iterate_items())
  254. def __setitem__(self, key, value):
  255. # remove least recently used key.
  256. with self.mutex:
  257. if self.limit and len(self.data) >= self.limit:
  258. self.data.pop(iter(self.data).next())
  259. self.data[key] = value
  260. def __iter__(self):
  261. return self.data.iterkeys()
  262. def _iterate_items(self):
  263. for k in self.data:
  264. try:
  265. yield (k, self.data[k])
  266. except KeyError:
  267. pass
  268. iteritems = _iterate_items
  269. def _iterate_values(self):
  270. for k in self.data:
  271. try:
  272. yield self.data[k]
  273. except KeyError:
  274. pass
  275. itervalues = _iterate_values
  276. class TokenBucket(object):
  277. """Token Bucket Algorithm.
  278. See http://en.wikipedia.org/wiki/Token_Bucket
  279. Most of this code was stolen from an entry in the ASPN Python Cookbook:
  280. http://code.activestate.com/recipes/511490/
  281. .. admonition:: Thread safety
  282. This implementation may not be thread safe.
  283. """
  284. #: The rate in tokens/second that the bucket will be refilled
  285. fill_rate = None
  286. #: Maximum number of tokensin the bucket.
  287. capacity = 1
  288. #: Timestamp of the last time a token was taken out of the bucket.
  289. timestamp = None
  290. def __init__(self, fill_rate, capacity=1):
  291. self.capacity = float(capacity)
  292. self._tokens = capacity
  293. self.fill_rate = float(fill_rate)
  294. self.timestamp = time.time()
  295. def can_consume(self, tokens=1):
  296. """Returns :const:`True` if `tokens` number of tokens can be consumed
  297. from the bucket."""
  298. if tokens <= self._get_tokens():
  299. self._tokens -= tokens
  300. return True
  301. return False
  302. def expected_time(self, tokens=1):
  303. """Returns the expected time in seconds when a new token should be
  304. available.
  305. .. admonition:: Warning
  306. This consumes a token from the bucket.
  307. """
  308. _tokens = self._get_tokens()
  309. tokens = max(tokens, _tokens)
  310. return (tokens - _tokens) / self.fill_rate
  311. def _get_tokens(self):
  312. if self._tokens < self.capacity:
  313. now = time.time()
  314. delta = self.fill_rate * (now - self.timestamp)
  315. self._tokens = min(self.capacity, self._tokens + delta)
  316. self.timestamp = now
  317. return self._tokens