test_datastructures.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. from __future__ import absolute_import
  2. from celery.datastructures import (
  3. ExceptionInfo,
  4. LimitedSet,
  5. AttributeDict,
  6. DictAttribute,
  7. ConfigurationView,
  8. DependencyGraph,
  9. )
  10. from celery.five import items
  11. from celery.tests.case import Case, WhateverIO
  12. class Object(object):
  13. pass
  14. class test_DictAttribute(Case):
  15. def test_get_set(self):
  16. x = DictAttribute(Object())
  17. x['foo'] = 'The quick brown fox'
  18. self.assertEqual(x['foo'], 'The quick brown fox')
  19. self.assertEqual(x['foo'], x.obj.foo)
  20. self.assertEqual(x.get('foo'), 'The quick brown fox')
  21. self.assertIsNone(x.get('bar'))
  22. with self.assertRaises(KeyError):
  23. x['bar']
  24. def test_setdefault(self):
  25. x = DictAttribute(Object())
  26. self.assertEqual(x.setdefault('foo', 'NEW'), 'NEW')
  27. self.assertEqual(x.setdefault('foo', 'XYZ'), 'NEW')
  28. def test_contains(self):
  29. x = DictAttribute(Object())
  30. x['foo'] = 1
  31. self.assertIn('foo', x)
  32. self.assertNotIn('bar', x)
  33. def test_items(self):
  34. obj = Object()
  35. obj.attr1 = 1
  36. x = DictAttribute(obj)
  37. x['attr2'] = 2
  38. self.assertEqual(x['attr1'], 1)
  39. self.assertEqual(x['attr2'], 2)
  40. class test_ConfigurationView(Case):
  41. def setUp(self):
  42. self.view = ConfigurationView({'changed_key': 1,
  43. 'both': 2},
  44. [{'default_key': 1,
  45. 'both': 1}])
  46. def test_setdefault(self):
  47. self.assertEqual(self.view.setdefault('both', 36), 2)
  48. self.assertEqual(self.view.setdefault('new', 36), 36)
  49. def test_get(self):
  50. self.assertEqual(self.view.get('both'), 2)
  51. sp = object()
  52. self.assertIs(self.view.get('nonexisting', sp), sp)
  53. def test_update(self):
  54. changes = dict(self.view.changes)
  55. self.view.update(a=1, b=2, c=3)
  56. self.assertDictEqual(self.view.changes,
  57. dict(changes, a=1, b=2, c=3))
  58. def test_contains(self):
  59. self.assertIn('changed_key', self.view)
  60. self.assertIn('default_key', self.view)
  61. self.assertNotIn('new', self.view)
  62. def test_repr(self):
  63. self.assertIn('changed_key', repr(self.view))
  64. self.assertIn('default_key', repr(self.view))
  65. def test_iter(self):
  66. expected = {'changed_key': 1,
  67. 'default_key': 1,
  68. 'both': 2}
  69. self.assertDictEqual(dict(items(self.view)), expected)
  70. self.assertItemsEqual(list(iter(self.view)),
  71. list(expected.keys()))
  72. self.assertItemsEqual(list(self.view.keys()), list(expected.keys()))
  73. self.assertItemsEqual(
  74. list(self.view.values()),
  75. list(expected.values()),
  76. )
  77. def test_isa_mapping(self):
  78. from collections import Mapping
  79. self.assertTrue(issubclass(ConfigurationView, Mapping))
  80. def test_isa_mutable_mapping(self):
  81. from collections import MutableMapping
  82. self.assertTrue(issubclass(ConfigurationView, MutableMapping))
  83. class test_ExceptionInfo(Case):
  84. def test_exception_info(self):
  85. try:
  86. raise LookupError('The quick brown fox jumps...')
  87. except Exception:
  88. einfo = ExceptionInfo()
  89. self.assertEqual(str(einfo), einfo.traceback)
  90. self.assertIsInstance(einfo.exception, LookupError)
  91. self.assertTupleEqual(
  92. einfo.exception.args, ('The quick brown fox jumps...', ),
  93. )
  94. self.assertTrue(einfo.traceback)
  95. r = repr(einfo)
  96. self.assertTrue(r)
  97. class test_LimitedSet(Case):
  98. def test_add(self):
  99. s = LimitedSet(maxlen=2)
  100. s.add('foo')
  101. s.add('bar')
  102. for n in 'foo', 'bar':
  103. self.assertIn(n, s)
  104. s.add('baz')
  105. for n in 'bar', 'baz':
  106. self.assertIn(n, s)
  107. self.assertNotIn('foo', s)
  108. def test_iter(self):
  109. s = LimitedSet(maxlen=2)
  110. items = 'foo', 'bar'
  111. for item in items:
  112. s.add(item)
  113. l = list(iter(s))
  114. for item in items:
  115. self.assertIn(item, l)
  116. def test_repr(self):
  117. s = LimitedSet(maxlen=2)
  118. items = 'foo', 'bar'
  119. for item in items:
  120. s.add(item)
  121. self.assertIn('LimitedSet(', repr(s))
  122. def test_clear(self):
  123. s = LimitedSet(maxlen=2)
  124. s.add('foo')
  125. s.add('bar')
  126. self.assertEqual(len(s), 2)
  127. s.clear()
  128. self.assertFalse(s)
  129. def test_update(self):
  130. s1 = LimitedSet(maxlen=2)
  131. s1.add('foo')
  132. s1.add('bar')
  133. s2 = LimitedSet(maxlen=2)
  134. s2.update(s1)
  135. self.assertItemsEqual(list(s2), ['foo', 'bar'])
  136. s2.update(['bla'])
  137. self.assertItemsEqual(list(s2), ['bla', 'bar'])
  138. s2.update(['do', 're'])
  139. self.assertItemsEqual(list(s2), ['do', 're'])
  140. def test_as_dict(self):
  141. s = LimitedSet(maxlen=2)
  142. s.add('foo')
  143. self.assertIsInstance(s.as_dict(), dict)
  144. class test_AttributeDict(Case):
  145. def test_getattr__setattr(self):
  146. x = AttributeDict({'foo': 'bar'})
  147. self.assertEqual(x['foo'], 'bar')
  148. with self.assertRaises(AttributeError):
  149. x.bar
  150. x.bar = 'foo'
  151. self.assertEqual(x['bar'], 'foo')
  152. class test_DependencyGraph(Case):
  153. def graph1(self):
  154. return DependencyGraph([
  155. ('A', []),
  156. ('B', []),
  157. ('C', ['A']),
  158. ('D', ['C', 'B']),
  159. ])
  160. def test_repr(self):
  161. self.assertTrue(repr(self.graph1()))
  162. def test_topsort(self):
  163. order = self.graph1().topsort()
  164. # C must start before D
  165. self.assertLess(order.index('C'), order.index('D'))
  166. # and B must start before D
  167. self.assertLess(order.index('B'), order.index('D'))
  168. # and A must start before C
  169. self.assertLess(order.index('A'), order.index('C'))
  170. def test_edges(self):
  171. self.assertItemsEqual(
  172. list(self.graph1().edges()),
  173. ['C', 'D'],
  174. )
  175. def test_items(self):
  176. self.assertDictEqual(
  177. dict(items(self.graph1())),
  178. {'A': [], 'B': [], 'C': ['A'], 'D': ['C', 'B']},
  179. )
  180. def test_to_dot(self):
  181. s = WhateverIO()
  182. self.graph1().to_dot(s)
  183. self.assertTrue(s.getvalue())