from __future__ import generators """ Custom Datastructures """ import time import traceback from UserList import UserList from Queue import Queue, Empty as QueueEmpty from celery.utils.compat import OrderedDict class PositionQueue(UserList): """A positional queue of a specific length, with slots that are either filled or unfilled. When all of the positions are filled, the queue is considered :meth:`full`. :param length: see :attr:`length`. .. attribute:: length The number of items required for the queue to be considered full. """ class UnfilledPosition(object): """Describes an unfilled slot.""" def __init__(self, position): # This is not used, but is an argument from xrange # so why not. self.position = position def __init__(self, length): self.length = length self.data = map(self.UnfilledPosition, xrange(length)) def full(self): """Returns ``True`` if all of the slots has been filled.""" return len(self) >= self.length def __len__(self): """``len(self)`` -> number of slots filled with real values.""" return len(self.filled) @property def filled(self): """Returns the filled slots as a list.""" return filter(lambda v: not isinstance(v, self.UnfilledPosition), self.data) class ExceptionInfo(object): """Exception wrapping an exception and its traceback. :param exc_info: The exception tuple info as returned by :func:`traceback.format_exception`. .. attribute:: exception The original exception. .. attribute:: traceback A traceback from the point when :attr:`exception` was raised. """ def __init__(self, exc_info): type_, exception, tb = exc_info self.exception = exception self.traceback = ''.join(traceback.format_exception(*exc_info)) def __str__(self): return self.traceback def __repr__(self): return "<%s.%s: %s>" % ( self.__class__.__module__, self.__class__.__name__, str(self.exception)) def consume_queue(queue): """Iterator yielding all immediately available items in a :class:`Queue.Queue`. The iterator stops as soon as the queue raises :exc:`Queue.Empty`. Example >>> q = Queue() >>> map(q.put, range(4)) >>> list(consume_queue(q)) [0, 1, 2, 3] >>> list(consume_queue(q)) [] """ while 1: try: yield queue.get_nowait() except QueueEmpty: break class SharedCounter(object): """Thread-safe counter. Please note that the final value is not synchronized, this means that you should not update the value by using a previous value, the only reliable operations are increment and decrement. Example >>> max_clients = SharedCounter(initial_value=10) # Thread one >>> max_clients += 1 # OK (safe) # Thread two >>> max_clients -= 3 # OK (safe) # Main thread >>> if client >= int(max_clients): # Max clients now at 8 ... wait() >>> max_client = max_clients + 10 # NOT OK (unsafe) """ def __init__(self, initial_value): self._value = initial_value self._modify_queue = Queue() def increment(self, n=1): """Increment value.""" self += n def decrement(self, n=1): """Decrement value.""" self -= n def _update_value(self): self._value += sum(consume_queue(self._modify_queue)) return self._value def __iadd__(self, y): """``self += y``""" self._modify_queue.put(y * +1) return self def __isub__(self, y): """``self -= y``""" self._modify_queue.put(y * -1) return self def __int__(self): """``int(self) -> int``""" return self._update_value() def __repr__(self): return "" % str(int(self)) class LimitedSet(object): """Kind-of Set with limitations. Good for when you need to test for membership (``a in set``), but the list might become to big, so you want to limit it so it doesn't consume too much resources. :keyword maxlen: Maximum number of members before we start deleting expired members. :keyword expires: Time in seconds, before a membership expires. """ def __init__(self, maxlen=None, expires=None): self.maxlen = maxlen self.expires = expires self._data = {} def add(self, value): """Add a new member.""" self._expire_item() self._data[value] = time.time() def pop_value(self, value): """Remove membership by finding value.""" self._data.pop(value, None) def _expire_item(self): """Hunt down and remove an expired item.""" while 1: if self.maxlen and len(self) >= self.maxlen: value, when = self.first if not self.expires or time.time() > when + self.expires: try: self.pop_value(value) except TypeError: # pragma: no cover continue break def __contains__(self, value): return value in self._data def __iter__(self): return iter(self._data.keys()) def __len__(self): return len(self._data.keys()) def __repr__(self): return "LimitedSet([%s])" % (repr(self._data.keys())) @property def chronologically(self): return sorted(self._data.items(), key=lambda (value, when): when) @property def first(self): """Get the oldest member.""" return self.chronologically[0] class LocalCache(OrderedDict): """Dictionary with a finite number of keys. Older items expires first. """ def __init__(self, limit=None): super(LocalCache, self).__init__() self.limit = limit def __setitem__(self, key, value): while len(self) >= self.limit: self.popitem(last=False) super(LocalCache, self).__setitem__(key, value)