test_result.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. from __future__ import generators
  2. import unittest2 as unittest
  3. from celery import states
  4. from celery.utils import gen_unique_id
  5. from celery.utils.compat import all
  6. from celery.result import AsyncResult, TaskSetResult
  7. from celery.backends import default_backend
  8. from celery.exceptions import TimeoutError
  9. from celery.task.base import Task
  10. from celery.tests.utils import skip_if_quick
  11. def mock_task(name, status, result):
  12. return dict(id=gen_unique_id(), name=name, status=status, result=result)
  13. def save_result(task):
  14. traceback = "Some traceback"
  15. if task["status"] == states.SUCCESS:
  16. default_backend.mark_as_done(task["id"], task["result"])
  17. elif task["status"] == states.RETRY:
  18. default_backend.mark_as_retry(task["id"], task["result"],
  19. traceback=traceback)
  20. else:
  21. default_backend.mark_as_failure(task["id"], task["result"],
  22. traceback=traceback)
  23. def make_mock_taskset(size=10):
  24. tasks = [mock_task("ts%d" % i, states.SUCCESS, i) for i in xrange(size)]
  25. [save_result(task) for task in tasks]
  26. return [AsyncResult(task["id"]) for task in tasks]
  27. class TestAsyncResult(unittest.TestCase):
  28. def setUp(self):
  29. self.task1 = mock_task("task1", states.SUCCESS, "the")
  30. self.task2 = mock_task("task2", states.SUCCESS, "quick")
  31. self.task3 = mock_task("task3", states.FAILURE, KeyError("brown"))
  32. self.task4 = mock_task("task3", states.RETRY, KeyError("red"))
  33. for task in (self.task1, self.task2, self.task3, self.task4):
  34. save_result(task)
  35. def test_successful(self):
  36. ok_res = AsyncResult(self.task1["id"])
  37. nok_res = AsyncResult(self.task3["id"])
  38. nok_res2 = AsyncResult(self.task4["id"])
  39. self.assertTrue(ok_res.successful())
  40. self.assertFalse(nok_res.successful())
  41. self.assertFalse(nok_res2.successful())
  42. pending_res = AsyncResult(gen_unique_id())
  43. self.assertFalse(pending_res.successful())
  44. def test_str(self):
  45. ok_res = AsyncResult(self.task1["id"])
  46. ok2_res = AsyncResult(self.task2["id"])
  47. nok_res = AsyncResult(self.task3["id"])
  48. self.assertEqual(str(ok_res), self.task1["id"])
  49. self.assertEqual(str(ok2_res), self.task2["id"])
  50. self.assertEqual(str(nok_res), self.task3["id"])
  51. pending_id = gen_unique_id()
  52. pending_res = AsyncResult(pending_id)
  53. self.assertEqual(str(pending_res), pending_id)
  54. def test_repr(self):
  55. ok_res = AsyncResult(self.task1["id"])
  56. ok2_res = AsyncResult(self.task2["id"])
  57. nok_res = AsyncResult(self.task3["id"])
  58. self.assertEqual(repr(ok_res), "<AsyncResult: %s>" % (
  59. self.task1["id"]))
  60. self.assertEqual(repr(ok2_res), "<AsyncResult: %s>" % (
  61. self.task2["id"]))
  62. self.assertEqual(repr(nok_res), "<AsyncResult: %s>" % (
  63. self.task3["id"]))
  64. pending_id = gen_unique_id()
  65. pending_res = AsyncResult(pending_id)
  66. self.assertEqual(repr(pending_res), "<AsyncResult: %s>" % (
  67. pending_id))
  68. def test_get_traceback(self):
  69. ok_res = AsyncResult(self.task1["id"])
  70. nok_res = AsyncResult(self.task3["id"])
  71. nok_res2 = AsyncResult(self.task4["id"])
  72. self.assertFalse(ok_res.traceback)
  73. self.assertTrue(nok_res.traceback)
  74. self.assertTrue(nok_res2.traceback)
  75. pending_res = AsyncResult(gen_unique_id())
  76. self.assertFalse(pending_res.traceback)
  77. def test_get(self):
  78. ok_res = AsyncResult(self.task1["id"])
  79. ok2_res = AsyncResult(self.task2["id"])
  80. nok_res = AsyncResult(self.task3["id"])
  81. nok2_res = AsyncResult(self.task4["id"])
  82. self.assertEqual(ok_res.get(), "the")
  83. self.assertEqual(ok2_res.get(), "quick")
  84. self.assertRaises(KeyError, nok_res.get)
  85. self.assertIsInstance(nok2_res.result, KeyError)
  86. def test_get_timeout(self):
  87. res = AsyncResult(self.task4["id"]) # has RETRY status
  88. self.assertRaises(TimeoutError, res.get, timeout=0.1)
  89. pending_res = AsyncResult(gen_unique_id())
  90. self.assertRaises(TimeoutError, pending_res.get, timeout=0.1)
  91. @skip_if_quick
  92. def test_get_timeout_longer(self):
  93. res = AsyncResult(self.task4["id"]) # has RETRY status
  94. self.assertRaises(TimeoutError, res.get, timeout=1)
  95. def test_ready(self):
  96. oks = (AsyncResult(self.task1["id"]),
  97. AsyncResult(self.task2["id"]),
  98. AsyncResult(self.task3["id"]))
  99. self.assertTrue(all(result.ready() for result in oks))
  100. self.assertFalse(AsyncResult(self.task4["id"]).ready())
  101. self.assertFalse(AsyncResult(gen_unique_id()).ready())
  102. class MockAsyncResultFailure(AsyncResult):
  103. @property
  104. def result(self):
  105. return KeyError("baz")
  106. @property
  107. def status(self):
  108. return states.FAILURE
  109. class MockAsyncResultSuccess(AsyncResult):
  110. @property
  111. def result(self):
  112. return 42
  113. @property
  114. def status(self):
  115. return states.SUCCESS
  116. class TestTaskSetResult(unittest.TestCase):
  117. def setUp(self):
  118. self.size = 10
  119. self.ts = TaskSetResult(gen_unique_id(), make_mock_taskset(self.size))
  120. def test_total(self):
  121. self.assertEqual(self.ts.total, self.size)
  122. def test_iterate_raises(self):
  123. ar = MockAsyncResultFailure(gen_unique_id())
  124. ts = TaskSetResult(gen_unique_id(), [ar])
  125. it = iter(ts)
  126. self.assertRaises(KeyError, it.next)
  127. def test_iterate_yields(self):
  128. ar = MockAsyncResultSuccess(gen_unique_id())
  129. ar2 = MockAsyncResultSuccess(gen_unique_id())
  130. ts = TaskSetResult(gen_unique_id(), [ar, ar2])
  131. it = iter(ts)
  132. self.assertEqual(it.next(), 42)
  133. self.assertEqual(it.next(), 42)
  134. def test_join_timeout(self):
  135. ar = MockAsyncResultSuccess(gen_unique_id())
  136. ar2 = MockAsyncResultSuccess(gen_unique_id())
  137. ar3 = AsyncResult(gen_unique_id())
  138. ts = TaskSetResult(gen_unique_id(), [ar, ar2, ar3])
  139. self.assertRaises(TimeoutError, ts.join, timeout=0.0000001)
  140. def test_itersubtasks(self):
  141. it = self.ts.itersubtasks()
  142. for i, t in enumerate(it):
  143. self.assertEqual(t.get(), i)
  144. def test___iter__(self):
  145. it = iter(self.ts)
  146. results = sorted(list(it))
  147. self.assertListEqual(results, list(xrange(self.size)))
  148. def test_join(self):
  149. joined = self.ts.join()
  150. self.assertListEqual(joined, list(xrange(self.size)))
  151. def test_successful(self):
  152. self.assertTrue(self.ts.successful())
  153. def test_failed(self):
  154. self.assertFalse(self.ts.failed())
  155. def test_waiting(self):
  156. self.assertFalse(self.ts.waiting())
  157. def test_ready(self):
  158. self.assertTrue(self.ts.ready())
  159. def test_completed_count(self):
  160. self.assertEqual(self.ts.completed_count(), self.ts.total)
  161. class TestPendingAsyncResult(unittest.TestCase):
  162. def setUp(self):
  163. self.task = AsyncResult(gen_unique_id())
  164. def test_result(self):
  165. self.assertIsNone(self.task.result)
  166. class TestFailedTaskSetResult(TestTaskSetResult):
  167. def setUp(self):
  168. self.size = 11
  169. subtasks = make_mock_taskset(10)
  170. failed = mock_task("ts11", states.FAILURE, KeyError("Baz"))
  171. save_result(failed)
  172. failed_res = AsyncResult(failed["id"])
  173. self.ts = TaskSetResult(gen_unique_id(), subtasks + [failed_res])
  174. def test_itersubtasks(self):
  175. it = self.ts.itersubtasks()
  176. for i in xrange(self.size - 1):
  177. t = it.next()
  178. self.assertEqual(t.get(), i)
  179. self.assertRaises(KeyError, it.next().get)
  180. def test_completed_count(self):
  181. self.assertEqual(self.ts.completed_count(), self.ts.total - 1)
  182. def test___iter__(self):
  183. it = iter(self.ts)
  184. def consume():
  185. return list(it)
  186. self.assertRaises(KeyError, consume)
  187. def test_join(self):
  188. self.assertRaises(KeyError, self.ts.join)
  189. def test_successful(self):
  190. self.assertFalse(self.ts.successful())
  191. def test_failed(self):
  192. self.assertTrue(self.ts.failed())
  193. class TestTaskSetPending(unittest.TestCase):
  194. def setUp(self):
  195. self.ts = TaskSetResult(gen_unique_id(), [
  196. AsyncResult(gen_unique_id()),
  197. AsyncResult(gen_unique_id())])
  198. def test_completed_count(self):
  199. self.assertEqual(self.ts.completed_count(), 0)
  200. def test_ready(self):
  201. self.assertFalse(self.ts.ready())
  202. def test_waiting(self):
  203. self.assertTrue(self.ts.waiting())
  204. def x_join(self):
  205. self.assertRaises(TimeoutError, self.ts.join, timeout=0.001)
  206. @skip_if_quick
  207. def x_join_longer(self):
  208. self.assertRaises(TimeoutError, self.ts.join, timeout=1)
  209. class RaisingTask(Task):
  210. def run(self, x, y):
  211. raise KeyError("xy")
  212. class TestEagerResult(unittest.TestCase):
  213. def test_wait_raises(self):
  214. res = RaisingTask.apply(args=[3, 3])
  215. self.assertRaises(KeyError, res.wait)
  216. def test_revoke(self):
  217. res = RaisingTask.apply(args=[3, 3])
  218. self.assertFalse(res.revoke())