12 KB

  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.datastructures
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Custom types and data structures.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import sys
  10. import time
  11. from collections import defaultdict
  12. from itertools import chain
  13. from billiard.einfo import ExceptionInfo # noqa
  14. from kombu.utils.limits import TokenBucket # noqa
  15. from .utils.functional import LRUCache, first, uniq # noqa
  16. class CycleError(Exception):
  17. """A cycle was detected in an acyclic graph."""
  18. class DependencyGraph(object):
  19. """A directed acyclic graph of objects and their dependencies.
  20. Supports a robust topological sort
  21. to detect the order in which they must be handled.
  22. Takes an optional iterator of ``(obj, dependencies)``
  23. tuples to build the graph from.
  24. .. warning::
  25. Does not support cycle detection.
  26. """
  27. def __init__(self, it=None):
  28. self.adjacent = {}
  29. if it is not None:
  30. self.update(it)
  31. def add_arc(self, obj):
  32. """Add an object to the graph."""
  33. self.adjacent.setdefault(obj, [])
  34. def add_edge(self, A, B):
  35. """Add an edge from object ``A`` to object ``B``
  36. (``A`` depends on ``B``)."""
  37. self[A].append(B)
  38. def topsort(self):
  39. """Sort the graph topologically.
  40. :returns: a list of objects in the order
  41. in which they must be handled.
  42. """
  43. graph = DependencyGraph()
  44. components = self._tarjan72()
  45. NC = dict((node, component)
  46. for component in components
  47. for node in component)
  48. for component in components:
  49. graph.add_arc(component)
  50. for node in self:
  51. node_c = NC[node]
  52. for successor in self[node]:
  53. successor_c = NC[successor]
  54. if node_c != successor_c:
  55. graph.add_edge(node_c, successor_c)
  56. return [t[0] for t in graph._khan62()]
  57. def valency_of(self, obj):
  58. """Returns the velency (degree) of a vertex in the graph."""
  59. try:
  60. l = [len(self[obj])]
  61. except KeyError:
  62. return 0
  63. for node in self[obj]:
  64. l.append(self.valency_of(node))
  65. return sum(l)
  66. def update(self, it):
  67. """Update the graph with data from a list
  68. of ``(obj, dependencies)`` tuples."""
  69. tups = list(it)
  70. for obj, _ in tups:
  71. self.add_arc(obj)
  72. for obj, deps in tups:
  73. for dep in deps:
  74. self.add_edge(obj, dep)
  75. def edges(self):
  76. """Returns generator that yields for all edges in the graph."""
  77. return (obj for obj, adj in self.iteritems() if adj)
  78. def _khan62(self):
  79. """Khans simple topological sort algorithm from '62
  80. See
  81. """
  82. count = defaultdict(lambda: 0)
  83. result = []
  84. for node in self:
  85. for successor in self[node]:
  86. count[successor] += 1
  87. ready = [node for node in self if not count[node]]
  88. while ready:
  89. node = ready.pop()
  90. result.append(node)
  91. for successor in self[node]:
  92. count[successor] -= 1
  93. if count[successor] == 0:
  94. ready.append(successor)
  95. result.reverse()
  96. return result
  97. def _tarjan72(self):
  98. """Tarjan's algorithm to find strongly connected components.
  99. See
  100. """
  101. result, stack, low = [], [], {}
  102. def visit(node):
  103. if node in low:
  104. return
  105. num = len(low)
  106. low[node] = num
  107. stack_pos = len(stack)
  108. stack.append(node)
  109. for successor in self[node]:
  110. visit(successor)
  111. low[node] = min(low[node], low[successor])
  112. if num == low[node]:
  113. component = tuple(stack[stack_pos:])
  114. stack[stack_pos:] = []
  115. result.append(component)
  116. for item in component:
  117. low[item] = len(self)
  118. for node in self:
  119. visit(node)
  120. return result
  121. def to_dot(self, fh, ws=' ' * 4):
  122. """Convert the graph to DOT format.
  123. :param fh: A file, or a file-like object to write the graph to.
  124. """
  125. fh.write('digraph dependencies {\n')
  126. for obj, adjacent in self.iteritems():
  127. if not adjacent:
  128. fh.write(ws + '"%s"\n' % (obj, ))
  129. for req in adjacent:
  130. fh.write(ws + '"%s" -> "%s"\n' % (obj, req))
  131. fh.write('}\n')
  132. def __iter__(self):
  133. return iter(self.adjacent)
  134. def __getitem__(self, node):
  135. return self.adjacent[node]
  136. def __len__(self):
  137. return len(self.adjacent)
  138. def __contains__(self, obj):
  139. return obj in self.adjacent
  140. def _iterate_items(self):
  141. return self.adjacent.iteritems()
  142. items = iteritems = _iterate_items
  143. def __repr__(self):
  144. return '\n'.join(self.repr_node(N) for N in self)
  145. def repr_node(self, obj, level=1):
  146. output = ['%s(%s)' % (obj, self.valency_of(obj))]
  147. if obj in self:
  148. for other in self[obj]:
  149. d = '%s(%s)' % (other, self.valency_of(other))
  150. output.append(' ' * level + d)
  151. output.extend(self.repr_node(other, level + 1).split('\n')[1:])
  152. return '\n'.join(output)
  153. class AttributeDictMixin(object):
  154. """Adds attribute access to mappings.
  155. `d.key -> d[key]`
  156. """
  157. def __getattr__(self, k):
  158. """`d.key -> d[key]`"""
  159. try:
  160. return self[k]
  161. except KeyError:
  162. raise AttributeError(
  163. "'%s' object has no attribute '%s'" % (type(self).__name__, k))
  164. def __setattr__(self, key, value):
  165. """`d[key] = value -> d.key = value`"""
  166. self[key] = value
  167. class AttributeDict(dict, AttributeDictMixin):
  168. """Dict subclass with attribute access."""
  169. pass
  170. class DictAttribute(object):
  171. """Dict interface to attributes.
  172. `obj[k] -> obj.k`
  173. """
  174. obj = None
  175. def __init__(self, obj):
  176. object.__setattr__(self, 'obj', obj)
  177. def __getattr__(self, key):
  178. return getattr(self.obj, key)
  179. def __setattr__(self, key, value):
  180. return setattr(self.obj, key, value)
  181. def get(self, key, default=None):
  182. try:
  183. return self[key]
  184. except KeyError:
  185. return default
  186. def setdefault(self, key, default):
  187. try:
  188. return self[key]
  189. except KeyError:
  190. self[key] = default
  191. return default
  192. def __getitem__(self, key):
  193. try:
  194. return getattr(self.obj, key)
  195. except AttributeError:
  196. raise KeyError(key)
  197. def __setitem__(self, key, value):
  198. setattr(self.obj, key, value)
  199. def __contains__(self, key):
  200. return hasattr(self.obj, key)
  201. def _iterate_keys(self):
  202. return iter(dir(self.obj))
  203. iterkeys = _iterate_keys
  204. def __iter__(self):
  205. return self._iterate_keys()
  206. def _iterate_items(self):
  207. for key in self._iterate_keys():
  208. yield getattr(self.obj, key)
  209. iteritems = _iterate_items
  210. if sys.version_info[0] == 3: # pragma: no cover
  211. items = _iterate_items
  212. keys = _iterate_keys
  213. else:
  214. def keys(self):
  215. return list(self)
  216. def items(self):
  217. return list(self._iterate_items())
  218. class ConfigurationView(AttributeDictMixin):
  219. """A view over an applications configuration dicts.
  220. If the key does not exist in ``changes``, the ``defaults`` dict
  221. is consulted.
  222. :param changes: Dict containing changes to the configuration.
  223. :param defaults: Dict containing the default configuration.
  224. """
  225. changes = None
  226. defaults = None
  227. _order = None
  228. def __init__(self, changes, defaults):
  229. self.__dict__.update(changes=changes, defaults=defaults,
  230. _order=[changes] + defaults)
  231. def add_defaults(self, d):
  232. self.defaults.insert(0, d)
  233. self._order.insert(1, d)
  234. def __getitem__(self, key):
  235. for d in self._order:
  236. try:
  237. return d[key]
  238. except KeyError:
  239. pass
  240. raise KeyError(key)
  241. def __setitem__(self, key, value):
  242. self.changes[key] = value
  243. def first(self, *keys):
  244. return first(None, (self.get(key) for key in keys))
  245. def get(self, key, default=None):
  246. try:
  247. return self[key]
  248. except KeyError:
  249. return default
  250. def setdefault(self, key, default):
  251. try:
  252. return self[key]
  253. except KeyError:
  254. self[key] = default
  255. return default
  256. def update(self, *args, **kwargs):
  257. return self.changes.update(*args, **kwargs)
  258. def __contains__(self, key):
  259. for d in self._order:
  260. if key in d:
  261. return True
  262. return False
  263. def __repr__(self):
  264. return repr(dict(self.iteritems()))
  265. def __iter__(self):
  266. return self._iterate_keys()
  267. def _iter(self, op):
  268. # defaults must be first in the stream, so values in
  269. # changes takes precedence.
  270. return chain(*[op(d) for d in reversed(self._order)])
  271. def _iterate_keys(self):
  272. return uniq(self._iter(lambda d: d))
  273. iterkeys = _iterate_keys
  274. def _iterate_items(self):
  275. return ((key, self[key]) for key in self)
  276. iteritems = _iterate_items
  277. def _iterate_values(self):
  278. return (self[key] for key in self)
  279. itervalues = _iterate_values
  280. def keys(self):
  281. return list(self._iterate_keys())
  282. def items(self):
  283. return list(self._iterate_items())
  284. def values(self):
  285. return list(self._iterate_values())
  286. class LimitedSet(object):
  287. """Kind-of Set with limitations.
  288. Good for when you need to test for membership (`a in set`),
  289. but the list might become to big, so you want to limit it so it doesn't
  290. consume too much resources.
  291. :keyword maxlen: Maximum number of members before we start
  292. evicting expired members.
  293. :keyword expires: Time in seconds, before a membership expires.
  294. """
  295. __slots__ = ('maxlen', 'expires', '_data', '__len__')
  296. def __init__(self, maxlen=None, expires=None):
  297. self.maxlen = maxlen
  298. self.expires = expires
  299. self._data = {}
  300. self.__len__ = self._data.__len__
  301. def add(self, value):
  302. """Add a new member."""
  303. self._expire_item()
  304. self._data[value] = time.time()
  305. def clear(self):
  306. """Remove all members"""
  307. self._data.clear()
  308. def pop_value(self, value):
  309. """Remove membership by finding value."""
  310. self._data.pop(value, None)
  311. def _expire_item(self):
  312. """Hunt down and remove an expired item."""
  313. while 1:
  314. if self.maxlen and len(self) >= self.maxlen:
  315. value, when = self.first
  316. if not self.expires or time.time() > when + self.expires:
  317. try:
  318. self.pop_value(value)
  319. except TypeError: # pragma: no cover
  320. continue
  321. break
  322. def __contains__(self, value):
  323. return value in self._data
  324. def update(self, other):
  325. if isinstance(other, self.__class__):
  326. self._data.update(other._data)
  327. else:
  328. for obj in other:
  329. self.add(obj)
  330. def as_dict(self):
  331. return self._data
  332. def __iter__(self):
  333. return iter(self._data)
  334. def __repr__(self):
  335. return 'LimitedSet(%r)' % (list(self._data), )
  336. @property
  337. def chronologically(self):
  338. return sorted(self._data.items(), key=lambda (value, when): when)
  339. @property
  340. def first(self):
  341. """Get the oldest member."""
  342. return self.chronologically[0]