collections.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916
  1. # -*- coding: utf-8 -*-
  2. """Custom maps, sets, sequences, and other data structures."""
  3. from __future__ import absolute_import, unicode_literals
  4. import sys
  5. import time
  6. from collections import (
  7. Callable, Mapping, MutableMapping, MutableSet, Sequence,
  8. OrderedDict as _OrderedDict, deque,
  9. )
  10. from heapq import heapify, heappush, heappop
  11. from itertools import chain, count
  12. from celery.five import Empty, items, keys, python_2_unicode_compatible, values
  13. from .functional import first, uniq
  14. from .text import match_case
  15. try:
  16. # pypy: dicts are ordered in recent versions
  17. from __pypy__ import reversed_dict as _dict_is_ordered
  18. except ImportError:
  19. _dict_is_ordered = None
  20. try:
  21. from django.utils.functional import LazyObject, LazySettings
  22. except ImportError:
  23. class LazyObject(object): # noqa
  24. pass
  25. LazySettings = LazyObject # noqa
  26. __all__ = [
  27. 'AttributeDictMixin', 'AttributeDict', 'BufferMap', 'ChainMap',
  28. 'ConfigurationView', 'DictAttribute', 'Evictable',
  29. 'LimitedSet', 'Messagebuffer', 'OrderedDict',
  30. 'force_mapping', 'lpmerge',
  31. ]
  32. PY3 = sys.version_info[0] >= 3
  33. REPR_LIMITED_SET = """\
  34. <{name}({size}): maxlen={0.maxlen}, expires={0.expires}, minlen={0.minlen}>\
  35. """
  36. def force_mapping(m):
  37. # type: (Any) -> Mapping
  38. """Wrap object into supporting the mapping interface if necessary."""
  39. if isinstance(m, (LazyObject, LazySettings)):
  40. m = m._wrapped
  41. return DictAttribute(m) if not isinstance(m, Mapping) else m
  42. def lpmerge(L, R):
  43. # type: (Mapping, Mapping) -> Mapping
  44. """In place left precedent dictionary merge.
  45. Keeps values from `L`, if the value in `R` is :const:`None`.
  46. """
  47. setitem = L.__setitem__
  48. [setitem(k, v) for k, v in items(R) if v is not None]
  49. return L
  50. class OrderedDict(_OrderedDict):
  51. """Dict where insertion order matters."""
  52. if PY3: # pragma: no cover
  53. def _LRUkey(self):
  54. # type: () -> Any
  55. # return value of od.keys does not support __next__,
  56. # but this version will also not create a copy of the list.
  57. return next(iter(keys(self)))
  58. else:
  59. if _dict_is_ordered: # pragma: no cover
  60. def _LRUkey(self):
  61. # type: () -> Any
  62. # iterkeys is iterable.
  63. return next(self.iterkeys())
  64. else:
  65. def _LRUkey(self):
  66. # type: () -> Any
  67. return self._OrderedDict__root[1][2]
  68. if not hasattr(_OrderedDict, 'move_to_end'):
  69. if _dict_is_ordered: # pragma: no cover
  70. def move_to_end(self, key, last=True):
  71. # type: (Any, bool) -> None
  72. if not last:
  73. # we don't use this argument, and the only way to
  74. # implement this on PyPy seems to be O(n): creating a
  75. # copy with the order changed, so we just raise.
  76. raise NotImplementedError('no last=True on PyPy')
  77. self[key] = self.pop(key)
  78. else:
  79. def move_to_end(self, key, last=True):
  80. # type: (Any, bool) -> None
  81. link = self._OrderedDict__map[key]
  82. link_prev = link[0]
  83. link_next = link[1]
  84. link_prev[1] = link_next
  85. link_next[0] = link_prev
  86. root = self._OrderedDict__root
  87. if last:
  88. last = root[0]
  89. link[0] = last
  90. link[1] = root
  91. last[1] = root[0] = link
  92. else:
  93. first_node = root[1]
  94. link[0] = root
  95. link[1] = first_node
  96. root[1] = first_node[0] = link
  97. class AttributeDictMixin(object):
  98. """Mixin for Mapping interface that adds attribute access.
  99. I.e., `d.key -> d[key]`).
  100. """
  101. def __getattr__(self, k):
  102. # type: (str) -> Any
  103. """`d.key -> d[key]`."""
  104. try:
  105. return self[k]
  106. except KeyError:
  107. raise AttributeError(
  108. '{0!r} object has no attribute {1!r}'.format(
  109. type(self).__name__, k))
  110. def __setattr__(self, key, value):
  111. # type: (str, Any) -> None
  112. """`d[key] = value -> d.key = value`."""
  113. self[key] = value
  114. class AttributeDict(dict, AttributeDictMixin):
  115. """Dict subclass with attribute access."""
  116. class DictAttribute(object):
  117. """Dict interface to attributes.
  118. `obj[k] -> obj.k`
  119. `obj[k] = val -> obj.k = val`
  120. """
  121. obj = None
  122. def __init__(self, obj):
  123. # type: (Any) -> None
  124. object.__setattr__(self, 'obj', obj)
  125. def __getattr__(self, key):
  126. # type: (Any) -> Any
  127. return getattr(self.obj, key)
  128. def __setattr__(self, key, value):
  129. # type: (Any, Any) -> None
  130. return setattr(self.obj, key, value)
  131. def get(self, key, default=None):
  132. # type: (Any, Any) -> Any
  133. try:
  134. return self[key]
  135. except KeyError:
  136. return default
  137. def setdefault(self, key, default=None):
  138. # type: (Any, Any) -> None
  139. if key not in self:
  140. self[key] = default
  141. def __getitem__(self, key):
  142. # type: (Any) -> Any
  143. try:
  144. return getattr(self.obj, key)
  145. except AttributeError:
  146. raise KeyError(key)
  147. def __setitem__(self, key, value):
  148. # type: (Any, Any) -> Any
  149. setattr(self.obj, key, value)
  150. def __contains__(self, key):
  151. # type: (Any) -> bool
  152. return hasattr(self.obj, key)
  153. def _iterate_keys(self):
  154. # type: () -> Iterable
  155. return iter(dir(self.obj))
  156. iterkeys = _iterate_keys
  157. def __iter__(self):
  158. # type: () -> Iterable
  159. return self._iterate_keys()
  160. def _iterate_items(self):
  161. # type: () -> Iterable
  162. for key in self._iterate_keys():
  163. yield key, getattr(self.obj, key)
  164. iteritems = _iterate_items
  165. def _iterate_values(self):
  166. # type: () -> Iterable
  167. for key in self._iterate_keys():
  168. yield getattr(self.obj, key)
  169. itervalues = _iterate_values
  170. if sys.version_info[0] == 3: # pragma: no cover
  171. items = _iterate_items
  172. keys = _iterate_keys
  173. values = _iterate_values
  174. else:
  175. def keys(self):
  176. # type: () -> List[Any]
  177. return list(self)
  178. def items(self):
  179. # type: () -> List[Tuple[Any, Any]]
  180. return list(self._iterate_items())
  181. def values(self):
  182. # type: () -> List[Any]
  183. return list(self._iterate_values())
  184. MutableMapping.register(DictAttribute) # noqa: E305
  185. class ChainMap(MutableMapping):
  186. """Key lookup on a sequence of maps."""
  187. key_t = None
  188. changes = None
  189. defaults = None
  190. maps = None
  191. def __init__(self, *maps, **kwargs):
  192. # type: (*Mapping, **Any) -> None
  193. maps = list(maps or [{}])
  194. self.__dict__.update(
  195. key_t=kwargs.get('key_t'),
  196. maps=maps,
  197. changes=maps[0],
  198. defaults=maps[1:],
  199. )
  200. def add_defaults(self, d):
  201. # type: (Mapping) -> None
  202. d = force_mapping(d)
  203. self.defaults.insert(0, d)
  204. self.maps.insert(1, d)
  205. def pop(self, key, *default):
  206. # type: (Any, *Any) -> Any
  207. try:
  208. return self.maps[0].pop(key, *default)
  209. except KeyError:
  210. raise KeyError(
  211. 'Key not found in the first mapping: {!r}'.format(key))
  212. def __missing__(self, key):
  213. # type: (Any) -> Any
  214. raise KeyError(key)
  215. def _key(self, key):
  216. # type: (Any) -> Any
  217. return self.key_t(key) if self.key_t is not None else key
  218. def __getitem__(self, key):
  219. # type: (Any) -> Any
  220. _key = self._key(key)
  221. for mapping in self.maps:
  222. try:
  223. return mapping[_key]
  224. except KeyError:
  225. pass
  226. return self.__missing__(key)
  227. def __setitem__(self, key, value):
  228. # type: (Any, Any) -> None
  229. self.changes[self._key(key)] = value
  230. def __delitem__(self, key):
  231. # type: (Any) -> None
  232. try:
  233. del self.changes[self._key(key)]
  234. except KeyError:
  235. raise KeyError('Key not found in first mapping: {0!r}'.format(key))
  236. def clear(self):
  237. # type: () -> None
  238. self.changes.clear()
  239. def get(self, key, default=None):
  240. # type: (Any, Any) -> Any
  241. try:
  242. return self[self._key(key)]
  243. except KeyError:
  244. return default
  245. def __len__(self):
  246. # type: () -> int
  247. return len(set().union(*self.maps))
  248. def __iter__(self):
  249. return self._iterate_keys()
  250. def __contains__(self, key):
  251. # type: (Any) -> bool
  252. key = self._key(key)
  253. return any(key in m for m in self.maps)
  254. def __bool__(self):
  255. # type: () -> bool
  256. return any(self.maps)
  257. __nonzero__ = __bool__ # Py2
  258. def setdefault(self, key, default=None):
  259. # type: (Any, Any) -> None
  260. key = self._key(key)
  261. if key not in self:
  262. self[key] = default
  263. def update(self, *args, **kwargs):
  264. # type: (*Any, **Any) -> Any
  265. return self.changes.update(*args, **kwargs)
  266. def __repr__(self):
  267. # type: () -> str
  268. return '{0.__class__.__name__}({1})'.format(
  269. self, ', '.join(map(repr, self.maps)))
  270. @classmethod
  271. def fromkeys(cls, iterable, *args):
  272. # type: (type, Iterable, *Any) -> 'ChainMap'
  273. """Create a ChainMap with a single dict created from the iterable."""
  274. return cls(dict.fromkeys(iterable, *args))
  275. def copy(self):
  276. # type: () -> 'ChainMap'
  277. return self.__class__(self.maps[0].copy(), *self.maps[1:])
  278. __copy__ = copy # Py2
  279. def _iter(self, op):
  280. # type: (Callable) -> Iterable
  281. # defaults must be first in the stream, so values in
  282. # changes take precedence.
  283. # pylint: disable=bad-reversed-sequence
  284. # Someone should teach pylint about properties.
  285. return chain(*[op(d) for d in reversed(self.maps)])
  286. def _iterate_keys(self):
  287. # type: () -> Iterable
  288. return uniq(self._iter(lambda d: d.keys()))
  289. iterkeys = _iterate_keys
  290. def _iterate_items(self):
  291. # type: () -> Iterable
  292. return ((key, self[key]) for key in self)
  293. iteritems = _iterate_items
  294. def _iterate_values(self):
  295. # type: () -> Iterable
  296. return (self[key] for key in self)
  297. itervalues = _iterate_values
  298. if sys.version_info[0] == 3: # pragma: no cover
  299. keys = _iterate_keys
  300. items = _iterate_items
  301. values = _iterate_values
  302. else: # noqa
  303. def keys(self):
  304. # type: () -> List[Any]
  305. return list(self._iterate_keys())
  306. def items(self):
  307. # type: () -> List[Tuple[Any, Any]]
  308. return list(self._iterate_items())
  309. def values(self):
  310. # type: () -> List[Any]
  311. return list(self._iterate_values())
  312. @python_2_unicode_compatible
  313. class ConfigurationView(ChainMap, AttributeDictMixin):
  314. """A view over an applications configuration dictionaries.
  315. Custom (but older) version of :class:`collections.ChainMap`.
  316. If the key does not exist in ``changes``, the ``defaults``
  317. dictionaries are consulted.
  318. Arguments:
  319. changes (Mapping): Map of configuration changes.
  320. defaults (List[Mapping]): List of dictionaries containing
  321. the default configuration.
  322. """
  323. def __init__(self, changes, defaults=None, keys=None, prefix=None):
  324. # type: (Mapping, Mapping, List[str], str) -> None
  325. defaults = [] if defaults is None else defaults
  326. super(ConfigurationView, self).__init__(changes, *defaults)
  327. self.__dict__.update(
  328. prefix=prefix.rstrip('_') + '_' if prefix else prefix,
  329. _keys=keys,
  330. )
  331. def _to_keys(self, key):
  332. # type: (str) -> Sequence[str]
  333. prefix = self.prefix
  334. if prefix:
  335. pkey = prefix + key if not key.startswith(prefix) else key
  336. return match_case(pkey, prefix), key
  337. return key,
  338. def __getitem__(self, key):
  339. # type: (str) -> Any
  340. keys = self._to_keys(key)
  341. getitem = super(ConfigurationView, self).__getitem__
  342. for k in keys + (
  343. tuple(f(key) for f in self._keys) if self._keys else ()):
  344. try:
  345. return getitem(k)
  346. except KeyError:
  347. pass
  348. try:
  349. # support subclasses implementing __missing__
  350. return self.__missing__(key)
  351. except KeyError:
  352. if len(keys) > 1:
  353. raise KeyError(
  354. 'Key not found: {0!r} (with prefix: {0!r})'.format(*keys))
  355. raise
  356. def __setitem__(self, key, value):
  357. # type: (str, Any) -> Any
  358. self.changes[self._key(key)] = value
  359. def first(self, *keys):
  360. # type: (*str) -> Any
  361. return first(None, (self.get(key) for key in keys))
  362. def get(self, key, default=None):
  363. # type: (str, Any) -> Any
  364. try:
  365. return self[key]
  366. except KeyError:
  367. return default
  368. def clear(self):
  369. # type: () -> None
  370. """Remove all changes, but keep defaults."""
  371. self.changes.clear()
  372. def __contains__(self, key):
  373. # type: (str) -> bool
  374. keys = self._to_keys(key)
  375. return any(any(k in m for k in keys) for m in self.maps)
  376. def swap_with(self, other):
  377. # type: (ConfigurationView) -> None
  378. changes = other.__dict__['changes']
  379. defaults = other.__dict__['defaults']
  380. self.__dict__.update(
  381. changes=changes,
  382. defaults=defaults,
  383. key_t=other.__dict__['key_t'],
  384. prefix=other.__dict__['prefix'],
  385. maps=[changes] + defaults
  386. )
  387. @python_2_unicode_compatible
  388. class LimitedSet(object):
  389. """Kind-of Set (or priority queue) with limitations.
  390. Good for when you need to test for membership (`a in set`),
  391. but the set should not grow unbounded.
  392. ``maxlen`` is enforced at all times, so if the limit is reached
  393. we'll also remove non-expired items.
  394. You can also configure ``minlen``: this is the minimal residual size
  395. of the set.
  396. All arguments are optional, and no limits are enabled by default.
  397. Arguments:
  398. maxlen (int): Optional max number of items.
  399. Adding more items than ``maxlen`` will result in immediate
  400. removal of items sorted by oldest insertion time.
  401. expires (float): TTL for all items.
  402. Expired items are purged as keys are inserted.
  403. minlen (int): Minimal residual size of this set.
  404. .. versionadded:: 4.0
  405. Value must be less than ``maxlen`` if both are configured.
  406. Older expired items will be deleted, only after the set
  407. exceeds ``minlen`` number of items.
  408. data (Sequence): Initial data to initialize set with.
  409. Can be an iterable of ``(key, value)`` pairs,
  410. a dict (``{key: insertion_time}``), or another instance
  411. of :class:`LimitedSet`.
  412. Example:
  413. >>> s = LimitedSet(maxlen=50000, expires=3600, minlen=4000)
  414. >>> for i in range(60000):
  415. ... s.add(i)
  416. ... s.add(str(i))
  417. ...
  418. >>> 57000 in s # last 50k inserted values are kept
  419. True
  420. >>> '10' in s # '10' did expire and was purged from set.
  421. False
  422. >>> len(s) # maxlen is reached
  423. 50000
  424. >>> s.purge(now=time.time() + 7200) # clock + 2 hours
  425. >>> len(s) # now only minlen items are cached
  426. 4000
  427. >>>> 57000 in s # even this item is gone now
  428. False
  429. """
  430. max_heap_percent_overload = 15
  431. def __init__(self, maxlen=0, expires=0, data=None, minlen=0):
  432. # type: (int, float, Mapping, int) -> None
  433. self.maxlen = 0 if maxlen is None else maxlen
  434. self.minlen = 0 if minlen is None else minlen
  435. self.expires = 0 if expires is None else expires
  436. self._data = {}
  437. self._heap = []
  438. if data:
  439. # import items from data
  440. self.update(data)
  441. if not self.maxlen >= self.minlen >= 0:
  442. raise ValueError(
  443. 'minlen must be a positive number, less or equal to maxlen.')
  444. if self.expires < 0:
  445. raise ValueError('expires cannot be negative!')
  446. def _refresh_heap(self):
  447. # type: () -> None
  448. """Time consuming recreating of heap. Don't run this too often."""
  449. self._heap[:] = [entry for entry in values(self._data)]
  450. heapify(self._heap)
  451. def _maybe_refresh_heap(self):
  452. # type: () -> None
  453. if self._heap_overload >= self.max_heap_percent_overload:
  454. self._refresh_heap()
  455. def clear(self):
  456. # type: () -> None
  457. """Clear all data, start from scratch again."""
  458. self._data.clear()
  459. self._heap[:] = []
  460. def add(self, item, now=None):
  461. # type: (Any, float) -> None
  462. """Add a new item, or reset the expiry time of an existing item."""
  463. now = now or time.time()
  464. if item in self._data:
  465. self.discard(item)
  466. entry = (now, item)
  467. self._data[item] = entry
  468. heappush(self._heap, entry)
  469. if self.maxlen and len(self._data) >= self.maxlen:
  470. self.purge()
  471. def update(self, other):
  472. # type: (Iterable) -> None
  473. """Update this set from other LimitedSet, dict or iterable."""
  474. if not other:
  475. return
  476. if isinstance(other, LimitedSet):
  477. self._data.update(other._data)
  478. self._refresh_heap()
  479. self.purge()
  480. elif isinstance(other, dict):
  481. # revokes are sent as a dict
  482. for key, inserted in items(other):
  483. if isinstance(inserted, (tuple, list)):
  484. # in case someone uses ._data directly for sending update
  485. inserted = inserted[0]
  486. if not isinstance(inserted, float):
  487. raise ValueError(
  488. 'Expecting float timestamp, got type '
  489. '{0!r} with value: {1}'.format(
  490. type(inserted), inserted))
  491. self.add(key, inserted)
  492. else:
  493. # XXX AVOID THIS, it could keep old data if more parties
  494. # exchange them all over and over again
  495. for obj in other:
  496. self.add(obj)
  497. def discard(self, item):
  498. # type: (Any) -> None
  499. # mark an existing item as removed. If KeyError is not found, pass.
  500. self._data.pop(item, None)
  501. self._maybe_refresh_heap()
  502. pop_value = discard
  503. def purge(self, now=None):
  504. # type: (float) -> None
  505. """Check oldest items and remove them if needed.
  506. Arguments:
  507. now (float): Time of purging -- by default right now.
  508. This can be useful for unit testing.
  509. """
  510. now = now or time.time()
  511. now = now() if isinstance(now, Callable) else now
  512. if self.maxlen:
  513. while len(self._data) > self.maxlen:
  514. self.pop()
  515. # time based expiring:
  516. if self.expires:
  517. while len(self._data) > self.minlen >= 0:
  518. inserted_time, _ = self._heap[0]
  519. if inserted_time + self.expires > now:
  520. break # oldest item hasn't expired yet
  521. self.pop()
  522. def pop(self, default=None):
  523. # type: (Any) -> Any
  524. """Remove and return the oldest item, or :const:`None` when empty."""
  525. while self._heap:
  526. _, item = heappop(self._heap)
  527. try:
  528. self._data.pop(item)
  529. except KeyError:
  530. pass
  531. else:
  532. return item
  533. return default
  534. def as_dict(self):
  535. # type: () -> Dict
  536. """Whole set as serializable dictionary.
  537. Example:
  538. >>> s = LimitedSet(maxlen=200)
  539. >>> r = LimitedSet(maxlen=200)
  540. >>> for i in range(500):
  541. ... s.add(i)
  542. ...
  543. >>> r.update(s.as_dict())
  544. >>> r == s
  545. True
  546. """
  547. return {key: inserted for inserted, key in values(self._data)}
  548. def __eq__(self, other):
  549. # type: (Any) -> bool
  550. return self._data == other._data
  551. def __ne__(self, other):
  552. # type: (Any) -> bool
  553. return not self.__eq__(other)
  554. def __repr__(self):
  555. # type: () -> str
  556. return REPR_LIMITED_SET.format(
  557. self, name=type(self).__name__, size=len(self),
  558. )
  559. def __iter__(self):
  560. # type: () -> Iterable
  561. return (i for _, i in sorted(values(self._data)))
  562. def __len__(self):
  563. # type: () -> int
  564. return len(self._data)
  565. def __contains__(self, key):
  566. # type: (Any) -> bool
  567. return key in self._data
  568. def __reduce__(self):
  569. # type: () -> Any
  570. return self.__class__, (
  571. self.maxlen, self.expires, self.as_dict(), self.minlen)
  572. def __bool__(self):
  573. # type: () -> bool
  574. return bool(self._data)
  575. __nonzero__ = __bool__ # Py2
  576. @property
  577. def _heap_overload(self):
  578. # type: () -> float
  579. """Compute how much is heap bigger than data [percents]."""
  580. return len(self._heap) * 100 / max(len(self._data), 1) - 100
  581. MutableSet.register(LimitedSet) # noqa: E305
  582. class Evictable(object):
  583. """Mixin for classes supporting the ``evict`` method."""
  584. Empty = Empty
  585. def evict(self):
  586. # type: () -> None
  587. """Force evict until maxsize is enforced."""
  588. self._evict(range=count)
  589. def _evict(self, limit=100, range=range):
  590. # type: (int) -> None
  591. try:
  592. [self._evict1() for _ in range(limit)]
  593. except IndexError:
  594. pass
  595. def _evict1(self):
  596. # type: () -> None
  597. if self._evictcount <= self.maxsize:
  598. raise IndexError()
  599. try:
  600. self._pop_to_evict()
  601. except self.Empty:
  602. raise IndexError()
  603. @python_2_unicode_compatible
  604. class Messagebuffer(Evictable):
  605. """A buffer of pending messages."""
  606. Empty = Empty
  607. def __init__(self, maxsize, iterable=None, deque=deque):
  608. # type: (int, Iterable, Any) -> None
  609. self.maxsize = maxsize
  610. self.data = deque(iterable or [])
  611. self._append = self.data.append
  612. self._pop = self.data.popleft
  613. self._len = self.data.__len__
  614. self._extend = self.data.extend
  615. def put(self, item):
  616. # type: (Any) -> None
  617. self._append(item)
  618. self.maxsize and self._evict()
  619. def extend(self, it):
  620. # type: (Iterable) -> None
  621. self._extend(it)
  622. self.maxsize and self._evict()
  623. def take(self, *default):
  624. # type: (*Any) -> Any
  625. try:
  626. return self._pop()
  627. except IndexError:
  628. if default:
  629. return default[0]
  630. raise self.Empty()
  631. def _pop_to_evict(self):
  632. # type: () -> None
  633. return self.take()
  634. def __repr__(self):
  635. # type: () -> str
  636. return '<{0}: {1}/{2}>'.format(
  637. type(self).__name__, len(self), self.maxsize,
  638. )
  639. def __iter__(self):
  640. # type: () -> Iterable
  641. while 1:
  642. try:
  643. yield self._pop()
  644. except IndexError:
  645. break
  646. def __len__(self):
  647. # type: () -> int
  648. return self._len()
  649. def __contains__(self, item):
  650. # type: () -> bool
  651. return item in self.data
  652. def __reversed__(self):
  653. # type: () -> Iterable
  654. return reversed(self.data)
  655. def __getitem__(self, index):
  656. # type: (Any) -> Any
  657. return self.data[index]
  658. @property
  659. def _evictcount(self):
  660. # type: () -> int
  661. return len(self)
  662. Sequence.register(Messagebuffer) # noqa: E305
  663. @python_2_unicode_compatible
  664. class BufferMap(OrderedDict, Evictable):
  665. """Map of buffers."""
  666. Buffer = Messagebuffer
  667. Empty = Empty
  668. maxsize = None
  669. total = 0
  670. bufmaxsize = None
  671. def __init__(self, maxsize, iterable=None, bufmaxsize=1000):
  672. # type: (int, Iterable, int) -> None
  673. super(BufferMap, self).__init__()
  674. self.maxsize = maxsize
  675. self.bufmaxsize = 1000
  676. if iterable:
  677. self.update(iterable)
  678. self.total = sum(len(buf) for buf in items(self))
  679. def put(self, key, item):
  680. # type: (Any, Any) -> None
  681. self._get_or_create_buffer(key).put(item)
  682. self.total += 1
  683. self.move_to_end(key) # least recently used.
  684. self.maxsize and self._evict()
  685. def extend(self, key, it):
  686. # type: (Any, Iterable) -> None
  687. self._get_or_create_buffer(key).extend(it)
  688. self.total += len(it)
  689. self.maxsize and self._evict()
  690. def take(self, key, *default):
  691. # type: (Any, *Any) -> Any
  692. item, throw = None, False
  693. try:
  694. buf = self[key]
  695. except KeyError:
  696. throw = True
  697. else:
  698. try:
  699. item = buf.take()
  700. self.total -= 1
  701. except self.Empty:
  702. throw = True
  703. else:
  704. self.move_to_end(key) # mark as LRU
  705. if throw:
  706. if default:
  707. return default[0]
  708. raise self.Empty()
  709. return item
  710. def _get_or_create_buffer(self, key):
  711. # type: (Any) -> Messagebuffer
  712. try:
  713. return self[key]
  714. except KeyError:
  715. buf = self[key] = self._new_buffer()
  716. return buf
  717. def _new_buffer(self):
  718. # type: () -> Messagebuffer
  719. return self.Buffer(maxsize=self.bufmaxsize)
  720. def _LRUpop(self, *default):
  721. # type: (*Any) -> Any
  722. return self[self._LRUkey()].take(*default)
  723. def _pop_to_evict(self):
  724. # type: () -> None
  725. for _ in range(100):
  726. key = self._LRUkey()
  727. buf = self[key]
  728. try:
  729. buf.take()
  730. except (IndexError, self.Empty):
  731. # buffer empty, remove it from mapping.
  732. self.pop(key)
  733. else:
  734. # we removed one item
  735. self.total -= 1
  736. # if buffer is empty now, remove it from mapping.
  737. if not len(buf):
  738. self.pop(key)
  739. else:
  740. # move to least recently used.
  741. self.move_to_end(key)
  742. break
  743. def __repr__(self):
  744. # type: () -> str
  745. return '<{0}: {1}/{2}>'.format(
  746. type(self).__name__, self.total, self.maxsize,
  747. )
  748. @property
  749. def _evictcount(self):
  750. # type: () -> int
  751. return self.total