|
@@ -1,18 +1,17 @@
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
-from pickle import loads, dumps
|
|
|
-from mock import Mock
|
|
|
+from contextlib import contextmanager
|
|
|
+from mock import Mock, patch
|
|
|
|
|
|
from celery import states
|
|
|
-from celery.app import app_or_default
|
|
|
from celery.exceptions import IncompleteStream, TimeoutError
|
|
|
from celery.five import range
|
|
|
from celery.result import (
|
|
|
AsyncResult,
|
|
|
EagerResult,
|
|
|
- GroupResult,
|
|
|
TaskSetResult,
|
|
|
ResultSet,
|
|
|
+ GroupResult,
|
|
|
from_serializable,
|
|
|
)
|
|
|
from celery.task import task
|
|
@@ -33,8 +32,7 @@ def mock_task(name, state, result):
|
|
|
return dict(id=uuid(), name=name, state=state, result=result)
|
|
|
|
|
|
|
|
|
-def save_result(task):
|
|
|
- app = app_or_default()
|
|
|
+def save_result(app, task):
|
|
|
traceback = 'Some traceback'
|
|
|
if task['state'] == states.SUCCESS:
|
|
|
app.backend.mark_as_done(task['id'], task['result'])
|
|
@@ -48,10 +46,10 @@ def save_result(task):
|
|
|
)
|
|
|
|
|
|
|
|
|
-def make_mock_group(size=10):
|
|
|
+def make_mock_group(app, size=10):
|
|
|
tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]
|
|
|
- [save_result(task) for task in tasks]
|
|
|
- return [AsyncResult(task['id']) for task in tasks]
|
|
|
+ [save_result(app, task) for task in tasks]
|
|
|
+ return [app.AsyncResult(task['id']) for task in tasks]
|
|
|
|
|
|
|
|
|
class test_AsyncResult(AppCase):
|
|
@@ -63,16 +61,16 @@ class test_AsyncResult(AppCase):
|
|
|
self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
|
|
|
|
|
|
for task in (self.task1, self.task2, self.task3, self.task4):
|
|
|
- save_result(task)
|
|
|
+ save_result(self.app, task)
|
|
|
|
|
|
def test_compat_properties(self):
|
|
|
- x = AsyncResult('1')
|
|
|
+ x = self.app.AsyncResult('1')
|
|
|
self.assertEqual(x.task_id, x.id)
|
|
|
x.task_id = '2'
|
|
|
self.assertEqual(x.id, '2')
|
|
|
|
|
|
def test_children(self):
|
|
|
- x = AsyncResult('1')
|
|
|
+ x = self.app.AsyncResult('1')
|
|
|
children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
|
|
|
x.backend = Mock()
|
|
|
x.backend.get_children.return_value = children
|
|
@@ -80,10 +78,23 @@ class test_AsyncResult(AppCase):
|
|
|
self.assertTrue(x.children)
|
|
|
self.assertEqual(len(x.children), 3)
|
|
|
|
|
|
+ def test_propagates_for_parent(self):
|
|
|
+ x = self.app.AsyncResult(uuid())
|
|
|
+ x.backend = Mock()
|
|
|
+ x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
|
|
|
+ with self.assertRaises(KeyError):
|
|
|
+ x.get(propagate=True)
|
|
|
+ self.assertFalse(x.backend.wait_for.called)
|
|
|
+
|
|
|
+ x.parent = EagerResult(uuid(), 42, states.SUCCESS)
|
|
|
+ x.get(propagate=True)
|
|
|
+ self.assertTrue(x.backend.wait_for.called)
|
|
|
+
|
|
|
def test_get_children(self):
|
|
|
tid = uuid()
|
|
|
- x = AsyncResult(tid)
|
|
|
- child = [AsyncResult(uuid()).serializable() for i in range(10)]
|
|
|
+ x = self.app.AsyncResult(tid)
|
|
|
+ child = [self.app.AsyncResult(uuid()).serializable()
|
|
|
+ for i in range(10)]
|
|
|
x.backend._cache[tid] = {'children': child}
|
|
|
self.assertTrue(x.children)
|
|
|
self.assertEqual(len(x.children), 10)
|
|
@@ -92,7 +103,7 @@ class test_AsyncResult(AppCase):
|
|
|
self.assertIsNone(x.children)
|
|
|
|
|
|
def test_build_graph_get_leaf_collect(self):
|
|
|
- x = AsyncResult('1')
|
|
|
+ x = self.app.AsyncResult('1')
|
|
|
x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
|
|
|
c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
|
|
|
x.iterdeps = Mock()
|
|
@@ -116,7 +127,7 @@ class test_AsyncResult(AppCase):
|
|
|
])
|
|
|
|
|
|
def test_iterdeps(self):
|
|
|
- x = AsyncResult('1')
|
|
|
+ x = self.app.AsyncResult('1')
|
|
|
x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
|
|
|
c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
|
|
|
for child in c:
|
|
@@ -139,45 +150,45 @@ class test_AsyncResult(AppCase):
|
|
|
list(x.iterdeps(intermediate=True))
|
|
|
|
|
|
def test_eq_not_implemented(self):
|
|
|
- self.assertFalse(AsyncResult('1') == object())
|
|
|
+ self.assertFalse(self.app.AsyncResult('1') == object())
|
|
|
|
|
|
def test_reduce(self):
|
|
|
- a1 = AsyncResult('uuid', task_name=mytask.name)
|
|
|
+ a1 = self.app.AsyncResult('uuid', task_name=mytask.name)
|
|
|
restored = pickle.loads(pickle.dumps(a1))
|
|
|
self.assertEqual(restored.id, 'uuid')
|
|
|
self.assertEqual(restored.task_name, mytask.name)
|
|
|
|
|
|
- a2 = AsyncResult('uuid')
|
|
|
+ a2 = self.app.AsyncResult('uuid')
|
|
|
self.assertEqual(pickle.loads(pickle.dumps(a2)).id, 'uuid')
|
|
|
|
|
|
def test_successful(self):
|
|
|
- ok_res = AsyncResult(self.task1['id'])
|
|
|
- nok_res = AsyncResult(self.task3['id'])
|
|
|
- nok_res2 = AsyncResult(self.task4['id'])
|
|
|
+ ok_res = self.app.AsyncResult(self.task1['id'])
|
|
|
+ nok_res = self.app.AsyncResult(self.task3['id'])
|
|
|
+ nok_res2 = self.app.AsyncResult(self.task4['id'])
|
|
|
|
|
|
self.assertTrue(ok_res.successful())
|
|
|
self.assertFalse(nok_res.successful())
|
|
|
self.assertFalse(nok_res2.successful())
|
|
|
|
|
|
- pending_res = AsyncResult(uuid())
|
|
|
+ pending_res = self.app.AsyncResult(uuid())
|
|
|
self.assertFalse(pending_res.successful())
|
|
|
|
|
|
def test_str(self):
|
|
|
- ok_res = AsyncResult(self.task1['id'])
|
|
|
- ok2_res = AsyncResult(self.task2['id'])
|
|
|
- nok_res = AsyncResult(self.task3['id'])
|
|
|
+ ok_res = self.app.AsyncResult(self.task1['id'])
|
|
|
+ ok2_res = self.app.AsyncResult(self.task2['id'])
|
|
|
+ nok_res = self.app.AsyncResult(self.task3['id'])
|
|
|
self.assertEqual(str(ok_res), self.task1['id'])
|
|
|
self.assertEqual(str(ok2_res), self.task2['id'])
|
|
|
self.assertEqual(str(nok_res), self.task3['id'])
|
|
|
|
|
|
pending_id = uuid()
|
|
|
- pending_res = AsyncResult(pending_id)
|
|
|
+ pending_res = self.app.AsyncResult(pending_id)
|
|
|
self.assertEqual(str(pending_res), pending_id)
|
|
|
|
|
|
def test_repr(self):
|
|
|
- ok_res = AsyncResult(self.task1['id'])
|
|
|
- ok2_res = AsyncResult(self.task2['id'])
|
|
|
- nok_res = AsyncResult(self.task3['id'])
|
|
|
+ ok_res = self.app.AsyncResult(self.task1['id'])
|
|
|
+ ok2_res = self.app.AsyncResult(self.task2['id'])
|
|
|
+ nok_res = self.app.AsyncResult(self.task3['id'])
|
|
|
self.assertEqual(repr(ok_res), '<AsyncResult: %s>' % (
|
|
|
self.task1['id']))
|
|
|
self.assertEqual(repr(ok2_res), '<AsyncResult: %s>' % (
|
|
@@ -186,32 +197,32 @@ class test_AsyncResult(AppCase):
|
|
|
self.task3['id']))
|
|
|
|
|
|
pending_id = uuid()
|
|
|
- pending_res = AsyncResult(pending_id)
|
|
|
+ pending_res = self.app.AsyncResult(pending_id)
|
|
|
self.assertEqual(repr(pending_res), '<AsyncResult: %s>' % (
|
|
|
pending_id))
|
|
|
|
|
|
def test_hash(self):
|
|
|
- self.assertEqual(hash(AsyncResult('x0w991')),
|
|
|
- hash(AsyncResult('x0w991')))
|
|
|
- self.assertNotEqual(hash(AsyncResult('x0w991')),
|
|
|
- hash(AsyncResult('x1w991')))
|
|
|
+ self.assertEqual(hash(self.app.AsyncResult('x0w991')),
|
|
|
+ hash(self.app.AsyncResult('x0w991')))
|
|
|
+ self.assertNotEqual(hash(self.app.AsyncResult('x0w991')),
|
|
|
+ hash(self.app.AsyncResult('x1w991')))
|
|
|
|
|
|
def test_get_traceback(self):
|
|
|
- ok_res = AsyncResult(self.task1['id'])
|
|
|
- nok_res = AsyncResult(self.task3['id'])
|
|
|
- nok_res2 = AsyncResult(self.task4['id'])
|
|
|
+ ok_res = self.app.AsyncResult(self.task1['id'])
|
|
|
+ nok_res = self.app.AsyncResult(self.task3['id'])
|
|
|
+ nok_res2 = self.app.AsyncResult(self.task4['id'])
|
|
|
self.assertFalse(ok_res.traceback)
|
|
|
self.assertTrue(nok_res.traceback)
|
|
|
self.assertTrue(nok_res2.traceback)
|
|
|
|
|
|
- pending_res = AsyncResult(uuid())
|
|
|
+ pending_res = self.app.AsyncResult(uuid())
|
|
|
self.assertFalse(pending_res.traceback)
|
|
|
|
|
|
def test_get(self):
|
|
|
- ok_res = AsyncResult(self.task1['id'])
|
|
|
- ok2_res = AsyncResult(self.task2['id'])
|
|
|
- nok_res = AsyncResult(self.task3['id'])
|
|
|
- nok2_res = AsyncResult(self.task4['id'])
|
|
|
+ ok_res = self.app.AsyncResult(self.task1['id'])
|
|
|
+ ok2_res = self.app.AsyncResult(self.task2['id'])
|
|
|
+ nok_res = self.app.AsyncResult(self.task3['id'])
|
|
|
+ nok2_res = self.app.AsyncResult(self.task4['id'])
|
|
|
|
|
|
self.assertEqual(ok_res.get(), 'the')
|
|
|
self.assertEqual(ok2_res.get(), 'quick')
|
|
@@ -222,42 +233,45 @@ class test_AsyncResult(AppCase):
|
|
|
self.assertEqual(ok_res.info, 'the')
|
|
|
|
|
|
def test_get_timeout(self):
|
|
|
- res = AsyncResult(self.task4['id']) # has RETRY state
|
|
|
+ res = self.app.AsyncResult(self.task4['id']) # has RETRY state
|
|
|
with self.assertRaises(TimeoutError):
|
|
|
- res.get(timeout=0.1)
|
|
|
+ res.get(timeout=0.001)
|
|
|
|
|
|
- pending_res = AsyncResult(uuid())
|
|
|
- with self.assertRaises(TimeoutError):
|
|
|
- pending_res.get(timeout=0.1)
|
|
|
+ pending_res = self.app.AsyncResult(uuid())
|
|
|
+ with patch('celery.result.time') as _time:
|
|
|
+ with self.assertRaises(TimeoutError):
|
|
|
+ pending_res.get(timeout=0.001, interval=0.001)
|
|
|
+ _time.sleep.assert_called_with(0.001)
|
|
|
|
|
|
- @skip_if_quick
|
|
|
def test_get_timeout_longer(self):
|
|
|
- res = AsyncResult(self.task4['id']) # has RETRY state
|
|
|
- with self.assertRaises(TimeoutError):
|
|
|
- res.get(timeout=1)
|
|
|
+ res = self.app.AsyncResult(self.task4['id']) # has RETRY state
|
|
|
+ with patch('celery.result.time') as _time:
|
|
|
+ with self.assertRaises(TimeoutError):
|
|
|
+ res.get(timeout=1, interval=1)
|
|
|
+ _time.sleep.assert_called_with(1)
|
|
|
|
|
|
def test_ready(self):
|
|
|
- oks = (AsyncResult(self.task1['id']),
|
|
|
- AsyncResult(self.task2['id']),
|
|
|
- AsyncResult(self.task3['id']))
|
|
|
+ oks = (self.app.AsyncResult(self.task1['id']),
|
|
|
+ self.app.AsyncResult(self.task2['id']),
|
|
|
+ self.app.AsyncResult(self.task3['id']))
|
|
|
self.assertTrue(all(result.ready() for result in oks))
|
|
|
- self.assertFalse(AsyncResult(self.task4['id']).ready())
|
|
|
+ self.assertFalse(self.app.AsyncResult(self.task4['id']).ready())
|
|
|
|
|
|
- self.assertFalse(AsyncResult(uuid()).ready())
|
|
|
+ self.assertFalse(self.app.AsyncResult(uuid()).ready())
|
|
|
|
|
|
|
|
|
class test_ResultSet(AppCase):
|
|
|
|
|
|
def test_resultset_repr(self):
|
|
|
self.assertTrue(repr(ResultSet(
|
|
|
- [AsyncResult(t) for t in ['1', '2', '3']])))
|
|
|
+ [self.app.AsyncResult(t) for t in ['1', '2', '3']])))
|
|
|
|
|
|
def test_eq_other(self):
|
|
|
self.assertFalse(ResultSet([1, 3, 3]) == 1)
|
|
|
self.assertTrue(ResultSet([1]) == ResultSet([1]))
|
|
|
|
|
|
def test_get(self):
|
|
|
- x = ResultSet([AsyncResult(t) for t in [1, 2, 3]])
|
|
|
+ x = ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])
|
|
|
b = x.results[0].backend = Mock()
|
|
|
b.supports_native_join = False
|
|
|
x.join_native = Mock()
|
|
@@ -275,16 +289,65 @@ class test_ResultSet(AppCase):
|
|
|
x.add(2)
|
|
|
self.assertEqual(len(x), 2)
|
|
|
|
|
|
+ @contextmanager
|
|
|
+ def dummy_copy(self):
|
|
|
+ with patch('celery.result.copy') as copy:
|
|
|
+
|
|
|
+ def passt(arg):
|
|
|
+ return arg
|
|
|
+ copy.side_effect = passt
|
|
|
+
|
|
|
+ yield
|
|
|
+
|
|
|
+ def test_iterate_respects_subpolling_interval(self):
|
|
|
+ r1 = self.app.AsyncResult(uuid())
|
|
|
+ r2 = self.app.AsyncResult(uuid())
|
|
|
+ backend = r1.backend = r2.backend = Mock()
|
|
|
+ backend.subpolling_interval = 10
|
|
|
+
|
|
|
+ ready = r1.ready = r2.ready = Mock()
|
|
|
+
|
|
|
+ def se(*args, **kwargs):
|
|
|
+ ready.side_effect = KeyError()
|
|
|
+ return False
|
|
|
+ ready.return_value = False
|
|
|
+ ready.side_effect = se
|
|
|
+
|
|
|
+ x = ResultSet([r1, r2])
|
|
|
+ with self.dummy_copy():
|
|
|
+ with patch('celery.result.time') as _time:
|
|
|
+ with self.assertRaises(KeyError):
|
|
|
+ list(x.iterate())
|
|
|
+ _time.sleep.assert_called_with(10)
|
|
|
+
|
|
|
+ backend.subpolling_interval = 0
|
|
|
+ with patch('celery.result.time') as _time:
|
|
|
+ with self.assertRaises(KeyError):
|
|
|
+ ready.return_value = False
|
|
|
+ ready.side_effect = se
|
|
|
+ list(x.iterate())
|
|
|
+ self.assertFalse(_time.sleep.called)
|
|
|
+
|
|
|
+ def test_times_out(self):
|
|
|
+ r1 = self.app.AsyncResult(uuid)
|
|
|
+ r1.ready = Mock()
|
|
|
+ r1.ready.return_value = False
|
|
|
+ x = ResultSet([r1])
|
|
|
+ with self.dummy_copy():
|
|
|
+ with patch('celery.result.time') as _time:
|
|
|
+ with self.assertRaises(TimeoutError):
|
|
|
+ list(x.iterate(timeout=1))
|
|
|
+
|
|
|
def test_add_discard(self):
|
|
|
x = ResultSet([])
|
|
|
- x.add(AsyncResult('1'))
|
|
|
- self.assertIn(AsyncResult('1'), x.results)
|
|
|
- x.discard(AsyncResult('1'))
|
|
|
- x.discard(AsyncResult('1'))
|
|
|
+ x.add(self.app.AsyncResult('1'))
|
|
|
+ self.assertIn(self.app.AsyncResult('1'), x.results)
|
|
|
+ x.discard(self.app.AsyncResult('1'))
|
|
|
+ x.discard(self.app.AsyncResult('1'))
|
|
|
x.discard('1')
|
|
|
- self.assertNotIn(AsyncResult('1'), x.results)
|
|
|
+ self.assertNotIn(self.app.AsyncResult('1'), x.results)
|
|
|
|
|
|
- x.update([AsyncResult('2')])
|
|
|
+ x.update([self.app.AsyncResult('2')])
|
|
|
|
|
|
def test_clear(self):
|
|
|
x = ResultSet([])
|
|
@@ -342,7 +405,7 @@ class test_TaskSetResult(AppCase):
|
|
|
|
|
|
def setup(self):
|
|
|
self.size = 10
|
|
|
- self.ts = TaskSetResult(uuid(), make_mock_group(self.size))
|
|
|
+ self.ts = TaskSetResult(uuid(), make_mock_group(self.app, self.size))
|
|
|
|
|
|
def test_total(self):
|
|
|
self.assertEqual(self.ts.total, self.size)
|
|
@@ -367,7 +430,15 @@ class test_GroupResult(AppCase):
|
|
|
|
|
|
def setup(self):
|
|
|
self.size = 10
|
|
|
- self.ts = GroupResult(uuid(), make_mock_group(self.size))
|
|
|
+ self.ts = self.app.GroupResult(
|
|
|
+ uuid(), make_mock_group(self.app, self.size),
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_is_pickleable(self):
|
|
|
+ ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
|
|
|
+ self.assertEqual(pickle.loads(pickle.dumps(ts)), ts)
|
|
|
+ ts2 = GroupResult(uuid(), [self.app.AsyncResult(uuid())])
|
|
|
+ self.assertEqual(pickle.loads(pickle.dumps(ts2)), ts2)
|
|
|
|
|
|
def test_len(self):
|
|
|
self.assertEqual(len(self.ts), self.size)
|
|
@@ -376,64 +447,92 @@ class test_GroupResult(AppCase):
|
|
|
self.assertFalse(self.ts == 1)
|
|
|
|
|
|
def test_reduce(self):
|
|
|
- self.assertTrue(loads(dumps(self.ts)))
|
|
|
+ self.assertTrue(pickle.loads(pickle.dumps(self.ts)))
|
|
|
|
|
|
def test_iterate_raises(self):
|
|
|
- ar = MockAsyncResultFailure(uuid())
|
|
|
- ts = GroupResult(uuid(), [ar])
|
|
|
+ ar = MockAsyncResultFailure(uuid(), app=self.app)
|
|
|
+ ts = self.app.GroupResult(uuid(), [ar])
|
|
|
it = ts.iterate()
|
|
|
with self.assertRaises(KeyError):
|
|
|
next(it)
|
|
|
|
|
|
def test_forget(self):
|
|
|
- subs = [MockAsyncResultSuccess(uuid()),
|
|
|
- MockAsyncResultSuccess(uuid())]
|
|
|
- ts = GroupResult(uuid(), subs)
|
|
|
+ subs = [MockAsyncResultSuccess(uuid(), app=self.app),
|
|
|
+ MockAsyncResultSuccess(uuid(), app=self.app)]
|
|
|
+ ts = self.app.GroupResult(uuid(), subs)
|
|
|
ts.forget()
|
|
|
for sub in subs:
|
|
|
self.assertTrue(sub.forgotten)
|
|
|
|
|
|
def test_getitem(self):
|
|
|
- subs = [MockAsyncResultSuccess(uuid()),
|
|
|
- MockAsyncResultSuccess(uuid())]
|
|
|
- ts = GroupResult(uuid(), subs)
|
|
|
+ subs = [MockAsyncResultSuccess(uuid(), app=self.app),
|
|
|
+ MockAsyncResultSuccess(uuid(), app=self.app)]
|
|
|
+ ts = self.app.GroupResult(uuid(), subs)
|
|
|
self.assertIs(ts[0], subs[0])
|
|
|
|
|
|
def test_save_restore(self):
|
|
|
- subs = [MockAsyncResultSuccess(uuid()),
|
|
|
- MockAsyncResultSuccess(uuid())]
|
|
|
- ts = GroupResult(uuid(), subs)
|
|
|
+ subs = [MockAsyncResultSuccess(uuid(), app=self.app),
|
|
|
+ MockAsyncResultSuccess(uuid(), app=self.app)]
|
|
|
+ ts = self.app.GroupResult(uuid(), subs)
|
|
|
ts.save()
|
|
|
with self.assertRaises(AttributeError):
|
|
|
ts.save(backend=object())
|
|
|
- self.assertEqual(GroupResult.restore(ts.id).subtasks,
|
|
|
+ self.assertEqual(self.app.GroupResult.restore(ts.id).subtasks,
|
|
|
ts.subtasks)
|
|
|
ts.delete()
|
|
|
- self.assertIsNone(GroupResult.restore(ts.id))
|
|
|
+ self.assertIsNone(self.app.GroupResult.restore(ts.id))
|
|
|
with self.assertRaises(AttributeError):
|
|
|
- GroupResult.restore(ts.id, backend=object())
|
|
|
+ self.app.GroupResult.restore(ts.id, backend=object())
|
|
|
|
|
|
def test_join_native(self):
|
|
|
backend = SimpleBackend()
|
|
|
- subtasks = [AsyncResult(uuid(), backend=backend)
|
|
|
+ subtasks = [self.app.AsyncResult(uuid(), backend=backend)
|
|
|
for i in range(10)]
|
|
|
- ts = GroupResult(uuid(), subtasks)
|
|
|
+ ts = self.app.GroupResult(uuid(), subtasks)
|
|
|
backend.ids = [subtask.id for subtask in subtasks]
|
|
|
res = ts.join_native()
|
|
|
self.assertEqual(res, list(range(10)))
|
|
|
|
|
|
+ def test_join_native_raises(self):
|
|
|
+ ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
|
|
|
+ ts.iter_native = Mock()
|
|
|
+ ts.iter_native.return_value = iter([
|
|
|
+ (uuid(), {'status': states.FAILURE, 'result': KeyError()})
|
|
|
+ ])
|
|
|
+ with self.assertRaises(KeyError):
|
|
|
+ ts.join_native(propagate=True)
|
|
|
+
|
|
|
+ def test_failed_join_report(self):
|
|
|
+ res = Mock()
|
|
|
+ ts = self.app.GroupResult(uuid(), [res])
|
|
|
+ res.state = states.FAILURE
|
|
|
+ res.backend.is_cached.return_value = True
|
|
|
+ self.assertIs(next(ts._failed_join_report()), res)
|
|
|
+ res.backend.is_cached.return_value = False
|
|
|
+ with self.assertRaises(StopIteration):
|
|
|
+ next(ts._failed_join_report())
|
|
|
+
|
|
|
+ def test_repr(self):
|
|
|
+ self.assertTrue(repr(
|
|
|
+ self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
|
|
|
+ ))
|
|
|
+
|
|
|
+ def test_children_is_results(self):
|
|
|
+ ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
|
|
|
+ self.assertIs(ts.children, ts.results)
|
|
|
+
|
|
|
def test_iter_native(self):
|
|
|
backend = SimpleBackend()
|
|
|
- subtasks = [AsyncResult(uuid(), backend=backend)
|
|
|
+ subtasks = [self.app.AsyncResult(uuid(), backend=backend)
|
|
|
for i in range(10)]
|
|
|
- ts = GroupResult(uuid(), subtasks)
|
|
|
+ ts = self.app.GroupResult(uuid(), subtasks)
|
|
|
backend.ids = [subtask.id for subtask in subtasks]
|
|
|
self.assertEqual(len(list(ts.iter_native())), 10)
|
|
|
|
|
|
def test_iterate_yields(self):
|
|
|
- ar = MockAsyncResultSuccess(uuid())
|
|
|
- ar2 = MockAsyncResultSuccess(uuid())
|
|
|
- ts = GroupResult(uuid(), [ar, ar2])
|
|
|
+ ar = MockAsyncResultSuccess(uuid(), app=self.app)
|
|
|
+ ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
|
|
|
+ ts = self.app.GroupResult(uuid(), [ar, ar2])
|
|
|
it = ts.iterate()
|
|
|
self.assertEqual(next(it), 42)
|
|
|
self.assertEqual(next(it), 42)
|
|
@@ -441,19 +540,28 @@ class test_GroupResult(AppCase):
|
|
|
def test_iterate_eager(self):
|
|
|
ar1 = EagerResult(uuid(), 42, states.SUCCESS)
|
|
|
ar2 = EagerResult(uuid(), 42, states.SUCCESS)
|
|
|
- ts = GroupResult(uuid(), [ar1, ar2])
|
|
|
+ ts = self.app.GroupResult(uuid(), [ar1, ar2])
|
|
|
it = ts.iterate()
|
|
|
self.assertEqual(next(it), 42)
|
|
|
self.assertEqual(next(it), 42)
|
|
|
|
|
|
def test_join_timeout(self):
|
|
|
- ar = MockAsyncResultSuccess(uuid())
|
|
|
- ar2 = MockAsyncResultSuccess(uuid())
|
|
|
- ar3 = AsyncResult(uuid())
|
|
|
- ts = GroupResult(uuid(), [ar, ar2, ar3])
|
|
|
+ ar = MockAsyncResultSuccess(uuid(), app=self.app)
|
|
|
+ ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
|
|
|
+ ar3 = self.app.AsyncResult(uuid())
|
|
|
+ ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
|
|
|
with self.assertRaises(TimeoutError):
|
|
|
ts.join(timeout=0.0000001)
|
|
|
|
|
|
+ ar4 = self.app.AsyncResult(uuid())
|
|
|
+ ar4.get = Mock()
|
|
|
+ ts2 = self.app.GroupResult(uuid(), [ar4])
|
|
|
+ self.assertTrue(ts2.join(timeout=0.1))
|
|
|
+
|
|
|
+ def test_iter_native_when_empty_group(self):
|
|
|
+ ts = self.app.GroupResult(uuid(), [])
|
|
|
+ self.assertListEqual(list(ts.iter_native()), [])
|
|
|
+
|
|
|
def test_iterate_simple(self):
|
|
|
it = self.ts.iterate()
|
|
|
results = sorted(list(it))
|
|
@@ -485,7 +593,7 @@ class test_GroupResult(AppCase):
|
|
|
class test_pending_AsyncResult(AppCase):
|
|
|
|
|
|
def setup(self):
|
|
|
- self.task = AsyncResult(uuid())
|
|
|
+ self.task = self.app.AsyncResult(uuid())
|
|
|
|
|
|
def test_result(self):
|
|
|
self.assertIsNone(self.task.result)
|
|
@@ -495,11 +603,11 @@ class test_failed_AsyncResult(test_GroupResult):
|
|
|
|
|
|
def setup(self):
|
|
|
self.size = 11
|
|
|
- subtasks = make_mock_group(10)
|
|
|
+ subtasks = make_mock_group(self.app, 10)
|
|
|
failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))
|
|
|
- save_result(failed)
|
|
|
- failed_res = AsyncResult(failed['id'])
|
|
|
- self.ts = GroupResult(uuid(), subtasks + [failed_res])
|
|
|
+ save_result(self.app, failed)
|
|
|
+ failed_res = self.app.AsyncResult(failed['id'])
|
|
|
+ self.ts = self.app.GroupResult(uuid(), subtasks + [failed_res])
|
|
|
|
|
|
def test_completed_count(self):
|
|
|
self.assertEqual(self.ts.completed_count(), len(self.ts) - 1)
|
|
@@ -527,8 +635,9 @@ class test_failed_AsyncResult(test_GroupResult):
|
|
|
class test_pending_Group(AppCase):
|
|
|
|
|
|
def setup(self):
|
|
|
- self.ts = GroupResult(uuid(), [AsyncResult(uuid()),
|
|
|
- AsyncResult(uuid())])
|
|
|
+ self.ts = self.app.GroupResult(
|
|
|
+ uuid(), [self.app.AsyncResult(uuid()),
|
|
|
+ self.app.AsyncResult(uuid())])
|
|
|
|
|
|
def test_completed_count(self):
|
|
|
self.assertEqual(self.ts.completed_count(), 0)
|
|
@@ -581,11 +690,26 @@ class test_EagerResult(AppCase):
|
|
|
class test_serializable(AppCase):
|
|
|
|
|
|
def test_AsyncResult(self):
|
|
|
- x = AsyncResult(uuid())
|
|
|
+ x = self.app.AsyncResult(uuid())
|
|
|
self.assertEqual(x, from_serializable(x.serializable(), self.app))
|
|
|
self.assertEqual(x, from_serializable(x, self.app))
|
|
|
|
|
|
+ def test_with_parent(self):
|
|
|
+ x = self.app.AsyncResult(uuid())
|
|
|
+ x.parent = self.app.AsyncResult(uuid())
|
|
|
+ y = from_serializable(x.serializable(), self.app)
|
|
|
+ self.assertEqual(y, x)
|
|
|
+ self.assertEqual(y.parent, x.parent)
|
|
|
+ self.assertIsInstance(y.parent, AsyncResult)
|
|
|
+
|
|
|
+ def test_compat(self):
|
|
|
+ uid = uuid()
|
|
|
+ x = from_serializable([uid, []])
|
|
|
+ self.assertEqual(x.id, uid)
|
|
|
+
|
|
|
def test_GroupResult(self):
|
|
|
- x = GroupResult(uuid(), [AsyncResult(uuid()) for _ in range(10)])
|
|
|
+ x = self.app.GroupResult(
|
|
|
+ uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
|
|
|
+ )
|
|
|
self.assertEqual(x, from_serializable(x.serializable(), self.app))
|
|
|
self.assertEqual(x, from_serializable(x, self.app))
|