123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- from __future__ import absolute_import
- import pickle
- import sys
- from billiard.einfo import ExceptionInfo
- from time import time
- from celery.datastructures import (
- LimitedSet,
- AttributeDict,
- DictAttribute,
- ConfigurationView,
- DependencyGraph,
- )
- from celery.five import items
- from celery.tests.case import Case, Mock, WhateverIO, SkipTest, patch
- class Object(object):
- pass
- class test_DictAttribute(Case):
- def test_get_set_keys_values_items(self):
- x = DictAttribute(Object())
- x['foo'] = 'The quick brown fox'
- self.assertEqual(x['foo'], 'The quick brown fox')
- self.assertEqual(x['foo'], x.obj.foo)
- self.assertEqual(x.get('foo'), 'The quick brown fox')
- self.assertIsNone(x.get('bar'))
- with self.assertRaises(KeyError):
- x['bar']
- x.foo = 'The quick yellow fox'
- self.assertEqual(x['foo'], 'The quick yellow fox')
- self.assertIn(
- ('foo', 'The quick yellow fox'),
- list(x.items()),
- )
- self.assertIn('foo', list(x.keys()))
- self.assertIn('The quick yellow fox', list(x.values()))
- def test_setdefault(self):
- x = DictAttribute(Object())
- self.assertEqual(x.setdefault('foo', 'NEW'), 'NEW')
- self.assertEqual(x.setdefault('foo', 'XYZ'), 'NEW')
- def test_contains(self):
- x = DictAttribute(Object())
- x['foo'] = 1
- self.assertIn('foo', x)
- self.assertNotIn('bar', x)
- def test_items(self):
- obj = Object()
- obj.attr1 = 1
- x = DictAttribute(obj)
- x['attr2'] = 2
- self.assertEqual(x['attr1'], 1)
- self.assertEqual(x['attr2'], 2)
- class test_ConfigurationView(Case):
- def setUp(self):
- self.view = ConfigurationView({'changed_key': 1,
- 'both': 2},
- [{'default_key': 1,
- 'both': 1}])
- def test_setdefault(self):
- self.assertEqual(self.view.setdefault('both', 36), 2)
- self.assertEqual(self.view.setdefault('new', 36), 36)
- def test_get(self):
- self.assertEqual(self.view.get('both'), 2)
- sp = object()
- self.assertIs(self.view.get('nonexisting', sp), sp)
- def test_update(self):
- changes = dict(self.view.changes)
- self.view.update(a=1, b=2, c=3)
- self.assertDictEqual(self.view.changes,
- dict(changes, a=1, b=2, c=3))
- def test_contains(self):
- self.assertIn('changed_key', self.view)
- self.assertIn('default_key', self.view)
- self.assertNotIn('new', self.view)
- def test_repr(self):
- self.assertIn('changed_key', repr(self.view))
- self.assertIn('default_key', repr(self.view))
- def test_iter(self):
- expected = {'changed_key': 1,
- 'default_key': 1,
- 'both': 2}
- self.assertDictEqual(dict(items(self.view)), expected)
- self.assertItemsEqual(list(iter(self.view)),
- list(expected.keys()))
- self.assertItemsEqual(list(self.view.keys()), list(expected.keys()))
- self.assertItemsEqual(
- list(self.view.values()),
- list(expected.values()),
- )
- self.assertIn('changed_key', list(self.view.keys()))
- self.assertIn(2, list(self.view.values()))
- self.assertIn(('both', 2), list(self.view.items()))
- def test_add_defaults_dict(self):
- defaults = {'foo': 10}
- self.view.add_defaults(defaults)
- self.assertEqual(self.view.foo, 10)
- def test_add_defaults_object(self):
- defaults = Object()
- defaults.foo = 10
- self.view.add_defaults(defaults)
- self.assertEqual(self.view.foo, 10)
- def test_clear(self):
- self.view.clear()
- self.assertEqual(self.view.both, 1)
- self.assertNotIn('changed_key', self.view)
- def test_bool(self):
- self.assertTrue(bool(self.view))
- self.view._order[:] = []
- self.assertFalse(bool(self.view))
- def test_len(self):
- self.assertEqual(len(self.view), 3)
- self.view.KEY = 33
- self.assertEqual(len(self.view), 4)
- self.view.clear()
- self.assertEqual(len(self.view), 2)
- def test_isa_mapping(self):
- from collections import Mapping
- self.assertTrue(issubclass(ConfigurationView, Mapping))
- def test_isa_mutable_mapping(self):
- from collections import MutableMapping
- self.assertTrue(issubclass(ConfigurationView, MutableMapping))
- class test_ExceptionInfo(Case):
- def test_exception_info(self):
- try:
- raise LookupError('The quick brown fox jumps...')
- except Exception:
- einfo = ExceptionInfo()
- self.assertEqual(str(einfo), einfo.traceback)
- self.assertIsInstance(einfo.exception, LookupError)
- self.assertTupleEqual(
- einfo.exception.args, ('The quick brown fox jumps...', ),
- )
- self.assertTrue(einfo.traceback)
- r = repr(einfo)
- self.assertTrue(r)
- class test_LimitedSet(Case):
- def setUp(self):
- if sys.platform == 'win32':
- raise SkipTest('Not working on Windows')
- def test_add(self):
- if sys.platform == 'win32':
- raise SkipTest('Not working properly on Windows')
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.add('bar')
- for n in 'foo', 'bar':
- self.assertIn(n, s)
- s.add('baz')
- for n in 'bar', 'baz':
- self.assertIn(n, s)
- self.assertNotIn('foo', s)
- def test_purge(self):
- s = LimitedSet(maxlen=None)
- [s.add(i) for i in range(10)]
- s.maxlen = 2
- s.purge(1)
- self.assertEqual(len(s), 9)
- s.purge(None)
- self.assertEqual(len(s), 2)
- # expired
- s = LimitedSet(maxlen=None, expires=1)
- [s.add(i) for i in range(10)]
- s.maxlen = 2
- s.purge(1, now=lambda: time() + 100)
- self.assertEqual(len(s), 9)
- s.purge(None, now=lambda: time() + 100)
- self.assertEqual(len(s), 2)
- # not expired
- s = LimitedSet(maxlen=None, expires=1)
- [s.add(i) for i in range(10)]
- s.maxlen = 2
- s.purge(1, now=lambda: time() - 100)
- self.assertEqual(len(s), 10)
- s.purge(None, now=lambda: time() - 100)
- self.assertEqual(len(s), 10)
- s = LimitedSet(maxlen=None)
- [s.add(i) for i in range(10)]
- s.maxlen = 2
- with patch('celery.datastructures.heappop') as hp:
- hp.side_effect = IndexError()
- s.purge()
- hp.assert_called_with(s._heap)
- with patch('celery.datastructures.heappop') as hp:
- s._data = dict((i * 2, i * 2) for i in range(10))
- s.purge()
- self.assertEqual(hp.call_count, 10)
- def test_pickleable(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.add('bar')
- self.assertEqual(pickle.loads(pickle.dumps(s)), s)
- def test_iter(self):
- if sys.platform == 'win32':
- raise SkipTest('Not working on Windows')
- s = LimitedSet(maxlen=3)
- items = ['foo', 'bar', 'baz', 'xaz']
- for item in items:
- s.add(item)
- l = list(iter(s))
- for item in items[1:]:
- self.assertIn(item, l)
- self.assertNotIn('foo', l)
- self.assertListEqual(l, items[1:], 'order by insertion time')
- def test_repr(self):
- s = LimitedSet(maxlen=2)
- items = 'foo', 'bar'
- for item in items:
- s.add(item)
- self.assertIn('LimitedSet(', repr(s))
- def test_discard(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.discard('foo')
- self.assertNotIn('foo', s)
- s.discard('foo')
- def test_clear(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.add('bar')
- self.assertEqual(len(s), 2)
- s.clear()
- self.assertFalse(s)
- def test_update(self):
- s1 = LimitedSet(maxlen=2)
- s1.add('foo')
- s1.add('bar')
- s2 = LimitedSet(maxlen=2)
- s2.update(s1)
- self.assertItemsEqual(list(s2), ['foo', 'bar'])
- s2.update(['bla'])
- self.assertItemsEqual(list(s2), ['bla', 'bar'])
- s2.update(['do', 're'])
- self.assertItemsEqual(list(s2), ['do', 're'])
- def test_as_dict(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- self.assertIsInstance(s.as_dict(), dict)
- class test_AttributeDict(Case):
- def test_getattr__setattr(self):
- x = AttributeDict({'foo': 'bar'})
- self.assertEqual(x['foo'], 'bar')
- with self.assertRaises(AttributeError):
- x.bar
- x.bar = 'foo'
- self.assertEqual(x['bar'], 'foo')
- class test_DependencyGraph(Case):
- def graph1(self):
- return DependencyGraph([
- ('A', []),
- ('B', []),
- ('C', ['A']),
- ('D', ['C', 'B']),
- ])
- def test_repr(self):
- self.assertTrue(repr(self.graph1()))
- def test_topsort(self):
- order = self.graph1().topsort()
- # C must start before D
- self.assertLess(order.index('C'), order.index('D'))
- # and B must start before D
- self.assertLess(order.index('B'), order.index('D'))
- # and A must start before C
- self.assertLess(order.index('A'), order.index('C'))
- def test_edges(self):
- self.assertItemsEqual(
- list(self.graph1().edges()),
- ['C', 'D'],
- )
- def test_connect(self):
- x, y = self.graph1(), self.graph1()
- x.connect(y)
- def test_valency_of_when_missing(self):
- x = self.graph1()
- self.assertEqual(x.valency_of('foobarbaz'), 0)
- def test_format(self):
- x = self.graph1()
- x.formatter = Mock()
- obj = Mock()
- self.assertTrue(x.format(obj))
- x.formatter.assert_called_with(obj)
- x.formatter = None
- self.assertIs(x.format(obj), obj)
- def test_items(self):
- self.assertDictEqual(
- dict(items(self.graph1())),
- {'A': [], 'B': [], 'C': ['A'], 'D': ['C', 'B']},
- )
- def test_repr_node(self):
- x = self.graph1()
- self.assertTrue(x.repr_node('fasdswewqewq'))
- def test_to_dot(self):
- s = WhateverIO()
- self.graph1().to_dot(s)
- self.assertTrue(s.getvalue())
|