datastructures.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. from __future__ import generators
  2. """
  3. Custom Datastructures
  4. """
  5. import time
  6. import traceback
  7. from UserList import UserList
  8. from Queue import Queue, Empty as QueueEmpty
  9. from celery.utils.compat import OrderedDict
  10. class PositionQueue(UserList):
  11. """A positional queue of a specific length, with slots that are either
  12. filled or unfilled. When all of the positions are filled, the queue
  13. is considered :meth:`full`.
  14. :param length: see :attr:`length`.
  15. .. attribute:: length
  16. The number of items required for the queue to be considered full.
  17. """
  18. class UnfilledPosition(object):
  19. """Describes an unfilled slot."""
  20. def __init__(self, position):
  21. # This is not used, but is an argument from xrange
  22. # so why not.
  23. self.position = position
  24. def __init__(self, length):
  25. self.length = length
  26. self.data = map(self.UnfilledPosition, xrange(length))
  27. def full(self):
  28. """Returns ``True`` if all of the slots has been filled."""
  29. return len(self) >= self.length
  30. def __len__(self):
  31. """``len(self)`` -> number of slots filled with real values."""
  32. return len(self.filled)
  33. @property
  34. def filled(self):
  35. """Returns the filled slots as a list."""
  36. return filter(lambda v: not isinstance(v, self.UnfilledPosition),
  37. self.data)
  38. class ExceptionInfo(object):
  39. """Exception wrapping an exception and its traceback.
  40. :param exc_info: The exception tuple info as returned by
  41. :func:`traceback.format_exception`.
  42. .. attribute:: exception
  43. The original exception.
  44. .. attribute:: traceback
  45. A traceback from the point when :attr:`exception` was raised.
  46. """
  47. def __init__(self, exc_info):
  48. type_, exception, tb = exc_info
  49. self.exception = exception
  50. self.traceback = ''.join(traceback.format_exception(*exc_info))
  51. def __str__(self):
  52. return self.traceback
  53. def __repr__(self):
  54. return "<%s.%s: %s>" % (
  55. self.__class__.__module__,
  56. self.__class__.__name__,
  57. str(self.exception))
  58. def consume_queue(queue):
  59. """Iterator yielding all immediately available items in a
  60. :class:`Queue.Queue`.
  61. The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
  62. Example
  63. >>> q = Queue()
  64. >>> map(q.put, range(4))
  65. >>> list(consume_queue(q))
  66. [0, 1, 2, 3]
  67. >>> list(consume_queue(q))
  68. []
  69. """
  70. while 1:
  71. try:
  72. yield queue.get_nowait()
  73. except QueueEmpty:
  74. break
  75. class SharedCounter(object):
  76. """Thread-safe counter.
  77. Please note that the final value is not synchronized, this means
  78. that you should not update the value by using a previous value, the only
  79. reliable operations are increment and decrement.
  80. Example
  81. >>> max_clients = SharedCounter(initial_value=10)
  82. # Thread one
  83. >>> max_clients += 1 # OK (safe)
  84. # Thread two
  85. >>> max_clients -= 3 # OK (safe)
  86. # Main thread
  87. >>> if client >= int(max_clients): # Max clients now at 8
  88. ... wait()
  89. >>> max_client = max_clients + 10 # NOT OK (unsafe)
  90. """
  91. def __init__(self, initial_value):
  92. self._value = initial_value
  93. self._modify_queue = Queue()
  94. def increment(self, n=1):
  95. """Increment value."""
  96. self += n
  97. def decrement(self, n=1):
  98. """Decrement value."""
  99. self -= n
  100. def _update_value(self):
  101. self._value += sum(consume_queue(self._modify_queue))
  102. return self._value
  103. def __iadd__(self, y):
  104. """``self += y``"""
  105. self._modify_queue.put(y * +1)
  106. return self
  107. def __isub__(self, y):
  108. """``self -= y``"""
  109. self._modify_queue.put(y * -1)
  110. return self
  111. def __int__(self):
  112. """``int(self) -> int``"""
  113. return self._update_value()
  114. def __repr__(self):
  115. return "<SharedCounter: int(%s)>" % str(int(self))
  116. class LimitedSet(object):
  117. """Kind-of Set with limitations.
  118. Good for when you need to test for membership (``a in set``),
  119. but the list might become to big, so you want to limit it so it doesn't
  120. consume too much resources.
  121. :keyword maxlen: Maximum number of members before we start
  122. deleting expired members.
  123. :keyword expires: Time in seconds, before a membership expires.
  124. """
  125. def __init__(self, maxlen=None, expires=None):
  126. self.maxlen = maxlen
  127. self.expires = expires
  128. self._data = {}
  129. def add(self, value):
  130. """Add a new member."""
  131. self._expire_item()
  132. self._data[value] = time.time()
  133. def pop_value(self, value):
  134. """Remove membership by finding value."""
  135. self._data.pop(value, None)
  136. def _expire_item(self):
  137. """Hunt down and remove an expired item."""
  138. while 1:
  139. if self.maxlen and len(self) >= self.maxlen:
  140. value, when = self.first
  141. if not self.expires or time.time() > when + self.expires:
  142. try:
  143. self.pop_value(value)
  144. except TypeError: # pragma: no cover
  145. continue
  146. break
  147. def __contains__(self, value):
  148. return value in self._data
  149. def __iter__(self):
  150. return iter(self._data.keys())
  151. def __len__(self):
  152. return len(self._data.keys())
  153. def __repr__(self):
  154. return "LimitedSet([%s])" % (repr(self._data.keys()))
  155. @property
  156. def chronologically(self):
  157. return sorted(self._data.items(), key=lambda (value, when): when)
  158. @property
  159. def first(self):
  160. """Get the oldest member."""
  161. return self.chronologically[0]
  162. class LocalCache(OrderedDict):
  163. """Dictionary with a finite number of keys.
  164. Older items expires first.
  165. """
  166. def __init__(self, limit=None):
  167. super(LocalCache, self).__init__()
  168. self.limit = limit
  169. def __setitem__(self, key, value):
  170. while len(self) >= self.limit:
  171. self.popitem(last=False)
  172. super(LocalCache, self).__setitem__(key, value)