test_datastructures.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import sys
  2. import unittest
  3. from Queue import Queue
  4. from celery.datastructures import PositionQueue, ExceptionInfo, LocalCache
  5. from celery.datastructures import LimitedSet, SharedCounter, consume_queue
  6. class TestPositionQueue(unittest.TestCase):
  7. def test_position_queue_unfilled(self):
  8. q = PositionQueue(length=10)
  9. for position in q.data:
  10. self.assertTrue(isinstance(position, q.UnfilledPosition))
  11. self.assertEquals(q.filled, [])
  12. self.assertEquals(len(q), 0)
  13. self.assertFalse(q.full())
  14. def test_position_queue_almost(self):
  15. q = PositionQueue(length=10)
  16. q[3] = 3
  17. q[6] = 6
  18. q[9] = 9
  19. self.assertEquals(q.filled, [3, 6, 9])
  20. self.assertEquals(len(q), 3)
  21. self.assertFalse(q.full())
  22. def test_position_queue_full(self):
  23. q = PositionQueue(length=10)
  24. for i in xrange(10):
  25. q[i] = i
  26. self.assertEquals(q.filled, list(xrange(10)))
  27. self.assertEquals(len(q), 10)
  28. self.assertTrue(q.full())
  29. class TestExceptionInfo(unittest.TestCase):
  30. def test_exception_info(self):
  31. try:
  32. raise LookupError("The quick brown fox jumps...")
  33. except LookupError:
  34. exc_info = sys.exc_info()
  35. einfo = ExceptionInfo(exc_info)
  36. self.assertEquals(str(einfo), einfo.traceback)
  37. self.assertTrue(isinstance(einfo.exception, LookupError))
  38. self.assertEquals(einfo.exception.args,
  39. ("The quick brown fox jumps...", ))
  40. self.assertTrue(einfo.traceback)
  41. r = repr(einfo)
  42. self.assertTrue(r)
  43. class TestUtilities(unittest.TestCase):
  44. def test_consume_queue(self):
  45. x = Queue()
  46. it = consume_queue(x)
  47. self.assertRaises(StopIteration, it.next)
  48. x.put("foo")
  49. it = consume_queue(x)
  50. self.assertEquals(it.next(), "foo")
  51. self.assertRaises(StopIteration, it.next)
  52. class TestSharedCounter(unittest.TestCase):
  53. def test_initial_value(self):
  54. self.assertEquals(int(SharedCounter(10)), 10)
  55. def test_increment(self):
  56. c = SharedCounter(10)
  57. c.increment()
  58. self.assertEquals(int(c), 11)
  59. c.increment(2)
  60. self.assertEquals(int(c), 13)
  61. def test_decrement(self):
  62. c = SharedCounter(10)
  63. c.decrement()
  64. self.assertEquals(int(c), 9)
  65. c.decrement(2)
  66. self.assertEquals(int(c), 7)
  67. def test_iadd(self):
  68. c = SharedCounter(10)
  69. c += 10
  70. self.assertEquals(int(c), 20)
  71. def test_isub(self):
  72. c = SharedCounter(10)
  73. c -= 20
  74. self.assertEquals(int(c), -10)
  75. def test_repr(self):
  76. self.assertTrue(repr(SharedCounter(10)).startswith("<SharedCounter:"))
  77. class TestLimitedSet(unittest.TestCase):
  78. def test_add(self):
  79. s = LimitedSet(maxlen=2)
  80. s.add("foo")
  81. s.add("bar")
  82. for n in "foo", "bar":
  83. self.assertTrue(n in s)
  84. s.add("baz")
  85. for n in "bar", "baz":
  86. self.assertTrue(n in s)
  87. self.assertTrue("foo" not in s)
  88. def test_iter(self):
  89. s = LimitedSet(maxlen=2)
  90. items = "foo", "bar"
  91. map(s.add, items)
  92. l = list(iter(items))
  93. for item in items:
  94. self.assertTrue(item in l)
  95. def test_repr(self):
  96. s = LimitedSet(maxlen=2)
  97. items = "foo", "bar"
  98. map(s.add, items)
  99. self.assertTrue(repr(s).startswith("LimitedSet("))
  100. class TestLocalCache(unittest.TestCase):
  101. def test_expires(self):
  102. limit = 100
  103. x = LocalCache(limit=limit)
  104. slots = list(range(limit * 2))
  105. for i in slots:
  106. x[i] = i
  107. self.assertEquals(x.keys(), slots[limit:])