123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- # -*- coding: utf-8 -*-
- """
- celery.datastructures
- ~~~~~~~~~~~~~~~~~~~~~
- Custom types and data structures.
- """
- from __future__ import absolute_import
- from __future__ import with_statement
- import sys
- import time
- from collections import defaultdict
- from itertools import chain
- from billiard.einfo import ExceptionInfo # noqa
- from kombu.utils.limits import TokenBucket # noqa
- from .utils.functional import LRUCache, first, uniq # noqa
- class CycleError(Exception):
- """A cycle was detected in an acyclic graph."""
- class DependencyGraph(object):
- """A directed acyclic graph of objects and their dependencies.
- Supports a robust topological sort
- to detect the order in which they must be handled.
- Takes an optional iterator of ``(obj, dependencies)``
- tuples to build the graph from.
- .. warning::
- Does not support cycle detection.
- """
- def __init__(self, it=None):
- self.adjacent = {}
- if it is not None:
- self.update(it)
- def add_arc(self, obj):
- """Add an object to the graph."""
- self.adjacent.setdefault(obj, [])
- def add_edge(self, A, B):
- """Add an edge from object ``A`` to object ``B``
- (``A`` depends on ``B``)."""
- self[A].append(B)
- def topsort(self):
- """Sort the graph topologically.
- :returns: a list of objects in the order
- in which they must be handled.
- """
- graph = DependencyGraph()
- components = self._tarjan72()
- NC = dict((node, component)
- for component in components
- for node in component)
- for component in components:
- graph.add_arc(component)
- for node in self:
- node_c = NC[node]
- for successor in self[node]:
- successor_c = NC[successor]
- if node_c != successor_c:
- graph.add_edge(node_c, successor_c)
- return [t[0] for t in graph._khan62()]
- def valency_of(self, obj):
- """Returns the velency (degree) of a vertex in the graph."""
- try:
- l = [len(self[obj])]
- except KeyError:
- return 0
- for node in self[obj]:
- l.append(self.valency_of(node))
- return sum(l)
- def update(self, it):
- """Update the graph with data from a list
- of ``(obj, dependencies)`` tuples."""
- tups = list(it)
- for obj, _ in tups:
- self.add_arc(obj)
- for obj, deps in tups:
- for dep in deps:
- self.add_edge(obj, dep)
- def edges(self):
- """Returns generator that yields for all edges in the graph."""
- return (obj for obj, adj in self.iteritems() if adj)
- def _khan62(self):
- """Khans simple topological sort algorithm from '62
- See http://en.wikipedia.org/wiki/Topological_sorting
- """
- count = defaultdict(lambda: 0)
- result = []
- for node in self:
- for successor in self[node]:
- count[successor] += 1
- ready = [node for node in self if not count[node]]
- while ready:
- node = ready.pop()
- result.append(node)
- for successor in self[node]:
- count[successor] -= 1
- if count[successor] == 0:
- ready.append(successor)
- result.reverse()
- return result
- def _tarjan72(self):
- """Tarjan's algorithm to find strongly connected components.
- See http://bit.ly/vIMv3h.
- """
- result, stack, low = [], [], {}
- def visit(node):
- if node in low:
- return
- num = len(low)
- low[node] = num
- stack_pos = len(stack)
- stack.append(node)
- for successor in self[node]:
- visit(successor)
- low[node] = min(low[node], low[successor])
- if num == low[node]:
- component = tuple(stack[stack_pos:])
- stack[stack_pos:] = []
- result.append(component)
- for item in component:
- low[item] = len(self)
- for node in self:
- visit(node)
- return result
- def to_dot(self, fh, ws=' ' * 4):
- """Convert the graph to DOT format.
- :param fh: A file, or a file-like object to write the graph to.
- """
- fh.write('digraph dependencies {\n')
- for obj, adjacent in self.iteritems():
- if not adjacent:
- fh.write(ws + '"%s"\n' % (obj, ))
- for req in adjacent:
- fh.write(ws + '"%s" -> "%s"\n' % (obj, req))
- fh.write('}\n')
- def __iter__(self):
- return iter(self.adjacent)
- def __getitem__(self, node):
- return self.adjacent[node]
- def __len__(self):
- return len(self.adjacent)
- def __contains__(self, obj):
- return obj in self.adjacent
- def _iterate_items(self):
- return self.adjacent.iteritems()
- items = iteritems = _iterate_items
- def __repr__(self):
- return '\n'.join(self.repr_node(N) for N in self)
- def repr_node(self, obj, level=1):
- output = ['%s(%s)' % (obj, self.valency_of(obj))]
- if obj in self:
- for other in self[obj]:
- d = '%s(%s)' % (other, self.valency_of(other))
- output.append(' ' * level + d)
- output.extend(self.repr_node(other, level + 1).split('\n')[1:])
- return '\n'.join(output)
- class AttributeDictMixin(object):
- """Adds attribute access to mappings.
- `d.key -> d[key]`
- """
- def __getattr__(self, k):
- """`d.key -> d[key]`"""
- try:
- return self[k]
- except KeyError:
- raise AttributeError(
- "'%s' object has no attribute '%s'" % (type(self).__name__, k))
- def __setattr__(self, key, value):
- """`d[key] = value -> d.key = value`"""
- self[key] = value
- class AttributeDict(dict, AttributeDictMixin):
- """Dict subclass with attribute access."""
- pass
- class DictAttribute(object):
- """Dict interface to attributes.
- `obj[k] -> obj.k`
- """
- obj = None
- def __init__(self, obj):
- object.__setattr__(self, 'obj', obj)
- def __getattr__(self, key):
- return getattr(self.obj, key)
- def __setattr__(self, key, value):
- return setattr(self.obj, key, value)
- def get(self, key, default=None):
- try:
- return self[key]
- except KeyError:
- return default
- def setdefault(self, key, default):
- try:
- return self[key]
- except KeyError:
- self[key] = default
- return default
- def __getitem__(self, key):
- try:
- return getattr(self.obj, key)
- except AttributeError:
- raise KeyError(key)
- def __setitem__(self, key, value):
- setattr(self.obj, key, value)
- def __contains__(self, key):
- return hasattr(self.obj, key)
- def _iterate_keys(self):
- return iter(dir(self.obj))
- iterkeys = _iterate_keys
- def __iter__(self):
- return self._iterate_keys()
- def _iterate_items(self):
- for key in self._iterate_keys():
- yield getattr(self.obj, key)
- iteritems = _iterate_items
- if sys.version_info[0] == 3: # pragma: no cover
- items = _iterate_items
- keys = _iterate_keys
- else:
- def keys(self):
- return list(self)
- def items(self):
- return list(self._iterate_items())
- class ConfigurationView(AttributeDictMixin):
- """A view over an applications configuration dicts.
- If the key does not exist in ``changes``, the ``defaults`` dict
- is consulted.
- :param changes: Dict containing changes to the configuration.
- :param defaults: Dict containing the default configuration.
- """
- changes = None
- defaults = None
- _order = None
- def __init__(self, changes, defaults):
- self.__dict__.update(changes=changes, defaults=defaults,
- _order=[changes] + defaults)
- def add_defaults(self, d):
- self.defaults.insert(0, d)
- self._order.insert(1, d)
- def __getitem__(self, key):
- for d in self._order:
- try:
- return d[key]
- except KeyError:
- pass
- raise KeyError(key)
- def __setitem__(self, key, value):
- self.changes[key] = value
- def first(self, *keys):
- return first(None, (self.get(key) for key in keys))
- def get(self, key, default=None):
- try:
- return self[key]
- except KeyError:
- return default
- def setdefault(self, key, default):
- try:
- return self[key]
- except KeyError:
- self[key] = default
- return default
- def update(self, *args, **kwargs):
- return self.changes.update(*args, **kwargs)
- def __contains__(self, key):
- for d in self._order:
- if key in d:
- return True
- return False
- def __repr__(self):
- return repr(dict(self.iteritems()))
- def __iter__(self):
- return self._iterate_keys()
- def _iter(self, op):
- # defaults must be first in the stream, so values in
- # changes takes precedence.
- return chain(*[op(d) for d in reversed(self._order)])
- def _iterate_keys(self):
- return uniq(self._iter(lambda d: d))
- iterkeys = _iterate_keys
- def _iterate_items(self):
- return ((key, self[key]) for key in self)
- iteritems = _iterate_items
- def _iterate_values(self):
- return (self[key] for key in self)
- itervalues = _iterate_values
- def keys(self):
- return list(self._iterate_keys())
- def items(self):
- return list(self._iterate_items())
- def values(self):
- return list(self._iterate_values())
- class LimitedSet(object):
- """Kind-of Set with limitations.
- Good for when you need to test for membership (`a in set`),
- but the list might become to big, so you want to limit it so it doesn't
- consume too much resources.
- :keyword maxlen: Maximum number of members before we start
- evicting expired members.
- :keyword expires: Time in seconds, before a membership expires.
- """
- __slots__ = ('maxlen', 'expires', '_data', '__len__')
- def __init__(self, maxlen=None, expires=None):
- self.maxlen = maxlen
- self.expires = expires
- self._data = {}
- self.__len__ = self._data.__len__
- def add(self, value):
- """Add a new member."""
- self._expire_item()
- self._data[value] = time.time()
- def clear(self):
- """Remove all members"""
- self._data.clear()
- def pop_value(self, value):
- """Remove membership by finding value."""
- self._data.pop(value, None)
- def _expire_item(self):
- """Hunt down and remove an expired item."""
- while 1:
- if self.maxlen and len(self) >= self.maxlen:
- value, when = self.first
- if not self.expires or time.time() > when + self.expires:
- try:
- self.pop_value(value)
- except TypeError: # pragma: no cover
- continue
- break
- def __contains__(self, value):
- return value in self._data
- def update(self, other):
- if isinstance(other, self.__class__):
- self._data.update(other._data)
- else:
- for obj in other:
- self.add(obj)
- def as_dict(self):
- return self._data
- def __iter__(self):
- return iter(self._data)
- def __repr__(self):
- return 'LimitedSet(%r)' % (list(self._data), )
- @property
- def chronologically(self):
- return sorted(self._data.items(), key=lambda (value, when): when)
- @property
- def first(self):
- """Get the oldest member."""
- return self.chronologically[0]
|