datastructures.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.datastructures
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Custom types and data structures.
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import sys
  9. import time
  10. from collections import defaultdict, Mapping, MutableMapping, MutableSet
  11. from heapq import heapify, heappush, heappop
  12. from itertools import chain
  13. from billiard.einfo import ExceptionInfo # noqa
  14. from kombu.utils.encoding import safe_str, bytes_to_str
  15. from kombu.utils.limits import TokenBucket # noqa
  16. from celery.five import items
  17. from celery.utils.functional import LRUCache, first, uniq # noqa
  18. from celery.utils.text import match_case
  19. try:
  20. from django.utils.functional import LazyObject, LazySettings
  21. except ImportError:
  22. class LazyObject(object): # noqa
  23. pass
  24. LazySettings = LazyObject # noqa
  25. DOT_HEAD = """
  26. {IN}{type} {id} {{
  27. {INp}graph [{attrs}]
  28. """
  29. DOT_ATTR = '{name}={value}'
  30. DOT_NODE = '{INp}"{0}" [{attrs}]'
  31. DOT_EDGE = '{INp}"{0}" {dir} "{1}" [{attrs}]'
  32. DOT_ATTRSEP = ', '
  33. DOT_DIRS = {'graph': '--', 'digraph': '->'}
  34. DOT_TAIL = '{IN}}}'
  35. __all__ = ['GraphFormatter', 'CycleError', 'DependencyGraph',
  36. 'AttributeDictMixin', 'AttributeDict', 'DictAttribute',
  37. 'ConfigurationView', 'LimitedSet']
  38. def force_mapping(m):
  39. if isinstance(m, (LazyObject, LazySettings)):
  40. m = m._wrapped
  41. return DictAttribute(m) if not isinstance(m, Mapping) else m
  42. class GraphFormatter(object):
  43. _attr = DOT_ATTR.strip()
  44. _node = DOT_NODE.strip()
  45. _edge = DOT_EDGE.strip()
  46. _head = DOT_HEAD.strip()
  47. _tail = DOT_TAIL.strip()
  48. _attrsep = DOT_ATTRSEP
  49. _dirs = dict(DOT_DIRS)
  50. scheme = {
  51. 'shape': 'box',
  52. 'arrowhead': 'vee',
  53. 'style': 'filled',
  54. 'fontname': 'HelveticaNeue',
  55. }
  56. edge_scheme = {
  57. 'color': 'darkseagreen4',
  58. 'arrowcolor': 'black',
  59. 'arrowsize': 0.7,
  60. }
  61. node_scheme = {'fillcolor': 'palegreen3', 'color': 'palegreen4'}
  62. term_scheme = {'fillcolor': 'palegreen1', 'color': 'palegreen2'}
  63. graph_scheme = {'bgcolor': 'mintcream'}
  64. def __init__(self, root=None, type=None, id=None,
  65. indent=0, inw=' ' * 4, **scheme):
  66. self.id = id or 'dependencies'
  67. self.root = root
  68. self.type = type or 'digraph'
  69. self.direction = self._dirs[self.type]
  70. self.IN = inw * (indent or 0)
  71. self.INp = self.IN + inw
  72. self.scheme = dict(self.scheme, **scheme)
  73. self.graph_scheme = dict(self.graph_scheme, root=self.label(self.root))
  74. def attr(self, name, value):
  75. value = '"{0}"'.format(value)
  76. return self.FMT(self._attr, name=name, value=value)
  77. def attrs(self, d, scheme=None):
  78. d = dict(self.scheme, **dict(scheme, **d or {}) if scheme else d)
  79. return self._attrsep.join(
  80. safe_str(self.attr(k, v)) for k, v in items(d)
  81. )
  82. def head(self, **attrs):
  83. return self.FMT(
  84. self._head, id=self.id, type=self.type,
  85. attrs=self.attrs(attrs, self.graph_scheme),
  86. )
  87. def tail(self):
  88. return self.FMT(self._tail)
  89. def label(self, obj):
  90. return obj
  91. def node(self, obj, **attrs):
  92. return self.draw_node(obj, self.node_scheme, attrs)
  93. def terminal_node(self, obj, **attrs):
  94. return self.draw_node(obj, self.term_scheme, attrs)
  95. def edge(self, a, b, **attrs):
  96. return self.draw_edge(a, b, **attrs)
  97. def _enc(self, s):
  98. return s.encode('utf-8', 'ignore')
  99. def FMT(self, fmt, *args, **kwargs):
  100. return self._enc(fmt.format(
  101. *args, **dict(kwargs, IN=self.IN, INp=self.INp)
  102. ))
  103. def draw_edge(self, a, b, scheme=None, attrs=None):
  104. return self.FMT(
  105. self._edge, self.label(a), self.label(b),
  106. dir=self.direction, attrs=self.attrs(attrs, self.edge_scheme),
  107. )
  108. def draw_node(self, obj, scheme=None, attrs=None):
  109. return self.FMT(
  110. self._node, self.label(obj), attrs=self.attrs(attrs, scheme),
  111. )
  112. class CycleError(Exception):
  113. """A cycle was detected in an acyclic graph."""
  114. class DependencyGraph(object):
  115. """A directed acyclic graph of objects and their dependencies.
  116. Supports a robust topological sort
  117. to detect the order in which they must be handled.
  118. Takes an optional iterator of ``(obj, dependencies)``
  119. tuples to build the graph from.
  120. .. warning::
  121. Does not support cycle detection.
  122. """
  123. def __init__(self, it=None, formatter=None):
  124. self.formatter = formatter or GraphFormatter()
  125. self.adjacent = {}
  126. if it is not None:
  127. self.update(it)
  128. def add_arc(self, obj):
  129. """Add an object to the graph."""
  130. self.adjacent.setdefault(obj, [])
  131. def add_edge(self, A, B):
  132. """Add an edge from object ``A`` to object ``B``
  133. (``A`` depends on ``B``)."""
  134. self[A].append(B)
  135. def connect(self, graph):
  136. """Add nodes from another graph."""
  137. self.adjacent.update(graph.adjacent)
  138. def topsort(self):
  139. """Sort the graph topologically.
  140. :returns: a list of objects in the order
  141. in which they must be handled.
  142. """
  143. graph = DependencyGraph()
  144. components = self._tarjan72()
  145. NC = {
  146. node: component for component in components for node in component
  147. }
  148. for component in components:
  149. graph.add_arc(component)
  150. for node in self:
  151. node_c = NC[node]
  152. for successor in self[node]:
  153. successor_c = NC[successor]
  154. if node_c != successor_c:
  155. graph.add_edge(node_c, successor_c)
  156. return [t[0] for t in graph._khan62()]
  157. def valency_of(self, obj):
  158. """Return the valency (degree) of a vertex in the graph."""
  159. try:
  160. l = [len(self[obj])]
  161. except KeyError:
  162. return 0
  163. for node in self[obj]:
  164. l.append(self.valency_of(node))
  165. return sum(l)
  166. def update(self, it):
  167. """Update the graph with data from a list
  168. of ``(obj, dependencies)`` tuples."""
  169. tups = list(it)
  170. for obj, _ in tups:
  171. self.add_arc(obj)
  172. for obj, deps in tups:
  173. for dep in deps:
  174. self.add_edge(obj, dep)
  175. def edges(self):
  176. """Return generator that yields for all edges in the graph."""
  177. return (obj for obj, adj in items(self) if adj)
  178. def _khan62(self):
  179. """Khans simple topological sort algorithm from '62
  180. See http://en.wikipedia.org/wiki/Topological_sorting
  181. """
  182. count = defaultdict(lambda: 0)
  183. result = []
  184. for node in self:
  185. for successor in self[node]:
  186. count[successor] += 1
  187. ready = [node for node in self if not count[node]]
  188. while ready:
  189. node = ready.pop()
  190. result.append(node)
  191. for successor in self[node]:
  192. count[successor] -= 1
  193. if count[successor] == 0:
  194. ready.append(successor)
  195. result.reverse()
  196. return result
  197. def _tarjan72(self):
  198. """Tarjan's algorithm to find strongly connected components.
  199. See http://bit.ly/vIMv3h.
  200. """
  201. result, stack, low = [], [], {}
  202. def visit(node):
  203. if node in low:
  204. return
  205. num = len(low)
  206. low[node] = num
  207. stack_pos = len(stack)
  208. stack.append(node)
  209. for successor in self[node]:
  210. visit(successor)
  211. low[node] = min(low[node], low[successor])
  212. if num == low[node]:
  213. component = tuple(stack[stack_pos:])
  214. stack[stack_pos:] = []
  215. result.append(component)
  216. for item in component:
  217. low[item] = len(self)
  218. for node in self:
  219. visit(node)
  220. return result
  221. def to_dot(self, fh, formatter=None):
  222. """Convert the graph to DOT format.
  223. :param fh: A file, or a file-like object to write the graph to.
  224. """
  225. seen = set()
  226. draw = formatter or self.formatter
  227. def P(s):
  228. print(bytes_to_str(s), file=fh)
  229. def if_not_seen(fun, obj):
  230. if draw.label(obj) not in seen:
  231. P(fun(obj))
  232. seen.add(draw.label(obj))
  233. P(draw.head())
  234. for obj, adjacent in items(self):
  235. if not adjacent:
  236. if_not_seen(draw.terminal_node, obj)
  237. for req in adjacent:
  238. if_not_seen(draw.node, obj)
  239. P(draw.edge(obj, req))
  240. P(draw.tail())
  241. def format(self, obj):
  242. return self.formatter(obj) if self.formatter else obj
  243. def __iter__(self):
  244. return iter(self.adjacent)
  245. def __getitem__(self, node):
  246. return self.adjacent[node]
  247. def __len__(self):
  248. return len(self.adjacent)
  249. def __contains__(self, obj):
  250. return obj in self.adjacent
  251. def _iterate_items(self):
  252. return items(self.adjacent)
  253. items = iteritems = _iterate_items
  254. def __repr__(self):
  255. return '\n'.join(self.repr_node(N) for N in self)
  256. def repr_node(self, obj, level=1, fmt='{0}({1})'):
  257. output = [fmt.format(obj, self.valency_of(obj))]
  258. if obj in self:
  259. for other in self[obj]:
  260. d = fmt.format(other, self.valency_of(other))
  261. output.append(' ' * level + d)
  262. output.extend(self.repr_node(other, level + 1).split('\n')[1:])
  263. return '\n'.join(output)
  264. class AttributeDictMixin(object):
  265. """Augment classes with a Mapping interface by adding attribute access.
  266. I.e. `d.key -> d[key]`.
  267. """
  268. def __getattr__(self, k):
  269. """`d.key -> d[key]`"""
  270. try:
  271. return self[k]
  272. except KeyError:
  273. raise AttributeError(
  274. '{0!r} object has no attribute {1!r}'.format(
  275. type(self).__name__, k))
  276. def __setattr__(self, key, value):
  277. """`d[key] = value -> d.key = value`"""
  278. self[key] = value
  279. class AttributeDict(dict, AttributeDictMixin):
  280. """Dict subclass with attribute access."""
  281. pass
  282. class DictAttribute(object):
  283. """Dict interface to attributes.
  284. `obj[k] -> obj.k`
  285. `obj[k] = val -> obj.k = val`
  286. """
  287. obj = None
  288. def __init__(self, obj):
  289. object.__setattr__(self, 'obj', obj)
  290. def __getattr__(self, key):
  291. return getattr(self.obj, key)
  292. def __setattr__(self, key, value):
  293. return setattr(self.obj, key, value)
  294. def get(self, key, default=None):
  295. try:
  296. return self[key]
  297. except KeyError:
  298. return default
  299. def setdefault(self, key, default):
  300. if key not in self:
  301. self[key] = default
  302. def __getitem__(self, key):
  303. try:
  304. return getattr(self.obj, key)
  305. except AttributeError:
  306. raise KeyError(key)
  307. def __setitem__(self, key, value):
  308. setattr(self.obj, key, value)
  309. def __contains__(self, key):
  310. return hasattr(self.obj, key)
  311. def _iterate_keys(self):
  312. return iter(dir(self.obj))
  313. iterkeys = _iterate_keys
  314. def __iter__(self):
  315. return self._iterate_keys()
  316. def _iterate_items(self):
  317. for key in self._iterate_keys():
  318. yield key, getattr(self.obj, key)
  319. iteritems = _iterate_items
  320. def _iterate_values(self):
  321. for key in self._iterate_keys():
  322. yield getattr(self.obj, key)
  323. itervalues = _iterate_values
  324. if sys.version_info[0] == 3: # pragma: no cover
  325. items = _iterate_items
  326. keys = _iterate_keys
  327. values = _iterate_values
  328. else:
  329. def keys(self):
  330. return list(self)
  331. def items(self):
  332. return list(self._iterate_items())
  333. def values(self):
  334. return list(self._iterate_values())
  335. MutableMapping.register(DictAttribute)
  336. class ConfigurationView(AttributeDictMixin):
  337. """A view over an applications configuration dicts.
  338. Custom (but older) version of :class:`collections.ChainMap`.
  339. If the key does not exist in ``changes``, the ``defaults`` dicts
  340. are consulted.
  341. :param changes: Dict containing changes to the configuration.
  342. :param defaults: List of dicts containing the default configuration.
  343. """
  344. key_t = None
  345. changes = None
  346. defaults = None
  347. _order = None
  348. def __init__(self, changes, defaults=None, key_t=None, prefix=None):
  349. defaults = [] if defaults is None else defaults
  350. self.__dict__.update(
  351. changes=changes,
  352. defaults=defaults,
  353. key_t=key_t,
  354. _order=[changes] + defaults,
  355. prefix=prefix.rstrip('_') + '_' if prefix else prefix,
  356. )
  357. def _to_keys(self, key):
  358. prefix = self.prefix
  359. if prefix:
  360. pkey = prefix + key if not key.startswith(prefix) else key
  361. return match_case(pkey, prefix), self._key(key)
  362. return self._key(key),
  363. def _key(self, key):
  364. return self.key_t(key) if self.key_t is not None else key
  365. def add_defaults(self, d):
  366. d = force_mapping(d)
  367. self.defaults.insert(0, d)
  368. self._order.insert(1, d)
  369. def __getitem__(self, key):
  370. keys = self._to_keys(key)
  371. for k in keys:
  372. for d in self._order:
  373. try:
  374. return d[k]
  375. except KeyError:
  376. pass
  377. if len(keys) > 1:
  378. raise KeyError(
  379. 'Key not found: {0!r} (with prefix: {0!r})'.format(*keys))
  380. raise KeyError(key)
  381. def __setitem__(self, key, value):
  382. self.changes[self._key(key)] = value
  383. def first(self, *keys):
  384. return first(None, (self.get(key) for key in keys))
  385. def get(self, key, default=None):
  386. try:
  387. return self[key]
  388. except KeyError:
  389. return default
  390. def clear(self):
  391. """Remove all changes, but keep defaults."""
  392. self.changes.clear()
  393. def setdefault(self, key, default):
  394. key = self._key(key)
  395. if key not in self:
  396. self[key] = default
  397. def update(self, *args, **kwargs):
  398. return self.changes.update(*args, **kwargs)
  399. def __contains__(self, key):
  400. keys = self._to_keys(key)
  401. return any(any(k in m for k in keys) for m in self._order)
  402. def __bool__(self):
  403. return any(self._order)
  404. __nonzero__ = __bool__ # Py2
  405. def __repr__(self):
  406. return repr(dict(items(self)))
  407. def __iter__(self):
  408. return self._iterate_keys()
  409. def __len__(self):
  410. # The logic for iterating keys includes uniq(),
  411. # so to be safe we count by explicitly iterating
  412. return len(set().union(*self._order))
  413. def _iter(self, op):
  414. # defaults must be first in the stream, so values in
  415. # changes takes precedence.
  416. return chain(*[op(d) for d in reversed(self._order)])
  417. def swap_with(self, other):
  418. changes = other.__dict__['changes']
  419. defaults = other.__dict__['defaults']
  420. self.__dict__.update(
  421. changes=changes,
  422. defaults=defaults,
  423. key_t=other.__dict__['key_t'],
  424. prefix=other.__dict__['prefix'],
  425. _order=[changes] + defaults
  426. )
  427. def _iterate_keys(self):
  428. return uniq(self._iter(lambda d: d.keys()))
  429. iterkeys = _iterate_keys
  430. def _iterate_items(self):
  431. return ((key, self[key]) for key in self)
  432. iteritems = _iterate_items
  433. def _iterate_values(self):
  434. return (self[key] for key in self)
  435. itervalues = _iterate_values
  436. if sys.version_info[0] == 3: # pragma: no cover
  437. keys = _iterate_keys
  438. items = _iterate_items
  439. values = _iterate_values
  440. else: # noqa
  441. def keys(self):
  442. return list(self._iterate_keys())
  443. def items(self):
  444. return list(self._iterate_items())
  445. def values(self):
  446. return list(self._iterate_values())
  447. MutableMapping.register(ConfigurationView)
  448. class LimitedSet(object):
  449. """Kind-of Set with limitations.
  450. Good for when you need to test for membership (`a in set`),
  451. but the set should not grow unbounded.
  452. :keyword maxlen: Maximum number of members before we start
  453. evicting expired members.
  454. :keyword expires: Time in seconds, before a membership expires.
  455. """
  456. def __init__(self, maxlen=None, expires=None, data=None, heap=None):
  457. # heap is ignored
  458. self.maxlen = maxlen
  459. self.expires = expires
  460. self._data = {} if data is None else data
  461. self._heap = []
  462. # make shortcuts
  463. self.__len__ = self._heap.__len__
  464. self.__contains__ = self._data.__contains__
  465. self._refresh_heap()
  466. def _refresh_heap(self):
  467. self._heap[:] = [(t, key) for key, t in items(self._data)]
  468. heapify(self._heap)
  469. def add(self, key, now=time.time, heappush=heappush):
  470. """Add a new member."""
  471. # offset is there to modify the length of the list,
  472. # this way we can expire an item before inserting the value,
  473. # and it will end up in the correct order.
  474. self.purge(1, offset=1)
  475. inserted = now()
  476. self._data[key] = inserted
  477. heappush(self._heap, (inserted, key))
  478. def clear(self):
  479. """Remove all members"""
  480. self._data.clear()
  481. self._heap[:] = []
  482. def discard(self, value):
  483. """Remove membership by finding value."""
  484. try:
  485. itime = self._data[value]
  486. except KeyError:
  487. return
  488. try:
  489. self._heap.remove((itime, value))
  490. except ValueError:
  491. pass
  492. self._data.pop(value, None)
  493. pop_value = discard # XXX compat
  494. def purge(self, limit=None, offset=0, now=time.time):
  495. """Purge expired items."""
  496. H, maxlen = self._heap, self.maxlen
  497. if not maxlen:
  498. return
  499. # If the data/heap gets corrupted and limit is None
  500. # this will go into an infinite loop, so limit must
  501. # have a value to guard the loop.
  502. limit = len(self) + offset if limit is None else limit
  503. i = 0
  504. while len(self) + offset > maxlen:
  505. if i >= limit:
  506. break
  507. try:
  508. item = heappop(H)
  509. except IndexError:
  510. break
  511. if self.expires:
  512. if now() < item[0] + self.expires:
  513. heappush(H, item)
  514. break
  515. try:
  516. self._data.pop(item[1])
  517. except KeyError: # out of sync with heap
  518. pass
  519. i += 1
  520. def update(self, other):
  521. if isinstance(other, LimitedSet):
  522. self._data.update(other._data)
  523. self._refresh_heap()
  524. else:
  525. for obj in other:
  526. self.add(obj)
  527. def as_dict(self):
  528. return self._data
  529. def __eq__(self, other):
  530. return self._heap == other._heap
  531. def __ne__(self, other):
  532. return not self.__eq__(other)
  533. def __repr__(self):
  534. return 'LimitedSet({0})'.format(len(self))
  535. def __iter__(self):
  536. return (item[1] for item in self._heap)
  537. def __len__(self):
  538. return len(self._heap)
  539. def __contains__(self, key):
  540. return key in self._data
  541. def __reduce__(self):
  542. return self.__class__, (self.maxlen, self.expires, self._data)
  543. MutableSet.register(LimitedSet)