123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
- from __future__ import absolute_import, unicode_literals
- import pickle
- from collections import Mapping
- from itertools import count
- from time import time
- import pytest
- from billiard.einfo import ExceptionInfo
- from case import skip
- from celery.five import items
- from celery.utils.collections import (AttributeDict, BufferMap,
- ConfigurationView, DictAttribute,
- LimitedSet, Messagebuffer)
- from celery.utils.objects import Bunch
- class test_DictAttribute:
- def test_get_set_keys_values_items(self):
- x = DictAttribute(Bunch())
- x['foo'] = 'The quick brown fox'
- assert x['foo'] == 'The quick brown fox'
- assert x['foo'] == x.obj.foo
- assert x.get('foo') == 'The quick brown fox'
- assert x.get('bar') is None
- with pytest.raises(KeyError):
- x['bar']
- x.foo = 'The quick yellow fox'
- assert x['foo'] == 'The quick yellow fox'
- assert ('foo', 'The quick yellow fox') in list(x.items())
- assert 'foo' in list(x.keys())
- assert 'The quick yellow fox' in list(x.values())
- def test_setdefault(self):
- x = DictAttribute(Bunch())
- x.setdefault('foo', 'NEW')
- assert x['foo'] == 'NEW'
- x.setdefault('foo', 'XYZ')
- assert x['foo'] == 'NEW'
- def test_contains(self):
- x = DictAttribute(Bunch())
- x['foo'] = 1
- assert 'foo' in x
- assert 'bar' not in x
- def test_items(self):
- obj = Bunch(attr1=1)
- x = DictAttribute(obj)
- x['attr2'] = 2
- assert x['attr1'] == 1
- assert x['attr2'] == 2
- class test_ConfigurationView:
- def setup(self):
- self.view = ConfigurationView(
- {'changed_key': 1, 'both': 2},
- [
- {'default_key': 1, 'both': 1},
- ],
- )
- def test_setdefault(self):
- self.view.setdefault('both', 36)
- assert self.view['both'] == 2
- self.view.setdefault('new', 36)
- assert self.view['new'] == 36
- def test_get(self):
- assert self.view.get('both') == 2
- sp = object()
- assert self.view.get('nonexisting', sp) is sp
- def test_update(self):
- changes = dict(self.view.changes)
- self.view.update(a=1, b=2, c=3)
- assert self.view.changes == dict(changes, a=1, b=2, c=3)
- def test_contains(self):
- assert 'changed_key' in self.view
- assert 'default_key' in self.view
- assert 'new' not in self.view
- def test_repr(self):
- assert 'changed_key' in repr(self.view)
- assert 'default_key' in repr(self.view)
- def test_iter(self):
- expected = {
- 'changed_key': 1,
- 'default_key': 1,
- 'both': 2,
- }
- assert dict(items(self.view)) == expected
- assert sorted(list(iter(self.view))) == sorted(list(expected.keys()))
- assert sorted(list(self.view.keys())) == sorted(list(expected.keys()))
- assert (sorted(list(self.view.values())) ==
- sorted(list(expected.values())))
- assert 'changed_key' in list(self.view.keys())
- assert 2 in list(self.view.values())
- assert ('both', 2) in list(self.view.items())
- def test_add_defaults_dict(self):
- defaults = {'foo': 10}
- self.view.add_defaults(defaults)
- assert self.view.foo == 10
- def test_add_defaults_object(self):
- defaults = Bunch(foo=10)
- self.view.add_defaults(defaults)
- assert self.view.foo == 10
- def test_clear(self):
- self.view.clear()
- assert self.view.both == 1
- assert 'changed_key' not in self.view
- def test_bool(self):
- assert bool(self.view)
- self.view.maps[:] = []
- assert not bool(self.view)
- def test_len(self):
- assert len(self.view) == 3
- self.view.KEY = 33
- assert len(self.view) == 4
- self.view.clear()
- assert len(self.view) == 2
- def test_isa_mapping(self):
- from collections import Mapping
- assert issubclass(ConfigurationView, Mapping)
- def test_isa_mutable_mapping(self):
- from collections import MutableMapping
- assert issubclass(ConfigurationView, MutableMapping)
- class test_ExceptionInfo:
- def test_exception_info(self):
- try:
- raise LookupError('The quick brown fox jumps...')
- except Exception:
- einfo = ExceptionInfo()
- assert str(einfo) == einfo.traceback
- assert isinstance(einfo.exception, LookupError)
- assert einfo.exception.args == ('The quick brown fox jumps...',)
- assert einfo.traceback
- assert repr(einfo)
- @skip.if_win32()
- class test_LimitedSet:
- def test_add(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.add('bar')
- for n in 'foo', 'bar':
- assert n in s
- s.add('baz')
- for n in 'bar', 'baz':
- assert n in s
- assert 'foo' not in s
- s = LimitedSet(maxlen=10)
- for i in range(150):
- s.add(i)
- assert len(s) <= 10
- # make sure heap is not leaking:
- assert len(s._heap) < len(s) * (
- 100. + s.max_heap_percent_overload) / 100
- def test_purge(self):
- # purge now enforces rules
- # cant purge(1) now. but .purge(now=...) still works
- s = LimitedSet(maxlen=10)
- [s.add(i) for i in range(10)]
- s.maxlen = 2
- s.purge()
- assert len(s) == 2
- # expired
- s = LimitedSet(maxlen=10, expires=1)
- [s.add(i) for i in range(10)]
- s.maxlen = 2
- s.purge(now=time() + 100)
- assert len(s) == 0
- # not expired
- s = LimitedSet(maxlen=None, expires=1)
- [s.add(i) for i in range(10)]
- s.maxlen = 2
- s.purge(now=lambda: time() - 100)
- assert len(s) == 2
- # expired -> minsize
- s = LimitedSet(maxlen=10, minlen=10, expires=1)
- [s.add(i) for i in range(20)]
- s.minlen = 3
- s.purge(now=time() + 3)
- assert s.minlen == len(s)
- assert len(s._heap) <= s.maxlen * (
- 100. + s.max_heap_percent_overload) / 100
- def test_pickleable(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.add('bar')
- assert pickle.loads(pickle.dumps(s)) == s
- def test_iter(self):
- s = LimitedSet(maxlen=3)
- items = ['foo', 'bar', 'baz', 'xaz']
- for item in items:
- s.add(item)
- l = list(iter(s))
- for item in items[1:]:
- assert item in l
- assert 'foo' not in l
- assert l == items[1:], 'order by insertion time'
- def test_repr(self):
- s = LimitedSet(maxlen=2)
- items = 'foo', 'bar'
- for item in items:
- s.add(item)
- assert 'LimitedSet(' in repr(s)
- def test_discard(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.discard('foo')
- assert 'foo' not in s
- assert len(s._data) == 0
- s.discard('foo')
- def test_clear(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.add('bar')
- assert len(s) == 2
- s.clear()
- assert not s
- def test_update(self):
- s1 = LimitedSet(maxlen=2)
- s1.add('foo')
- s1.add('bar')
- s2 = LimitedSet(maxlen=2)
- s2.update(s1)
- assert sorted(list(s2)) == ['bar', 'foo']
- s2.update(['bla'])
- assert sorted(list(s2)) == ['bar', 'bla']
- s2.update(['do', 're'])
- assert sorted(list(s2)) == ['do', 're']
- s1 = LimitedSet(maxlen=10, expires=None)
- s2 = LimitedSet(maxlen=10, expires=None)
- s3 = LimitedSet(maxlen=10, expires=None)
- s4 = LimitedSet(maxlen=10, expires=None)
- s5 = LimitedSet(maxlen=10, expires=None)
- for i in range(12):
- s1.add(i)
- s2.add(i * i)
- s3.update(s1)
- s3.update(s2)
- s4.update(s1.as_dict())
- s4.update(s2.as_dict())
- s5.update(s1._data) # revoke is using this
- s5.update(s2._data)
- assert s3 == s4
- assert s3 == s5
- s2.update(s4)
- s4.update(s2)
- assert s2 == s4
- def test_iterable_and_ordering(self):
- s = LimitedSet(maxlen=35, expires=None)
- # we use a custom clock here, as time.time() does not have enough
- # precision when called quickly (can return the same value twice).
- clock = count(1)
- for i in reversed(range(15)):
- s.add(i, now=next(clock))
- j = 40
- for i in s:
- assert i < j # each item is smaller and smaller
- j = i
- assert i == 0 # last item is zero
- def test_pop_and_ordering_again(self):
- s = LimitedSet(maxlen=5)
- for i in range(10):
- s.add(i)
- j = -1
- for _ in range(5):
- i = s.pop()
- assert j < i
- i = s.pop()
- assert i is None
- def test_as_dict(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- assert isinstance(s.as_dict(), Mapping)
- def test_add_removes_duplicate_from_small_heap(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.add('foo')
- s.add('foo')
- assert len(s) == 1
- assert len(s._data) == 1
- assert len(s._heap) == 1
- def test_add_removes_duplicate_from_big_heap(self):
- s = LimitedSet(maxlen=1000)
- [s.add(i) for i in range(2000)]
- assert len(s) == 1000
- [s.add('foo') for i in range(1000)]
- # heap is refreshed when 15% larger than _data
- assert len(s._heap) < 1150
- [s.add('foo') for i in range(1000)]
- assert len(s._heap) < 1150
- class test_AttributeDict:
- def test_getattr__setattr(self):
- x = AttributeDict({'foo': 'bar'})
- assert x['foo'] == 'bar'
- with pytest.raises(AttributeError):
- x.bar
- x.bar = 'foo'
- assert x['bar'] == 'foo'
- class test_Messagebuffer:
- def assert_size_and_first(self, buf, size, expected_first_item):
- assert len(buf) == size
- assert buf.take() == expected_first_item
- def test_append_limited(self):
- b = Messagebuffer(10)
- for i in range(20):
- b.put(i)
- self.assert_size_and_first(b, 10, 10)
- def test_append_unlimited(self):
- b = Messagebuffer(None)
- for i in range(20):
- b.put(i)
- self.assert_size_and_first(b, 20, 0)
- def test_extend_limited(self):
- b = Messagebuffer(10)
- b.extend(list(range(20)))
- self.assert_size_and_first(b, 10, 10)
- def test_extend_unlimited(self):
- b = Messagebuffer(None)
- b.extend(list(range(20)))
- self.assert_size_and_first(b, 20, 0)
- def test_extend_eviction_time_limited(self):
- b = Messagebuffer(3000)
- b.extend(range(10000))
- assert len(b) > 3000
- b.evict()
- assert len(b) == 3000
- def test_pop_empty_with_default(self):
- b = Messagebuffer(10)
- sentinel = object()
- assert b.take(sentinel) is sentinel
- def test_pop_empty_no_default(self):
- b = Messagebuffer(10)
- with pytest.raises(b.Empty):
- b.take()
- def test_repr(self):
- assert repr(Messagebuffer(10, [1, 2, 3]))
- def test_iter(self):
- b = Messagebuffer(10, list(range(10)))
- assert len(b) == 10
- for i, item in enumerate(b):
- assert item == i
- assert len(b) == 0
- def test_contains(self):
- b = Messagebuffer(10, list(range(10)))
- assert 5 in b
- def test_reversed(self):
- assert (list(reversed(Messagebuffer(10, list(range(10))))) ==
- list(reversed(range(10))))
- def test_getitem(self):
- b = Messagebuffer(10, list(range(10)))
- for i in range(10):
- assert b[i] == i
- class test_BufferMap:
- def test_append_limited(self):
- b = BufferMap(10)
- for i in range(20):
- b.put(i, i)
- self.assert_size_and_first(b, 10, 10)
- def assert_size_and_first(self, buf, size, expected_first_item):
- assert buf.total == size
- assert buf._LRUpop() == expected_first_item
- def test_append_unlimited(self):
- b = BufferMap(None)
- for i in range(20):
- b.put(i, i)
- self.assert_size_and_first(b, 20, 0)
- def test_extend_limited(self):
- b = BufferMap(10)
- b.extend(1, list(range(20)))
- self.assert_size_and_first(b, 10, 10)
- def test_extend_unlimited(self):
- b = BufferMap(None)
- b.extend(1, list(range(20)))
- self.assert_size_and_first(b, 20, 0)
- def test_pop_empty_with_default(self):
- b = BufferMap(10)
- sentinel = object()
- assert b.take(1, sentinel) is sentinel
- def test_pop_empty_no_default(self):
- b = BufferMap(10)
- with pytest.raises(b.Empty):
- b.take(1)
- def test_repr(self):
- assert repr(Messagebuffer(10, [1, 2, 3]))
|