test_functional.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from __future__ import absolute_import
  2. import pickle
  3. from kombu.utils.functional import lazy
  4. from celery.five import THREAD_TIMEOUT_MAX, items, range, nextfun
  5. from celery.utils.functional import (
  6. LRUCache,
  7. firstmethod,
  8. first,
  9. mlazy,
  10. padlist,
  11. maybe_list,
  12. )
  13. from celery.tests.case import Case
  14. class test_LRUCache(Case):
  15. def test_expires(self):
  16. limit = 100
  17. x = LRUCache(limit=limit)
  18. slots = list(range(limit * 2))
  19. for i in slots:
  20. x[i] = i
  21. self.assertListEqual(list(x.keys()), list(slots[limit:]))
  22. self.assertTrue(x.items())
  23. self.assertTrue(x.values())
  24. def test_is_pickleable(self):
  25. x = LRUCache(limit=10)
  26. x.update(luke=1, leia=2)
  27. y = pickle.loads(pickle.dumps(x))
  28. self.assertEqual(y.limit, y.limit)
  29. self.assertEqual(y, x)
  30. def test_update_expires(self):
  31. limit = 100
  32. x = LRUCache(limit=limit)
  33. slots = list(range(limit * 2))
  34. for i in slots:
  35. x.update({i: i})
  36. self.assertListEqual(list(x.keys()), list(slots[limit:]))
  37. def test_least_recently_used(self):
  38. x = LRUCache(3)
  39. x[1], x[2], x[3] = 1, 2, 3
  40. self.assertEqual(list(x.keys()), [1, 2, 3])
  41. x[4], x[5] = 4, 5
  42. self.assertEqual(list(x.keys()), [3, 4, 5])
  43. # access 3, which makes it the last used key.
  44. x[3]
  45. x[6] = 6
  46. self.assertEqual(list(x.keys()), [5, 3, 6])
  47. x[7] = 7
  48. self.assertEqual(list(x.keys()), [3, 6, 7])
  49. def test_update_larger_than_cache_size(self):
  50. x = LRUCache(2)
  51. x.update(dict((x, x) for x in range(100)))
  52. self.assertEqual(list(x.keys()), [98, 99])
  53. def assertSafeIter(self, method, interval=0.01, size=10000):
  54. from threading import Thread, Event
  55. from time import sleep
  56. x = LRUCache(size)
  57. x.update(zip(range(size), range(size)))
  58. class Burglar(Thread):
  59. def __init__(self, cache):
  60. self.cache = cache
  61. self.__is_shutdown = Event()
  62. self.__is_stopped = Event()
  63. Thread.__init__(self)
  64. def run(self):
  65. while not self.__is_shutdown.isSet():
  66. try:
  67. self.cache.popitem(last=False)
  68. except KeyError:
  69. break
  70. self.__is_stopped.set()
  71. def stop(self):
  72. self.__is_shutdown.set()
  73. self.__is_stopped.wait()
  74. self.join(THREAD_TIMEOUT_MAX)
  75. burglar = Burglar(x)
  76. burglar.start()
  77. try:
  78. for _ in getattr(x, method)():
  79. sleep(0.0001)
  80. finally:
  81. burglar.stop()
  82. def test_safe_to_remove_while_iteritems(self):
  83. self.assertSafeIter('iteritems')
  84. def test_safe_to_remove_while_keys(self):
  85. self.assertSafeIter('keys')
  86. def test_safe_to_remove_while_itervalues(self):
  87. self.assertSafeIter('itervalues')
  88. def test_items(self):
  89. c = LRUCache()
  90. c.update(a=1, b=2, c=3)
  91. self.assertTrue(list(items(c)))
  92. class test_utils(Case):
  93. def test_padlist(self):
  94. self.assertListEqual(
  95. padlist(['George', 'Costanza', 'NYC'], 3),
  96. ['George', 'Costanza', 'NYC'],
  97. )
  98. self.assertListEqual(
  99. padlist(['George', 'Costanza'], 3),
  100. ['George', 'Costanza', None],
  101. )
  102. self.assertListEqual(
  103. padlist(['George', 'Costanza', 'NYC'], 4, default='Earth'),
  104. ['George', 'Costanza', 'NYC', 'Earth'],
  105. )
  106. def test_firstmethod_AttributeError(self):
  107. self.assertIsNone(firstmethod('foo')([object()]))
  108. def test_firstmethod_handles_lazy(self):
  109. class A(object):
  110. def __init__(self, value=None):
  111. self.value = value
  112. def m(self):
  113. return self.value
  114. self.assertEqual('four', firstmethod('m')([
  115. A(), A(), A(), A('four'), A('five')]))
  116. self.assertEqual('four', firstmethod('m')([
  117. A(), A(), A(), lazy(lambda: A('four')), A('five')]))
  118. def test_first(self):
  119. iterations = [0]
  120. def predicate(value):
  121. iterations[0] += 1
  122. if value == 5:
  123. return True
  124. return False
  125. self.assertEqual(5, first(predicate, range(10)))
  126. self.assertEqual(iterations[0], 6)
  127. iterations[0] = 0
  128. self.assertIsNone(first(predicate, range(10, 20)))
  129. self.assertEqual(iterations[0], 10)
  130. def test_maybe_list(self):
  131. self.assertEqual(maybe_list(1), [1])
  132. self.assertEqual(maybe_list([1]), [1])
  133. self.assertIsNone(maybe_list(None))
  134. class test_mlazy(Case):
  135. def test_is_memoized(self):
  136. it = iter(range(20, 30))
  137. p = mlazy(nextfun(it))
  138. self.assertEqual(p(), 20)
  139. self.assertTrue(p.evaluated)
  140. self.assertEqual(p(), 20)
  141. self.assertEqual(repr(p), '20')