| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453 | 
							- from __future__ import absolute_import, unicode_literals
 
- import pickle
 
- from collections import Mapping
 
- from itertools import count
 
- import pytest
 
- from billiard.einfo import ExceptionInfo
 
- from case import skip
 
- from celery.five import items, monotonic
 
- from celery.utils.collections import (AttributeDict, BufferMap,
 
-                                       ConfigurationView, DictAttribute,
 
-                                       LimitedSet, Messagebuffer)
 
- from celery.utils.objects import Bunch
 
- class test_DictAttribute:
 
-     def test_get_set_keys_values_items(self):
 
-         x = DictAttribute(Bunch())
 
-         x['foo'] = 'The quick brown fox'
 
-         assert x['foo'] == 'The quick brown fox'
 
-         assert x['foo'] == x.obj.foo
 
-         assert x.get('foo') == 'The quick brown fox'
 
-         assert x.get('bar') is None
 
-         with pytest.raises(KeyError):
 
-             x['bar']
 
-         x.foo = 'The quick yellow fox'
 
-         assert x['foo'] == 'The quick yellow fox'
 
-         assert ('foo', 'The quick yellow fox') in list(x.items())
 
-         assert 'foo' in list(x.keys())
 
-         assert 'The quick yellow fox' in list(x.values())
 
-     def test_setdefault(self):
 
-         x = DictAttribute(Bunch())
 
-         x.setdefault('foo', 'NEW')
 
-         assert x['foo'] == 'NEW'
 
-         x.setdefault('foo', 'XYZ')
 
-         assert x['foo'] == 'NEW'
 
-     def test_contains(self):
 
-         x = DictAttribute(Bunch())
 
-         x['foo'] = 1
 
-         assert 'foo' in x
 
-         assert 'bar' not in x
 
-     def test_items(self):
 
-         obj = Bunch(attr1=1)
 
-         x = DictAttribute(obj)
 
-         x['attr2'] = 2
 
-         assert x['attr1'] == 1
 
-         assert x['attr2'] == 2
 
- class test_ConfigurationView:
 
-     def setup(self):
 
-         self.view = ConfigurationView(
 
-             {'changed_key': 1, 'both': 2},
 
-             [
 
-                 {'default_key': 1, 'both': 1},
 
-             ],
 
-         )
 
-     def test_setdefault(self):
 
-         self.view.setdefault('both', 36)
 
-         assert self.view['both'] == 2
 
-         self.view.setdefault('new', 36)
 
-         assert self.view['new'] == 36
 
-     def test_get(self):
 
-         assert self.view.get('both') == 2
 
-         sp = object()
 
-         assert self.view.get('nonexisting', sp) is sp
 
-     def test_update(self):
 
-         changes = dict(self.view.changes)
 
-         self.view.update(a=1, b=2, c=3)
 
-         assert self.view.changes == dict(changes, a=1, b=2, c=3)
 
-     def test_contains(self):
 
-         assert 'changed_key' in self.view
 
-         assert 'default_key' in self.view
 
-         assert 'new' not in self.view
 
-     def test_repr(self):
 
-         assert 'changed_key' in repr(self.view)
 
-         assert 'default_key' in repr(self.view)
 
-     def test_iter(self):
 
-         expected = {
 
-             'changed_key': 1,
 
-             'default_key': 1,
 
-             'both': 2,
 
-         }
 
-         assert dict(items(self.view)) == expected
 
-         assert sorted(list(iter(self.view))) == sorted(list(expected.keys()))
 
-         assert sorted(list(self.view.keys())) == sorted(list(expected.keys()))
 
-         assert (sorted(list(self.view.values())) ==
 
-                 sorted(list(expected.values())))
 
-         assert 'changed_key' in list(self.view.keys())
 
-         assert 2 in list(self.view.values())
 
-         assert ('both', 2) in list(self.view.items())
 
-     def test_add_defaults_dict(self):
 
-         defaults = {'foo': 10}
 
-         self.view.add_defaults(defaults)
 
-         assert self.view.foo == 10
 
-     def test_add_defaults_object(self):
 
-         defaults = Bunch(foo=10)
 
-         self.view.add_defaults(defaults)
 
-         assert self.view.foo == 10
 
-     def test_clear(self):
 
-         self.view.clear()
 
-         assert self.view.both == 1
 
-         assert 'changed_key' not in self.view
 
-     def test_bool(self):
 
-         assert bool(self.view)
 
-         self.view.maps[:] = []
 
-         assert not bool(self.view)
 
-     def test_len(self):
 
-         assert len(self.view) == 3
 
-         self.view.KEY = 33
 
-         assert len(self.view) == 4
 
-         self.view.clear()
 
-         assert len(self.view) == 2
 
-     def test_isa_mapping(self):
 
-         from collections import Mapping
 
-         assert issubclass(ConfigurationView, Mapping)
 
-     def test_isa_mutable_mapping(self):
 
-         from collections import MutableMapping
 
-         assert issubclass(ConfigurationView, MutableMapping)
 
- class test_ExceptionInfo:
 
-     def test_exception_info(self):
 
-         try:
 
-             raise LookupError('The quick brown fox jumps...')
 
-         except Exception:
 
-             einfo = ExceptionInfo()
 
-             assert str(einfo) == einfo.traceback
 
-             assert isinstance(einfo.exception, LookupError)
 
-             assert einfo.exception.args == ('The quick brown fox jumps...',)
 
-             assert einfo.traceback
 
-             assert repr(einfo)
 
- @skip.if_win32()
 
- class test_LimitedSet:
 
-     def test_add(self):
 
-         s = LimitedSet(maxlen=2)
 
-         s.add('foo')
 
-         s.add('bar')
 
-         for n in 'foo', 'bar':
 
-             assert n in s
 
-         s.add('baz')
 
-         for n in 'bar', 'baz':
 
-             assert n in s
 
-         assert 'foo' not in s
 
-         s = LimitedSet(maxlen=10)
 
-         for i in range(150):
 
-             s.add(i)
 
-         assert len(s) <= 10
 
-         # make sure heap is not leaking:
 
-         assert len(s._heap) < len(s) * (
 
-             100. + s.max_heap_percent_overload) / 100
 
-     def test_purge(self):
 
-         # purge now enforces rules
 
-         # cant purge(1) now. but .purge(now=...) still works
 
-         s = LimitedSet(maxlen=10)
 
-         [s.add(i) for i in range(10)]
 
-         s.maxlen = 2
 
-         s.purge()
 
-         assert len(s) == 2
 
-         # expired
 
-         s = LimitedSet(maxlen=10, expires=1)
 
-         [s.add(i) for i in range(10)]
 
-         s.maxlen = 2
 
-         s.purge(now=monotonic() + 100)
 
-         assert len(s) == 0
 
-         # not expired
 
-         s = LimitedSet(maxlen=None, expires=1)
 
-         [s.add(i) for i in range(10)]
 
-         s.maxlen = 2
 
-         s.purge(now=lambda: monotonic() - 100)
 
-         assert len(s) == 2
 
-         # expired -> minsize
 
-         s = LimitedSet(maxlen=10, minlen=10, expires=1)
 
-         [s.add(i) for i in range(20)]
 
-         s.minlen = 3
 
-         s.purge(now=monotonic() + 3)
 
-         assert s.minlen == len(s)
 
-         assert len(s._heap) <= s.maxlen * (
 
-             100. + s.max_heap_percent_overload) / 100
 
-     def test_pickleable(self):
 
-         s = LimitedSet(maxlen=2)
 
-         s.add('foo')
 
-         s.add('bar')
 
-         assert pickle.loads(pickle.dumps(s)) == s
 
-     def test_iter(self):
 
-         s = LimitedSet(maxlen=3)
 
-         items = ['foo', 'bar', 'baz', 'xaz']
 
-         for item in items:
 
-             s.add(item)
 
-         l = list(iter(s))
 
-         for item in items[1:]:
 
-             assert item in l
 
-         assert 'foo' not in l
 
-         assert l == items[1:], 'order by insertion time'
 
-     def test_repr(self):
 
-         s = LimitedSet(maxlen=2)
 
-         items = 'foo', 'bar'
 
-         for item in items:
 
-             s.add(item)
 
-         assert 'LimitedSet(' in repr(s)
 
-     def test_discard(self):
 
-         s = LimitedSet(maxlen=2)
 
-         s.add('foo')
 
-         s.discard('foo')
 
-         assert 'foo' not in s
 
-         assert len(s._data) == 0
 
-         s.discard('foo')
 
-     def test_clear(self):
 
-         s = LimitedSet(maxlen=2)
 
-         s.add('foo')
 
-         s.add('bar')
 
-         assert len(s) == 2
 
-         s.clear()
 
-         assert not s
 
-     def test_update(self):
 
-         s1 = LimitedSet(maxlen=2)
 
-         s1.add('foo')
 
-         s1.add('bar')
 
-         s2 = LimitedSet(maxlen=2)
 
-         s2.update(s1)
 
-         assert sorted(list(s2)) == ['bar', 'foo']
 
-         s2.update(['bla'])
 
-         assert sorted(list(s2)) == ['bar', 'bla']
 
-         s2.update(['do', 're'])
 
-         assert sorted(list(s2)) == ['do', 're']
 
-         s1 = LimitedSet(maxlen=10, expires=None)
 
-         s2 = LimitedSet(maxlen=10, expires=None)
 
-         s3 = LimitedSet(maxlen=10, expires=None)
 
-         s4 = LimitedSet(maxlen=10, expires=None)
 
-         s5 = LimitedSet(maxlen=10, expires=None)
 
-         for i in range(12):
 
-             s1.add(i)
 
-             s2.add(i * i)
 
-         s3.update(s1)
 
-         s3.update(s2)
 
-         s4.update(s1.as_dict())
 
-         s4.update(s2.as_dict())
 
-         s5.update(s1._data)  # revoke is using this
 
-         s5.update(s2._data)
 
-         assert s3 == s4
 
-         assert s3 == s5
 
-         s2.update(s4)
 
-         s4.update(s2)
 
-         assert s2 == s4
 
-     def test_iterable_and_ordering(self):
 
-         s = LimitedSet(maxlen=35, expires=None)
 
-         clock = count(1)
 
-         for i in reversed(range(15)):
 
-             s.add(i, now=next(clock))
 
-         j = 40
 
-         for i in s:
 
-             assert i < j  # each item is smaller and smaller
 
-             j = i
 
-         assert i == 0  # last item is zero
 
-     def test_pop_and_ordering_again(self):
 
-         s = LimitedSet(maxlen=5)
 
-         for i in range(10):
 
-             s.add(i)
 
-         j = -1
 
-         for _ in range(5):
 
-             i = s.pop()
 
-             assert j < i
 
-         i = s.pop()
 
-         assert i is None
 
-     def test_as_dict(self):
 
-         s = LimitedSet(maxlen=2)
 
-         s.add('foo')
 
-         assert isinstance(s.as_dict(), Mapping)
 
-     def test_add_removes_duplicate_from_small_heap(self):
 
-         s = LimitedSet(maxlen=2)
 
-         s.add('foo')
 
-         s.add('foo')
 
-         s.add('foo')
 
-         assert len(s) == 1
 
-         assert len(s._data) == 1
 
-         assert len(s._heap) == 1
 
-     def test_add_removes_duplicate_from_big_heap(self):
 
-         s = LimitedSet(maxlen=1000)
 
-         [s.add(i) for i in range(2000)]
 
-         assert len(s) == 1000
 
-         [s.add('foo') for i in range(1000)]
 
-         # heap is refreshed when 15% larger than _data
 
-         assert len(s._heap) < 1150
 
-         [s.add('foo') for i in range(1000)]
 
-         assert len(s._heap) < 1150
 
- class test_AttributeDict:
 
-     def test_getattr__setattr(self):
 
-         x = AttributeDict({'foo': 'bar'})
 
-         assert x['foo'] == 'bar'
 
-         with pytest.raises(AttributeError):
 
-             x.bar
 
-         x.bar = 'foo'
 
-         assert x['bar'] == 'foo'
 
- class test_Messagebuffer:
 
-     def assert_size_and_first(self, buf, size, expected_first_item):
 
-         assert len(buf) == size
 
-         assert buf.take() == expected_first_item
 
-     def test_append_limited(self):
 
-         b = Messagebuffer(10)
 
-         for i in range(20):
 
-             b.put(i)
 
-         self.assert_size_and_first(b, 10, 10)
 
-     def test_append_unlimited(self):
 
-         b = Messagebuffer(None)
 
-         for i in range(20):
 
-             b.put(i)
 
-         self.assert_size_and_first(b, 20, 0)
 
-     def test_extend_limited(self):
 
-         b = Messagebuffer(10)
 
-         b.extend(list(range(20)))
 
-         self.assert_size_and_first(b, 10, 10)
 
-     def test_extend_unlimited(self):
 
-         b = Messagebuffer(None)
 
-         b.extend(list(range(20)))
 
-         self.assert_size_and_first(b, 20, 0)
 
-     def test_extend_eviction_time_limited(self):
 
-         b = Messagebuffer(3000)
 
-         b.extend(range(10000))
 
-         assert len(b) > 3000
 
-         b.evict()
 
-         assert len(b) == 3000
 
-     def test_pop_empty_with_default(self):
 
-         b = Messagebuffer(10)
 
-         sentinel = object()
 
-         assert b.take(sentinel) is sentinel
 
-     def test_pop_empty_no_default(self):
 
-         b = Messagebuffer(10)
 
-         with pytest.raises(b.Empty):
 
-             b.take()
 
-     def test_repr(self):
 
-         assert repr(Messagebuffer(10, [1, 2, 3]))
 
-     def test_iter(self):
 
-         b = Messagebuffer(10, list(range(10)))
 
-         assert len(b) == 10
 
-         for i, item in enumerate(b):
 
-             assert item == i
 
-         assert len(b) == 0
 
-     def test_contains(self):
 
-         b = Messagebuffer(10, list(range(10)))
 
-         assert 5 in b
 
-     def test_reversed(self):
 
-         assert (list(reversed(Messagebuffer(10, list(range(10))))) ==
 
-                 list(reversed(range(10))))
 
-     def test_getitem(self):
 
-         b = Messagebuffer(10, list(range(10)))
 
-         for i in range(10):
 
-             assert b[i] == i
 
- class test_BufferMap:
 
-     def test_append_limited(self):
 
-         b = BufferMap(10)
 
-         for i in range(20):
 
-             b.put(i, i)
 
-         self.assert_size_and_first(b, 10, 10)
 
-     def assert_size_and_first(self, buf, size, expected_first_item):
 
-         assert buf.total == size
 
-         assert buf._LRUpop() == expected_first_item
 
-     def test_append_unlimited(self):
 
-         b = BufferMap(None)
 
-         for i in range(20):
 
-             b.put(i, i)
 
-         self.assert_size_and_first(b, 20, 0)
 
-     def test_extend_limited(self):
 
-         b = BufferMap(10)
 
-         b.extend(1, list(range(20)))
 
-         self.assert_size_and_first(b, 10, 10)
 
-     def test_extend_unlimited(self):
 
-         b = BufferMap(None)
 
-         b.extend(1, list(range(20)))
 
-         self.assert_size_and_first(b, 20, 0)
 
-     def test_pop_empty_with_default(self):
 
-         b = BufferMap(10)
 
-         sentinel = object()
 
-         assert b.take(1, sentinel) is sentinel
 
-     def test_pop_empty_no_default(self):
 
-         b = BufferMap(10)
 
-         with pytest.raises(b.Empty):
 
-             b.take(1)
 
-     def test_repr(self):
 
-         assert repr(Messagebuffer(10, [1, 2, 3]))
 
 
  |