test_functional.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. from __future__ import absolute_import
  2. import pickle
  3. from kombu.utils.functional import lazy
  4. from celery.five import THREAD_TIMEOUT_MAX, items, range, nextfun
  5. from celery.utils.functional import (
  6. LRUCache,
  7. firstmethod,
  8. first,
  9. mlazy,
  10. padlist,
  11. maybe_list,
  12. )
  13. from celery.tests.case import Case
  14. class test_LRUCache(Case):
  15. def test_expires(self):
  16. limit = 100
  17. x = LRUCache(limit=limit)
  18. slots = list(range(limit * 2))
  19. for i in slots:
  20. x[i] = i
  21. self.assertListEqual(list(x.keys()), list(slots[limit:]))
  22. self.assertTrue(x.items())
  23. self.assertTrue(x.values())
  24. def test_is_pickleable(self):
  25. x = LRUCache(limit=10)
  26. x.update(luke=1, leia=2)
  27. y = pickle.loads(pickle.dumps(x))
  28. self.assertEqual(y.limit, y.limit)
  29. self.assertEqual(y, x)
  30. def test_update_expires(self):
  31. limit = 100
  32. x = LRUCache(limit=limit)
  33. slots = list(range(limit * 2))
  34. for i in slots:
  35. x.update({i: i})
  36. self.assertListEqual(list(x.keys()), list(slots[limit:]))
  37. def test_least_recently_used(self):
  38. x = LRUCache(3)
  39. x[1], x[2], x[3] = 1, 2, 3
  40. self.assertEqual(list(x.keys()), [1, 2, 3])
  41. x[4], x[5] = 4, 5
  42. self.assertEqual(list(x.keys()), [3, 4, 5])
  43. # access 3, which makes it the last used key.
  44. x[3]
  45. x[6] = 6
  46. self.assertEqual(list(x.keys()), [5, 3, 6])
  47. x[7] = 7
  48. self.assertEqual(list(x.keys()), [3, 6, 7])
  49. def assertSafeIter(self, method, interval=0.01, size=10000):
  50. from threading import Thread, Event
  51. from time import sleep
  52. x = LRUCache(size)
  53. x.update(zip(range(size), range(size)))
  54. class Burglar(Thread):
  55. def __init__(self, cache):
  56. self.cache = cache
  57. self.__is_shutdown = Event()
  58. self.__is_stopped = Event()
  59. Thread.__init__(self)
  60. def run(self):
  61. while not self.__is_shutdown.isSet():
  62. try:
  63. self.cache.data.popitem(last=False)
  64. except KeyError:
  65. break
  66. self.__is_stopped.set()
  67. def stop(self):
  68. self.__is_shutdown.set()
  69. self.__is_stopped.wait()
  70. self.join(THREAD_TIMEOUT_MAX)
  71. burglar = Burglar(x)
  72. burglar.start()
  73. try:
  74. for _ in getattr(x, method)():
  75. sleep(0.0001)
  76. finally:
  77. burglar.stop()
  78. def test_safe_to_remove_while_iteritems(self):
  79. self.assertSafeIter('iteritems')
  80. def test_safe_to_remove_while_keys(self):
  81. self.assertSafeIter('keys')
  82. def test_safe_to_remove_while_itervalues(self):
  83. self.assertSafeIter('itervalues')
  84. def test_items(self):
  85. c = LRUCache()
  86. c.update(a=1, b=2, c=3)
  87. self.assertTrue(list(items(c)))
  88. class test_utils(Case):
  89. def test_padlist(self):
  90. self.assertListEqual(
  91. padlist(['George', 'Costanza', 'NYC'], 3),
  92. ['George', 'Costanza', 'NYC'],
  93. )
  94. self.assertListEqual(
  95. padlist(['George', 'Costanza'], 3),
  96. ['George', 'Costanza', None],
  97. )
  98. self.assertListEqual(
  99. padlist(['George', 'Costanza', 'NYC'], 4, default='Earth'),
  100. ['George', 'Costanza', 'NYC', 'Earth'],
  101. )
  102. def test_firstmethod_AttributeError(self):
  103. self.assertIsNone(firstmethod('foo')([object()]))
  104. def test_firstmethod_handles_lazy(self):
  105. class A(object):
  106. def __init__(self, value=None):
  107. self.value = value
  108. def m(self):
  109. return self.value
  110. self.assertEqual('four', firstmethod('m')([
  111. A(), A(), A(), A('four'), A('five')]))
  112. self.assertEqual('four', firstmethod('m')([
  113. A(), A(), A(), lazy(lambda: A('four')), A('five')]))
  114. def test_first(self):
  115. iterations = [0]
  116. def predicate(value):
  117. iterations[0] += 1
  118. if value == 5:
  119. return True
  120. return False
  121. self.assertEqual(5, first(predicate, range(10)))
  122. self.assertEqual(iterations[0], 6)
  123. iterations[0] = 0
  124. self.assertIsNone(first(predicate, range(10, 20)))
  125. self.assertEqual(iterations[0], 10)
  126. def test_maybe_list(self):
  127. self.assertEqual(maybe_list(1), [1])
  128. self.assertEqual(maybe_list([1]), [1])
  129. self.assertIsNone(maybe_list(None))
  130. class test_mlazy(Case):
  131. def test_is_memoized(self):
  132. it = iter(range(20, 30))
  133. p = mlazy(nextfun(it))
  134. self.assertEqual(p(), 20)
  135. self.assertTrue(p.evaluated)
  136. self.assertEqual(p(), 20)
  137. self.assertEqual(repr(p), '20')