| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721 | 
							- from __future__ import absolute_import, unicode_literals
 
- import json
 
- import pytest
 
- from case import MagicMock, Mock
 
- from celery._state import _task_stack
 
- from celery.canvas import (Signature, _chain, _maybe_group, chain, chord,
 
-                            chunks, group, maybe_signature, maybe_unroll_group,
 
-                            signature, xmap, xstarmap)
 
- from celery.result import AsyncResult, EagerResult, GroupResult
 
- SIG = Signature({
 
-     'task': 'TASK',
 
-     'args': ('A1',),
 
-     'kwargs': {'K1': 'V1'},
 
-     'options': {'task_id': 'TASK_ID'},
 
-     'subtask_type': ''},
 
- )
 
- class test_maybe_unroll_group:
 
-     def test_when_no_len_and_no_length_hint(self):
 
-         g = MagicMock(name='group')
 
-         g.tasks.__len__.side_effect = TypeError()
 
-         g.tasks.__length_hint__ = Mock()
 
-         g.tasks.__length_hint__.return_value = 0
 
-         assert maybe_unroll_group(g) is g
 
-         g.tasks.__length_hint__.side_effect = AttributeError()
 
-         assert maybe_unroll_group(g) is g
 
- class CanvasCase:
 
-     def setup(self):
 
-         @self.app.task(shared=False)
 
-         def add(x, y):
 
-             return x + y
 
-         self.add = add
 
-         @self.app.task(shared=False)
 
-         def mul(x, y):
 
-             return x * y
 
-         self.mul = mul
 
-         @self.app.task(shared=False)
 
-         def div(x, y):
 
-             return x / y
 
-         self.div = div
 
- class test_Signature(CanvasCase):
 
-     def test_getitem_property_class(self):
 
-         assert Signature.task
 
-         assert Signature.args
 
-         assert Signature.kwargs
 
-         assert Signature.options
 
-         assert Signature.subtask_type
 
-     def test_getitem_property(self):
 
-         assert SIG.task == 'TASK'
 
-         assert SIG.args == ('A1',)
 
-         assert SIG.kwargs == {'K1': 'V1'}
 
-         assert SIG.options == {'task_id': 'TASK_ID'}
 
-         assert SIG.subtask_type == ''
 
-     def test_call(self):
 
-         x = Signature('foo', (1, 2), {'arg1': 33}, app=self.app)
 
-         x.type = Mock(name='type')
 
-         x(3, 4, arg2=66)
 
-         x.type.assert_called_with(3, 4, 1, 2, arg1=33, arg2=66)
 
-     def test_link_on_scalar(self):
 
-         x = Signature('TASK', link=Signature('B'))
 
-         assert x.options['link']
 
-         x.link(Signature('C'))
 
-         assert isinstance(x.options['link'], list)
 
-         assert Signature('B') in x.options['link']
 
-         assert Signature('C') in x.options['link']
 
-     def test_json(self):
 
-         x = Signature('TASK', link=Signature('B', app=self.app), app=self.app)
 
-         assert x.__json__() == dict(x)
 
-     @pytest.mark.usefixtures('depends_on_current_app')
 
-     def test_reduce(self):
 
-         x = Signature('TASK', (2, 4), app=self.app)
 
-         fun, args = x.__reduce__()
 
-         assert fun(*args) == x
 
-     def test_replace(self):
 
-         x = Signature('TASK', ('A'), {})
 
-         assert x.replace(args=('B',)).args == ('B',)
 
-         assert x.replace(kwargs={'FOO': 'BAR'}).kwargs == {
 
-             'FOO': 'BAR',
 
-         }
 
-         assert x.replace(options={'task_id': '123'}).options == {
 
-             'task_id': '123',
 
-         }
 
-     def test_set(self):
 
-         assert Signature('TASK', x=1).set(task_id='2').options == {
 
-             'x': 1, 'task_id': '2',
 
-         }
 
-     def test_link(self):
 
-         x = signature(SIG)
 
-         x.link(SIG)
 
-         x.link(SIG)
 
-         assert SIG in x.options['link']
 
-         assert len(x.options['link']) == 1
 
-     def test_link_error(self):
 
-         x = signature(SIG)
 
-         x.link_error(SIG)
 
-         x.link_error(SIG)
 
-         assert SIG in x.options['link_error']
 
-         assert len(x.options['link_error']) == 1
 
-     def test_flatten_links(self):
 
-         tasks = [self.add.s(2, 2), self.mul.s(4), self.div.s(2)]
 
-         tasks[0].link(tasks[1])
 
-         tasks[1].link(tasks[2])
 
-         assert tasks[0].flatten_links() == tasks
 
-     def test_OR(self):
 
-         x = self.add.s(2, 2) | self.mul.s(4)
 
-         assert isinstance(x, _chain)
 
-         y = self.add.s(4, 4) | self.div.s(2)
 
-         z = x | y
 
-         assert isinstance(y, _chain)
 
-         assert isinstance(z, _chain)
 
-         assert len(z.tasks) == 4
 
-         with pytest.raises(TypeError):
 
-             x | 10
 
-         ax = self.add.s(2, 2) | (self.add.s(4) | self.add.s(8))
 
-         assert isinstance(ax, _chain)
 
-         assert len(ax.tasks), 3 == 'consolidates chain to chain'
 
-     def test_INVERT(self):
 
-         x = self.add.s(2, 2)
 
-         x.apply_async = Mock()
 
-         x.apply_async.return_value = Mock()
 
-         x.apply_async.return_value.get = Mock()
 
-         x.apply_async.return_value.get.return_value = 4
 
-         assert ~x == 4
 
-         x.apply_async.assert_called()
 
-     def test_merge_immutable(self):
 
-         x = self.add.si(2, 2, foo=1)
 
-         args, kwargs, options = x._merge((4,), {'bar': 2}, {'task_id': 3})
 
-         assert args == (2, 2)
 
-         assert kwargs == {'foo': 1}
 
-         assert options == {'task_id': 3}
 
-     def test_set_immutable(self):
 
-         x = self.add.s(2, 2)
 
-         assert not x.immutable
 
-         x.set(immutable=True)
 
-         assert x.immutable
 
-         x.set(immutable=False)
 
-         assert not x.immutable
 
-     def test_election(self):
 
-         x = self.add.s(2, 2)
 
-         x.freeze('foo')
 
-         x.type.app.control = Mock()
 
-         r = x.election()
 
-         x.type.app.control.election.assert_called()
 
-         assert r.id == 'foo'
 
-     def test_AsyncResult_when_not_registered(self):
 
-         s = signature('xxx.not.registered', app=self.app)
 
-         assert s.AsyncResult
 
-     def test_apply_async_when_not_registered(self):
 
-         s = signature('xxx.not.registered', app=self.app)
 
-         assert s._apply_async
 
-     def test_keeping_link_error_on_chaining(self):
 
-         x = self.add.s(2, 2) | self.mul.s(4)
 
-         assert isinstance(x, _chain)
 
-         x.link_error(SIG)
 
-         assert SIG in x.options['link_error']
 
-         t = signature(SIG)
 
-         z = x | t
 
-         assert isinstance(z, _chain)
 
-         assert t in z.tasks
 
-         assert not z.options.get('link_error')
 
-         assert SIG in z.tasks[0].options['link_error']
 
-         assert not z.tasks[2].options.get('link_error')
 
-         assert SIG in x.options['link_error']
 
-         assert t not in x.tasks
 
-         assert not x.tasks[0].options.get('link_error')
 
-         z = t | x
 
-         assert isinstance(z, _chain)
 
-         assert t in z.tasks
 
-         assert not z.options.get('link_error')
 
-         assert SIG in z.tasks[1].options['link_error']
 
-         assert not z.tasks[0].options.get('link_error')
 
-         assert SIG in x.options['link_error']
 
-         assert t not in x.tasks
 
-         assert not x.tasks[0].options.get('link_error')
 
-         y = self.add.s(4, 4) | self.div.s(2)
 
-         assert isinstance(y, _chain)
 
-         z = x | y
 
-         assert isinstance(z, _chain)
 
-         assert not z.options.get('link_error')
 
-         assert SIG in z.tasks[0].options['link_error']
 
-         assert not z.tasks[2].options.get('link_error')
 
-         assert SIG in x.options['link_error']
 
-         assert not x.tasks[0].options.get('link_error')
 
-         z = y | x
 
-         assert isinstance(z, _chain)
 
-         assert not z.options.get('link_error')
 
-         assert SIG in z.tasks[3].options['link_error']
 
-         assert not z.tasks[1].options.get('link_error')
 
-         assert SIG in x.options['link_error']
 
-         assert not x.tasks[0].options.get('link_error')
 
- class test_xmap_xstarmap(CanvasCase):
 
-     def test_apply(self):
 
-         for type, attr in [(xmap, 'map'), (xstarmap, 'starmap')]:
 
-             args = [(i, i) for i in range(10)]
 
-             s = getattr(self.add, attr)(args)
 
-             s.type = Mock()
 
-             s.apply_async(foo=1)
 
-             s.type.apply_async.assert_called_with(
 
-                 (), {'task': self.add.s(), 'it': args}, foo=1,
 
-                 route_name=self.add.name,
 
-             )
 
-             assert type.from_dict(dict(s)) == s
 
-             assert repr(s)
 
- class test_chunks(CanvasCase):
 
-     def test_chunks(self):
 
-         x = self.add.chunks(range(100), 10)
 
-         assert dict(chunks.from_dict(dict(x), app=self.app)) == dict(x)
 
-         assert x.group()
 
-         assert len(x.group().tasks) == 10
 
-         x.group = Mock()
 
-         gr = x.group.return_value = Mock()
 
-         x.apply_async()
 
-         gr.apply_async.assert_called_with((), {}, route_name=self.add.name)
 
-         gr.apply_async.reset_mock()
 
-         x()
 
-         gr.apply_async.assert_called_with((), {}, route_name=self.add.name)
 
-         self.app.conf.task_always_eager = True
 
-         chunks.apply_chunks(app=self.app, **x['kwargs'])
 
- class test_chain(CanvasCase):
 
-     def test_clone_preserves_state(self):
 
-         x = chain(self.add.s(i, i) for i in range(10))
 
-         assert x.clone().tasks == x.tasks
 
-         assert x.clone().kwargs == x.kwargs
 
-         assert x.clone().args == x.args
 
-     def test_repr(self):
 
-         x = self.add.s(2, 2) | self.add.s(2)
 
-         assert repr(x) == '%s(2, 2) | add(2)' % (self.add.name,)
 
-     def test_apply_async(self):
 
-         c = self.add.s(2, 2) | self.add.s(4) | self.add.s(8)
 
-         result = c.apply_async()
 
-         assert result.parent
 
-         assert result.parent.parent
 
-         assert result.parent.parent.parent is None
 
-     def test_splices_chains(self):
 
-         c = chain(
 
-             self.add.s(5, 5),
 
-             chain(self.add.s(6), self.add.s(7), self.add.s(8), app=self.app),
 
-             app=self.app,
 
-         )
 
-         c.freeze()
 
-         tasks, _ = c._frozen
 
-         assert len(tasks) == 4
 
-     def test_from_dict_no_tasks(self):
 
-         assert chain.from_dict(dict(chain(app=self.app)), app=self.app)
 
-     def test_from_dict_full_subtasks(self):
 
-         c = chain(self.add.si(1, 2), self.add.si(3, 4), self.add.si(5, 6))
 
-         serialized = json.loads(json.dumps(c))
 
-         deserialized = chain.from_dict(serialized)
 
-         for task in deserialized.tasks:
 
-             assert isinstance(task, Signature)
 
-     @pytest.mark.usefixtures('depends_on_current_app')
 
-     def test_app_falls_back_to_default(self):
 
-         from celery._state import current_app
 
-         assert chain().app is current_app
 
-     def test_handles_dicts(self):
 
-         c = chain(
 
-             self.add.s(5, 5), dict(self.add.s(8)), app=self.app,
 
-         )
 
-         c.freeze()
 
-         tasks, _ = c._frozen
 
-         for task in tasks:
 
-             assert isinstance(task, Signature)
 
-             assert task.app is self.app
 
-     def test_group_to_chord(self):
 
-         c = (
 
-             self.add.s(5) |
 
-             group([self.add.s(i, i) for i in range(5)], app=self.app) |
 
-             self.add.s(10) |
 
-             self.add.s(20) |
 
-             self.add.s(30)
 
-         )
 
-         c._use_link = True
 
-         tasks, results = c.prepare_steps((), c.tasks)
 
-         assert tasks[-1].args[0] == 5
 
-         assert isinstance(tasks[-2], chord)
 
-         assert len(tasks[-2].tasks) == 5
 
-         body = tasks[-2].body
 
-         assert len(body.tasks) == 3
 
-         assert body.tasks[0].args[0] == 10
 
-         assert body.tasks[1].args[0] == 20
 
-         assert body.tasks[2].args[0] == 30
 
-         c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
 
-         c2._use_link = True
 
-         tasks2, _ = c2.prepare_steps((), c2.tasks)
 
-         assert isinstance(tasks2[0], group)
 
-     def test_group_to_chord__protocol_2__or(self):
 
-         c = (
 
-             group([self.add.s(i, i) for i in range(5)], app=self.app) |
 
-             self.add.s(10) |
 
-             self.add.s(20) |
 
-             self.add.s(30)
 
-         )
 
-         assert isinstance(c, chord)
 
-     def test_group_to_chord__protocol_2(self):
 
-         c = chain(
 
-             group([self.add.s(i, i) for i in range(5)], app=self.app),
 
-             self.add.s(10),
 
-             self.add.s(20),
 
-             self.add.s(30)
 
-         )
 
-         assert isinstance(c, chord)
 
-         assert isinstance(c.body, _chain)
 
-         assert len(c.body.tasks) == 3
 
-         c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
 
-         c2._use_link = False
 
-         tasks2, _ = c2.prepare_steps((), c2.tasks)
 
-         assert isinstance(tasks2[0], group)
 
-     def test_apply_options(self):
 
-         class static(Signature):
 
-             def clone(self, *args, **kwargs):
 
-                 return self
 
-         def s(*args, **kwargs):
 
-             return static(self.add, args, kwargs, type=self.add, app=self.app)
 
-         c = s(2, 2) | s(4) | s(8)
 
-         r1 = c.apply_async(task_id='some_id')
 
-         assert r1.id == 'some_id'
 
-         c.apply_async(group_id='some_group_id')
 
-         assert c.tasks[-1].options['group_id'] == 'some_group_id'
 
-         c.apply_async(chord='some_chord_id')
 
-         assert c.tasks[-1].options['chord'] == 'some_chord_id'
 
-         c.apply_async(link=[s(32)])
 
-         assert c.tasks[-1].options['link'] == [s(32)]
 
-         c.apply_async(link_error=[s('error')])
 
-         for task in c.tasks:
 
-             assert task.options['link_error'] == [s('error')]
 
-     def test_reverse(self):
 
-         x = self.add.s(2, 2) | self.add.s(2)
 
-         assert isinstance(signature(x), _chain)
 
-         assert isinstance(signature(dict(x)), _chain)
 
-     def test_always_eager(self):
 
-         self.app.conf.task_always_eager = True
 
-         assert ~(self.add.s(4, 4) | self.add.s(8)) == 16
 
-     def test_apply(self):
 
-         x = chain(self.add.s(4, 4), self.add.s(8), self.add.s(10))
 
-         res = x.apply()
 
-         assert isinstance(res, EagerResult)
 
-         assert res.get() == 26
 
-         assert res.parent.get() == 16
 
-         assert res.parent.parent.get() == 8
 
-         assert res.parent.parent.parent is None
 
-     def test_empty_chain_returns_none(self):
 
-         assert chain(app=self.app)() is None
 
-         assert chain(app=self.app).apply_async() is None
 
-     def test_call_no_tasks(self):
 
-         x = chain()
 
-         assert not x()
 
-     def test_call_with_tasks(self):
 
-         x = self.add.s(2, 2) | self.add.s(4)
 
-         x.apply_async = Mock()
 
-         x(2, 2, foo=1)
 
-         x.apply_async.assert_called_with((2, 2), {'foo': 1})
 
-     def test_from_dict_no_args__with_args(self):
 
-         x = dict(self.add.s(2, 2) | self.add.s(4))
 
-         x['args'] = None
 
-         assert isinstance(chain.from_dict(x), _chain)
 
-         x['args'] = (2,)
 
-         assert isinstance(chain.from_dict(x), _chain)
 
-     def test_accepts_generator_argument(self):
 
-         x = chain(self.add.s(i) for i in range(10))
 
-         assert x.tasks[0].type, self.add
 
-         assert x.type
 
-     def test_chord_sets_result_parent(self):
 
-         g = (self.add.s(0, 0) |
 
-              group(self.add.s(i, i) for i in range(1, 10)) |
 
-              self.add.s(2, 2) |
 
-              self.add.s(4, 4))
 
-         res = g.freeze()
 
-         assert isinstance(res, AsyncResult)
 
-         assert not isinstance(res, GroupResult)
 
-         assert isinstance(res.parent, AsyncResult)
 
-         assert not isinstance(res.parent, GroupResult)
 
-         assert isinstance(res.parent.parent, GroupResult)
 
-         assert isinstance(res.parent.parent.parent, AsyncResult)
 
-         assert not isinstance(res.parent.parent.parent, GroupResult)
 
-         assert res.parent.parent.parent.parent is None
 
-         seen = set()
 
-         node = res
 
-         while node:
 
-             assert node.id not in seen
 
-             seen.add(node.id)
 
-             node = node.parent
 
- class test_group(CanvasCase):
 
-     def test_repr(self):
 
-         x = group([self.add.s(2, 2), self.add.s(4, 4)])
 
-         assert repr(x)
 
-     def test_reverse(self):
 
-         x = group([self.add.s(2, 2), self.add.s(4, 4)])
 
-         assert isinstance(signature(x), group)
 
-         assert isinstance(signature(dict(x)), group)
 
-     def test_cannot_link_on_group(self):
 
-         x = group([self.add.s(2, 2), self.add.s(4, 4)])
 
-         with pytest.raises(TypeError):
 
-             x.apply_async(link=self.add.s(2, 2))
 
-     def test_cannot_link_error_on_group(self):
 
-         x = group([self.add.s(2, 2), self.add.s(4, 4)])
 
-         with pytest.raises(TypeError):
 
-             x.apply_async(link_error=self.add.s(2, 2))
 
-     def test_group_with_group_argument(self):
 
-         g1 = group(self.add.s(2, 2), self.add.s(4, 4), app=self.app)
 
-         g2 = group(g1, app=self.app)
 
-         assert g2.tasks is g1.tasks
 
-     def test_maybe_group_sig(self):
 
-         assert _maybe_group(self.add.s(2, 2), self.app) == [self.add.s(2, 2)]
 
-     def test_apply(self):
 
-         x = group([self.add.s(4, 4), self.add.s(8, 8)])
 
-         res = x.apply()
 
-         assert res.get(), [8 == 16]
 
-     def test_apply_async(self):
 
-         x = group([self.add.s(4, 4), self.add.s(8, 8)])
 
-         x.apply_async()
 
-     def test_prepare_with_dict(self):
 
-         x = group([self.add.s(4, 4), dict(self.add.s(8, 8))], app=self.app)
 
-         x.apply_async()
 
-     def test_group_in_group(self):
 
-         g1 = group(self.add.s(2, 2), self.add.s(4, 4), app=self.app)
 
-         g2 = group(self.add.s(8, 8), g1, self.add.s(16, 16), app=self.app)
 
-         g2.apply_async()
 
-     def test_set_immutable(self):
 
-         g1 = group(Mock(name='t1'), Mock(name='t2'), app=self.app)
 
-         g1.set_immutable(True)
 
-         for task in g1.tasks:
 
-             task.set_immutable.assert_called_with(True)
 
-     def test_link(self):
 
-         g1 = group(Mock(name='t1'), Mock(name='t2'), app=self.app)
 
-         sig = Mock(name='sig')
 
-         g1.link(sig)
 
-         g1.tasks[0].link.assert_called_with(sig.clone().set(immutable=True))
 
-     def test_link_error(self):
 
-         g1 = group(Mock(name='t1'), Mock(name='t2'), app=self.app)
 
-         sig = Mock(name='sig')
 
-         g1.link_error(sig)
 
-         g1.tasks[0].link_error.assert_called_with(
 
-             sig.clone().set(immutable=True),
 
-         )
 
-     def test_apply_empty(self):
 
-         x = group(app=self.app)
 
-         x.apply()
 
-         res = x.apply_async()
 
-         assert res
 
-         assert not res.results
 
-     def test_apply_async_with_parent(self):
 
-         _task_stack.push(self.add)
 
-         try:
 
-             self.add.push_request(called_directly=False)
 
-             try:
 
-                 assert not self.add.request.children
 
-                 x = group([self.add.s(4, 4), self.add.s(8, 8)])
 
-                 res = x()
 
-                 assert self.add.request.children
 
-                 assert res in self.add.request.children
 
-                 assert len(self.add.request.children) == 1
 
-             finally:
 
-                 self.add.pop_request()
 
-         finally:
 
-             _task_stack.pop()
 
-     def test_from_dict(self):
 
-         x = group([self.add.s(2, 2), self.add.s(4, 4)])
 
-         x['args'] = (2, 2)
 
-         assert group.from_dict(dict(x))
 
-         x['args'] = None
 
-         assert group.from_dict(dict(x))
 
-     def test_call_empty_group(self):
 
-         x = group(app=self.app)
 
-         assert not len(x())
 
-         x.delay()
 
-         x.apply_async()
 
-         x()
 
-     def test_skew(self):
 
-         g = group([self.add.s(i, i) for i in range(10)])
 
-         g.skew(start=1, stop=10, step=1)
 
-         for i, task in enumerate(g.tasks):
 
-             assert task.options['countdown'] == i + 1
 
-     def test_iter(self):
 
-         g = group([self.add.s(i, i) for i in range(10)])
 
-         assert list(iter(g)) == list(g.keys())
 
-     def test_single_task(self):
 
-         g = group([self.add.s(1, 1)])
 
-         assert isinstance(g, group)
 
-         assert len(g.tasks) == 1
 
-         g = group(self.add.s(1, 1))
 
-         assert isinstance(g, group)
 
-         assert len(g.tasks) == 1
 
-     @staticmethod
 
-     def helper_test_get_delay(result):
 
-         import time
 
-         t0 = time.time()
 
-         while not result.ready():
 
-             time.sleep(0.01)
 
-             if time.time() - t0 > 1:
 
-                 return None
 
-         return result.get()
 
-     def test_kwargs_direct(self):
 
-         res = [self.add(x=1, y=1), self.add(x=1, y=1)]
 
-         assert res == [2, 2]
 
-     def test_kwargs_apply(self):
 
-         x = group([self.add.s(), self.add.s()])
 
-         res = x.apply(kwargs={'x': 1, 'y': 1}).get()
 
-         assert res == [2, 2]
 
-     def test_kwargs_apply_async(self):
 
-         self.app.conf.task_always_eager = True
 
-         x = group([self.add.s(), self.add.s()])
 
-         res = self.helper_test_get_delay(
 
-             x.apply_async(kwargs={'x': 1, 'y': 1})
 
-         )
 
-         assert res == [2, 2]
 
-     def test_kwargs_delay(self):
 
-         self.app.conf.task_always_eager = True
 
-         x = group([self.add.s(), self.add.s()])
 
-         res = self.helper_test_get_delay(x.delay(x=1, y=1))
 
-         assert res == [2, 2]
 
-     def test_kwargs_delay_partial(self):
 
-         self.app.conf.task_always_eager = True
 
-         x = group([self.add.s(1), self.add.s(x=1)])
 
-         res = self.helper_test_get_delay(x.delay(y=1))
 
-         assert res == [2, 2]
 
- class test_chord(CanvasCase):
 
-     def test_reverse(self):
 
-         x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
 
-         assert isinstance(signature(x), chord)
 
-         assert isinstance(signature(dict(x)), chord)
 
-     def test_clone_clones_body(self):
 
-         x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
 
-         y = x.clone()
 
-         assert x.kwargs['body'] is not y.kwargs['body']
 
-         y.kwargs.pop('body')
 
-         z = y.clone()
 
-         assert z.kwargs.get('body') is None
 
-     def test_argument_is_group(self):
 
-         x = chord(group(self.add.s(2, 2), self.add.s(4, 4), app=self.app))
 
-         assert x.tasks
 
-     def test_app_when_app(self):
 
-         app = Mock(name='app')
 
-         x = chord([self.add.s(4, 4)], app=app)
 
-         assert x.app is app
 
-     def test_app_when_app_in_task(self):
 
-         t1 = Mock(name='t1')
 
-         t2 = Mock(name='t2')
 
-         x = chord([t1, self.add.s(4, 4)])
 
-         assert x.app is x.tasks[0].app
 
-         t1.app = None
 
-         x = chord([t1], body=t2)
 
-         assert x.app is t2._app
 
-     def test_app_when_header_is_empty(self):
 
-         x = chord([], self.add.s(4, 4))
 
-         assert x.app is self.add.app
 
-     @pytest.mark.usefixtures('depends_on_current_app')
 
-     def test_app_fallback_to_current(self):
 
-         from celery._state import current_app
 
-         t1 = Mock(name='t1')
 
-         t1.app = t1._app = None
 
-         x = chord([t1], body=t1)
 
-         assert x.app is current_app
 
-     def test_set_immutable(self):
 
-         x = chord([Mock(name='t1'), Mock(name='t2')], app=self.app)
 
-         x.set_immutable(True)
 
-     def test_links_to_body(self):
 
-         x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
 
-         x.link(self.div.s(2))
 
-         assert not x.options.get('link')
 
-         assert x.kwargs['body'].options['link']
 
-         x.link_error(self.div.s(2))
 
-         assert not x.options.get('link_error')
 
-         assert x.kwargs['body'].options['link_error']
 
-         assert x.tasks
 
-         assert x.body
 
-     def test_repr(self):
 
-         x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
 
-         assert repr(x)
 
-         x.kwargs['body'] = None
 
-         assert 'without body' in repr(x)
 
-     def test_freeze_tasks_is_not_group(self):
 
-         x = chord([self.add.s(2, 2)], body=self.add.s(), app=self.app)
 
-         x.freeze()
 
-         x.tasks = [self.add.s(2, 2)]
 
-         x.freeze()
 
- class test_maybe_signature(CanvasCase):
 
-     def test_is_None(self):
 
-         assert maybe_signature(None, app=self.app) is None
 
-     def test_is_dict(self):
 
-         assert isinstance(maybe_signature(dict(self.add.s()), app=self.app),
 
-                           Signature)
 
-     def test_when_sig(self):
 
-         s = self.add.s()
 
-         assert maybe_signature(s, app=self.app) is s
 
 
  |