test_functional.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from kombu.utils.functional import lazy
  4. from celery.five import range, nextfun
  5. from celery.utils.functional import (
  6. DummyContext,
  7. fun_accepts_kwargs,
  8. fun_takes_argument,
  9. head_from_fun,
  10. firstmethod,
  11. first,
  12. maybe_list,
  13. mlazy,
  14. padlist,
  15. regen,
  16. seq_concat_seq,
  17. seq_concat_item,
  18. )
  19. def test_DummyContext():
  20. with DummyContext():
  21. pass
  22. with pytest.raises(KeyError):
  23. with DummyContext():
  24. raise KeyError()
  25. @pytest.mark.parametrize('items,n,default,expected', [
  26. (['George', 'Costanza', 'NYC'], 3, None,
  27. ['George', 'Costanza', 'NYC']),
  28. (['George', 'Costanza'], 3, None,
  29. ['George', 'Costanza', None]),
  30. (['George', 'Costanza', 'NYC'], 4, 'Earth',
  31. ['George', 'Costanza', 'NYC', 'Earth']),
  32. ])
  33. def test_padlist(items, n, default, expected):
  34. assert padlist(items, n, default=default) == expected
  35. class test_firstmethod:
  36. def test_AttributeError(self):
  37. assert firstmethod('foo')([object()]) is None
  38. def test_handles_lazy(self):
  39. class A(object):
  40. def __init__(self, value=None):
  41. self.value = value
  42. def m(self):
  43. return self.value
  44. assert 'four' == firstmethod('m')([
  45. A(), A(), A(), A('four'), A('five')])
  46. assert 'four' == firstmethod('m')([
  47. A(), A(), A(), lazy(lambda: A('four')), A('five')])
  48. def test_first():
  49. iterations = [0]
  50. def predicate(value):
  51. iterations[0] += 1
  52. if value == 5:
  53. return True
  54. return False
  55. assert first(predicate, range(10)) == 5
  56. assert iterations[0] == 6
  57. iterations[0] = 0
  58. assert first(predicate, range(10, 20)) is None
  59. assert iterations[0] == 10
  60. def test_maybe_list():
  61. assert maybe_list(1) == [1]
  62. assert maybe_list([1]) == [1]
  63. assert maybe_list(None) is None
  64. def test_mlazy():
  65. it = iter(range(20, 30))
  66. p = mlazy(nextfun(it))
  67. assert p() == 20
  68. assert p.evaluated
  69. assert p() == 20
  70. assert repr(p) == '20'
  71. class test_regen:
  72. def test_list(self):
  73. l = [1, 2]
  74. r = regen(iter(l))
  75. assert regen(l) is l
  76. assert r == l
  77. assert r == l # again
  78. assert r.__length_hint__() == 0
  79. fun, args = r.__reduce__()
  80. assert fun(*args) == l
  81. def test_gen(self):
  82. g = regen(iter(list(range(10))))
  83. assert g[7] == 7
  84. assert g[6] == 6
  85. assert g[5] == 5
  86. assert g[4] == 4
  87. assert g[3] == 3
  88. assert g[2] == 2
  89. assert g[1] == 1
  90. assert g[0] == 0
  91. assert g.data, list(range(10))
  92. assert g[8] == 8
  93. assert g[0] == 0
  94. g = regen(iter(list(range(10))))
  95. assert g[0] == 0
  96. assert g[1] == 1
  97. assert g.data == list(range(10))
  98. g = regen(iter([1]))
  99. assert g[0] == 1
  100. with pytest.raises(IndexError):
  101. g[1]
  102. assert g.data == [1]
  103. g = regen(iter(list(range(10))))
  104. assert g[-1] == 9
  105. assert g[-2] == 8
  106. assert g[-3] == 7
  107. assert g[-4] == 6
  108. assert g[-5] == 5
  109. assert g[5] == 5
  110. assert g.data == list(range(10))
  111. assert list(iter(g)) == list(range(10))
  112. class test_head_from_fun:
  113. def test_from_cls(self):
  114. class X(object):
  115. def __call__(x, y, kwarg=1): # noqa
  116. pass
  117. g = head_from_fun(X())
  118. with pytest.raises(TypeError):
  119. g(1)
  120. g(1, 2)
  121. g(1, 2, kwarg=3)
  122. def test_from_fun(self):
  123. def f(x, y, kwarg=1):
  124. pass
  125. g = head_from_fun(f)
  126. with pytest.raises(TypeError):
  127. g(1)
  128. g(1, 2)
  129. g(1, 2, kwarg=3)
  130. def test_from_fun_with_hints(self):
  131. local = {}
  132. fun = ('def f_hints(x: int, y: int, kwarg: int=1):'
  133. ' pass')
  134. try:
  135. exec(fun, {}, local)
  136. except SyntaxError:
  137. # py2
  138. return
  139. f_hints = local['f_hints']
  140. g = head_from_fun(f_hints)
  141. with pytest.raises(TypeError):
  142. g(1)
  143. g(1, 2)
  144. g(1, 2, kwarg=3)
  145. class test_fun_takes_argument:
  146. def test_starkwargs(self):
  147. assert fun_takes_argument('foo', lambda **kw: 1)
  148. def test_named(self):
  149. assert fun_takes_argument('foo', lambda a, foo, bar: 1)
  150. def fun(a, b, c, d):
  151. return 1
  152. assert fun_takes_argument('foo', fun, position=4)
  153. def test_starargs(self):
  154. assert fun_takes_argument('foo', lambda a, *args: 1)
  155. def test_does_not(self):
  156. assert not fun_takes_argument('foo', lambda a, bar, baz: 1)
  157. assert not fun_takes_argument('foo', lambda: 1)
  158. def fun(a, b, foo):
  159. return 1
  160. assert not fun_takes_argument('foo', fun, position=4)
  161. @pytest.mark.parametrize('a,b,expected', [
  162. ((1, 2, 3), [4, 5], (1, 2, 3, 4, 5)),
  163. ((1, 2), [3, 4, 5], [1, 2, 3, 4, 5]),
  164. ([1, 2, 3], (4, 5), [1, 2, 3, 4, 5]),
  165. ([1, 2], (3, 4, 5), (1, 2, 3, 4, 5)),
  166. ])
  167. def test_seq_concat_seq(a, b, expected):
  168. res = seq_concat_seq(a, b)
  169. assert type(res) is type(expected) # noqa
  170. assert res == expected
  171. @pytest.mark.parametrize('a,b,expected', [
  172. ((1, 2, 3), 4, (1, 2, 3, 4)),
  173. ([1, 2, 3], 4, [1, 2, 3, 4]),
  174. ])
  175. def test_seq_concat_item(a, b, expected):
  176. res = seq_concat_item(a, b)
  177. assert type(res) is type(expected) # noqa
  178. assert res == expected
  179. class StarKwargsCallable(object):
  180. def __call__(self, **kwargs):
  181. return 1
  182. class StarArgsStarKwargsCallable(object):
  183. def __call__(self, *args, **kwargs):
  184. return 1
  185. class StarArgsCallable(object):
  186. def __call__(self, *args):
  187. return 1
  188. class ArgsCallable(object):
  189. def __call__(self, a, b):
  190. return 1
  191. class ArgsStarKwargsCallable(object):
  192. def __call__(self, a, b, **kwargs):
  193. return 1
  194. class test_fun_accepts_kwargs:
  195. @pytest.mark.parametrize('fun', [
  196. lambda a, b, **kwargs: 1,
  197. lambda *args, **kwargs: 1,
  198. lambda foo=1, **kwargs: 1,
  199. StarKwargsCallable,
  200. StarArgsStarKwargsCallable,
  201. ArgsStarKwargsCallable,
  202. ])
  203. def test_accepts(self, fun):
  204. assert fun_accepts_kwargs(fun)
  205. @pytest.mark.parametrize('fun', [
  206. lambda a: 1,
  207. lambda a, b: 1,
  208. lambda *args: 1,
  209. lambda a, kw1=1, kw2=2: 1,
  210. StarArgsCallable,
  211. ArgsCallable,
  212. ])
  213. def test_rejects(self, fun):
  214. assert not fun_accepts_kwargs(fun)