datastructures.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. """
  2. celery.datastructures
  3. =====================
  4. Custom data structures.
  5. :copyright: (c) 2009 - 2010 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. from __future__ import generators
  9. import time
  10. import traceback
  11. from itertools import chain
  12. from Queue import Queue, Empty as QueueEmpty
  13. from celery.utils.compat import OrderedDict
  14. class AttributeDictMixin(object):
  15. """Adds attribute access to mappings.
  16. `d.key -> d[key]`
  17. """
  18. def __getattr__(self, key):
  19. """`d.key -> d[key]`"""
  20. try:
  21. return self[key]
  22. except KeyError:
  23. raise AttributeError("'%s' object has no attribute '%s'" % (
  24. self.__class__.__name__, key))
  25. def __setattr__(self, key, value):
  26. """`d[key] = value -> d.key = value`"""
  27. self[key] = value
  28. class AttributeDict(dict, AttributeDictMixin):
  29. """Dict subclass with attribute access."""
  30. pass
  31. class DictAttribute(object):
  32. """Dict interface to attributes.
  33. `obj[k] -> obj.k`
  34. """
  35. def __init__(self, obj):
  36. self.obj = obj
  37. def get(self, key, default=None):
  38. try:
  39. return self[key]
  40. except KeyError:
  41. return default
  42. def setdefault(self, key, default):
  43. try:
  44. return self[key]
  45. except KeyError:
  46. self[key] = default
  47. return default
  48. def __getitem__(self, key):
  49. try:
  50. return getattr(self.obj, key)
  51. except AttributeError:
  52. raise KeyError(key)
  53. def __setitem__(self, key, value):
  54. setattr(self.obj, key, value)
  55. def __contains__(self, key):
  56. return hasattr(self.obj, key)
  57. def iteritems(self):
  58. return vars(self.obj).iteritems()
  59. class ConfigurationView(AttributeDictMixin):
  60. changes = None
  61. defaults = None
  62. def __init__(self, changes, defaults):
  63. self.__dict__["changes"] = changes
  64. self.__dict__["defaults"] = defaults
  65. self.__dict__["_order"] = [changes] + defaults
  66. def __getitem__(self, key):
  67. for d in self.__dict__["_order"]:
  68. try:
  69. return d[key]
  70. except KeyError:
  71. pass
  72. raise KeyError(key)
  73. def __setitem__(self, key, value):
  74. self.__dict__["changes"][key] = value
  75. def get(self, key, default=None):
  76. try:
  77. return self[key]
  78. except KeyError:
  79. return default
  80. def setdefault(self, key, default):
  81. try:
  82. return self[key]
  83. except KeyError:
  84. self[key] = default
  85. return default
  86. def update(self, *args, **kwargs):
  87. return self.__dict__["changes"].update(*args, **kwargs)
  88. def __contains__(self, key):
  89. for d in self.__dict__["_order"]:
  90. if key in d:
  91. return True
  92. return False
  93. def __repr__(self):
  94. return repr(dict(iter(self)))
  95. def __iter__(self):
  96. # defaults must be first in the stream, so values in
  97. # in changes takes precedence.
  98. return chain(*[d.iteritems()
  99. for d in reversed(self.__dict__["_order"])])
  100. def iteritems(self):
  101. return iter(self)
  102. def items(self):
  103. return tuple(self.iteritems())
  104. class ExceptionInfo(object):
  105. """Exception wrapping an exception and its traceback.
  106. :param exc_info: The exception tuple info as returned by
  107. :func:`traceback.format_exception`.
  108. """
  109. #: The original exception.
  110. exception = None
  111. #: A traceback form the point when :attr:`exception` was raised.
  112. traceback = None
  113. def __init__(self, exc_info):
  114. type_, exception, tb = exc_info
  115. self.exception = exception
  116. self.traceback = ''.join(traceback.format_exception(*exc_info))
  117. def __str__(self):
  118. return self.traceback
  119. def __repr__(self):
  120. return "<%s.%s: %s>" % (
  121. self.__class__.__module__,
  122. self.__class__.__name__,
  123. str(self.exception))
  124. def consume_queue(queue):
  125. """Iterator yielding all immediately available items in a
  126. :class:`Queue.Queue`.
  127. The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
  128. Example
  129. >>> q = Queue()
  130. >>> map(q.put, range(4))
  131. >>> list(consume_queue(q))
  132. [0, 1, 2, 3]
  133. >>> list(consume_queue(q))
  134. []
  135. """
  136. while 1:
  137. try:
  138. yield queue.get_nowait()
  139. except QueueEmpty:
  140. break
  141. class SharedCounter(object):
  142. """Thread-safe counter.
  143. Please note that the final value is not synchronized, this means
  144. that you should not update the value by using a previous value, the only
  145. reliable operations are increment and decrement.
  146. Example::
  147. >>> max_clients = SharedCounter(initial_value=10)
  148. # Thread one
  149. >>> max_clients += 1 # OK (safe)
  150. # Thread two
  151. >>> max_clients -= 3 # OK (safe)
  152. # Main thread
  153. >>> if client >= int(max_clients): # Max clients now at 8
  154. ... wait()
  155. >>> max_client = max_clients + 10 # NOT OK (unsafe)
  156. """
  157. def __init__(self, initial_value):
  158. self._value = initial_value
  159. self._modify_queue = Queue()
  160. def increment(self, n=1):
  161. """Increment value."""
  162. self += n
  163. return int(self)
  164. def decrement(self, n=1):
  165. """Decrement value."""
  166. self -= n
  167. return int(self)
  168. def _update_value(self):
  169. self._value += sum(consume_queue(self._modify_queue))
  170. return self._value
  171. def __iadd__(self, y):
  172. """`self += y`"""
  173. self._modify_queue.put(y * +1)
  174. return self
  175. def __isub__(self, y):
  176. """`self -= y`"""
  177. self._modify_queue.put(y * -1)
  178. return self
  179. def __int__(self):
  180. """`int(self) -> int`"""
  181. return self._update_value()
  182. def __repr__(self):
  183. return "<SharedCounter: int(%s)>" % str(int(self))
  184. class LimitedSet(object):
  185. """Kind-of Set with limitations.
  186. Good for when you need to test for membership (`a in set`),
  187. but the list might become to big, so you want to limit it so it doesn't
  188. consume too much resources.
  189. :keyword maxlen: Maximum number of members before we start
  190. deleting expired members.
  191. :keyword expires: Time in seconds, before a membership expires.
  192. """
  193. def __init__(self, maxlen=None, expires=None):
  194. self.maxlen = maxlen
  195. self.expires = expires
  196. self._data = {}
  197. def add(self, value):
  198. """Add a new member."""
  199. self._expire_item()
  200. self._data[value] = time.time()
  201. def clear(self):
  202. """Remove all members"""
  203. self._data.clear()
  204. def pop_value(self, value):
  205. """Remove membership by finding value."""
  206. self._data.pop(value, None)
  207. def _expire_item(self):
  208. """Hunt down and remove an expired item."""
  209. while 1:
  210. if self.maxlen and len(self) >= self.maxlen:
  211. value, when = self.first
  212. if not self.expires or time.time() > when + self.expires:
  213. try:
  214. self.pop_value(value)
  215. except TypeError: # pragma: no cover
  216. continue
  217. break
  218. def __contains__(self, value):
  219. return value in self._data
  220. def update(self, other):
  221. if isinstance(other, self.__class__):
  222. self._data.update(other._data)
  223. else:
  224. self._data.update(other)
  225. def as_dict(self):
  226. return self._data
  227. def __iter__(self):
  228. return iter(self._data.keys())
  229. def __len__(self):
  230. return len(self._data.keys())
  231. def __repr__(self):
  232. return "LimitedSet([%s])" % (repr(self._data.keys()))
  233. @property
  234. def chronologically(self):
  235. return sorted(self._data.items(), key=lambda (value, when): when)
  236. @property
  237. def first(self):
  238. """Get the oldest member."""
  239. return self.chronologically[0]
  240. class LocalCache(OrderedDict):
  241. """Dictionary with a finite number of keys.
  242. Older items expires first.
  243. """
  244. def __init__(self, limit=None):
  245. super(LocalCache, self).__init__()
  246. self.limit = limit
  247. def __setitem__(self, key, value):
  248. while len(self) >= self.limit:
  249. self.popitem(last=False)
  250. super(LocalCache, self).__setitem__(key, value)
  251. class TokenBucket(object):
  252. """Token Bucket Algorithm.
  253. See http://en.wikipedia.org/wiki/Token_Bucket
  254. Most of this code was stolen from an entry in the ASPN Python Cookbook:
  255. http://code.activestate.com/recipes/511490/
  256. .. admonition:: Thread safety
  257. This implementation is not thread safe.
  258. :param fill_rate: Refill rate in tokens/second.
  259. :keyword capacity: Max number of tokens. Default is 1.
  260. """
  261. #: The rate in tokens/second that the bucket will be refilled
  262. fill_rate = None
  263. #: Maximum number of tokensin the bucket.
  264. capacity = 1
  265. #: Timestamp of the last time a token was taken out of the bucket.
  266. timestamp = None
  267. def __init__(self, fill_rate, capacity=1):
  268. self.capacity = float(capacity)
  269. self._tokens = capacity
  270. self.fill_rate = float(fill_rate)
  271. self.timestamp = time.time()
  272. def can_consume(self, tokens=1):
  273. """Returns :const:`True` if `tokens` number of tokens can be consumed
  274. from the bucket."""
  275. if tokens <= self._get_tokens():
  276. self._tokens -= tokens
  277. return True
  278. return False
  279. def expected_time(self, tokens=1):
  280. """Returns the expected time in seconds when a new token should be
  281. available.
  282. .. admonition:: Warning
  283. This consumes a token from the bucket.
  284. """
  285. _tokens = self._get_tokens()
  286. tokens = max(tokens, _tokens)
  287. return (tokens - _tokens) / self.fill_rate
  288. def _get_tokens(self):
  289. if self._tokens < self.capacity:
  290. now = time.time()
  291. delta = self.fill_rate * (now - self.timestamp)
  292. self._tokens = min(self.capacity, self._tokens + delta)
  293. self.timestamp = now
  294. return self._tokens