datastructures.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.datastructures
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Custom types and data structures.
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import sys
  9. import time
  10. from collections import defaultdict
  11. from functools import partial
  12. from itertools import chain
  13. from operator import itemgetter
  14. from billiard.einfo import ExceptionInfo # noqa
  15. from kombu.utils.encoding import safe_str
  16. from kombu.utils.limits import TokenBucket # noqa
  17. from .five import items
  18. from .utils.functional import LRUCache, first, uniq # noqa
  19. DOT_HEAD = """
  20. {IN}{type} {id} {{
  21. {INp}graph [{attrs}]
  22. """
  23. DOT_ATTR = '{name}={value}'
  24. DOT_NODE = '{INp}"{0}" [{attrs}]'
  25. DOT_EDGE = '{INp}"{0}" {dir} "{1}" [{attrs}]'
  26. DOT_ATTRSEP = ', '
  27. DOT_DIRS = {'graph': '--', 'digraph': '->'}
  28. DOT_TAIL = '{IN}}}'
  29. class GraphFormatter(object):
  30. _attr = DOT_ATTR.strip()
  31. _node = DOT_NODE.strip()
  32. _edge = DOT_EDGE.strip()
  33. _head = DOT_HEAD.strip()
  34. _tail = DOT_TAIL.strip()
  35. _attrsep = DOT_ATTRSEP
  36. _dirs = dict(DOT_DIRS)
  37. scheme = {
  38. 'shape': 'box',
  39. 'arrowhead': 'vee',
  40. 'style': 'filled',
  41. 'fontname': 'Helvetica Neue',
  42. }
  43. edge_scheme = {
  44. 'color': 'darkseagreen4',
  45. 'arrowcolor': 'black',
  46. 'arrowsize': 0.7,
  47. }
  48. node_scheme = {'fillcolor': 'palegreen3', 'color': 'palegreen4'}
  49. term_scheme = {'fillcolor': 'palegreen1', 'color': 'palegreen2'}
  50. graph_scheme = {'bgcolor': 'mintcream'}
  51. def __init__(self, root=None, type=None, id=None,
  52. indent=0, inw=' ' * 4, **scheme):
  53. self.id = id or 'dependencies'
  54. self.root = root
  55. self.type = type or 'digraph'
  56. self.direction = self._dirs[self.type]
  57. self.IN = inw * (indent or 0)
  58. self.INp = self.IN + inw
  59. self.scheme = dict(self.scheme, **scheme)
  60. self.graph_scheme = dict(self.graph_scheme, root=self.label(self.root))
  61. def attr(self, name, value):
  62. value = '"{0}"'.format(value)
  63. return self.FMT(self._attr, name=name, value=value)
  64. def attrs(self, d, scheme=None):
  65. d = dict(self.scheme, **dict(scheme, **d or {}) if scheme else d)
  66. return self._attrsep.join(
  67. safe_str(self.attr(k, v)) for k, v in items(d)
  68. )
  69. def head(self, **attrs):
  70. return self.FMT(self._head, id=self.id, type=self.type,
  71. attrs=self.attrs(attrs, self.graph_scheme),
  72. )
  73. def tail(self):
  74. return self.FMT(self._tail)
  75. def label(self, obj):
  76. return obj
  77. def node(self, obj, **attrs):
  78. return self.draw_node(obj, self.node_scheme, attrs)
  79. def terminal_node(self, obj, **attrs):
  80. return self.draw_node(obj, self.term_scheme, attrs)
  81. def edge(self, a, b, **attrs):
  82. return self.draw_edge(a, b, **attrs)
  83. def _enc(self, s):
  84. return s.encode('utf-8', 'ignore')
  85. def FMT(self, fmt, *args, **kwargs):
  86. return self._enc(fmt.format(
  87. *args, **dict(kwargs, IN=self.IN, INp=self.INp)
  88. ))
  89. def draw_edge(self, a, b, scheme=None, attrs=None):
  90. return self.FMT(self._edge, self.label(a), self.label(b),
  91. dir=self.direction, attrs=self.attrs(attrs, self.edge_scheme),
  92. )
  93. def draw_node(self, obj, scheme=None, attrs=None):
  94. return self.FMT(self._node, self.label(obj),
  95. attrs=self.attrs(attrs, scheme),
  96. )
  97. class CycleError(Exception):
  98. """A cycle was detected in an acyclic graph."""
  99. class DependencyGraph(object):
  100. """A directed acyclic graph of objects and their dependencies.
  101. Supports a robust topological sort
  102. to detect the order in which they must be handled.
  103. Takes an optional iterator of ``(obj, dependencies)``
  104. tuples to build the graph from.
  105. .. warning::
  106. Does not support cycle detection.
  107. """
  108. def __init__(self, it=None, formatter=None):
  109. self.formatter = formatter or GraphFormatter()
  110. self.adjacent = {}
  111. if it is not None:
  112. self.update(it)
  113. def add_arc(self, obj):
  114. """Add an object to the graph."""
  115. self.adjacent.setdefault(obj, [])
  116. def add_edge(self, A, B):
  117. """Add an edge from object ``A`` to object ``B``
  118. (``A`` depends on ``B``)."""
  119. self[A].append(B)
  120. def find_last(self, g):
  121. for obj in g.adjacent:
  122. if obj.last:
  123. return obj
  124. def connect(self, graph):
  125. """Add nodes from another graph."""
  126. self.adjacent.update(graph.adjacent)
  127. def topsort(self):
  128. """Sort the graph topologically.
  129. :returns: a list of objects in the order
  130. in which they must be handled.
  131. """
  132. graph = DependencyGraph()
  133. components = self._tarjan72()
  134. NC = dict((node, component)
  135. for component in components
  136. for node in component)
  137. for component in components:
  138. graph.add_arc(component)
  139. for node in self:
  140. node_c = NC[node]
  141. for successor in self[node]:
  142. successor_c = NC[successor]
  143. if node_c != successor_c:
  144. graph.add_edge(node_c, successor_c)
  145. return [t[0] for t in graph._khan62()]
  146. def valency_of(self, obj):
  147. """Returns the velency (degree) of a vertex in the graph."""
  148. try:
  149. l = [len(self[obj])]
  150. except KeyError:
  151. return 0
  152. for node in self[obj]:
  153. l.append(self.valency_of(node))
  154. return sum(l)
  155. def update(self, it):
  156. """Update the graph with data from a list
  157. of ``(obj, dependencies)`` tuples."""
  158. tups = list(it)
  159. for obj, _ in tups:
  160. self.add_arc(obj)
  161. for obj, deps in tups:
  162. for dep in deps:
  163. self.add_edge(obj, dep)
  164. def edges(self):
  165. """Returns generator that yields for all edges in the graph."""
  166. return (obj for obj, adj in items(self) if adj)
  167. def _khan62(self):
  168. """Khans simple topological sort algorithm from '62
  169. See http://en.wikipedia.org/wiki/Topological_sorting
  170. """
  171. count = defaultdict(lambda: 0)
  172. result = []
  173. for node in self:
  174. for successor in self[node]:
  175. count[successor] += 1
  176. ready = [node for node in self if not count[node]]
  177. while ready:
  178. node = ready.pop()
  179. result.append(node)
  180. for successor in self[node]:
  181. count[successor] -= 1
  182. if count[successor] == 0:
  183. ready.append(successor)
  184. result.reverse()
  185. return result
  186. def _tarjan72(self):
  187. """Tarjan's algorithm to find strongly connected components.
  188. See http://bit.ly/vIMv3h.
  189. """
  190. result, stack, low = [], [], {}
  191. def visit(node):
  192. if node in low:
  193. return
  194. num = len(low)
  195. low[node] = num
  196. stack_pos = len(stack)
  197. stack.append(node)
  198. for successor in self[node]:
  199. visit(successor)
  200. low[node] = min(low[node], low[successor])
  201. if num == low[node]:
  202. component = tuple(stack[stack_pos:])
  203. stack[stack_pos:] = []
  204. result.append(component)
  205. for item in component:
  206. low[item] = len(self)
  207. for node in self:
  208. visit(node)
  209. return result
  210. def to_dot(self, fh, formatter=None):
  211. """Convert the graph to DOT format.
  212. :param fh: A file, or a file-like object to write the graph to.
  213. """
  214. seen = set()
  215. draw = formatter or self.formatter
  216. P = partial(print, file=fh)
  217. def if_not_seen(fun, obj):
  218. if draw.label(obj) not in seen:
  219. P(fun(obj))
  220. seen.add(draw.label(obj))
  221. P(draw.head())
  222. for obj, adjacent in items(self):
  223. if not adjacent:
  224. if_not_seen(draw.terminal_node, obj)
  225. for req in adjacent:
  226. if_not_seen(draw.node, obj)
  227. P(draw.edge(obj, req))
  228. P(draw.tail())
  229. def format(self, obj):
  230. return self.formatter(obj) if self.formatter else obj
  231. def __iter__(self):
  232. return iter(self.adjacent)
  233. def __getitem__(self, node):
  234. return self.adjacent[node]
  235. def __len__(self):
  236. return len(self.adjacent)
  237. def __contains__(self, obj):
  238. return obj in self.adjacent
  239. def _iterate_items(self):
  240. return items(self.adjacent)
  241. items = iteritems = _iterate_items
  242. def __repr__(self):
  243. return '\n'.join(self.repr_node(N) for N in self)
  244. def repr_node(self, obj, level=1, fmt='{0}({1})'):
  245. output = [fmt.format(obj, self.valency_of(obj))]
  246. if obj in self:
  247. for other in self[obj]:
  248. d = fmt.format(other, self.valency_of(other))
  249. output.append(' ' * level + d)
  250. output.extend(self.repr_node(other, level + 1).split('\n')[1:])
  251. return '\n'.join(output)
  252. class AttributeDictMixin(object):
  253. """Adds attribute access to mappings.
  254. `d.key -> d[key]`
  255. """
  256. def __getattr__(self, k):
  257. """`d.key -> d[key]`"""
  258. try:
  259. return self[k]
  260. except KeyError:
  261. raise AttributeError(
  262. '{0!r} object has no attribute {1!r}'.format(
  263. type(self).__name__, k))
  264. def __setattr__(self, key, value):
  265. """`d[key] = value -> d.key = value`"""
  266. self[key] = value
  267. class AttributeDict(dict, AttributeDictMixin):
  268. """Dict subclass with attribute access."""
  269. pass
  270. class DictAttribute(object):
  271. """Dict interface to attributes.
  272. `obj[k] -> obj.k`
  273. """
  274. obj = None
  275. def __init__(self, obj):
  276. object.__setattr__(self, 'obj', obj)
  277. def __getattr__(self, key):
  278. return getattr(self.obj, key)
  279. def __setattr__(self, key, value):
  280. return setattr(self.obj, key, value)
  281. def get(self, key, default=None):
  282. try:
  283. return self[key]
  284. except KeyError:
  285. return default
  286. def setdefault(self, key, default):
  287. try:
  288. return self[key]
  289. except KeyError:
  290. self[key] = default
  291. return default
  292. def __getitem__(self, key):
  293. try:
  294. return getattr(self.obj, key)
  295. except AttributeError:
  296. raise KeyError(key)
  297. def __setitem__(self, key, value):
  298. setattr(self.obj, key, value)
  299. def __contains__(self, key):
  300. return hasattr(self.obj, key)
  301. def _iterate_keys(self):
  302. return iter(dir(self.obj))
  303. iterkeys = _iterate_keys
  304. def __iter__(self):
  305. return self._iterate_keys()
  306. def _iterate_items(self):
  307. for key in self._iterate_keys():
  308. yield key, getattr(self.obj, key)
  309. iteritems = _iterate_items
  310. if sys.version_info[0] == 3: # pragma: no cover
  311. items = _iterate_items
  312. keys = _iterate_keys
  313. else:
  314. def keys(self):
  315. return list(self)
  316. def items(self):
  317. return list(self._iterate_items())
  318. class ConfigurationView(AttributeDictMixin):
  319. """A view over an applications configuration dicts.
  320. If the key does not exist in ``changes``, the ``defaults`` dict
  321. is consulted.
  322. :param changes: Dict containing changes to the configuration.
  323. :param defaults: Dict containing the default configuration.
  324. """
  325. changes = None
  326. defaults = None
  327. _order = None
  328. def __init__(self, changes, defaults):
  329. self.__dict__.update(changes=changes, defaults=defaults,
  330. _order=[changes] + defaults)
  331. def add_defaults(self, d):
  332. self.defaults.insert(0, d)
  333. self._order.insert(1, d)
  334. def __getitem__(self, key):
  335. for d in self._order:
  336. try:
  337. return d[key]
  338. except KeyError:
  339. pass
  340. raise KeyError(key)
  341. def __setitem__(self, key, value):
  342. self.changes[key] = value
  343. def first(self, *keys):
  344. return first(None, (self.get(key) for key in keys))
  345. def get(self, key, default=None):
  346. try:
  347. return self[key]
  348. except KeyError:
  349. return default
  350. def setdefault(self, key, default):
  351. try:
  352. return self[key]
  353. except KeyError:
  354. self[key] = default
  355. return default
  356. def update(self, *args, **kwargs):
  357. return self.changes.update(*args, **kwargs)
  358. def __contains__(self, key):
  359. for d in self._order:
  360. if key in d:
  361. return True
  362. return False
  363. def __repr__(self):
  364. return repr(dict(items(self)))
  365. def __iter__(self):
  366. return self._iterate_keys()
  367. def _iter(self, op):
  368. # defaults must be first in the stream, so values in
  369. # changes takes precedence.
  370. return chain(*[op(d) for d in reversed(self._order)])
  371. def _iterate_keys(self):
  372. return uniq(self._iter(lambda d: d))
  373. iterkeys = _iterate_keys
  374. def _iterate_items(self):
  375. return ((key, self[key]) for key in self)
  376. iteritems = _iterate_items
  377. def _iterate_values(self):
  378. return (self[key] for key in self)
  379. itervalues = _iterate_values
  380. def keys(self):
  381. return list(self._iterate_keys())
  382. def items(self):
  383. return list(self._iterate_items())
  384. def values(self):
  385. return list(self._iterate_values())
  386. class LimitedSet(object):
  387. """Kind-of Set with limitations.
  388. Good for when you need to test for membership (`a in set`),
  389. but the list might become to big, so you want to limit it so it doesn't
  390. consume too much resources.
  391. :keyword maxlen: Maximum number of members before we start
  392. evicting expired members.
  393. :keyword expires: Time in seconds, before a membership expires.
  394. """
  395. __slots__ = ('maxlen', 'expires', '_data', '__len__')
  396. def __init__(self, maxlen=None, expires=None, data=None):
  397. self.maxlen = maxlen
  398. self.expires = expires
  399. self._data = {} if data is None else data
  400. self.__len__ = self._data.__len__
  401. def add(self, value):
  402. """Add a new member."""
  403. self._expire_item()
  404. self._data[value] = time.time()
  405. def clear(self):
  406. """Remove all members"""
  407. self._data.clear()
  408. def pop_value(self, value):
  409. """Remove membership by finding value."""
  410. self._data.pop(value, None)
  411. def _expire_item(self):
  412. """Hunt down and remove an expired item."""
  413. while 1:
  414. if self.maxlen and len(self) >= self.maxlen:
  415. value, when = self.first
  416. if not self.expires or time.time() > when + self.expires:
  417. try:
  418. self.pop_value(value)
  419. except TypeError: # pragma: no cover
  420. continue
  421. break
  422. def __contains__(self, value):
  423. return value in self._data
  424. def update(self, other):
  425. if isinstance(other, dict):
  426. self._data.update(other)
  427. elif isinstance(other, self.__class__):
  428. self._data.update(other._data)
  429. else:
  430. for obj in other:
  431. self.add(obj)
  432. def as_dict(self):
  433. return self._data
  434. def __iter__(self):
  435. return iter(self._data)
  436. def __repr__(self):
  437. return 'LimitedSet({0!r})'.format(list(self._data))
  438. @property
  439. def chronologically(self):
  440. return sorted(self._data.items(), key=itemgetter(1))
  441. @property
  442. def first(self):
  443. """Get the oldest member."""
  444. return self.chronologically[0]