test_datastructures.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. from __future__ import absolute_import
  2. import pickle
  3. import sys
  4. from billiard.einfo import ExceptionInfo
  5. from time import time
  6. from celery.datastructures import (
  7. LimitedSet,
  8. AttributeDict,
  9. DictAttribute,
  10. ConfigurationView,
  11. DependencyGraph,
  12. )
  13. from celery.five import items
  14. from celery.tests.case import Case, Mock, WhateverIO, SkipTest, patch
  15. class Object(object):
  16. pass
  17. class test_DictAttribute(Case):
  18. def test_get_set_keys_values_items(self):
  19. x = DictAttribute(Object())
  20. x['foo'] = 'The quick brown fox'
  21. self.assertEqual(x['foo'], 'The quick brown fox')
  22. self.assertEqual(x['foo'], x.obj.foo)
  23. self.assertEqual(x.get('foo'), 'The quick brown fox')
  24. self.assertIsNone(x.get('bar'))
  25. with self.assertRaises(KeyError):
  26. x['bar']
  27. x.foo = 'The quick yellow fox'
  28. self.assertEqual(x['foo'], 'The quick yellow fox')
  29. self.assertIn(
  30. ('foo', 'The quick yellow fox'),
  31. list(x.items()),
  32. )
  33. self.assertIn('foo', list(x.keys()))
  34. self.assertIn('The quick yellow fox', list(x.values()))
  35. def test_setdefault(self):
  36. x = DictAttribute(Object())
  37. x.setdefault('foo', 'NEW')
  38. self.assertEqual(x['foo'], 'NEW')
  39. x.setdefault('foo', 'XYZ')
  40. self.assertEqual(x['foo'], 'NEW')
  41. def test_contains(self):
  42. x = DictAttribute(Object())
  43. x['foo'] = 1
  44. self.assertIn('foo', x)
  45. self.assertNotIn('bar', x)
  46. def test_items(self):
  47. obj = Object()
  48. obj.attr1 = 1
  49. x = DictAttribute(obj)
  50. x['attr2'] = 2
  51. self.assertEqual(x['attr1'], 1)
  52. self.assertEqual(x['attr2'], 2)
  53. class test_ConfigurationView(Case):
  54. def setUp(self):
  55. self.view = ConfigurationView({'changed_key': 1,
  56. 'both': 2},
  57. [{'default_key': 1,
  58. 'both': 1}])
  59. def test_setdefault(self):
  60. self.view.setdefault('both', 36)
  61. self.assertEqual(self.view['both'], 2)
  62. self.view.setdefault('new', 36)
  63. self.assertEqual(self.view['new'], 36)
  64. def test_get(self):
  65. self.assertEqual(self.view.get('both'), 2)
  66. sp = object()
  67. self.assertIs(self.view.get('nonexisting', sp), sp)
  68. def test_update(self):
  69. changes = dict(self.view.changes)
  70. self.view.update(a=1, b=2, c=3)
  71. self.assertDictEqual(self.view.changes,
  72. dict(changes, a=1, b=2, c=3))
  73. def test_contains(self):
  74. self.assertIn('changed_key', self.view)
  75. self.assertIn('default_key', self.view)
  76. self.assertNotIn('new', self.view)
  77. def test_repr(self):
  78. self.assertIn('changed_key', repr(self.view))
  79. self.assertIn('default_key', repr(self.view))
  80. def test_iter(self):
  81. expected = {'changed_key': 1,
  82. 'default_key': 1,
  83. 'both': 2}
  84. self.assertDictEqual(dict(items(self.view)), expected)
  85. self.assertItemsEqual(list(iter(self.view)),
  86. list(expected.keys()))
  87. self.assertItemsEqual(list(self.view.keys()), list(expected.keys()))
  88. self.assertItemsEqual(
  89. list(self.view.values()),
  90. list(expected.values()),
  91. )
  92. self.assertIn('changed_key', list(self.view.keys()))
  93. self.assertIn(2, list(self.view.values()))
  94. self.assertIn(('both', 2), list(self.view.items()))
  95. def test_add_defaults_dict(self):
  96. defaults = {'foo': 10}
  97. self.view.add_defaults(defaults)
  98. self.assertEqual(self.view.foo, 10)
  99. def test_add_defaults_object(self):
  100. defaults = Object()
  101. defaults.foo = 10
  102. self.view.add_defaults(defaults)
  103. self.assertEqual(self.view.foo, 10)
  104. def test_clear(self):
  105. self.view.clear()
  106. self.assertEqual(self.view.both, 1)
  107. self.assertNotIn('changed_key', self.view)
  108. def test_bool(self):
  109. self.assertTrue(bool(self.view))
  110. self.view._order[:] = []
  111. self.assertFalse(bool(self.view))
  112. def test_len(self):
  113. self.assertEqual(len(self.view), 3)
  114. self.view.KEY = 33
  115. self.assertEqual(len(self.view), 4)
  116. self.view.clear()
  117. self.assertEqual(len(self.view), 2)
  118. def test_isa_mapping(self):
  119. from collections import Mapping
  120. self.assertTrue(issubclass(ConfigurationView, Mapping))
  121. def test_isa_mutable_mapping(self):
  122. from collections import MutableMapping
  123. self.assertTrue(issubclass(ConfigurationView, MutableMapping))
  124. class test_ExceptionInfo(Case):
  125. def test_exception_info(self):
  126. try:
  127. raise LookupError('The quick brown fox jumps...')
  128. except Exception:
  129. einfo = ExceptionInfo()
  130. self.assertEqual(str(einfo), einfo.traceback)
  131. self.assertIsInstance(einfo.exception, LookupError)
  132. self.assertTupleEqual(
  133. einfo.exception.args, ('The quick brown fox jumps...',),
  134. )
  135. self.assertTrue(einfo.traceback)
  136. r = repr(einfo)
  137. self.assertTrue(r)
  138. class test_LimitedSet(Case):
  139. def setUp(self):
  140. if sys.platform == 'win32':
  141. raise SkipTest('Not working on Windows')
  142. def test_add(self):
  143. if sys.platform == 'win32':
  144. raise SkipTest('Not working properly on Windows')
  145. s = LimitedSet(maxlen=2)
  146. s.add('foo')
  147. s.add('bar')
  148. for n in 'foo', 'bar':
  149. self.assertIn(n, s)
  150. s.add('baz')
  151. for n in 'bar', 'baz':
  152. self.assertIn(n, s)
  153. self.assertNotIn('foo', s)
  154. def test_purge(self):
  155. s = LimitedSet(maxlen=None)
  156. [s.add(i) for i in range(10)]
  157. s.maxlen = 2
  158. s.purge(1)
  159. self.assertEqual(len(s), 9)
  160. s.purge(None)
  161. self.assertEqual(len(s), 2)
  162. # expired
  163. s = LimitedSet(maxlen=None, expires=1)
  164. [s.add(i) for i in range(10)]
  165. s.maxlen = 2
  166. s.purge(1, now=lambda: time() + 100)
  167. self.assertEqual(len(s), 9)
  168. s.purge(None, now=lambda: time() + 100)
  169. self.assertEqual(len(s), 2)
  170. # not expired
  171. s = LimitedSet(maxlen=None, expires=1)
  172. [s.add(i) for i in range(10)]
  173. s.maxlen = 2
  174. s.purge(1, now=lambda: time() - 100)
  175. self.assertEqual(len(s), 10)
  176. s.purge(None, now=lambda: time() - 100)
  177. self.assertEqual(len(s), 10)
  178. s = LimitedSet(maxlen=None)
  179. [s.add(i) for i in range(10)]
  180. s.maxlen = 2
  181. with patch('celery.datastructures.heappop') as hp:
  182. hp.side_effect = IndexError()
  183. s.purge()
  184. hp.assert_called_with(s._heap)
  185. with patch('celery.datastructures.heappop') as hp:
  186. s._data = {i * 2: i * 2 for i in range(10)}
  187. s.purge()
  188. self.assertEqual(hp.call_count, 10)
  189. def test_pickleable(self):
  190. s = LimitedSet(maxlen=2)
  191. s.add('foo')
  192. s.add('bar')
  193. self.assertEqual(pickle.loads(pickle.dumps(s)), s)
  194. def test_iter(self):
  195. if sys.platform == 'win32':
  196. raise SkipTest('Not working on Windows')
  197. s = LimitedSet(maxlen=3)
  198. items = ['foo', 'bar', 'baz', 'xaz']
  199. for item in items:
  200. s.add(item)
  201. l = list(iter(s))
  202. for item in items[1:]:
  203. self.assertIn(item, l)
  204. self.assertNotIn('foo', l)
  205. self.assertListEqual(l, items[1:], 'order by insertion time')
  206. def test_repr(self):
  207. s = LimitedSet(maxlen=2)
  208. items = 'foo', 'bar'
  209. for item in items:
  210. s.add(item)
  211. self.assertIn('LimitedSet(', repr(s))
  212. def test_discard(self):
  213. s = LimitedSet(maxlen=2)
  214. s.add('foo')
  215. s.discard('foo')
  216. self.assertNotIn('foo', s)
  217. self.assertEqual(len(s._data), 0)
  218. self.assertEqual(len(s._heap), 0)
  219. s.discard('foo')
  220. def test_clear(self):
  221. s = LimitedSet(maxlen=2)
  222. s.add('foo')
  223. s.add('bar')
  224. self.assertEqual(len(s), 2)
  225. s.clear()
  226. self.assertFalse(s)
  227. def test_update(self):
  228. s1 = LimitedSet(maxlen=2)
  229. s1.add('foo')
  230. s1.add('bar')
  231. s2 = LimitedSet(maxlen=2)
  232. s2.update(s1)
  233. self.assertItemsEqual(list(s2), ['foo', 'bar'])
  234. s2.update(['bla'])
  235. self.assertItemsEqual(list(s2), ['bla', 'bar'])
  236. s2.update(['do', 're'])
  237. self.assertItemsEqual(list(s2), ['do', 're'])
  238. def test_as_dict(self):
  239. s = LimitedSet(maxlen=2)
  240. s.add('foo')
  241. self.assertIsInstance(s.as_dict(), dict)
  242. class test_AttributeDict(Case):
  243. def test_getattr__setattr(self):
  244. x = AttributeDict({'foo': 'bar'})
  245. self.assertEqual(x['foo'], 'bar')
  246. with self.assertRaises(AttributeError):
  247. x.bar
  248. x.bar = 'foo'
  249. self.assertEqual(x['bar'], 'foo')
  250. class test_DependencyGraph(Case):
  251. def graph1(self):
  252. return DependencyGraph([
  253. ('A', []),
  254. ('B', []),
  255. ('C', ['A']),
  256. ('D', ['C', 'B']),
  257. ])
  258. def test_repr(self):
  259. self.assertTrue(repr(self.graph1()))
  260. def test_topsort(self):
  261. order = self.graph1().topsort()
  262. # C must start before D
  263. self.assertLess(order.index('C'), order.index('D'))
  264. # and B must start before D
  265. self.assertLess(order.index('B'), order.index('D'))
  266. # and A must start before C
  267. self.assertLess(order.index('A'), order.index('C'))
  268. def test_edges(self):
  269. self.assertItemsEqual(
  270. list(self.graph1().edges()),
  271. ['C', 'D'],
  272. )
  273. def test_connect(self):
  274. x, y = self.graph1(), self.graph1()
  275. x.connect(y)
  276. def test_valency_of_when_missing(self):
  277. x = self.graph1()
  278. self.assertEqual(x.valency_of('foobarbaz'), 0)
  279. def test_format(self):
  280. x = self.graph1()
  281. x.formatter = Mock()
  282. obj = Mock()
  283. self.assertTrue(x.format(obj))
  284. x.formatter.assert_called_with(obj)
  285. x.formatter = None
  286. self.assertIs(x.format(obj), obj)
  287. def test_items(self):
  288. self.assertDictEqual(
  289. dict(items(self.graph1())),
  290. {'A': [], 'B': [], 'C': ['A'], 'D': ['C', 'B']},
  291. )
  292. def test_repr_node(self):
  293. x = self.graph1()
  294. self.assertTrue(x.repr_node('fasdswewqewq'))
  295. def test_to_dot(self):
  296. s = WhateverIO()
  297. self.graph1().to_dot(s)
  298. self.assertTrue(s.getvalue())