test_datastructures.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import sys
  2. from celery.tests.utils import unittest
  3. from Queue import Queue
  4. from celery.datastructures import PositionQueue, ExceptionInfo, LocalCache
  5. from celery.datastructures import LimitedSet, SharedCounter, consume_queue
  6. from celery.datastructures import AttributeDict, DictAttribute
  7. from celery.datastructures import ConfigurationView
  8. class Object(object):
  9. pass
  10. class test_DictAttribute(unittest.TestCase):
  11. def test_get_set(self):
  12. x = DictAttribute(Object())
  13. x["foo"] = "The quick brown fox"
  14. self.assertEqual(x["foo"], "The quick brown fox")
  15. self.assertEqual(x["foo"], x.obj.foo)
  16. self.assertEqual(x.get("foo"), "The quick brown fox")
  17. self.assertIsNone(x.get("bar"))
  18. self.assertRaises(KeyError, x.__getitem__, "bar")
  19. def test_setdefault(self):
  20. x = DictAttribute(Object())
  21. self.assertEqual(x.setdefault("foo", "NEW"), "NEW")
  22. self.assertEqual(x.setdefault("foo", "XYZ"), "NEW")
  23. def test_contains(self):
  24. x = DictAttribute(Object())
  25. x["foo"] = 1
  26. self.assertIn("foo", x)
  27. self.assertNotIn("bar", x)
  28. def test_iteritems(self):
  29. obj = Object()
  30. obj.attr1 = 1
  31. x = DictAttribute(obj)
  32. x["attr2"] = 2
  33. self.assertDictEqual(dict(x.iteritems()),
  34. dict(attr1=1, attr2=2))
  35. class test_ConfigurationView(unittest.TestCase):
  36. def setUp(self):
  37. self.view = ConfigurationView({"changed_key": 1,
  38. "both": 2},
  39. {"default_key": 1,
  40. "both": 1})
  41. def test_setdefault(self):
  42. self.assertEqual(self.view.setdefault("both", 36), 2)
  43. self.assertEqual(self.view.setdefault("new", 36), 36)
  44. def test_contains(self):
  45. self.assertIn("changed_key", self.view)
  46. self.assertIn("default_key", self.view)
  47. self.assertNotIn("new", self.view)
  48. def test_repr(self):
  49. self.assertIn("changed_key", repr(self.view))
  50. self.assertIn("default_key", repr(self.view))
  51. def test_iter(self):
  52. expected = {"changed_key": 1,
  53. "default_key": 1,
  54. "both": 2}
  55. self.assertDictEqual(dict(self.view.items()), expected)
  56. class test_PositionQueue(unittest.TestCase):
  57. def test_position_queue_unfilled(self):
  58. q = PositionQueue(length=10)
  59. for position in q.data:
  60. self.assertIsInstance(position, q.UnfilledPosition)
  61. self.assertListEqual(q.filled, [])
  62. self.assertEqual(len(q), 0)
  63. self.assertFalse(q.full())
  64. def test_position_queue_almost(self):
  65. q = PositionQueue(length=10)
  66. q[3] = 3
  67. q[6] = 6
  68. q[9] = 9
  69. self.assertListEqual(q.filled, [3, 6, 9])
  70. self.assertEqual(len(q), 3)
  71. self.assertFalse(q.full())
  72. def test_position_queue_full(self):
  73. q = PositionQueue(length=10)
  74. for i in xrange(10):
  75. q[i] = i
  76. self.assertListEqual(q.filled, list(xrange(10)))
  77. self.assertEqual(len(q), 10)
  78. self.assertTrue(q.full())
  79. class test_ExceptionInfo(unittest.TestCase):
  80. def test_exception_info(self):
  81. try:
  82. raise LookupError("The quick brown fox jumps...")
  83. except LookupError:
  84. exc_info = sys.exc_info()
  85. einfo = ExceptionInfo(exc_info)
  86. self.assertEqual(str(einfo), einfo.traceback)
  87. self.assertIsInstance(einfo.exception, LookupError)
  88. self.assertTupleEqual(einfo.exception.args,
  89. ("The quick brown fox jumps...", ))
  90. self.assertTrue(einfo.traceback)
  91. r = repr(einfo)
  92. self.assertTrue(r)
  93. class test_utilities(unittest.TestCase):
  94. def test_consume_queue(self):
  95. x = Queue()
  96. it = consume_queue(x)
  97. self.assertRaises(StopIteration, it.next)
  98. x.put("foo")
  99. it = consume_queue(x)
  100. self.assertEqual(it.next(), "foo")
  101. self.assertRaises(StopIteration, it.next)
  102. class test_SharedCounter(unittest.TestCase):
  103. def test_initial_value(self):
  104. self.assertEqual(int(SharedCounter(10)), 10)
  105. def test_increment(self):
  106. c = SharedCounter(10)
  107. c.increment()
  108. self.assertEqual(int(c), 11)
  109. c.increment(2)
  110. self.assertEqual(int(c), 13)
  111. def test_decrement(self):
  112. c = SharedCounter(10)
  113. c.decrement()
  114. self.assertEqual(int(c), 9)
  115. c.decrement(2)
  116. self.assertEqual(int(c), 7)
  117. def test_iadd(self):
  118. c = SharedCounter(10)
  119. c += 10
  120. self.assertEqual(int(c), 20)
  121. def test_isub(self):
  122. c = SharedCounter(10)
  123. c -= 20
  124. self.assertEqual(int(c), -10)
  125. def test_repr(self):
  126. self.assertIn("<SharedCounter:", repr(SharedCounter(10)))
  127. class test_LimitedSet(unittest.TestCase):
  128. def test_add(self):
  129. s = LimitedSet(maxlen=2)
  130. s.add("foo")
  131. s.add("bar")
  132. for n in "foo", "bar":
  133. self.assertIn(n, s)
  134. s.add("baz")
  135. for n in "bar", "baz":
  136. self.assertIn(n, s)
  137. self.assertNotIn("foo", s)
  138. def test_iter(self):
  139. s = LimitedSet(maxlen=2)
  140. items = "foo", "bar"
  141. for item in items:
  142. s.add(item)
  143. l = list(iter(s))
  144. for item in items:
  145. self.assertIn(item, l)
  146. def test_repr(self):
  147. s = LimitedSet(maxlen=2)
  148. items = "foo", "bar"
  149. for item in items:
  150. s.add(item)
  151. self.assertIn("LimitedSet(", repr(s))
  152. class test_LocalCache(unittest.TestCase):
  153. def test_expires(self):
  154. limit = 100
  155. x = LocalCache(limit=limit)
  156. slots = list(range(limit * 2))
  157. for i in slots:
  158. x[i] = i
  159. self.assertListEqual(x.keys(), slots[limit:])
  160. class test_AttributeDict(unittest.TestCase):
  161. def test_getattr__setattr(self):
  162. x = AttributeDict({"foo": "bar"})
  163. self.assertEqual(x["foo"], "bar")
  164. self.assertRaises(AttributeError, getattr, x, "bar")
  165. x.bar = "foo"
  166. self.assertEqual(x["bar"], "foo")