datastructures.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.datastructures
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Custom types and data structures.
  6. """
  7. from __future__ import absolute_import, print_function
  8. import sys
  9. import time
  10. from collections import defaultdict
  11. from functools import partial
  12. from itertools import chain
  13. from billiard.einfo import ExceptionInfo # noqa
  14. from kombu.utils.limits import TokenBucket # noqa
  15. from .utils.functional import LRUCache, first, uniq # noqa
  16. class CycleError(Exception):
  17. """A cycle was detected in an acyclic graph."""
  18. class DependencyGraph(object):
  19. """A directed acyclic graph of objects and their dependencies.
  20. Supports a robust topological sort
  21. to detect the order in which they must be handled.
  22. Takes an optional iterator of ``(obj, dependencies)``
  23. tuples to build the graph from.
  24. .. warning::
  25. Does not support cycle detection.
  26. """
  27. def __init__(self, it=None):
  28. self.adjacent = {}
  29. if it is not None:
  30. self.update(it)
  31. def add_arc(self, obj):
  32. """Add an object to the graph."""
  33. self.adjacent.setdefault(obj, [])
  34. def add_edge(self, A, B):
  35. """Add an edge from object ``A`` to object ``B``
  36. (``A`` depends on ``B``)."""
  37. self[A].append(B)
  38. def topsort(self):
  39. """Sort the graph topologically.
  40. :returns: a list of objects in the order
  41. in which they must be handled.
  42. """
  43. graph = DependencyGraph()
  44. components = self._tarjan72()
  45. NC = dict((node, component)
  46. for component in components
  47. for node in component)
  48. for component in components:
  49. graph.add_arc(component)
  50. for node in self:
  51. node_c = NC[node]
  52. for successor in self[node]:
  53. successor_c = NC[successor]
  54. if node_c != successor_c:
  55. graph.add_edge(node_c, successor_c)
  56. return [t[0] for t in graph._khan62()]
  57. def valency_of(self, obj):
  58. """Returns the velency (degree) of a vertex in the graph."""
  59. try:
  60. l = [len(self[obj])]
  61. except KeyError:
  62. return 0
  63. for node in self[obj]:
  64. l.append(self.valency_of(node))
  65. return sum(l)
  66. def update(self, it):
  67. """Update the graph with data from a list
  68. of ``(obj, dependencies)`` tuples."""
  69. tups = list(it)
  70. for obj, _ in tups:
  71. self.add_arc(obj)
  72. for obj, deps in tups:
  73. for dep in deps:
  74. self.add_edge(obj, dep)
  75. def edges(self):
  76. """Returns generator that yields for all edges in the graph."""
  77. return (obj for obj, adj in self.iteritems() if adj)
  78. def _khan62(self):
  79. """Khans simple topological sort algorithm from '62
  80. See http://en.wikipedia.org/wiki/Topological_sorting
  81. """
  82. count = defaultdict(lambda: 0)
  83. result = []
  84. for node in self:
  85. for successor in self[node]:
  86. count[successor] += 1
  87. ready = [node for node in self if not count[node]]
  88. while ready:
  89. node = ready.pop()
  90. result.append(node)
  91. for successor in self[node]:
  92. count[successor] -= 1
  93. if count[successor] == 0:
  94. ready.append(successor)
  95. result.reverse()
  96. return result
  97. def _tarjan72(self):
  98. """Tarjan's algorithm to find strongly connected components.
  99. See http://bit.ly/vIMv3h.
  100. """
  101. result, stack, low = [], [], {}
  102. def visit(node):
  103. if node in low:
  104. return
  105. num = len(low)
  106. low[node] = num
  107. stack_pos = len(stack)
  108. stack.append(node)
  109. for successor in self[node]:
  110. visit(successor)
  111. low[node] = min(low[node], low[successor])
  112. if num == low[node]:
  113. component = tuple(stack[stack_pos:])
  114. stack[stack_pos:] = []
  115. result.append(component)
  116. for item in component:
  117. low[item] = len(self)
  118. for node in self:
  119. visit(node)
  120. return result
  121. def to_dot(self, fh, ws=' ' * 4):
  122. """Convert the graph to DOT format.
  123. :param fh: A file, or a file-like object to write the graph to.
  124. """
  125. P = partial(print, file=fh)
  126. P('digraph dependencies {')
  127. for obj, adjacent in self.iteritems():
  128. if not adjacent:
  129. P(ws + '"{0}"'.format(obj))
  130. for req in adjacent:
  131. P(ws + '"{0}" -> "{1}"'.format(obj, req))
  132. P('}')
  133. def __iter__(self):
  134. return self.adjacent.iterkeys()
  135. def __getitem__(self, node):
  136. return self.adjacent[node]
  137. def __len__(self):
  138. return len(self.adjacent)
  139. def __contains__(self, obj):
  140. return obj in self.adjacent
  141. def _iterate_items(self):
  142. return self.adjacent.iteritems()
  143. items = iteritems = _iterate_items
  144. def __repr__(self):
  145. return '\n'.join(self.repr_node(N) for N in self)
  146. def repr_node(self, obj, level=1, fmt='{0}({1})'):
  147. output = [fmt.format(obj, self.valency_of(obj))]
  148. if obj in self:
  149. for other in self[obj]:
  150. d = fmt.format(other, self.valency_of(other))
  151. output.append(' ' * level + d)
  152. output.extend(self.repr_node(other, level + 1).split('\n')[1:])
  153. return '\n'.join(output)
  154. class AttributeDictMixin(object):
  155. """Adds attribute access to mappings.
  156. `d.key -> d[key]`
  157. """
  158. def __getattr__(self, k):
  159. """`d.key -> d[key]`"""
  160. try:
  161. return self[k]
  162. except KeyError:
  163. raise AttributeError(
  164. "{0!r} object has no attribute {1!r}".format(
  165. type(self).__name__, k))
  166. def __setattr__(self, key, value):
  167. """`d[key] = value -> d.key = value`"""
  168. self[key] = value
  169. class AttributeDict(dict, AttributeDictMixin):
  170. """Dict subclass with attribute access."""
  171. pass
  172. class DictAttribute(object):
  173. """Dict interface to attributes.
  174. `obj[k] -> obj.k`
  175. """
  176. def __init__(self, obj):
  177. self.obj = obj
  178. def get(self, key, default=None):
  179. try:
  180. return self[key]
  181. except KeyError:
  182. return default
  183. def setdefault(self, key, default):
  184. try:
  185. return self[key]
  186. except KeyError:
  187. self[key] = default
  188. return default
  189. def __getitem__(self, key):
  190. try:
  191. return getattr(self.obj, key)
  192. except AttributeError:
  193. raise KeyError(key)
  194. def __setitem__(self, key, value):
  195. setattr(self.obj, key, value)
  196. def __contains__(self, key):
  197. return hasattr(self.obj, key)
  198. def _iterate_keys(self):
  199. return vars(self.obj).iterkeys()
  200. iterkeys = _iterate_keys
  201. def __iter__(self):
  202. return self.iterkeys()
  203. def _iterate_items(self):
  204. return vars(self.obj).iteritems()
  205. iteritems = _iterate_items
  206. if sys.version_info[0] == 3: # pragma: no cover
  207. items = _iterate_items
  208. keys = _iterate_keys
  209. else:
  210. def keys(self):
  211. return list(self._iterate_keys())
  212. def items(self):
  213. return list(self._iterate_items())
  214. class ConfigurationView(AttributeDictMixin):
  215. """A view over an applications configuration dicts.
  216. If the key does not exist in ``changes``, the ``defaults`` dict
  217. is consulted.
  218. :param changes: Dict containing changes to the configuration.
  219. :param defaults: Dict containing the default configuration.
  220. """
  221. changes = None
  222. defaults = None
  223. _order = None
  224. def __init__(self, changes, defaults):
  225. self.__dict__.update(changes=changes, defaults=defaults,
  226. _order=[changes] + defaults)
  227. def __getitem__(self, key):
  228. for d in self._order:
  229. try:
  230. return d[key]
  231. except KeyError:
  232. pass
  233. raise KeyError(key)
  234. def __setitem__(self, key, value):
  235. self.changes[key] = value
  236. def first(self, *keys):
  237. return first(None, (self.get(key) for key in keys))
  238. def get(self, key, default=None):
  239. try:
  240. return self[key]
  241. except KeyError:
  242. return default
  243. def setdefault(self, key, default):
  244. try:
  245. return self[key]
  246. except KeyError:
  247. self[key] = default
  248. return default
  249. def update(self, *args, **kwargs):
  250. return self.changes.update(*args, **kwargs)
  251. def __contains__(self, key):
  252. for d in self._order:
  253. if key in d:
  254. return True
  255. return False
  256. def __repr__(self):
  257. return repr(dict(self.iteritems()))
  258. def __iter__(self):
  259. return self.iterkeys()
  260. def _iter(self, op):
  261. # defaults must be first in the stream, so values in
  262. # changes takes precedence.
  263. return chain(*[op(d) for d in reversed(self._order)])
  264. def _iterate_keys(self):
  265. return uniq(self._iter(lambda d: d.iterkeys()))
  266. iterkeys = _iterate_keys
  267. def _iterate_items(self):
  268. return ((key, self[key]) for key in self)
  269. iteritems = _iterate_items
  270. def _iterate_values(self):
  271. return (self[key] for key in self)
  272. itervalues = _iterate_values
  273. def keys(self):
  274. return list(self._iterate_keys())
  275. def items(self):
  276. return list(self._iterate_items())
  277. def values(self):
  278. return list(self._iterate_values())
  279. class LimitedSet(object):
  280. """Kind-of Set with limitations.
  281. Good for when you need to test for membership (`a in set`),
  282. but the list might become to big, so you want to limit it so it doesn't
  283. consume too much resources.
  284. :keyword maxlen: Maximum number of members before we start
  285. evicting expired members.
  286. :keyword expires: Time in seconds, before a membership expires.
  287. """
  288. __slots__ = ('maxlen', 'expires', '_data', '__len__')
  289. def __init__(self, maxlen=None, expires=None):
  290. self.maxlen = maxlen
  291. self.expires = expires
  292. self._data = {}
  293. self.__len__ = self._data.__len__
  294. def add(self, value):
  295. """Add a new member."""
  296. self._expire_item()
  297. self._data[value] = time.time()
  298. def clear(self):
  299. """Remove all members"""
  300. self._data.clear()
  301. def pop_value(self, value):
  302. """Remove membership by finding value."""
  303. self._data.pop(value, None)
  304. def _expire_item(self):
  305. """Hunt down and remove an expired item."""
  306. while 1:
  307. if self.maxlen and len(self) >= self.maxlen:
  308. value, when = self.first
  309. if not self.expires or time.time() > when + self.expires:
  310. try:
  311. self.pop_value(value)
  312. except TypeError: # pragma: no cover
  313. continue
  314. break
  315. def __contains__(self, value):
  316. return value in self._data
  317. def update(self, other):
  318. if isinstance(other, self.__class__):
  319. self._data.update(other._data)
  320. else:
  321. for obj in other:
  322. self.add(obj)
  323. def as_dict(self):
  324. return self._data
  325. def __iter__(self):
  326. return iter(self._data)
  327. def __repr__(self):
  328. return 'LimitedSet({0!r})'.format(self._data.keys())
  329. @property
  330. def chronologically(self):
  331. return sorted(self._data.items(), key=lambda (value, when): when)
  332. @property
  333. def first(self):
  334. """Get the oldest member."""
  335. return self.chronologically[0]