datastructures.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.datastructures
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Custom types and data structures.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import sys
  10. import time
  11. from collections import defaultdict
  12. from heapq import heapify, heappush, heappop
  13. from itertools import chain
  14. try:
  15. from collections import Mapping, MutableMapping
  16. except ImportError: # pragma: no cover
  17. MutableMapping = None # noqa
  18. Mapping = dict # noqa
  19. from billiard.einfo import ExceptionInfo # noqa
  20. from kombu.utils.limits import TokenBucket # noqa
  21. from .utils.functional import LRUCache, first, uniq # noqa
  22. class CycleError(Exception):
  23. """A cycle was detected in an acyclic graph."""
  24. class DependencyGraph(object):
  25. """A directed acyclic graph of objects and their dependencies.
  26. Supports a robust topological sort
  27. to detect the order in which they must be handled.
  28. Takes an optional iterator of ``(obj, dependencies)``
  29. tuples to build the graph from.
  30. .. warning::
  31. Does not support cycle detection.
  32. """
  33. def __init__(self, it=None):
  34. self.adjacent = {}
  35. if it is not None:
  36. self.update(it)
  37. def add_arc(self, obj):
  38. """Add an object to the graph."""
  39. self.adjacent.setdefault(obj, [])
  40. def add_edge(self, A, B):
  41. """Add an edge from object ``A`` to object ``B``
  42. (``A`` depends on ``B``)."""
  43. self[A].append(B)
  44. def topsort(self):
  45. """Sort the graph topologically.
  46. :returns: a list of objects in the order
  47. in which they must be handled.
  48. """
  49. graph = DependencyGraph()
  50. components = self._tarjan72()
  51. NC = dict((node, component)
  52. for component in components
  53. for node in component)
  54. for component in components:
  55. graph.add_arc(component)
  56. for node in self:
  57. node_c = NC[node]
  58. for successor in self[node]:
  59. successor_c = NC[successor]
  60. if node_c != successor_c:
  61. graph.add_edge(node_c, successor_c)
  62. return [t[0] for t in graph._khan62()]
  63. def valency_of(self, obj):
  64. """Returns the velency (degree) of a vertex in the graph."""
  65. try:
  66. l = [len(self[obj])]
  67. except KeyError:
  68. return 0
  69. for node in self[obj]:
  70. l.append(self.valency_of(node))
  71. return sum(l)
  72. def update(self, it):
  73. """Update the graph with data from a list
  74. of ``(obj, dependencies)`` tuples."""
  75. tups = list(it)
  76. for obj, _ in tups:
  77. self.add_arc(obj)
  78. for obj, deps in tups:
  79. for dep in deps:
  80. self.add_edge(obj, dep)
  81. def edges(self):
  82. """Returns generator that yields for all edges in the graph."""
  83. return (obj for obj, adj in self.iteritems() if adj)
  84. def _khan62(self):
  85. """Khans simple topological sort algorithm from '62
  86. See http://en.wikipedia.org/wiki/Topological_sorting
  87. """
  88. count = defaultdict(lambda: 0)
  89. result = []
  90. for node in self:
  91. for successor in self[node]:
  92. count[successor] += 1
  93. ready = [node for node in self if not count[node]]
  94. while ready:
  95. node = ready.pop()
  96. result.append(node)
  97. for successor in self[node]:
  98. count[successor] -= 1
  99. if count[successor] == 0:
  100. ready.append(successor)
  101. result.reverse()
  102. return result
  103. def _tarjan72(self):
  104. """Tarjan's algorithm to find strongly connected components.
  105. See http://bit.ly/vIMv3h.
  106. """
  107. result, stack, low = [], [], {}
  108. def visit(node):
  109. if node in low:
  110. return
  111. num = len(low)
  112. low[node] = num
  113. stack_pos = len(stack)
  114. stack.append(node)
  115. for successor in self[node]:
  116. visit(successor)
  117. low[node] = min(low[node], low[successor])
  118. if num == low[node]:
  119. component = tuple(stack[stack_pos:])
  120. stack[stack_pos:] = []
  121. result.append(component)
  122. for item in component:
  123. low[item] = len(self)
  124. for node in self:
  125. visit(node)
  126. return result
  127. def to_dot(self, fh, ws=' ' * 4):
  128. """Convert the graph to DOT format.
  129. :param fh: A file, or a file-like object to write the graph to.
  130. """
  131. fh.write('digraph dependencies {\n')
  132. for obj, adjacent in self.iteritems():
  133. if not adjacent:
  134. fh.write(ws + '"%s"\n' % (obj, ))
  135. for req in adjacent:
  136. fh.write(ws + '"%s" -> "%s"\n' % (obj, req))
  137. fh.write('}\n')
  138. def __iter__(self):
  139. return iter(self.adjacent)
  140. def __getitem__(self, node):
  141. return self.adjacent[node]
  142. def __len__(self):
  143. return len(self.adjacent)
  144. def __contains__(self, obj):
  145. return obj in self.adjacent
  146. def _iterate_items(self):
  147. return self.adjacent.iteritems()
  148. items = iteritems = _iterate_items
  149. def __repr__(self):
  150. return '\n'.join(self.repr_node(N) for N in self)
  151. def repr_node(self, obj, level=1):
  152. output = ['%s(%s)' % (obj, self.valency_of(obj))]
  153. if obj in self:
  154. for other in self[obj]:
  155. d = '%s(%s)' % (other, self.valency_of(other))
  156. output.append(' ' * level + d)
  157. output.extend(self.repr_node(other, level + 1).split('\n')[1:])
  158. return '\n'.join(output)
  159. class AttributeDictMixin(object):
  160. """Adds attribute access to mappings.
  161. `d.key -> d[key]`
  162. """
  163. def __getattr__(self, k):
  164. """`d.key -> d[key]`"""
  165. try:
  166. return self[k]
  167. except KeyError:
  168. raise AttributeError(
  169. "'%s' object has no attribute '%s'" % (type(self).__name__, k))
  170. def __setattr__(self, key, value):
  171. """`d[key] = value -> d.key = value`"""
  172. self[key] = value
  173. class AttributeDict(dict, AttributeDictMixin):
  174. """Dict subclass with attribute access."""
  175. pass
  176. class DictAttribute(object):
  177. """Dict interface to attributes.
  178. `obj[k] -> obj.k`
  179. """
  180. obj = None
  181. def __init__(self, obj):
  182. object.__setattr__(self, 'obj', obj)
  183. def __getattr__(self, key):
  184. return getattr(self.obj, key)
  185. def __setattr__(self, key, value):
  186. return setattr(self.obj, key, value)
  187. def get(self, key, default=None):
  188. try:
  189. return self[key]
  190. except KeyError:
  191. return default
  192. def setdefault(self, key, default):
  193. try:
  194. return self[key]
  195. except KeyError:
  196. self[key] = default
  197. return default
  198. def __getitem__(self, key):
  199. try:
  200. return getattr(self.obj, key)
  201. except AttributeError:
  202. raise KeyError(key)
  203. def __setitem__(self, key, value):
  204. setattr(self.obj, key, value)
  205. def __contains__(self, key):
  206. return hasattr(self.obj, key)
  207. def _iterate_keys(self):
  208. return iter(dir(self.obj))
  209. iterkeys = _iterate_keys
  210. def __iter__(self):
  211. return self._iterate_keys()
  212. def _iterate_items(self):
  213. for key in self._iterate_keys():
  214. yield key, getattr(self.obj, key)
  215. iteritems = _iterate_items
  216. if sys.version_info[0] == 3: # pragma: no cover
  217. items = _iterate_items
  218. keys = _iterate_keys
  219. else:
  220. def keys(self):
  221. return list(self)
  222. def items(self):
  223. return list(self._iterate_items())
  224. class ConfigurationView(AttributeDictMixin):
  225. """A view over an applications configuration dicts.
  226. If the key does not exist in ``changes``, the ``defaults`` dicts
  227. are consulted.
  228. :param changes: Dict containing changes to the configuration.
  229. :param defaults: List of dicts containing the default configuration.
  230. """
  231. changes = None
  232. defaults = None
  233. _order = None
  234. def __init__(self, changes, defaults):
  235. self.__dict__.update(changes=changes, defaults=defaults,
  236. _order=[changes] + defaults)
  237. def add_defaults(self, d):
  238. if not isinstance(d, Mapping):
  239. d = DictAttribute(d)
  240. self.defaults.insert(0, d)
  241. self._order.insert(1, d)
  242. def __getitem__(self, key):
  243. for d in self._order:
  244. try:
  245. return d[key]
  246. except KeyError:
  247. pass
  248. raise KeyError(key)
  249. def __setitem__(self, key, value):
  250. self.changes[key] = value
  251. def first(self, *keys):
  252. return first(None, (self.get(key) for key in keys))
  253. def get(self, key, default=None):
  254. try:
  255. return self[key]
  256. except KeyError:
  257. return default
  258. def setdefault(self, key, default):
  259. try:
  260. return self[key]
  261. except KeyError:
  262. self[key] = default
  263. return default
  264. def update(self, *args, **kwargs):
  265. return self.changes.update(*args, **kwargs)
  266. def __contains__(self, key):
  267. for d in self._order:
  268. if key in d:
  269. return True
  270. return False
  271. def __repr__(self):
  272. return repr(dict(self.iteritems()))
  273. def __iter__(self):
  274. return self._iterate_keys()
  275. def __len__(self):
  276. # The logic for iterating keys includes uniq(),
  277. # so to be safe we count by explicitly iterating
  278. return len(self.keys())
  279. def _iter(self, op):
  280. # defaults must be first in the stream, so values in
  281. # changes takes precedence.
  282. return chain(*[op(d) for d in reversed(self._order)])
  283. def _iterate_keys(self):
  284. return uniq(self._iter(lambda d: d))
  285. iterkeys = _iterate_keys
  286. def _iterate_items(self):
  287. return ((key, self[key]) for key in self)
  288. iteritems = _iterate_items
  289. def _iterate_values(self):
  290. return (self[key] for key in self)
  291. itervalues = _iterate_values
  292. def keys(self):
  293. return list(self._iterate_keys())
  294. def items(self):
  295. return list(self._iterate_items())
  296. def values(self):
  297. return list(self._iterate_values())
  298. if MutableMapping:
  299. MutableMapping.register(ConfigurationView)
  300. class LimitedSet(object):
  301. """Kind-of Set with limitations.
  302. Good for when you need to test for membership (`a in set`),
  303. but the list might become to big, so you want to limit it so it doesn't
  304. consume too much resources.
  305. :keyword maxlen: Maximum number of members before we start
  306. evicting expired members.
  307. :keyword expires: Time in seconds, before a membership expires.
  308. """
  309. __slots__ = ('maxlen', 'expires', '_data', '__len__', '_heap')
  310. def __init__(self, maxlen=None, expires=None, data=None, heap=None):
  311. self.maxlen = maxlen
  312. self.expires = expires
  313. self._data = data or {}
  314. self._heap = heap or []
  315. self.__len__ = self._data.__len__
  316. def add(self, value):
  317. """Add a new member."""
  318. self.purge(1)
  319. now = time.time()
  320. self._data[value] = now
  321. heappush(self._heap, (now, value))
  322. def __reduce__(self):
  323. return self.__class__, (
  324. self.maxlen, self.expires, self._data, self._heap,
  325. )
  326. def clear(self):
  327. """Remove all members"""
  328. self._data.clear()
  329. self._heap[:] = []
  330. def pop_value(self, value):
  331. """Remove membership by finding value."""
  332. try:
  333. itime = self._data[value]
  334. except KeyError:
  335. return
  336. try:
  337. self._heap.remove((value, itime))
  338. except ValueError:
  339. pass
  340. self._data.pop(value, None)
  341. def _expire_item(self):
  342. """Hunt down and remove an expired item."""
  343. self.purge(1)
  344. def __contains__(self, value):
  345. return value in self._data
  346. def purge(self, limit=None):
  347. H, maxlen = self._heap, self.maxlen
  348. if not maxlen:
  349. return
  350. i = 0
  351. while len(self) >= maxlen:
  352. if limit and i > limit:
  353. break
  354. try:
  355. item = heappop(H)
  356. except IndexError:
  357. break
  358. if self.expires:
  359. if time.time() < item[0] + self.expires:
  360. heappush(H, item)
  361. break
  362. self._data.pop(item[1])
  363. i += 1
  364. def update(self, other, heappush=heappush):
  365. if isinstance(other, self.__class__):
  366. self._data.update(other._data)
  367. self._heap.extend(other._heap)
  368. heapify(self._heap)
  369. else:
  370. for obj in other:
  371. self.add(obj)
  372. def as_dict(self):
  373. return self._data
  374. def __iter__(self):
  375. return iter(self._data)
  376. def __repr__(self):
  377. return 'LimitedSet(%s)' % (repr(list(self._data))[:100], )
  378. @property
  379. def chronologically(self):
  380. return [value for _, value in self._heap]
  381. @property
  382. def first(self):
  383. """Get the oldest member."""
  384. return self._heap[0][1]