123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- from __future__ import absolute_import
- from celery.datastructures import (
- ExceptionInfo,
- LimitedSet,
- AttributeDict,
- DictAttribute,
- ConfigurationView,
- DependencyGraph,
- )
- from celery.five import items
- from celery.tests.case import Case, WhateverIO
- class Object(object):
- pass
- class test_DictAttribute(Case):
- def test_get_set(self):
- x = DictAttribute(Object())
- x['foo'] = 'The quick brown fox'
- self.assertEqual(x['foo'], 'The quick brown fox')
- self.assertEqual(x['foo'], x.obj.foo)
- self.assertEqual(x.get('foo'), 'The quick brown fox')
- self.assertIsNone(x.get('bar'))
- with self.assertRaises(KeyError):
- x['bar']
- def test_setdefault(self):
- x = DictAttribute(Object())
- self.assertEqual(x.setdefault('foo', 'NEW'), 'NEW')
- self.assertEqual(x.setdefault('foo', 'XYZ'), 'NEW')
- def test_contains(self):
- x = DictAttribute(Object())
- x['foo'] = 1
- self.assertIn('foo', x)
- self.assertNotIn('bar', x)
- def test_items(self):
- obj = Object()
- obj.attr1 = 1
- x = DictAttribute(obj)
- x['attr2'] = 2
- self.assertEqual(x['attr1'], 1)
- self.assertEqual(x['attr2'], 2)
- class test_ConfigurationView(Case):
- def setUp(self):
- self.view = ConfigurationView({'changed_key': 1,
- 'both': 2},
- [{'default_key': 1,
- 'both': 1}])
- def test_setdefault(self):
- self.assertEqual(self.view.setdefault('both', 36), 2)
- self.assertEqual(self.view.setdefault('new', 36), 36)
- def test_get(self):
- self.assertEqual(self.view.get('both'), 2)
- sp = object()
- self.assertIs(self.view.get('nonexisting', sp), sp)
- def test_update(self):
- changes = dict(self.view.changes)
- self.view.update(a=1, b=2, c=3)
- self.assertDictEqual(self.view.changes,
- dict(changes, a=1, b=2, c=3))
- def test_contains(self):
- self.assertIn('changed_key', self.view)
- self.assertIn('default_key', self.view)
- self.assertNotIn('new', self.view)
- def test_repr(self):
- self.assertIn('changed_key', repr(self.view))
- self.assertIn('default_key', repr(self.view))
- def test_iter(self):
- expected = {'changed_key': 1,
- 'default_key': 1,
- 'both': 2}
- self.assertDictEqual(dict(items(self.view)), expected)
- self.assertItemsEqual(list(iter(self.view)),
- list(expected.keys()))
- self.assertItemsEqual(list(self.view.keys()), list(expected.keys()))
- self.assertItemsEqual(
- list(self.view.values()),
- list(expected.values()),
- )
- def test_isa_mapping(self):
- from collections import Mapping
- self.assertTrue(issubclass(ConfigurationView, Mapping))
- def test_isa_mutable_mapping(self):
- from collections import MutableMapping
- self.assertTrue(issubclass(ConfigurationView, MutableMapping))
- class test_ExceptionInfo(Case):
- def test_exception_info(self):
- try:
- raise LookupError('The quick brown fox jumps...')
- except Exception:
- einfo = ExceptionInfo()
- self.assertEqual(str(einfo), einfo.traceback)
- self.assertIsInstance(einfo.exception, LookupError)
- self.assertTupleEqual(
- einfo.exception.args, ('The quick brown fox jumps...', ),
- )
- self.assertTrue(einfo.traceback)
- r = repr(einfo)
- self.assertTrue(r)
- class test_LimitedSet(Case):
- def test_add(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.add('bar')
- for n in 'foo', 'bar':
- self.assertIn(n, s)
- s.add('baz')
- for n in 'bar', 'baz':
- self.assertIn(n, s)
- self.assertNotIn('foo', s)
- def test_iter(self):
- s = LimitedSet(maxlen=2)
- items = 'foo', 'bar'
- for item in items:
- s.add(item)
- l = list(iter(s))
- for item in items:
- self.assertIn(item, l)
- def test_repr(self):
- s = LimitedSet(maxlen=2)
- items = 'foo', 'bar'
- for item in items:
- s.add(item)
- self.assertIn('LimitedSet(', repr(s))
- def test_clear(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- s.add('bar')
- self.assertEqual(len(s), 2)
- s.clear()
- self.assertFalse(s)
- def test_update(self):
- s1 = LimitedSet(maxlen=2)
- s1.add('foo')
- s1.add('bar')
- s2 = LimitedSet(maxlen=2)
- s2.update(s1)
- self.assertItemsEqual(list(s2), ['foo', 'bar'])
- s2.update(['bla'])
- self.assertItemsEqual(list(s2), ['bla', 'bar'])
- s2.update(['do', 're'])
- self.assertItemsEqual(list(s2), ['do', 're'])
- def test_as_dict(self):
- s = LimitedSet(maxlen=2)
- s.add('foo')
- self.assertIsInstance(s.as_dict(), dict)
- class test_AttributeDict(Case):
- def test_getattr__setattr(self):
- x = AttributeDict({'foo': 'bar'})
- self.assertEqual(x['foo'], 'bar')
- with self.assertRaises(AttributeError):
- x.bar
- x.bar = 'foo'
- self.assertEqual(x['bar'], 'foo')
- class test_DependencyGraph(Case):
- def graph1(self):
- return DependencyGraph([
- ('A', []),
- ('B', []),
- ('C', ['A']),
- ('D', ['C', 'B']),
- ])
- def test_repr(self):
- self.assertTrue(repr(self.graph1()))
- def test_topsort(self):
- order = self.graph1().topsort()
- # C must start before D
- self.assertLess(order.index('C'), order.index('D'))
- # and B must start before D
- self.assertLess(order.index('B'), order.index('D'))
- # and A must start before C
- self.assertLess(order.index('A'), order.index('C'))
- def test_edges(self):
- self.assertItemsEqual(
- list(self.graph1().edges()),
- ['C', 'D'],
- )
- def test_items(self):
- self.assertDictEqual(
- dict(items(self.graph1())),
- {'A': [], 'B': [], 'C': ['A'], 'D': ['C', 'B']},
- )
- def test_to_dot(self):
- s = WhateverIO()
- self.graph1().to_dot(s)
- self.assertTrue(s.getvalue())
|