test_functional.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import skip
  4. from kombu.utils.functional import lazy
  5. from celery.five import nextfun, range
  6. from celery.utils.functional import (DummyContext, first, firstmethod,
  7. fun_accepts_kwargs, fun_takes_argument,
  8. head_from_fun, maybe_list, mlazy,
  9. padlist, regen, seq_concat_item,
  10. seq_concat_seq)
  11. def test_DummyContext():
  12. with DummyContext():
  13. pass
  14. with pytest.raises(KeyError):
  15. with DummyContext():
  16. raise KeyError()
  17. @pytest.mark.parametrize('items,n,default,expected', [
  18. (['George', 'Costanza', 'NYC'], 3, None,
  19. ['George', 'Costanza', 'NYC']),
  20. (['George', 'Costanza'], 3, None,
  21. ['George', 'Costanza', None]),
  22. (['George', 'Costanza', 'NYC'], 4, 'Earth',
  23. ['George', 'Costanza', 'NYC', 'Earth']),
  24. ])
  25. def test_padlist(items, n, default, expected):
  26. assert padlist(items, n, default=default) == expected
  27. class test_firstmethod:
  28. def test_AttributeError(self):
  29. assert firstmethod('foo')([object()]) is None
  30. def test_handles_lazy(self):
  31. class A(object):
  32. def __init__(self, value=None):
  33. self.value = value
  34. def m(self):
  35. return self.value
  36. assert 'four' == firstmethod('m')([
  37. A(), A(), A(), A('four'), A('five')])
  38. assert 'four' == firstmethod('m')([
  39. A(), A(), A(), lazy(lambda: A('four')), A('five')])
  40. def test_first():
  41. iterations = [0]
  42. def predicate(value):
  43. iterations[0] += 1
  44. if value == 5:
  45. return True
  46. return False
  47. assert first(predicate, range(10)) == 5
  48. assert iterations[0] == 6
  49. iterations[0] = 0
  50. assert first(predicate, range(10, 20)) is None
  51. assert iterations[0] == 10
  52. def test_maybe_list():
  53. assert maybe_list(1) == [1]
  54. assert maybe_list([1]) == [1]
  55. assert maybe_list(None) is None
  56. def test_mlazy():
  57. it = iter(range(20, 30))
  58. p = mlazy(nextfun(it))
  59. assert p() == 20
  60. assert p.evaluated
  61. assert p() == 20
  62. assert repr(p) == '20'
  63. class test_regen:
  64. def test_list(self):
  65. l = [1, 2]
  66. r = regen(iter(l))
  67. assert regen(l) is l
  68. assert r == l
  69. assert r == l # again
  70. assert r.__length_hint__() == 0
  71. fun, args = r.__reduce__()
  72. assert fun(*args) == l
  73. def test_gen(self):
  74. g = regen(iter(list(range(10))))
  75. assert g[7] == 7
  76. assert g[6] == 6
  77. assert g[5] == 5
  78. assert g[4] == 4
  79. assert g[3] == 3
  80. assert g[2] == 2
  81. assert g[1] == 1
  82. assert g[0] == 0
  83. assert g.data, list(range(10))
  84. assert g[8] == 8
  85. assert g[0] == 0
  86. g = regen(iter(list(range(10))))
  87. assert g[0] == 0
  88. assert g[1] == 1
  89. assert g.data == list(range(10))
  90. g = regen(iter([1]))
  91. assert g[0] == 1
  92. with pytest.raises(IndexError):
  93. g[1]
  94. assert g.data == [1]
  95. g = regen(iter(list(range(10))))
  96. assert g[-1] == 9
  97. assert g[-2] == 8
  98. assert g[-3] == 7
  99. assert g[-4] == 6
  100. assert g[-5] == 5
  101. assert g[5] == 5
  102. assert g.data == list(range(10))
  103. assert list(iter(g)) == list(range(10))
  104. class test_head_from_fun:
  105. def test_from_cls(self):
  106. class X(object):
  107. def __call__(x, y, kwarg=1): # noqa
  108. pass
  109. g = head_from_fun(X())
  110. with pytest.raises(TypeError):
  111. g(1)
  112. g(1, 2)
  113. g(1, 2, kwarg=3)
  114. def test_from_fun(self):
  115. def f(x, y, kwarg=1):
  116. pass
  117. g = head_from_fun(f)
  118. with pytest.raises(TypeError):
  119. g(1)
  120. g(1, 2)
  121. g(1, 2, kwarg=3)
  122. @skip.unless_python3()
  123. def test_regression_3678(self):
  124. local = {}
  125. fun = ('def f(foo, *args, bar="", **kwargs):'
  126. ' return foo, args, bar')
  127. exec(fun, {}, local)
  128. g = head_from_fun(local['f'])
  129. g(1)
  130. g(1, 2, 3, 4, bar=100)
  131. with pytest.raises(TypeError):
  132. g(bar=100)
  133. @skip.unless_python3()
  134. def test_from_fun_with_hints(self):
  135. local = {}
  136. fun = ('def f_hints(x: int, y: int, kwarg: int=1):'
  137. ' pass')
  138. exec(fun, {}, local)
  139. f_hints = local['f_hints']
  140. g = head_from_fun(f_hints)
  141. with pytest.raises(TypeError):
  142. g(1)
  143. g(1, 2)
  144. g(1, 2, kwarg=3)
  145. @skip.unless_python3()
  146. def test_from_fun_forced_kwargs(self):
  147. local = {}
  148. fun = ('def f_kwargs(*, a, b="b", c=None):'
  149. ' return')
  150. exec(fun, {}, local)
  151. f_kwargs = local['f_kwargs']
  152. g = head_from_fun(f_kwargs)
  153. with pytest.raises(TypeError):
  154. g(1)
  155. g(a=1)
  156. g(a=1, b=2)
  157. g(a=1, b=2, c=3)
  158. def test_classmethod(self):
  159. class A(object):
  160. @classmethod
  161. def f(cls, x):
  162. return x
  163. fun = head_from_fun(A.f, bound=False)
  164. assert fun(A, 1) == 1
  165. fun = head_from_fun(A.f, bound=True)
  166. assert fun(1) == 1
  167. class test_fun_takes_argument:
  168. def test_starkwargs(self):
  169. assert fun_takes_argument('foo', lambda **kw: 1)
  170. def test_named(self):
  171. assert fun_takes_argument('foo', lambda a, foo, bar: 1)
  172. def fun(a, b, c, d):
  173. return 1
  174. assert fun_takes_argument('foo', fun, position=4)
  175. def test_starargs(self):
  176. assert fun_takes_argument('foo', lambda a, *args: 1)
  177. def test_does_not(self):
  178. assert not fun_takes_argument('foo', lambda a, bar, baz: 1)
  179. assert not fun_takes_argument('foo', lambda: 1)
  180. def fun(a, b, foo):
  181. return 1
  182. assert not fun_takes_argument('foo', fun, position=4)
  183. @pytest.mark.parametrize('a,b,expected', [
  184. ((1, 2, 3), [4, 5], (1, 2, 3, 4, 5)),
  185. ((1, 2), [3, 4, 5], [1, 2, 3, 4, 5]),
  186. ([1, 2, 3], (4, 5), [1, 2, 3, 4, 5]),
  187. ([1, 2], (3, 4, 5), (1, 2, 3, 4, 5)),
  188. ])
  189. def test_seq_concat_seq(a, b, expected):
  190. res = seq_concat_seq(a, b)
  191. assert type(res) is type(expected) # noqa
  192. assert res == expected
  193. @pytest.mark.parametrize('a,b,expected', [
  194. ((1, 2, 3), 4, (1, 2, 3, 4)),
  195. ([1, 2, 3], 4, [1, 2, 3, 4]),
  196. ])
  197. def test_seq_concat_item(a, b, expected):
  198. res = seq_concat_item(a, b)
  199. assert type(res) is type(expected) # noqa
  200. assert res == expected
  201. class StarKwargsCallable(object):
  202. def __call__(self, **kwargs):
  203. return 1
  204. class StarArgsStarKwargsCallable(object):
  205. def __call__(self, *args, **kwargs):
  206. return 1
  207. class StarArgsCallable(object):
  208. def __call__(self, *args):
  209. return 1
  210. class ArgsCallable(object):
  211. def __call__(self, a, b):
  212. return 1
  213. class ArgsStarKwargsCallable(object):
  214. def __call__(self, a, b, **kwargs):
  215. return 1
  216. class test_fun_accepts_kwargs:
  217. @pytest.mark.parametrize('fun', [
  218. lambda a, b, **kwargs: 1,
  219. lambda *args, **kwargs: 1,
  220. lambda foo=1, **kwargs: 1,
  221. StarKwargsCallable(),
  222. StarArgsStarKwargsCallable(),
  223. ArgsStarKwargsCallable(),
  224. ])
  225. def test_accepts(self, fun):
  226. assert fun_accepts_kwargs(fun)
  227. @pytest.mark.parametrize('fun', [
  228. lambda a: 1,
  229. lambda a, b: 1,
  230. lambda *args: 1,
  231. lambda a, kw1=1, kw2=2: 1,
  232. StarArgsCallable(),
  233. ArgsCallable(),
  234. ])
  235. def test_rejects(self, fun):
  236. assert not fun_accepts_kwargs(fun)