datastructures.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. """
  2. celery.datastructures
  3. =====================
  4. Custom data structures.
  5. :copyright: (c) 2009 - 2011 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. from __future__ import absolute_import
  9. from __future__ import with_statement
  10. import time
  11. import traceback
  12. from itertools import chain
  13. from Queue import Empty
  14. from threading import RLock
  15. from .utils.compat import OrderedDict
  16. class AttributeDictMixin(object):
  17. """Adds attribute access to mappings.
  18. `d.key -> d[key]`
  19. """
  20. def __getattr__(self, key):
  21. """`d.key -> d[key]`"""
  22. try:
  23. return self[key]
  24. except KeyError:
  25. raise AttributeError("'%s' object has no attribute '%s'" % (
  26. self.__class__.__name__, key))
  27. def __setattr__(self, key, value):
  28. """`d[key] = value -> d.key = value`"""
  29. self[key] = value
  30. class AttributeDict(dict, AttributeDictMixin):
  31. """Dict subclass with attribute access."""
  32. pass
  33. class DictAttribute(object):
  34. """Dict interface to attributes.
  35. `obj[k] -> obj.k`
  36. """
  37. def __init__(self, obj):
  38. self.obj = obj
  39. def get(self, key, default=None):
  40. try:
  41. return self[key]
  42. except KeyError:
  43. return default
  44. def setdefault(self, key, default):
  45. try:
  46. return self[key]
  47. except KeyError:
  48. self[key] = default
  49. return default
  50. def __getitem__(self, key):
  51. try:
  52. return getattr(self.obj, key)
  53. except AttributeError:
  54. raise KeyError(key)
  55. def __setitem__(self, key, value):
  56. setattr(self.obj, key, value)
  57. def __contains__(self, key):
  58. return hasattr(self.obj, key)
  59. def iteritems(self):
  60. return vars(self.obj).iteritems()
  61. class ConfigurationView(AttributeDictMixin):
  62. """A view over an applications configuration dicts.
  63. If the key does not exist in ``changes``, the ``defaults`` dict
  64. is consulted.
  65. :param changes: Dict containing changes to the configuration.
  66. :param defaults: Dict containing the default configuration.
  67. """
  68. changes = None
  69. defaults = None
  70. _order = None
  71. def __init__(self, changes, defaults):
  72. self.__dict__.update(changes=changes, defaults=defaults,
  73. _order=[changes] + defaults)
  74. def __getitem__(self, key):
  75. for d in self._order:
  76. try:
  77. return d[key]
  78. except KeyError:
  79. pass
  80. raise KeyError(key)
  81. def __setitem__(self, key, value):
  82. self.changes[key] = value
  83. def get(self, key, default=None):
  84. try:
  85. return self[key]
  86. except KeyError:
  87. return default
  88. def setdefault(self, key, default):
  89. try:
  90. return self[key]
  91. except KeyError:
  92. self[key] = default
  93. return default
  94. def update(self, *args, **kwargs):
  95. return self.changes.update(*args, **kwargs)
  96. def __contains__(self, key):
  97. for d in self._order:
  98. if key in d:
  99. return True
  100. return False
  101. def __repr__(self):
  102. return repr(dict(self.iteritems()))
  103. def __iter__(self):
  104. return self.iterkeys()
  105. def _iter(self, op):
  106. # defaults must be first in the stream, so values in
  107. # changes takes precedence.
  108. return chain(*[op(d) for d in reversed(self._order)])
  109. def iterkeys(self):
  110. return self._iter(lambda d: d.iterkeys())
  111. def iteritems(self):
  112. return self._iter(lambda d: d.iteritems())
  113. def itervalues(self):
  114. return self._iter(lambda d: d.itervalues())
  115. def keys(self):
  116. return list(self.iterkeys())
  117. def items(self):
  118. return list(self.iteritems())
  119. def values(self):
  120. return list(self.itervalues())
  121. class ExceptionInfo(object):
  122. """Exception wrapping an exception and its traceback.
  123. :param exc_info: The exception info tuple as returned by
  124. :func:`sys.exc_info`.
  125. """
  126. #: The original exception.
  127. exception = None
  128. #: A traceback form the point when :attr:`exception` was raised.
  129. traceback = None
  130. def __init__(self, exc_info):
  131. _, exception, _ = exc_info
  132. self.exception = exception
  133. self.traceback = ''.join(traceback.format_exception(*exc_info))
  134. def __str__(self):
  135. return self.traceback
  136. def __repr__(self):
  137. return "<ExceptionInfo: %r>" % (self.exception, )
  138. def consume_queue(queue):
  139. """Iterator yielding all immediately available items in a
  140. :class:`Queue.Queue`.
  141. The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
  142. *Examples*
  143. >>> q = Queue()
  144. >>> map(q.put, range(4))
  145. >>> list(consume_queue(q))
  146. [0, 1, 2, 3]
  147. >>> list(consume_queue(q))
  148. []
  149. """
  150. get = queue.get_nowait
  151. while 1:
  152. try:
  153. yield get()
  154. except Empty:
  155. break
  156. class LimitedSet(object):
  157. """Kind-of Set with limitations.
  158. Good for when you need to test for membership (`a in set`),
  159. but the list might become to big, so you want to limit it so it doesn't
  160. consume too much resources.
  161. :keyword maxlen: Maximum number of members before we start
  162. evicting expired members.
  163. :keyword expires: Time in seconds, before a membership expires.
  164. """
  165. __slots__ = ("maxlen", "expires", "_data")
  166. def __init__(self, maxlen=None, expires=None):
  167. self.maxlen = maxlen
  168. self.expires = expires
  169. self._data = {}
  170. def add(self, value):
  171. """Add a new member."""
  172. self._expire_item()
  173. self._data[value] = time.time()
  174. def clear(self):
  175. """Remove all members"""
  176. self._data.clear()
  177. def pop_value(self, value):
  178. """Remove membership by finding value."""
  179. self._data.pop(value, None)
  180. def _expire_item(self):
  181. """Hunt down and remove an expired item."""
  182. while 1:
  183. if self.maxlen and len(self) >= self.maxlen:
  184. value, when = self.first
  185. if not self.expires or time.time() > when + self.expires:
  186. try:
  187. self.pop_value(value)
  188. except TypeError: # pragma: no cover
  189. continue
  190. break
  191. def __contains__(self, value):
  192. return value in self._data
  193. def update(self, other):
  194. if isinstance(other, self.__class__):
  195. self._data.update(other._data)
  196. else:
  197. self._data.update(other)
  198. def as_dict(self):
  199. return self._data
  200. def __iter__(self):
  201. return iter(self._data.keys())
  202. def __len__(self):
  203. return len(self._data.keys())
  204. def __repr__(self):
  205. return "LimitedSet([%s])" % (repr(self._data.keys()))
  206. @property
  207. def chronologically(self):
  208. return sorted(self._data.items(), key=lambda (value, when): when)
  209. @property
  210. def first(self):
  211. """Get the oldest member."""
  212. return self.chronologically[0]
  213. class LocalCache(OrderedDict):
  214. """Dictionary with a finite number of keys.
  215. Older items expires first.
  216. """
  217. def __init__(self, limit=None):
  218. super(LocalCache, self).__init__()
  219. self.limit = limit
  220. self.lock = RLock()
  221. def __setitem__(self, key, value):
  222. with self.lock:
  223. while len(self) >= self.limit:
  224. self.popitem(last=False)
  225. super(LocalCache, self).__setitem__(key, value)
  226. def pop(self, key, *args):
  227. with self.lock:
  228. super(LocalCache, self).pop(key, *args)
  229. class TokenBucket(object):
  230. """Token Bucket Algorithm.
  231. See http://en.wikipedia.org/wiki/Token_Bucket
  232. Most of this code was stolen from an entry in the ASPN Python Cookbook:
  233. http://code.activestate.com/recipes/511490/
  234. .. admonition:: Thread safety
  235. This implementation may not be thread safe.
  236. """
  237. #: The rate in tokens/second that the bucket will be refilled
  238. fill_rate = None
  239. #: Maximum number of tokensin the bucket.
  240. capacity = 1
  241. #: Timestamp of the last time a token was taken out of the bucket.
  242. timestamp = None
  243. def __init__(self, fill_rate, capacity=1):
  244. self.capacity = float(capacity)
  245. self._tokens = capacity
  246. self.fill_rate = float(fill_rate)
  247. self.timestamp = time.time()
  248. def can_consume(self, tokens=1):
  249. """Returns :const:`True` if `tokens` number of tokens can be consumed
  250. from the bucket."""
  251. if tokens <= self._get_tokens():
  252. self._tokens -= tokens
  253. return True
  254. return False
  255. def expected_time(self, tokens=1):
  256. """Returns the expected time in seconds when a new token should be
  257. available.
  258. .. admonition:: Warning
  259. This consumes a token from the bucket.
  260. """
  261. _tokens = self._get_tokens()
  262. tokens = max(tokens, _tokens)
  263. return (tokens - _tokens) / self.fill_rate
  264. def _get_tokens(self):
  265. if self._tokens < self.capacity:
  266. now = time.time()
  267. delta = self.fill_rate * (now - self.timestamp)
  268. self._tokens = min(self.capacity, self._tokens + delta)
  269. self.timestamp = now
  270. return self._tokens