123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- from __future__ import absolute_import, unicode_literals
- import pytest
- from case import skip
- from kombu.utils.functional import lazy
- from celery.five import nextfun, range
- from celery.utils.functional import (DummyContext, first, firstmethod,
- fun_accepts_kwargs, fun_takes_argument,
- head_from_fun, maybe_list, mlazy,
- padlist, regen, seq_concat_item,
- seq_concat_seq)
- def test_DummyContext():
- with DummyContext():
- pass
- with pytest.raises(KeyError):
- with DummyContext():
- raise KeyError()
- @pytest.mark.parametrize('items,n,default,expected', [
- (['George', 'Costanza', 'NYC'], 3, None,
- ['George', 'Costanza', 'NYC']),
- (['George', 'Costanza'], 3, None,
- ['George', 'Costanza', None]),
- (['George', 'Costanza', 'NYC'], 4, 'Earth',
- ['George', 'Costanza', 'NYC', 'Earth']),
- ])
- def test_padlist(items, n, default, expected):
- assert padlist(items, n, default=default) == expected
- class test_firstmethod:
- def test_AttributeError(self):
- assert firstmethod('foo')([object()]) is None
- def test_handles_lazy(self):
- class A(object):
- def __init__(self, value=None):
- self.value = value
- def m(self):
- return self.value
- assert 'four' == firstmethod('m')([
- A(), A(), A(), A('four'), A('five')])
- assert 'four' == firstmethod('m')([
- A(), A(), A(), lazy(lambda: A('four')), A('five')])
- def test_first():
- iterations = [0]
- def predicate(value):
- iterations[0] += 1
- if value == 5:
- return True
- return False
- assert first(predicate, range(10)) == 5
- assert iterations[0] == 6
- iterations[0] = 0
- assert first(predicate, range(10, 20)) is None
- assert iterations[0] == 10
- def test_maybe_list():
- assert maybe_list(1) == [1]
- assert maybe_list([1]) == [1]
- assert maybe_list(None) is None
- def test_mlazy():
- it = iter(range(20, 30))
- p = mlazy(nextfun(it))
- assert p() == 20
- assert p.evaluated
- assert p() == 20
- assert repr(p) == '20'
- class test_regen:
- def test_list(self):
- l = [1, 2]
- r = regen(iter(l))
- assert regen(l) is l
- assert r == l
- assert r == l # again
- assert r.__length_hint__() == 0
- fun, args = r.__reduce__()
- assert fun(*args) == l
- def test_gen(self):
- g = regen(iter(list(range(10))))
- assert g[7] == 7
- assert g[6] == 6
- assert g[5] == 5
- assert g[4] == 4
- assert g[3] == 3
- assert g[2] == 2
- assert g[1] == 1
- assert g[0] == 0
- assert g.data, list(range(10))
- assert g[8] == 8
- assert g[0] == 0
- g = regen(iter(list(range(10))))
- assert g[0] == 0
- assert g[1] == 1
- assert g.data == list(range(10))
- g = regen(iter([1]))
- assert g[0] == 1
- with pytest.raises(IndexError):
- g[1]
- assert g.data == [1]
- g = regen(iter(list(range(10))))
- assert g[-1] == 9
- assert g[-2] == 8
- assert g[-3] == 7
- assert g[-4] == 6
- assert g[-5] == 5
- assert g[5] == 5
- assert g.data == list(range(10))
- assert list(iter(g)) == list(range(10))
- class test_head_from_fun:
- def test_from_cls(self):
- class X(object):
- def __call__(x, y, kwarg=1): # noqa
- pass
- g = head_from_fun(X())
- with pytest.raises(TypeError):
- g(1)
- g(1, 2)
- g(1, 2, kwarg=3)
- def test_from_fun(self):
- def f(x, y, kwarg=1):
- pass
- g = head_from_fun(f)
- with pytest.raises(TypeError):
- g(1)
- g(1, 2)
- g(1, 2, kwarg=3)
- @skip.unless_python3()
- def test_regression_3678(self):
- local = {}
- fun = ('def f(foo, *args, bar="", **kwargs):'
- ' return foo, args, bar')
- exec(fun, {}, local)
- g = head_from_fun(local['f'])
- g(1)
- g(1, 2, 3, 4, bar=100)
- with pytest.raises(TypeError):
- g(bar=100)
- @skip.unless_python3()
- def test_from_fun_with_hints(self):
- local = {}
- fun = ('def f_hints(x: int, y: int, kwarg: int=1):'
- ' pass')
- exec(fun, {}, local)
- f_hints = local['f_hints']
- g = head_from_fun(f_hints)
- with pytest.raises(TypeError):
- g(1)
- g(1, 2)
- g(1, 2, kwarg=3)
- @skip.unless_python3()
- def test_from_fun_forced_kwargs(self):
- local = {}
- fun = ('def f_kwargs(*, a, b="b", c=None):'
- ' return')
- exec(fun, {}, local)
- f_kwargs = local['f_kwargs']
- g = head_from_fun(f_kwargs)
- with pytest.raises(TypeError):
- g(1)
- g(a=1)
- g(a=1, b=2)
- g(a=1, b=2, c=3)
- def test_classmethod(self):
- class A(object):
- @classmethod
- def f(cls, x):
- return x
- fun = head_from_fun(A.f, bound=False)
- assert fun(A, 1) == 1
- fun = head_from_fun(A.f, bound=True)
- assert fun(1) == 1
- class test_fun_takes_argument:
- def test_starkwargs(self):
- assert fun_takes_argument('foo', lambda **kw: 1)
- def test_named(self):
- assert fun_takes_argument('foo', lambda a, foo, bar: 1)
- def fun(a, b, c, d):
- return 1
- assert fun_takes_argument('foo', fun, position=4)
- def test_starargs(self):
- assert fun_takes_argument('foo', lambda a, *args: 1)
- def test_does_not(self):
- assert not fun_takes_argument('foo', lambda a, bar, baz: 1)
- assert not fun_takes_argument('foo', lambda: 1)
- def fun(a, b, foo):
- return 1
- assert not fun_takes_argument('foo', fun, position=4)
- @pytest.mark.parametrize('a,b,expected', [
- ((1, 2, 3), [4, 5], (1, 2, 3, 4, 5)),
- ((1, 2), [3, 4, 5], [1, 2, 3, 4, 5]),
- ([1, 2, 3], (4, 5), [1, 2, 3, 4, 5]),
- ([1, 2], (3, 4, 5), (1, 2, 3, 4, 5)),
- ])
- def test_seq_concat_seq(a, b, expected):
- res = seq_concat_seq(a, b)
- assert type(res) is type(expected) # noqa
- assert res == expected
- @pytest.mark.parametrize('a,b,expected', [
- ((1, 2, 3), 4, (1, 2, 3, 4)),
- ([1, 2, 3], 4, [1, 2, 3, 4]),
- ])
- def test_seq_concat_item(a, b, expected):
- res = seq_concat_item(a, b)
- assert type(res) is type(expected) # noqa
- assert res == expected
- class StarKwargsCallable(object):
- def __call__(self, **kwargs):
- return 1
- class StarArgsStarKwargsCallable(object):
- def __call__(self, *args, **kwargs):
- return 1
- class StarArgsCallable(object):
- def __call__(self, *args):
- return 1
- class ArgsCallable(object):
- def __call__(self, a, b):
- return 1
- class ArgsStarKwargsCallable(object):
- def __call__(self, a, b, **kwargs):
- return 1
- class test_fun_accepts_kwargs:
- @pytest.mark.parametrize('fun', [
- lambda a, b, **kwargs: 1,
- lambda *args, **kwargs: 1,
- lambda foo=1, **kwargs: 1,
- StarKwargsCallable(),
- StarArgsStarKwargsCallable(),
- ArgsStarKwargsCallable(),
- ])
- def test_accepts(self, fun):
- assert fun_accepts_kwargs(fun)
- @pytest.mark.parametrize('fun', [
- lambda a: 1,
- lambda a, b: 1,
- lambda *args: 1,
- lambda a, kw1=1, kw2=2: 1,
- StarArgsCallable(),
- ArgsCallable(),
- ])
- def test_rejects(self, fun):
- assert not fun_accepts_kwargs(fun)
|