collections.py 22 KB


  1. # -*- coding: utf-8 -*-
  2. """Custom maps, sets, sequences and other data structures."""
  3. import time
  4. from collections import (
  5. Mapping, MutableMapping, MutableSet, Sequence,
  6. OrderedDict as _OrderedDict, deque,
  7. )
  8. from heapq import heapify, heappush, heappop
  9. from itertools import chain, count
  10. from queue import Empty
  11. from typing import (
  12. Any, Callable, Dict, Iterable, Iterator, Tuple, Optional, Union,
  13. )
  14. from .functional import first, uniq
  15. from .text import match_case
  16. from .typing import DictArgument
  17. try:
  18. # pypy: dicts are ordered in recent versions
  19. from __pypy__ import reversed_dict as _dict_is_ordered
  20. except ImportError:
  21. _dict_is_ordered = None
  22. try:
  23. from django.utils.functional import LazyObject, LazySettings
  24. except ImportError:
  25. class LazyObject: # noqa
  26. pass
  27. LazySettings = LazyObject # noqa
  28. __all__ = [
  29. 'AttributeDictMixin', 'AttributeDict', 'BufferMap', 'ChainMap',
  30. 'ConfigurationView', 'DictAttribute', 'Evictable',
  31. 'LimitedSet', 'Messagebuffer', 'OrderedDict',
  32. 'force_mapping', 'lpmerge',
  33. ]
  34. REPR_LIMITED_SET = """\
  35. <{name}({size}): maxlen={0.maxlen}, expires={0.expires}, minlen={0.minlen}>\
  36. """
  37. #: Dictionary key type (key: str) -> str
  38. KeyCallback = Callable[[str], str]
  39. def force_mapping(m: Any) -> Mapping:
  40. if isinstance(m, (LazyObject, LazySettings)):
  41. m = m._wrapped
  42. return DictAttribute(m) if not isinstance(m, Mapping) else m
  43. def lpmerge(L: Mapping, R: Mapping) -> Mapping:
  44. """In place left precedent dictionary merge.
  45. Keeps values from `L`, if the value in `R` is :const:`None`.
  46. """
  47. setitem = L.__setitem__
  48. [setitem(k, v) for k, v in R.items() if v is not None]
  49. return L
  50. class OrderedDict(_OrderedDict):
  51. def _LRUkey(self) -> Any:
  52. # return value of od.keys does not support __next__,
  53. # but this version will also not create a copy of the list.
  54. return next(iter(self.keys()))
  55. class AttributeDictMixin:
  56. """Augment classes with a Mapping interface by adding attribute access.
  57. I.e. `d.key -> d[key]`.
  58. """
  59. def __getattr__(self, k: str) -> Any:
  60. """`d.key -> d[key]`"""
  61. try:
  62. return self[k]
  63. except KeyError:
  64. raise AttributeError(
  65. '{0!r} object has no attribute {1!r}'.format(
  66. type(self).__name__, k))
  67. def __setattr__(self, key: str, value: Any) -> None:
  68. """`d[key] = value -> d.key = value`"""
  69. self[key] = value
  70. class AttributeDict(dict, AttributeDictMixin):
  71. """Dict subclass with attribute access."""
  72. pass
  73. class DictAttribute:
  74. """Dict interface to attributes.
  75. `obj[k] -> obj.k`
  76. `obj[k] = val -> obj.k = val`
  77. """
  78. obj = None # type: Mapping[Any, Any]
  79. def __init__(self, obj: Mapping) -> None:
  80. object.__setattr__(self, 'obj', obj)
  81. def __getattr__(self, key: str) -> Any:
  82. return getattr(self.obj, key)
  83. def __setattr__(self, key: str, value: Any) -> None:
  84. setattr(self.obj, key, value)
  85. def get(self, key: str, default: Any=None) -> Any:
  86. try:
  87. return self[key]
  88. except KeyError:
  89. return default
  90. def setdefault(self, key: str, default: Any) -> None:
  91. if key not in self:
  92. self[key] = default
  93. def __getitem__(self, key: str) -> Any:
  94. try:
  95. return getattr(self.obj, key)
  96. except AttributeError:
  97. raise KeyError(key)
  98. def __setitem__(self, key: str, value: Any) -> None:
  99. setattr(self.obj, key, value)
  100. def __contains__(self, key: str) -> bool:
  101. return hasattr(self.obj, key)
  102. def keys(self) -> Iterator[str]:
  103. return iter(dir(self.obj))
  104. def __iter__(self) -> Iterator[str]:
  105. return self.keys()
  106. def items(self) -> Iterator[Tuple[str, Any]]:
  107. for key in self.keys():
  108. yield key, getattr(self.obj, key)
  109. def values(self) -> Iterator[Any]:
  110. for key in self.keys():
  111. yield getattr(self.obj, key)
  112. MutableMapping.register(DictAttribute)
  113. class ChainMap(MutableMapping):
  114. key_t = None # type: Optional[KeyCallback]
  115. changes = None # type: Mapping
  116. defaults = None # type: Sequence[Mapping]
  117. maps = None # type: Sequence[Mapping]
  118. def __init__(self, *maps: Sequence[Mapping],
  119. key_t: Optional[KeyCallback], **kwargs) -> None:
  120. maps = list(maps or [{}])
  121. self.__dict__.update(
  122. key_t=key_t,
  123. maps=maps,
  124. changes=maps[0],
  125. defaults=maps[1:],
  126. )
  127. def add_defaults(self, d: Mapping) -> None:
  128. d = force_mapping(d)
  129. self.defaults.insert(0, d)
  130. self.maps.insert(1, d)
  131. def pop(self, key: str, *default: Tuple[Any]) -> Any:
  132. try:
  133. return self.maps[0].pop(key, *default)
  134. except KeyError:
  135. raise KeyError(
  136. 'Key not found in the first mapping: {!r}'.format(key))
  137. def __missing__(self, key: str) -> Any:
  138. raise KeyError(key)
  139. def _key(self, key: str) -> str:
  140. return self.key_t(key) if self.key_t is not None else key
  141. def __getitem__(self, key: str) -> Any:
  142. _key = self._key(key)
  143. for mapping in self.maps:
  144. try:
  145. return mapping[_key]
  146. except KeyError:
  147. pass
  148. return self.__missing__(key)
  149. def __setitem__(self, key: str, value: Any) -> None:
  150. self.changes[self._key(key)] = value
  151. def __delitem__(self, key: str) -> None:
  152. try:
  153. del self.changes[self._key(key)]
  154. except KeyError:
  155. raise KeyError('Key not found in first mapping: {0!r}'.format(key))
  156. def clear(self) -> None:
  157. self.changes.clear()
  158. def get(self, key: str, default: Optional[Any]=None) -> Any:
  159. try:
  160. return self[self._key(key)]
  161. except KeyError:
  162. return default
  163. def __len__(self) -> int:
  164. return len(set().union(*self.maps))
  165. def __iter__(self) -> Iterator[str]:
  166. return self.keys()
  167. def __contains__(self, key: str) -> bool:
  168. key = self._key(key)
  169. return any(key in m for m in self.maps)
  170. def __bool__(self) -> bool:
  171. return any(self.maps)
  172. def setdefault(self, key: str, default: Any) -> None:
  173. key = self._key(key)
  174. if key not in self:
  175. self[key] = default
  176. def update(self, *args: Tuple[Mapping], **kwargs: Dict[Any, Any]) -> None:
  177. self.changes.update(*args, **kwargs)
  178. def __repr__(self) -> str:
  179. return '{0.__class__.__name__}({1})'.format(
  180. self, ', '.join(map(repr, self.maps)))
  181. @classmethod
  182. def fromkeys(cls, iterable: Union[Sequence[str], Iterator[str]],
  183. *args: Tuple[Any]) -> 'ChainMap':
  184. """Create a ChainMap with a single dict created from the iterable."""
  185. return cls(dict.fromkeys(iterable, *args))
  186. def copy(self) -> 'ChainMap':
  187. """New ChainMap or subclass with a new copy of maps[0] and
  188. refs to maps[1:]."""
  189. return self.__class__(self.maps[0].copy(), *self.maps[1:])
  190. def _iter(self, op: Callable[[Mapping], Iterator]) -> Iterator:
  191. # defaults must be first in the stream, so values in
  192. # changes take precedence.
  193. return chain(*[op(d) for d in reversed(self.maps)])
  194. def keys(self) -> Iterator[str]:
  195. return uniq(self._iter(lambda d: d.keys()))
  196. def items(self) -> Iterator[Tuple[str, Any]]:
  197. return ((key, self[key]) for key in self)
  198. def values(self) -> Iterator[Any]:
  199. return (self[key] for key in self)
  200. class ConfigurationView(ChainMap, AttributeDictMixin):
  201. """A view over an applications configuration dictionaries.
  202. Custom (but older) version of :class:`collections.ChainMap`.
  203. If the key does not exist in ``changes``, the ``defaults``
  204. dictionaries are consulted.
  205. Arguments:
  206. changes (Mapping): Map of configuration changes.
  207. defaults (List[Mapping]): List of dictionaries containing
  208. the default configuration.
  209. """
  210. def __init__(self, changes: Optional[Mapping],
  211. defaults: Sequence[Mapping] = None,
  212. key_t: KeyCallback = None,
  213. prefix: str = None) -> None:
  214. defaults = [] if defaults is None else defaults
  215. super().__init__(changes, *defaults, **{'key_t': key_t})
  216. self.__dict__.update(
  217. prefix=prefix.rstrip('_') + '_' if prefix else prefix,
  218. )
  219. def _to_keys(self, key: str) -> Tuple[str]:
  220. prefix = self.prefix
  221. if prefix:
  222. pkey = prefix + key if not key.startswith(prefix) else key
  223. return match_case(pkey, prefix), key
  224. return key,
  225. def __getitem__(self, key: str) -> Any:
  226. keys = self._to_keys(key)
  227. getitem = super().__getitem__
  228. for k in keys:
  229. try:
  230. return getitem(k)
  231. except KeyError:
  232. pass
  233. try:
  234. # support subclasses implementing __missing__
  235. return self.__missing__(key)
  236. except KeyError:
  237. if len(keys) > 1:
  238. raise KeyError(
  239. 'Key not found: {0!r} (with prefix: {0!r})'.format(*keys))
  240. raise
  241. def __setitem__(self, key: str, value: Any) -> None:
  242. self.changes[self._key(key)] = value
  243. def first(self, *keys: Tuple[str]) -> Any:
  244. return first(None, (self.get(key) for key in keys))
  245. def get(self, key: str, default: Optional[Any]=None) -> Any:
  246. try:
  247. return self[key]
  248. except KeyError:
  249. return default
  250. def clear(self) -> None:
  251. """Remove all changes, but keep defaults."""
  252. self.changes.clear()
  253. def __contains__(self, key: str) -> bool:
  254. keys = self._to_keys(key)
  255. return any(any(k in m for k in keys) for m in self.maps)
  256. def swap_with(self, other: 'ConfigurationView') -> None:
  257. changes = other.__dict__['changes']
  258. defaults = other.__dict__['defaults']
  259. self.__dict__.update(
  260. changes=changes,
  261. defaults=defaults,
  262. key_t=other.__dict__['key_t'],
  263. prefix=other.__dict__['prefix'],
  264. maps=[changes] + defaults
  265. )
  266. class LimitedSet:
  267. """Kind-of Set (or priority queue) with limitations.
  268. Good for when you need to test for membership (`a in set`),
  269. but the set should not grow unbounded.
  270. ``maxlen`` is enforced at all times, so if the limit is reached
  271. we will also remove non-expired items.
  272. You can also configure ``minlen``, which is the minimal residual size
  273. of the set.
  274. All arguments are optional, and no limits are enabled by default.
  275. Arguments:
  276. maxlen (int): Optional max number of items.
  277. Adding more items than ``maxlen`` will result in immediate
  278. removal of items sorted by oldest insertion time.
  279. expires (float): TTL for all items.
  280. Expired items are purged as keys are inserted.
  281. minlen (int): Minimal residual size of this set.
  282. .. versionadded:: 4.0
  283. Value must be less than ``maxlen`` if both are configured.
  284. Older expired items will be deleted, only after the set
  285. exceeds ``minlen`` number of items.
  286. data (Sequence): Initial data to initialize set with.
  287. Can be an iterable of ``(key, value)`` pairs,
  288. a dict (``{key: insertion_time}``), or another instance
  289. of :class:`LimitedSet`.
  290. Example:
  291. >>> s = LimitedSet(maxlen=50000, expires=3600, minlen=4000)
  292. >>> for i in range(60000):
  293. ... s.add(i)
  294. ... s.add(str(i))
  295. ...
  296. >>> 57000 in s # last 50k inserted values are kept
  297. True
  298. >>> '10' in s # '10' did expire and was purged from set.
  299. False
  300. >>> len(s) # maxlen is reached
  301. 50000
  302. >>> s.purge(now=time.time() + 7200) # clock + 2 hours
  303. >>> len(s) # now only minlen items are cached
  304. 4000
  305. >>>> 57000 in s # even this item is gone now
  306. False
  307. """
  308. max_heap_percent_overload = 15
  309. def __init__(self,
  310. maxlen: Optional[int]=0,
  311. minlen: Optional[int]=0,
  312. expires: Optional[int]=0,
  313. data: Optional[DictArgument]=None) -> None:
  314. self.maxlen = 0 if maxlen is None else maxlen
  315. self.minlen = 0 if minlen is None else minlen
  316. self.expires = 0 if expires is None else expires
  317. # type: Mapping[str, Any]
  318. self._data = {}
  319. # type: Sequence[Tuple[float, Any]]
  320. self._heap = []
  321. if data:
  322. # import items from data
  323. self.update(data)
  324. if not self.maxlen >= self.minlen >= 0:
  325. raise ValueError(
  326. 'minlen must be a positive number, less or equal to maxlen.')
  327. if self.expires < 0:
  328. raise ValueError('expires cannot be negative!')
  329. def _refresh_heap(self) -> None:
  330. """Time consuming recreating of heap. Do not run this too often."""
  331. self._heap[:] = [entry for entry in self._data.values()]
  332. heapify(self._heap)
  333. def _maybe_refresh_heap(self) -> None:
  334. if self._heap_overload >= self.max_heap_percent_overload:
  335. self._refresh_heap()
  336. def clear(self) -> None:
  337. """Clear all data, start from scratch again."""
  338. self._data.clear()
  339. self._heap[:] = []
  340. def add(self, item: Any, now: Optional[float]=None) -> None:
  341. """Add a new item, or reset the expiry time of an existing item."""
  342. now = now or time.time()
  343. if item in self._data:
  344. self.discard(item)
  345. entry = (now, item)
  346. self._data[item] = entry
  347. heappush(self._heap, entry)
  348. if self.maxlen and len(self._data) >= self.maxlen:
  349. self.purge()
  350. def update(self, other: Optional[DictArgument]) -> None:
  351. """Update this set from other LimitedSet, dict or iterable."""
  352. if not other:
  353. return
  354. if isinstance(other, LimitedSet):
  355. self._data.update(other._data)
  356. self._refresh_heap()
  357. self.purge()
  358. elif isinstance(other, dict):
  359. # revokes are sent as a dict
  360. for key, inserted in other.items():
  361. if isinstance(inserted, (tuple, list)):
  362. # in case someone uses ._data directly for sending update
  363. inserted = inserted[0]
  364. if not isinstance(inserted, float):
  365. raise ValueError(
  366. 'Expecting float timestamp, got type '
  367. '{0!r} with value: {1}'.format(
  368. type(inserted), inserted))
  369. self.add(key, inserted)
  370. else:
  371. # XXX AVOID THIS, it could keep old data if more parties
  372. # exchange them all over and over again
  373. for obj in other:
  374. self.add(obj)
  375. def discard(self, item: Any) -> None:
  376. # mark an existing item as removed. If KeyError is not found, pass.
  377. self._data.pop(item, None)
  378. self._maybe_refresh_heap()
  379. pop_value = discard
  380. def purge(self, now: Optional[float]=None) -> None:
  381. """Check oldest items and remove them if needed.
  382. Arguments:
  383. now (float): Time of purging -- by default right now.
  384. This can be useful for unit testing.
  385. """
  386. now = now or time.time()
  387. now = now() if isinstance(now, Callable) else now
  388. if self.maxlen:
  389. while len(self._data) > self.maxlen:
  390. self.pop()
  391. # time based expiring:
  392. if self.expires:
  393. while len(self._data) > self.minlen >= 0:
  394. inserted_time, _ = self._heap[0]
  395. if inserted_time + self.expires > now:
  396. break # oldest item has not expired yet
  397. self.pop()
  398. def pop(self, default: Optional[Any]=None) -> None:
  399. """Remove and return the oldest item, or :const:`None` when empty."""
  400. while self._heap:
  401. _, item = heappop(self._heap)
  402. try:
  403. self._data.pop(item)
  404. except KeyError:
  405. pass
  406. else:
  407. return item
  408. return default
  409. def as_dict(self) -> Dict:
  410. """Whole set as serializable dictionary.
  411. Example:
  412. >>> s = LimitedSet(maxlen=200)
  413. >>> r = LimitedSet(maxlen=200)
  414. >>> for i in range(500):
  415. ... s.add(i)
  416. ...
  417. >>> r.update(s.as_dict())
  418. >>> r == s
  419. True
  420. """
  421. return {key: inserted for inserted, key in self._data.values()}
  422. def __eq__(self, other: 'LimitedSet') -> bool:
  423. return self._data == other._data
  424. def __ne__(self, other: 'LimitedSet') -> bool:
  425. return not self.__eq__(other)
  426. def __repr__(self) -> str:
  427. return REPR_LIMITED_SET.format(
  428. self, name=type(self).__name__, size=len(self),
  429. )
  430. def __iter__(self) -> Iterable[Any]:
  431. return (i for _, i in sorted(self._data.values()))
  432. def __len__(self) -> int:
  433. return len(self._data)
  434. def __contains__(self, key: str) -> bool:
  435. return key in self._data
  436. def __reduce__(self) -> Tuple:
  437. return self.__class__, (
  438. self.maxlen, self.expires, self.as_dict(), self.minlen)
  439. def __bool__(self) -> bool:
  440. return bool(self._data)
  441. @property
  442. def _heap_overload(self) -> float:
  443. """Compute how much is heap bigger than data [percents]."""
  444. return len(self._heap) * 100 / max(len(self._data), 1) - 100
  445. MutableSet.register(LimitedSet)
  446. class Evictable:
  447. Empty = Empty # type: Exception
  448. def evict(self) -> None:
  449. """Force evict until maxsize is enforced."""
  450. self._evict(range=count)
  451. def _evict(self, limit: int=100, range: Callable=range) -> None:
  452. try:
  453. [self._evict1() for _ in range(limit)]
  454. except IndexError:
  455. pass
  456. def _evict1(self) -> None:
  457. if self._evictcount <= self.maxsize:
  458. raise IndexError()
  459. try:
  460. self._pop_to_evict()
  461. except self.Empty:
  462. raise IndexError()
  463. class Messagebuffer(Evictable):
  464. def __init__(self,
  465. maxsize: Optional[int],
  466. iterable: Optional[Iterable]=None, deque: Any=deque) -> None:
  467. self.maxsize = maxsize
  468. self.data = deque(iterable or []) # type: deque
  469. self._append = self.data.append # type: Callable[[Any], None]
  470. self._pop = self.data.popleft # type: Callable[[], Any]
  471. self._len = self.data.__len__ # type: Callable[[], int]
  472. self._extend = self.data.extend # type: Callable[[Iterable], None]
  473. def put(self, item: Any) -> None:
  474. self._append(item)
  475. self.maxsize and self._evict()
  476. def extend(self, it: Iterable) -> None:
  477. self._extend(it)
  478. self.maxsize and self._evict()
  479. def take(self, *default: Tuple[Any]) -> Any:
  480. try:
  481. return self._pop()
  482. except IndexError:
  483. if default:
  484. return default[0]
  485. raise self.Empty()
  486. def _pop_to_evict(self) -> Any:
  487. return self.take()
  488. def __repr__(self) -> str:
  489. return '<{0}: {1}/{2}>'.format(
  490. type(self).__name__, len(self), self.maxsize,
  491. )
  492. def __iter__(self) -> Iterator:
  493. while 1:
  494. try:
  495. yield self._pop()
  496. except IndexError:
  497. break
  498. def __len__(self) -> int:
  499. return self._len()
  500. def __contains__(self, item: Any) -> bool:
  501. return item in self.data
  502. def __reversed__(self) -> Iterable:
  503. return reversed(self.data)
  504. def __getitem__(self, index: int) -> Any:
  505. return self.data[index]
  506. @property
  507. def _evictcount(self) -> int:
  508. return len(self)
  509. Sequence.register(Messagebuffer)
  510. class BufferMap(OrderedDict, Evictable):
  511. Buffer = Messagebuffer
  512. maxsize = None
  513. total = 0
  514. bufmaxsize = None
  515. def __init__(self,
  516. maxsize: Optional[int],
  517. iterable: Optional[Iterable]=None,
  518. bufmaxsize: Optional[int]=1000) -> None:
  519. super().__init__()
  520. self.maxsize = maxsize
  521. self.bufmaxsize = bufmaxsize
  522. if iterable:
  523. self.update(iterable)
  524. # type: int
  525. self.total = sum(len(buf) for buf in self.items())
  526. def put(self, key: Any, item: Any) -> None:
  527. self._get_or_create_buffer(key).put(item)
  528. self.total += 1
  529. self.move_to_end(key) # least recently used.
  530. self.maxsize and self._evict()
  531. def extend(self, key: Any, it: Iterable) -> None:
  532. self._get_or_create_buffer(key).extend(it)
  533. self.total += len(it)
  534. self.maxsize and self._evict()
  535. def take(self, key: Any, *default: Tuple[Any]) -> Any:
  536. item, throw = None, False
  537. try:
  538. buf = self[key]
  539. except KeyError:
  540. throw = True
  541. else:
  542. try:
  543. item = buf.take()
  544. self.total -= 1
  545. except self.Empty:
  546. throw = True
  547. else:
  548. self.move_to_end(key) # mark as LRU
  549. if throw:
  550. if default:
  551. return default[0]
  552. raise self.Empty()
  553. return item
  554. def _get_or_create_buffer(self, key: Any) -> Messagebuffer:
  555. try:
  556. return self[key]
  557. except KeyError:
  558. buf = self[key] = self._new_buffer()
  559. return buf
  560. def _new_buffer(self) -> Messagebuffer:
  561. return self.Buffer(maxsize=self.bufmaxsize)
  562. def _LRUpop(self, *default: Tuple[Any]) -> Any:
  563. return self[self._LRUkey()].take(*default)
  564. def _pop_to_evict(self) -> None:
  565. for i in range(100):
  566. key = self._LRUkey()
  567. buf = self[key]
  568. try:
  569. buf.take()
  570. except (IndexError, self.Empty):
  571. # buffer empty, remove it from mapping.
  572. self.pop(key)
  573. else:
  574. # we removed one item
  575. self.total -= 1
  576. # if buffer is empty now, remove it from mapping.
  577. if not len(buf):
  578. self.pop(key)
  579. else:
  580. # move to least recently used.
  581. self.move_to_end(key)
  582. break
  583. def __repr__(self) -> str:
  584. return '<{0}: {1}/{2}>'.format(
  585. type(self).__name__, self.total, self.maxsize,
  586. )
  587. @property
  588. def _evictcount(self) -> int:
  589. return self.total