datastructures.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. from __future__ import generators
  2. import time
  3. import traceback
  4. from UserList import UserList
  5. from Queue import Queue, Empty as QueueEmpty
  6. from celery.utils.compat import OrderedDict
  7. class AttributeDict(dict):
  8. """Dict subclass with attribute access."""
  9. def __getattr__(self, key):
  10. try:
  11. return self[key]
  12. except KeyError:
  13. raise AttributeError("'%s' object has no attribute '%s'" % (
  14. self.__class__.__name__, key))
  15. def __setattr__(self, key, value):
  16. self[key] = value
  17. class PositionQueue(UserList):
  18. """A positional queue of a specific length, with slots that are either
  19. filled or unfilled. When all of the positions are filled, the queue
  20. is considered :meth:`full`.
  21. :param length: see :attr:`length`.
  22. .. attribute:: length
  23. The number of items required for the queue to be considered full.
  24. """
  25. class UnfilledPosition(object):
  26. """Describes an unfilled slot."""
  27. def __init__(self, position):
  28. # This is not used, but is an argument from xrange
  29. # so why not.
  30. self.position = position
  31. def __init__(self, length):
  32. self.length = length
  33. self.data = map(self.UnfilledPosition, xrange(length))
  34. def full(self):
  35. """Returns ``True`` if all of the slots has been filled."""
  36. return len(self) >= self.length
  37. def __len__(self):
  38. """``len(self)`` -> number of slots filled with real values."""
  39. return len(self.filled)
  40. @property
  41. def filled(self):
  42. """Returns the filled slots as a list."""
  43. return filter(lambda v: not isinstance(v, self.UnfilledPosition),
  44. self.data)
  45. class ExceptionInfo(object):
  46. """Exception wrapping an exception and its traceback.
  47. :param exc_info: The exception tuple info as returned by
  48. :func:`traceback.format_exception`.
  49. .. attribute:: exception
  50. The original exception.
  51. .. attribute:: traceback
  52. A traceback from the point when :attr:`exception` was raised.
  53. """
  54. def __init__(self, exc_info):
  55. type_, exception, tb = exc_info
  56. self.exception = exception
  57. self.traceback = ''.join(traceback.format_exception(*exc_info))
  58. def __str__(self):
  59. return self.traceback
  60. def __repr__(self):
  61. return "<%s.%s: %s>" % (
  62. self.__class__.__module__,
  63. self.__class__.__name__,
  64. str(self.exception))
  65. def consume_queue(queue):
  66. """Iterator yielding all immediately available items in a
  67. :class:`Queue.Queue`.
  68. The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
  69. Example
  70. >>> q = Queue()
  71. >>> map(q.put, range(4))
  72. >>> list(consume_queue(q))
  73. [0, 1, 2, 3]
  74. >>> list(consume_queue(q))
  75. []
  76. """
  77. while 1:
  78. try:
  79. yield queue.get_nowait()
  80. except QueueEmpty:
  81. break
  82. class SharedCounter(object):
  83. """Thread-safe counter.
  84. Please note that the final value is not synchronized, this means
  85. that you should not update the value by using a previous value, the only
  86. reliable operations are increment and decrement.
  87. Example
  88. >>> max_clients = SharedCounter(initial_value=10)
  89. # Thread one
  90. >>> max_clients += 1 # OK (safe)
  91. # Thread two
  92. >>> max_clients -= 3 # OK (safe)
  93. # Main thread
  94. >>> if client >= int(max_clients): # Max clients now at 8
  95. ... wait()
  96. >>> max_client = max_clients + 10 # NOT OK (unsafe)
  97. """
  98. def __init__(self, initial_value):
  99. self._value = initial_value
  100. self._modify_queue = Queue()
  101. def increment(self, n=1):
  102. """Increment value."""
  103. self += n
  104. return int(self)
  105. def decrement(self, n=1):
  106. """Decrement value."""
  107. self -= n
  108. return int(self)
  109. def _update_value(self):
  110. self._value += sum(consume_queue(self._modify_queue))
  111. return self._value
  112. def __iadd__(self, y):
  113. """``self += y``"""
  114. self._modify_queue.put(y * +1)
  115. return self
  116. def __isub__(self, y):
  117. """``self -= y``"""
  118. self._modify_queue.put(y * -1)
  119. return self
  120. def __int__(self):
  121. """``int(self) -> int``"""
  122. return self._update_value()
  123. def __repr__(self):
  124. return "<SharedCounter: int(%s)>" % str(int(self))
  125. class LimitedSet(object):
  126. """Kind-of Set with limitations.
  127. Good for when you need to test for membership (``a in set``),
  128. but the list might become to big, so you want to limit it so it doesn't
  129. consume too much resources.
  130. :keyword maxlen: Maximum number of members before we start
  131. deleting expired members.
  132. :keyword expires: Time in seconds, before a membership expires.
  133. """
  134. def __init__(self, maxlen=None, expires=None):
  135. self.maxlen = maxlen
  136. self.expires = expires
  137. self._data = {}
  138. def add(self, value):
  139. """Add a new member."""
  140. self._expire_item()
  141. self._data[value] = time.time()
  142. def pop_value(self, value):
  143. """Remove membership by finding value."""
  144. self._data.pop(value, None)
  145. def _expire_item(self):
  146. """Hunt down and remove an expired item."""
  147. while 1:
  148. if self.maxlen and len(self) >= self.maxlen:
  149. value, when = self.first
  150. if not self.expires or time.time() > when + self.expires:
  151. try:
  152. self.pop_value(value)
  153. except TypeError: # pragma: no cover
  154. continue
  155. break
  156. def __contains__(self, value):
  157. return value in self._data
  158. def __iter__(self):
  159. return iter(self._data.keys())
  160. def __len__(self):
  161. return len(self._data.keys())
  162. def __repr__(self):
  163. return "LimitedSet([%s])" % (repr(self._data.keys()))
  164. @property
  165. def chronologically(self):
  166. return sorted(self._data.items(), key=lambda (value, when): when)
  167. @property
  168. def first(self):
  169. """Get the oldest member."""
  170. return self.chronologically[0]
  171. class LocalCache(OrderedDict):
  172. """Dictionary with a finite number of keys.
  173. Older items expires first.
  174. """
  175. def __init__(self, limit=None):
  176. super(LocalCache, self).__init__()
  177. self.limit = limit
  178. def __setitem__(self, key, value):
  179. while len(self) >= self.limit:
  180. self.popitem(last=False)
  181. super(LocalCache, self).__setitem__(key, value)