datastructures.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.datastructures
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Custom types and data structures.
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import sys
  9. import time
  10. from collections import defaultdict
  11. from functools import partial
  12. from itertools import chain
  13. from billiard.einfo import ExceptionInfo # noqa
  14. from kombu.utils.encoding import safe_str
  15. from kombu.utils.limits import TokenBucket # noqa
  16. from .utils.functional import LRUCache, first, uniq # noqa
  17. DOT_HEAD = """
  18. {IN}{type} {id} {{
  19. {INp}graph [{attrs}]
  20. """
  21. DOT_ATTR = '{name}={value}'
  22. DOT_NODE = '{INp}"{0}" [{attrs}]'
  23. DOT_EDGE = '{INp}"{0}" {dir} "{1}" [{attrs}]'
  24. DOT_ATTRSEP = ', '
  25. DOT_DIRS = {'graph': '--', 'digraph': '->'}
  26. DOT_TAIL = '{IN}}}'
  27. class GraphFormatter(object):
  28. _attr = DOT_ATTR.strip()
  29. _node = DOT_NODE.strip()
  30. _edge = DOT_EDGE.strip()
  31. _head = DOT_HEAD.strip()
  32. _tail = DOT_TAIL.strip()
  33. _attrsep = DOT_ATTRSEP
  34. _dirs = dict(DOT_DIRS)
  35. scheme = {
  36. 'shape': 'box',
  37. 'arrowhead': 'vee',
  38. 'style': 'filled',
  39. 'fontname': 'Helvetica Neue',
  40. }
  41. edge_scheme = {
  42. 'color': 'darkseagreen4',
  43. 'arrowcolor': 'black',
  44. 'arrowsize': 0.7,
  45. }
  46. node_scheme = {'fillcolor': 'palegreen3', 'color': 'palegreen4'}
  47. term_scheme = {'fillcolor': 'palegreen1', 'color': 'palegreen2'}
  48. graph_scheme = {'bgcolor': 'mintcream'}
  49. def __init__(self, root=None, type=None, id=None,
  50. indent=0, inw=' ' * 4, **scheme):
  51. self.id = id or 'dependencies'
  52. self.root = root
  53. self.type = type or 'digraph'
  54. self.direction = self._dirs[self.type]
  55. self.IN = inw * (indent or 0)
  56. self.INp = self.IN + inw
  57. self.scheme = dict(self.scheme, **scheme)
  58. self.graph_scheme = dict(self.graph_scheme, root=self.label(self.root))
  59. def attr(self, name, value):
  60. value = '"{0}"'.format(value)
  61. return self.FMT(self._attr, name=name, value=value)
  62. def attrs(self, d, scheme=None):
  63. d = dict(self.scheme, **dict(scheme, **d or {}) if scheme else d)
  64. return self._attrsep.join(
  65. safe_str(self.attr(k, v)) for k, v in d.iteritems()
  66. )
  67. def head(self, **attrs):
  68. return self.FMT(self._head, id=self.id, type=self.type,
  69. attrs=self.attrs(attrs, self.graph_scheme),
  70. )
  71. def tail(self):
  72. return self.FMT(self._tail)
  73. def label(self, obj):
  74. return obj
  75. def node(self, obj, **attrs):
  76. return self.draw_node(obj, self.node_scheme, attrs)
  77. def terminal_node(self, obj, **attrs):
  78. return self.draw_node(obj, self.term_scheme, attrs)
  79. def edge(self, a, b, **attrs):
  80. return self.draw_edge(a, b, **attrs)
  81. def _enc(self, s):
  82. return s.encode('utf-8', 'ignore')
  83. def FMT(self, fmt, *args, **kwargs):
  84. return self._enc(fmt.format(
  85. *args, **dict(kwargs, IN=self.IN, INp=self.INp)
  86. ))
  87. def draw_edge(self, a, b, scheme=None, attrs=None):
  88. return self.FMT(self._edge, self.label(a), self.label(b),
  89. dir=self.direction, attrs=self.attrs(attrs, self.edge_scheme),
  90. )
  91. def draw_node(self, obj, scheme=None, attrs=None):
  92. return self.FMT(self._node, self.label(obj),
  93. attrs=self.attrs(attrs, scheme),
  94. )
  95. class CycleError(Exception):
  96. """A cycle was detected in an acyclic graph."""
  97. class DependencyGraph(object):
  98. """A directed acyclic graph of objects and their dependencies.
  99. Supports a robust topological sort
  100. to detect the order in which they must be handled.
  101. Takes an optional iterator of ``(obj, dependencies)``
  102. tuples to build the graph from.
  103. .. warning::
  104. Does not support cycle detection.
  105. """
  106. def __init__(self, it=None, formatter=None):
  107. self.formatter = formatter or GraphFormatter()
  108. self.adjacent = {}
  109. if it is not None:
  110. self.update(it)
  111. def add_arc(self, obj):
  112. """Add an object to the graph."""
  113. self.adjacent.setdefault(obj, [])
  114. def add_edge(self, A, B):
  115. """Add an edge from object ``A`` to object ``B``
  116. (``A`` depends on ``B``)."""
  117. self[A].append(B)
  118. def find_last(self, g):
  119. for obj in g.adjacent.keys():
  120. if obj.last:
  121. return obj
  122. def connect(self, graph):
  123. """Add nodes from another graph."""
  124. self.adjacent.update(graph.adjacent)
  125. def topsort(self):
  126. """Sort the graph topologically.
  127. :returns: a list of objects in the order
  128. in which they must be handled.
  129. """
  130. graph = DependencyGraph()
  131. components = self._tarjan72()
  132. NC = dict((node, component)
  133. for component in components
  134. for node in component)
  135. for component in components:
  136. graph.add_arc(component)
  137. for node in self:
  138. node_c = NC[node]
  139. for successor in self[node]:
  140. successor_c = NC[successor]
  141. if node_c != successor_c:
  142. graph.add_edge(node_c, successor_c)
  143. return [t[0] for t in graph._khan62()]
  144. def valency_of(self, obj):
  145. """Returns the velency (degree) of a vertex in the graph."""
  146. try:
  147. l = [len(self[obj])]
  148. except KeyError:
  149. return 0
  150. for node in self[obj]:
  151. l.append(self.valency_of(node))
  152. return sum(l)
  153. def update(self, it):
  154. """Update the graph with data from a list
  155. of ``(obj, dependencies)`` tuples."""
  156. tups = list(it)
  157. for obj, _ in tups:
  158. self.add_arc(obj)
  159. for obj, deps in tups:
  160. for dep in deps:
  161. self.add_edge(obj, dep)
  162. def edges(self):
  163. """Returns generator that yields for all edges in the graph."""
  164. return (obj for obj, adj in self.iteritems() if adj)
  165. def _khan62(self):
  166. """Khans simple topological sort algorithm from '62
  167. See http://en.wikipedia.org/wiki/Topological_sorting
  168. """
  169. count = defaultdict(lambda: 0)
  170. result = []
  171. for node in self:
  172. for successor in self[node]:
  173. count[successor] += 1
  174. ready = [node for node in self if not count[node]]
  175. while ready:
  176. node = ready.pop()
  177. result.append(node)
  178. for successor in self[node]:
  179. count[successor] -= 1
  180. if count[successor] == 0:
  181. ready.append(successor)
  182. result.reverse()
  183. return result
  184. def _tarjan72(self):
  185. """Tarjan's algorithm to find strongly connected components.
  186. See http://bit.ly/vIMv3h.
  187. """
  188. result, stack, low = [], [], {}
  189. def visit(node):
  190. if node in low:
  191. return
  192. num = len(low)
  193. low[node] = num
  194. stack_pos = len(stack)
  195. stack.append(node)
  196. for successor in self[node]:
  197. visit(successor)
  198. low[node] = min(low[node], low[successor])
  199. if num == low[node]:
  200. component = tuple(stack[stack_pos:])
  201. stack[stack_pos:] = []
  202. result.append(component)
  203. for item in component:
  204. low[item] = len(self)
  205. for node in self:
  206. visit(node)
  207. return result
  208. def to_dot(self, fh, formatter=None):
  209. """Convert the graph to DOT format.
  210. :param fh: A file, or a file-like object to write the graph to.
  211. """
  212. seen = set()
  213. draw = formatter or self.formatter
  214. P = partial(print, file=fh)
  215. def if_not_seen(fun, obj):
  216. if draw.label(obj) not in seen:
  217. P(fun(obj))
  218. seen.add(draw.label(obj))
  219. P(draw.head())
  220. for obj, adjacent in self.iteritems():
  221. if not adjacent:
  222. if_not_seen(draw.terminal_node, obj)
  223. for req in adjacent:
  224. if_not_seen(draw.node, obj)
  225. P(draw.edge(obj, req))
  226. P(draw.tail())
  227. def format(self, obj):
  228. return self.formatter(obj) if self.formatter else obj
  229. def __iter__(self):
  230. return iter(self.adjacent)
  231. def __getitem__(self, node):
  232. return self.adjacent[node]
  233. def __len__(self):
  234. return len(self.adjacent)
  235. def __contains__(self, obj):
  236. return obj in self.adjacent
  237. def _iterate_items(self):
  238. return self.adjacent.iteritems()
  239. items = iteritems = _iterate_items
  240. def __repr__(self):
  241. return '\n'.join(self.repr_node(N) for N in self)
  242. def repr_node(self, obj, level=1, fmt='{0}({1})'):
  243. output = [fmt.format(obj, self.valency_of(obj))]
  244. if obj in self:
  245. for other in self[obj]:
  246. d = fmt.format(other, self.valency_of(other))
  247. output.append(' ' * level + d)
  248. output.extend(self.repr_node(other, level + 1).split('\n')[1:])
  249. return '\n'.join(output)
  250. class AttributeDictMixin(object):
  251. """Adds attribute access to mappings.
  252. `d.key -> d[key]`
  253. """
  254. def __getattr__(self, k):
  255. """`d.key -> d[key]`"""
  256. try:
  257. return self[k]
  258. except KeyError:
  259. raise AttributeError(
  260. '{0!r} object has no attribute {1!r}'.format(
  261. type(self).__name__, k))
  262. def __setattr__(self, key, value):
  263. """`d[key] = value -> d.key = value`"""
  264. self[key] = value
  265. class AttributeDict(dict, AttributeDictMixin):
  266. """Dict subclass with attribute access."""
  267. pass
  268. class DictAttribute(object):
  269. """Dict interface to attributes.
  270. `obj[k] -> obj.k`
  271. """
  272. obj = None
  273. def __init__(self, obj):
  274. object.__setattr__(self, 'obj', obj)
  275. def __getattr__(self, key):
  276. return getattr(self.obj, key)
  277. def __setattr__(self, key, value):
  278. return setattr(self.obj, key, value)
  279. def get(self, key, default=None):
  280. try:
  281. return self[key]
  282. except KeyError:
  283. return default
  284. def setdefault(self, key, default):
  285. try:
  286. return self[key]
  287. except KeyError:
  288. self[key] = default
  289. return default
  290. def __getitem__(self, key):
  291. try:
  292. return getattr(self.obj, key)
  293. except AttributeError:
  294. raise KeyError(key)
  295. def __setitem__(self, key, value):
  296. setattr(self.obj, key, value)
  297. def __contains__(self, key):
  298. return hasattr(self.obj, key)
  299. def _iterate_keys(self):
  300. return iter(dir(self.obj))
  301. iterkeys = _iterate_keys
  302. def __iter__(self):
  303. return self._iterate_keys()
  304. def _iterate_items(self):
  305. for key in self._iterate_keys():
  306. yield key, getattr(self.obj, key)
  307. iteritems = _iterate_items
  308. if sys.version_info[0] == 3: # pragma: no cover
  309. items = _iterate_items
  310. keys = _iterate_keys
  311. else:
  312. def keys(self):
  313. return list(self)
  314. def items(self):
  315. return list(self._iterate_items())
  316. class ConfigurationView(AttributeDictMixin):
  317. """A view over an applications configuration dicts.
  318. If the key does not exist in ``changes``, the ``defaults`` dict
  319. is consulted.
  320. :param changes: Dict containing changes to the configuration.
  321. :param defaults: Dict containing the default configuration.
  322. """
  323. changes = None
  324. defaults = None
  325. _order = None
  326. def __init__(self, changes, defaults):
  327. self.__dict__.update(changes=changes, defaults=defaults,
  328. _order=[changes] + defaults)
  329. def add_defaults(self, d):
  330. self.defaults.insert(0, d)
  331. self._order.insert(1, d)
  332. def __getitem__(self, key):
  333. for d in self._order:
  334. try:
  335. return d[key]
  336. except KeyError:
  337. pass
  338. raise KeyError(key)
  339. def __setitem__(self, key, value):
  340. self.changes[key] = value
  341. def first(self, *keys):
  342. return first(None, (self.get(key) for key in keys))
  343. def get(self, key, default=None):
  344. try:
  345. return self[key]
  346. except KeyError:
  347. return default
  348. def setdefault(self, key, default):
  349. try:
  350. return self[key]
  351. except KeyError:
  352. self[key] = default
  353. return default
  354. def update(self, *args, **kwargs):
  355. return self.changes.update(*args, **kwargs)
  356. def __contains__(self, key):
  357. for d in self._order:
  358. if key in d:
  359. return True
  360. return False
  361. def __repr__(self):
  362. return repr(dict(self.iteritems()))
  363. def __iter__(self):
  364. return self._iterate_keys()
  365. def _iter(self, op):
  366. # defaults must be first in the stream, so values in
  367. # changes takes precedence.
  368. return chain(*[op(d) for d in reversed(self._order)])
  369. def _iterate_keys(self):
  370. return uniq(self._iter(lambda d: d))
  371. iterkeys = _iterate_keys
  372. def _iterate_items(self):
  373. return ((key, self[key]) for key in self)
  374. iteritems = _iterate_items
  375. def _iterate_values(self):
  376. return (self[key] for key in self)
  377. itervalues = _iterate_values
  378. def keys(self):
  379. return list(self._iterate_keys())
  380. def items(self):
  381. return list(self._iterate_items())
  382. def values(self):
  383. return list(self._iterate_values())
  384. class LimitedSet(object):
  385. """Kind-of Set with limitations.
  386. Good for when you need to test for membership (`a in set`),
  387. but the list might become to big, so you want to limit it so it doesn't
  388. consume too much resources.
  389. :keyword maxlen: Maximum number of members before we start
  390. evicting expired members.
  391. :keyword expires: Time in seconds, before a membership expires.
  392. """
  393. __slots__ = ('maxlen', 'expires', '_data', '__len__')
  394. def __init__(self, maxlen=None, expires=None):
  395. self.maxlen = maxlen
  396. self.expires = expires
  397. self._data = {}
  398. self.__len__ = self._data.__len__
  399. def add(self, value):
  400. """Add a new member."""
  401. self._expire_item()
  402. self._data[value] = time.time()
  403. def clear(self):
  404. """Remove all members"""
  405. self._data.clear()
  406. def pop_value(self, value):
  407. """Remove membership by finding value."""
  408. self._data.pop(value, None)
  409. def _expire_item(self):
  410. """Hunt down and remove an expired item."""
  411. while 1:
  412. if self.maxlen and len(self) >= self.maxlen:
  413. value, when = self.first
  414. if not self.expires or time.time() > when + self.expires:
  415. try:
  416. self.pop_value(value)
  417. except TypeError: # pragma: no cover
  418. continue
  419. break
  420. def __contains__(self, value):
  421. return value in self._data
  422. def update(self, other):
  423. if isinstance(other, self.__class__):
  424. self._data.update(other._data)
  425. else:
  426. for obj in other:
  427. self.add(obj)
  428. def as_dict(self):
  429. return self._data
  430. def __iter__(self):
  431. return iter(self._data)
  432. def __repr__(self):
  433. return 'LimitedSet({0!r})'.format(list(self._data))
  434. @property
  435. def chronologically(self):
  436. return sorted(self._data.items(), key=lambda (value, when): when)
  437. @property
  438. def first(self):
  439. """Get the oldest member."""
  440. return self.chronologically[0]