test_functional.py 5.5 KB


  1. from __future__ import absolute_import, unicode_literals
  2. from kombu.utils.functional import lazy
  3. from celery.five import range, nextfun
  4. from celery.utils.functional import (
  5. DummyContext,
  6. fun_takes_argument,
  7. head_from_fun,
  8. firstmethod,
  9. first,
  10. maybe_list,
  11. mlazy,
  12. padlist,
  13. regen,
  14. )
  15. from celery.tests.case import Case
  16. class test_DummyContext(Case):
  17. def test_context(self):
  18. with DummyContext():
  19. pass
  20. with self.assertRaises(KeyError):
  21. with DummyContext():
  22. raise KeyError()
  23. class test_utils(Case):
  24. def test_padlist(self):
  25. self.assertListEqual(
  26. padlist(['George', 'Costanza', 'NYC'], 3),
  27. ['George', 'Costanza', 'NYC'],
  28. )
  29. self.assertListEqual(
  30. padlist(['George', 'Costanza'], 3),
  31. ['George', 'Costanza', None],
  32. )
  33. self.assertListEqual(
  34. padlist(['George', 'Costanza', 'NYC'], 4, default='Earth'),
  35. ['George', 'Costanza', 'NYC', 'Earth'],
  36. )
  37. def test_firstmethod_AttributeError(self):
  38. self.assertIsNone(firstmethod('foo')([object()]))
  39. def test_firstmethod_handles_lazy(self):
  40. class A(object):
  41. def __init__(self, value=None):
  42. self.value = value
  43. def m(self):
  44. return self.value
  45. self.assertEqual('four', firstmethod('m')([
  46. A(), A(), A(), A('four'), A('five')]))
  47. self.assertEqual('four', firstmethod('m')([
  48. A(), A(), A(), lazy(lambda: A('four')), A('five')]))
  49. def test_first(self):
  50. iterations = [0]
  51. def predicate(value):
  52. iterations[0] += 1
  53. if value == 5:
  54. return True
  55. return False
  56. self.assertEqual(5, first(predicate, range(10)))
  57. self.assertEqual(iterations[0], 6)
  58. iterations[0] = 0
  59. self.assertIsNone(first(predicate, range(10, 20)))
  60. self.assertEqual(iterations[0], 10)
  61. def test_maybe_list(self):
  62. self.assertEqual(maybe_list(1), [1])
  63. self.assertEqual(maybe_list([1]), [1])
  64. self.assertIsNone(maybe_list(None))
  65. class test_mlazy(Case):
  66. def test_is_memoized(self):
  67. it = iter(range(20, 30))
  68. p = mlazy(nextfun(it))
  69. self.assertEqual(p(), 20)
  70. self.assertTrue(p.evaluated)
  71. self.assertEqual(p(), 20)
  72. self.assertEqual(repr(p), '20')
  73. class test_regen(Case):
  74. def test_regen_list(self):
  75. l = [1, 2]
  76. r = regen(iter(l))
  77. self.assertIs(regen(l), l)
  78. self.assertEqual(r, l)
  79. self.assertEqual(r, l)
  80. self.assertEqual(r.__length_hint__(), 0)
  81. fun, args = r.__reduce__()
  82. self.assertEqual(fun(*args), l)
  83. def test_regen_gen(self):
  84. g = regen(iter(list(range(10))))
  85. self.assertEqual(g[7], 7)
  86. self.assertEqual(g[6], 6)
  87. self.assertEqual(g[5], 5)
  88. self.assertEqual(g[4], 4)
  89. self.assertEqual(g[3], 3)
  90. self.assertEqual(g[2], 2)
  91. self.assertEqual(g[1], 1)
  92. self.assertEqual(g[0], 0)
  93. self.assertEqual(g.data, list(range(10)))
  94. self.assertEqual(g[8], 8)
  95. self.assertEqual(g[0], 0)
  96. g = regen(iter(list(range(10))))
  97. self.assertEqual(g[0], 0)
  98. self.assertEqual(g[1], 1)
  99. self.assertEqual(g.data, list(range(10)))
  100. g = regen(iter([1]))
  101. self.assertEqual(g[0], 1)
  102. with self.assertRaises(IndexError):
  103. g[1]
  104. self.assertEqual(g.data, [1])
  105. g = regen(iter(list(range(10))))
  106. self.assertEqual(g[-1], 9)
  107. self.assertEqual(g[-2], 8)
  108. self.assertEqual(g[-3], 7)
  109. self.assertEqual(g[-4], 6)
  110. self.assertEqual(g[-5], 5)
  111. self.assertEqual(g[5], 5)
  112. self.assertEqual(g.data, list(range(10)))
  113. self.assertListEqual(list(iter(g)), list(range(10)))
  114. class test_head_from_fun(Case):
  115. def test_from_cls(self):
  116. class X(object):
  117. def __call__(x, y, kwarg=1):
  118. pass
  119. g = head_from_fun(X())
  120. with self.assertRaises(TypeError):
  121. g(1)
  122. g(1, 2)
  123. g(1, 2, kwarg=3)
  124. def test_from_fun(self):
  125. def f(x, y, kwarg=1):
  126. pass
  127. g = head_from_fun(f)
  128. with self.assertRaises(TypeError):
  129. g(1)
  130. g(1, 2)
  131. g(1, 2, kwarg=3)
  132. def test_from_fun_with_hints(self):
  133. local = {}
  134. fun = ('def f_hints(x: int, y: int, kwarg: int=1):'
  135. ' pass')
  136. try:
  137. exec(fun, {}, local)
  138. except SyntaxError:
  139. # py2
  140. return
  141. f_hints = local['f_hints']
  142. g = head_from_fun(f_hints)
  143. with self.assertRaises(TypeError):
  144. g(1)
  145. g(1, 2)
  146. g(1, 2, kwarg=3)
  147. class test_fun_takes_argument(Case):
  148. def test_starkwargs(self):
  149. self.assertTrue(fun_takes_argument('foo', lambda **kw: 1))
  150. def test_named(self):
  151. self.assertTrue(fun_takes_argument('foo', lambda a, foo, bar: 1))
  152. def fun(a, b, c, d):
  153. return 1
  154. self.assertTrue(fun_takes_argument('foo', fun, position=4))
  155. def test_starargs(self):
  156. self.assertTrue(fun_takes_argument('foo', lambda a, *args: 1))
  157. def test_does_not(self):
  158. self.assertFalse(fun_takes_argument('foo', lambda a, bar, baz: 1))
  159. self.assertFalse(fun_takes_argument('foo', lambda: 1))
  160. def fun(a, b, foo):
  161. return 1
  162. self.assertFalse(fun_takes_argument('foo', fun, position=4))