datastructures.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. from __future__ import generators
  2. import time
  3. import traceback
  4. from UserList import UserList
  5. from Queue import Queue, Empty as QueueEmpty
  6. from celery.utils.compat import OrderedDict
  7. class AttributeDictMixin(object):
  8. def __getattr__(self, key):
  9. try:
  10. return self[key]
  11. except KeyError:
  12. raise AttributeError("'%s' object has no attribute '%s'" % (
  13. self.__class__.__name__, key))
  14. def __setattr__(self, key, value):
  15. self[key] = value
  16. class AttributeDict(dict, AttributeDictMixin):
  17. """Dict subclass with attribute access."""
  18. pass
  19. class DictAttribute(object):
  20. def __init__(self, obj):
  21. self.obj = obj
  22. def get(self, key, default=None):
  23. try:
  24. return self[key]
  25. except KeyError:
  26. return default
  27. def setdefault(self, key, default):
  28. try:
  29. return self[key]
  30. except KeyError:
  31. self[key] = default
  32. return default
  33. def __getitem__(self, key):
  34. try:
  35. return getattr(self.obj, key)
  36. except AttributeError:
  37. raise KeyError(key)
  38. def __setitem__(self, key, value):
  39. setattr(self.obj, key, value)
  40. def __contains__(self, key):
  41. return hasattr(self.obj, key)
  42. def iteritems(self):
  43. return vars(self.obj).iteritems()
  44. class PositionQueue(UserList):
  45. """A positional queue of a specific length, with slots that are either
  46. filled or unfilled. When all of the positions are filled, the queue
  47. is considered :meth:`full`.
  48. :param length: see :attr:`length`.
  49. .. attribute:: length
  50. The number of items required for the queue to be considered full.
  51. """
  52. class UnfilledPosition(object):
  53. """Describes an unfilled slot."""
  54. def __init__(self, position):
  55. # This is not used, but is an argument from xrange
  56. # so why not.
  57. self.position = position
  58. def __init__(self, length):
  59. self.length = length
  60. self.data = map(self.UnfilledPosition, xrange(length))
  61. def full(self):
  62. """Returns ``True`` if all of the slots has been filled."""
  63. return len(self) >= self.length
  64. def __len__(self):
  65. """``len(self)`` -> number of slots filled with real values."""
  66. return len(self.filled)
  67. @property
  68. def filled(self):
  69. """Returns the filled slots as a list."""
  70. return filter(lambda v: not isinstance(v, self.UnfilledPosition),
  71. self.data)
  72. class ExceptionInfo(object):
  73. """Exception wrapping an exception and its traceback.
  74. :param exc_info: The exception tuple info as returned by
  75. :func:`traceback.format_exception`.
  76. .. attribute:: exception
  77. The original exception.
  78. .. attribute:: traceback
  79. A traceback from the point when :attr:`exception` was raised.
  80. """
  81. def __init__(self, exc_info):
  82. type_, exception, tb = exc_info
  83. self.exception = exception
  84. self.traceback = ''.join(traceback.format_exception(*exc_info))
  85. def __str__(self):
  86. return self.traceback
  87. def __repr__(self):
  88. return "<%s.%s: %s>" % (
  89. self.__class__.__module__,
  90. self.__class__.__name__,
  91. str(self.exception))
  92. def consume_queue(queue):
  93. """Iterator yielding all immediately available items in a
  94. :class:`Queue.Queue`.
  95. The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
  96. Example
  97. >>> q = Queue()
  98. >>> map(q.put, range(4))
  99. >>> list(consume_queue(q))
  100. [0, 1, 2, 3]
  101. >>> list(consume_queue(q))
  102. []
  103. """
  104. while 1:
  105. try:
  106. yield queue.get_nowait()
  107. except QueueEmpty:
  108. break
  109. class SharedCounter(object):
  110. """Thread-safe counter.
  111. Please note that the final value is not synchronized, this means
  112. that you should not update the value by using a previous value, the only
  113. reliable operations are increment and decrement.
  114. Example
  115. >>> max_clients = SharedCounter(initial_value=10)
  116. # Thread one
  117. >>> max_clients += 1 # OK (safe)
  118. # Thread two
  119. >>> max_clients -= 3 # OK (safe)
  120. # Main thread
  121. >>> if client >= int(max_clients): # Max clients now at 8
  122. ... wait()
  123. >>> max_client = max_clients + 10 # NOT OK (unsafe)
  124. """
  125. def __init__(self, initial_value):
  126. self._value = initial_value
  127. self._modify_queue = Queue()
  128. def increment(self, n=1):
  129. """Increment value."""
  130. self += n
  131. return int(self)
  132. def decrement(self, n=1):
  133. """Decrement value."""
  134. self -= n
  135. return int(self)
  136. def _update_value(self):
  137. self._value += sum(consume_queue(self._modify_queue))
  138. return self._value
  139. def __iadd__(self, y):
  140. """``self += y``"""
  141. self._modify_queue.put(y * +1)
  142. return self
  143. def __isub__(self, y):
  144. """``self -= y``"""
  145. self._modify_queue.put(y * -1)
  146. return self
  147. def __int__(self):
  148. """``int(self) -> int``"""
  149. return self._update_value()
  150. def __repr__(self):
  151. return "<SharedCounter: int(%s)>" % str(int(self))
  152. class LimitedSet(object):
  153. """Kind-of Set with limitations.
  154. Good for when you need to test for membership (``a in set``),
  155. but the list might become to big, so you want to limit it so it doesn't
  156. consume too much resources.
  157. :keyword maxlen: Maximum number of members before we start
  158. deleting expired members.
  159. :keyword expires: Time in seconds, before a membership expires.
  160. """
  161. def __init__(self, maxlen=None, expires=None):
  162. self.maxlen = maxlen
  163. self.expires = expires
  164. self._data = {}
  165. def add(self, value):
  166. """Add a new member."""
  167. self._expire_item()
  168. self._data[value] = time.time()
  169. def pop_value(self, value):
  170. """Remove membership by finding value."""
  171. self._data.pop(value, None)
  172. def _expire_item(self):
  173. """Hunt down and remove an expired item."""
  174. while 1:
  175. if self.maxlen and len(self) >= self.maxlen:
  176. value, when = self.first
  177. if not self.expires or time.time() > when + self.expires:
  178. try:
  179. self.pop_value(value)
  180. except TypeError: # pragma: no cover
  181. continue
  182. break
  183. def __contains__(self, value):
  184. return value in self._data
  185. def update(self, other):
  186. if isinstance(other, self.__class__):
  187. self._data.update(other._data)
  188. else:
  189. self._data.update(other)
  190. def as_dict(self):
  191. return self._data
  192. def __iter__(self):
  193. return iter(self._data.keys())
  194. def __len__(self):
  195. return len(self._data.keys())
  196. def __repr__(self):
  197. return "LimitedSet([%s])" % (repr(self._data.keys()))
  198. @property
  199. def chronologically(self):
  200. return sorted(self._data.items(), key=lambda (value, when): when)
  201. @property
  202. def first(self):
  203. """Get the oldest member."""
  204. return self.chronologically[0]
  205. class LocalCache(OrderedDict):
  206. """Dictionary with a finite number of keys.
  207. Older items expires first.
  208. """
  209. def __init__(self, limit=None):
  210. super(LocalCache, self).__init__()
  211. self.limit = limit
  212. def __setitem__(self, key, value):
  213. while len(self) >= self.limit:
  214. self.popitem(last=False)
  215. super(LocalCache, self).__setitem__(key, value)
  216. class TokenBucket(object):
  217. """Token Bucket Algorithm.
  218. See http://en.wikipedia.org/wiki/Token_Bucket
  219. Most of this code was stolen from an entry in the ASPN Python Cookbook:
  220. http://code.activestate.com/recipes/511490/
  221. :param fill_rate: see :attr:`fill_rate`.
  222. :keyword capacity: see :attr:`capacity`.
  223. .. attribute:: fill_rate
  224. The rate in tokens/second that the bucket will be refilled.
  225. .. attribute:: capacity
  226. Maximum number of tokens in the bucket. Default is ``1``.
  227. .. attribute:: timestamp
  228. Timestamp of the last time a token was taken out of the bucket.
  229. """
  230. def __init__(self, fill_rate, capacity=1):
  231. self.capacity = float(capacity)
  232. self._tokens = capacity
  233. self.fill_rate = float(fill_rate)
  234. self.timestamp = time.time()
  235. def can_consume(self, tokens=1):
  236. if tokens <= self._get_tokens():
  237. self._tokens -= tokens
  238. return True
  239. return False
  240. def expected_time(self, tokens=1):
  241. """Returns the expected time in seconds when a new token should be
  242. available. *Note: consumes a token from the bucket*"""
  243. _tokens = self._get_tokens()
  244. tokens = max(tokens, _tokens)
  245. return (tokens - _tokens) / self.fill_rate
  246. def _get_tokens(self):
  247. if self._tokens < self.capacity:
  248. now = time.time()
  249. delta = self.fill_rate * (now - self.timestamp)
  250. self._tokens = min(self.capacity, self._tokens + delta)
  251. self.timestamp = now
  252. return self._tokens