test_functional.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. import pytest
  2. from kombu.utils.functional import lazy
  3. from celery.utils.functional import (
  4. DummyContext,
  5. fun_accepts_kwargs,
  6. fun_takes_argument,
  7. head_from_fun,
  8. firstmethod,
  9. first,
  10. maybe_list,
  11. mlazy,
  12. padlist,
  13. regen,
  14. seq_concat_seq,
  15. seq_concat_item,
  16. )
  17. def test_DummyContext():
  18. with DummyContext():
  19. pass
  20. with pytest.raises(KeyError):
  21. with DummyContext():
  22. raise KeyError()
  23. @pytest.mark.parametrize('items,n,default,expected', [
  24. (['George', 'Costanza', 'NYC'], 3, None,
  25. ['George', 'Costanza', 'NYC']),
  26. (['George', 'Costanza'], 3, None,
  27. ['George', 'Costanza', None]),
  28. (['George', 'Costanza', 'NYC'], 4, 'Earth',
  29. ['George', 'Costanza', 'NYC', 'Earth']),
  30. ])
  31. def test_padlist(items, n, default, expected):
  32. assert padlist(items, n, default=default) == expected
  33. class test_firstmethod:
  34. def test_AttributeError(self):
  35. assert firstmethod('foo')([object()]) is None
  36. def test_handles_lazy(self):
  37. class A:
  38. def __init__(self, value=None):
  39. self.value = value
  40. def m(self):
  41. return self.value
  42. assert 'four' == firstmethod('m')([
  43. A(), A(), A(), A('four'), A('five')])
  44. assert 'four' == firstmethod('m')([
  45. A(), A(), A(), lazy(lambda: A('four')), A('five')])
  46. def test_first():
  47. iterations = [0]
  48. def predicate(value):
  49. iterations[0] += 1
  50. if value == 5:
  51. return True
  52. return False
  53. assert first(predicate, range(10)) == 5
  54. assert iterations[0] == 6
  55. iterations[0] = 0
  56. assert first(predicate, range(10, 20)) is None
  57. assert iterations[0] == 10
  58. def test_maybe_list():
  59. assert maybe_list(1) == [1]
  60. assert maybe_list([1]) == [1]
  61. assert maybe_list(None) is None
  62. def test_mlazy():
  63. it = iter(range(20, 30))
  64. p = mlazy(it.__next__)
  65. assert p() == 20
  66. assert p.evaluated
  67. assert p() == 20
  68. assert repr(p) == '20'
  69. class test_regen:
  70. def test_list(self):
  71. l = [1, 2]
  72. r = regen(iter(l))
  73. assert regen(l) is l
  74. assert r == l
  75. assert r == l # again
  76. assert r.__length_hint__() == 0
  77. fun, args = r.__reduce__()
  78. assert fun(*args) == l
  79. def test_gen(self):
  80. g = regen(iter(list(range(10))))
  81. assert g[7] == 7
  82. assert g[6] == 6
  83. assert g[5] == 5
  84. assert g[4] == 4
  85. assert g[3] == 3
  86. assert g[2] == 2
  87. assert g[1] == 1
  88. assert g[0] == 0
  89. assert g.data, list(range(10))
  90. assert g[8] == 8
  91. assert g[0] == 0
  92. g = regen(iter(list(range(10))))
  93. assert g[0] == 0
  94. assert g[1] == 1
  95. assert g.data == list(range(10))
  96. g = regen(iter([1]))
  97. assert g[0] == 1
  98. with pytest.raises(IndexError):
  99. g[1]
  100. assert g.data == [1]
  101. g = regen(iter(list(range(10))))
  102. assert g[-1] == 9
  103. assert g[-2] == 8
  104. assert g[-3] == 7
  105. assert g[-4] == 6
  106. assert g[-5] == 5
  107. assert g[5] == 5
  108. assert g.data == list(range(10))
  109. assert list(iter(g)) == list(range(10))
  110. class test_head_from_fun:
  111. def test_from_cls(self):
  112. class X:
  113. def __call__(x, y, kwarg=1): # noqa
  114. pass
  115. g = head_from_fun(X())
  116. with pytest.raises(TypeError):
  117. g(1)
  118. g(1, 2)
  119. g(1, 2, kwarg=3)
  120. def test_from_fun(self):
  121. def f(x, y, kwarg=1):
  122. pass
  123. g = head_from_fun(f)
  124. with pytest.raises(TypeError):
  125. g(1)
  126. g(1, 2)
  127. g(1, 2, kwarg=3)
  128. def test_regression_3678(self):
  129. local = {}
  130. fun = ('def f(foo, *args, bar=""):'
  131. ' return foo, args, bar')
  132. exec(fun, {}, local)
  133. g = head_from_fun(local['f'])
  134. g(1)
  135. g(1, 2, 3, 4, bar=100)
  136. with pytest.raises(TypeError):
  137. g(bar=100)
  138. def test_from_fun_with_hints(self):
  139. local = {}
  140. fun = ('def f_hints(x: int, y: int, kwarg: int=1):'
  141. ' pass')
  142. exec(fun, {}, local)
  143. f_hints = local['f_hints']
  144. g = head_from_fun(f_hints)
  145. with pytest.raises(TypeError):
  146. g(1)
  147. g(1, 2)
  148. g(1, 2, kwarg=3)
  149. def test_from_fun_forced_kwargs(self):
  150. local = {}
  151. fun = ('def f_kwargs(*, a, b="b", c=None):'
  152. ' return')
  153. exec(fun, {}, local)
  154. f_kwargs = local['f_kwargs']
  155. g = head_from_fun(f_kwargs)
  156. with pytest.raises(TypeError):
  157. g(1)
  158. g(a=1)
  159. g(a=1, b=2)
  160. g(a=1, b=2, c=3)
  161. class test_fun_takes_argument:
  162. def test_starkwargs(self):
  163. assert fun_takes_argument('foo', lambda **kw: 1)
  164. def test_named(self):
  165. assert fun_takes_argument('foo', lambda a, foo, bar: 1)
  166. def fun(a, b, c, d):
  167. return 1
  168. assert fun_takes_argument('foo', fun, position=4)
  169. def test_starargs(self):
  170. assert fun_takes_argument('foo', lambda a, *args: 1)
  171. def test_does_not(self):
  172. assert not fun_takes_argument('foo', lambda a, bar, baz: 1)
  173. assert not fun_takes_argument('foo', lambda: 1)
  174. def fun(a, b, foo):
  175. return 1
  176. assert not fun_takes_argument('foo', fun, position=4)
  177. @pytest.mark.parametrize('a,b,expected', [
  178. ((1, 2, 3), [4, 5], (1, 2, 3, 4, 5)),
  179. ((1, 2), [3, 4, 5], [1, 2, 3, 4, 5]),
  180. ([1, 2, 3], (4, 5), [1, 2, 3, 4, 5]),
  181. ([1, 2], (3, 4, 5), (1, 2, 3, 4, 5)),
  182. ])
  183. def test_seq_concat_seq(a, b, expected):
  184. res = seq_concat_seq(a, b)
  185. assert type(res) is type(expected) # noqa
  186. assert res == expected
  187. @pytest.mark.parametrize('a,b,expected', [
  188. ((1, 2, 3), 4, (1, 2, 3, 4)),
  189. ([1, 2, 3], 4, [1, 2, 3, 4]),
  190. ])
  191. def test_seq_concat_item(a, b, expected):
  192. res = seq_concat_item(a, b)
  193. assert type(res) is type(expected) # noqa
  194. assert res == expected
  195. class StarKwargsCallable(object):
  196. def __call__(self, **kwargs):
  197. return 1
  198. class StarArgsStarKwargsCallable(object):
  199. def __call__(self, *args, **kwargs):
  200. return 1
  201. class StarArgsCallable(object):
  202. def __call__(self, *args):
  203. return 1
  204. class ArgsCallable(object):
  205. def __call__(self, a, b):
  206. return 1
  207. class ArgsStarKwargsCallable(object):
  208. def __call__(self, a, b, **kwargs):
  209. return 1
  210. class test_fun_accepts_kwargs:
  211. @pytest.mark.parametrize('fun', [
  212. lambda a, b, **kwargs: 1,
  213. lambda *args, **kwargs: 1,
  214. lambda foo=1, **kwargs: 1,
  215. StarKwargsCallable(),
  216. StarArgsStarKwargsCallable(),
  217. ArgsStarKwargsCallable(),
  218. ])
  219. def test_accepts(self, fun):
  220. assert fun_accepts_kwargs(fun)
  221. @pytest.mark.parametrize('fun', [
  222. lambda a: 1,
  223. lambda a, b: 1,
  224. lambda *args: 1,
  225. lambda a, kw1=1, kw2=2: 1,
  226. StarArgsCallable(),
  227. ArgsCallable(),
  228. ])
  229. def test_rejects(self, fun):
  230. assert not fun_accepts_kwargs(fun)