test_result.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. import unittest
  2. from celery.utils import gen_unique_id
  3. from celery.tests.utils import skip_if_quick
  4. from celery.result import AsyncResult, TaskSetResult
  5. from celery.backends import default_backend
  6. from celery.exceptions import TimeoutError
  7. from celery.task.base import Task
  8. def mock_task(name, status, result):
  9. return dict(id=gen_unique_id(), name=name, status=status, result=result)
  10. def save_result(task):
  11. traceback = "Some traceback"
  12. if task["status"] == "SUCCESS":
  13. default_backend.mark_as_done(task["id"], task["result"])
  14. elif task["status"] == "RETRY":
  15. default_backend.mark_as_retry(task["id"], task["result"],
  16. traceback=traceback)
  17. else:
  18. default_backend.mark_as_failure(task["id"], task["result"],
  19. traceback=traceback)
  20. def make_mock_taskset(size=10):
  21. tasks = [mock_task("ts%d" % i, "SUCCESS", i) for i in xrange(size)]
  22. [save_result(task) for task in tasks]
  23. return [AsyncResult(task["id"]) for task in tasks]
  24. class TestAsyncResult(unittest.TestCase):
  25. def setUp(self):
  26. self.task1 = mock_task("task1", "SUCCESS", "the")
  27. self.task2 = mock_task("task2", "SUCCESS", "quick")
  28. self.task3 = mock_task("task3", "FAILURE", KeyError("brown"))
  29. self.task4 = mock_task("task3", "RETRY", KeyError("red"))
  30. for task in (self.task1, self.task2, self.task3, self.task4):
  31. save_result(task)
  32. def test_successful(self):
  33. ok_res = AsyncResult(self.task1["id"])
  34. nok_res = AsyncResult(self.task3["id"])
  35. nok_res2 = AsyncResult(self.task4["id"])
  36. self.assertTrue(ok_res.successful())
  37. self.assertFalse(nok_res.successful())
  38. self.assertFalse(nok_res2.successful())
  39. def test_str(self):
  40. ok_res = AsyncResult(self.task1["id"])
  41. ok2_res = AsyncResult(self.task2["id"])
  42. nok_res = AsyncResult(self.task3["id"])
  43. self.assertEquals(str(ok_res), self.task1["id"])
  44. self.assertEquals(str(ok2_res), self.task2["id"])
  45. self.assertEquals(str(nok_res), self.task3["id"])
  46. def test_repr(self):
  47. ok_res = AsyncResult(self.task1["id"])
  48. ok2_res = AsyncResult(self.task2["id"])
  49. nok_res = AsyncResult(self.task3["id"])
  50. self.assertEquals(repr(ok_res), "<AsyncResult: %s>" % (
  51. self.task1["id"]))
  52. self.assertEquals(repr(ok2_res), "<AsyncResult: %s>" % (
  53. self.task2["id"]))
  54. self.assertEquals(repr(nok_res), "<AsyncResult: %s>" % (
  55. self.task3["id"]))
  56. def test_get_traceback(self):
  57. ok_res = AsyncResult(self.task1["id"])
  58. nok_res = AsyncResult(self.task3["id"])
  59. nok_res2 = AsyncResult(self.task4["id"])
  60. self.assertFalse(ok_res.traceback)
  61. self.assertTrue(nok_res.traceback)
  62. self.assertTrue(nok_res2.traceback)
  63. def test_get(self):
  64. ok_res = AsyncResult(self.task1["id"])
  65. ok2_res = AsyncResult(self.task2["id"])
  66. nok_res = AsyncResult(self.task3["id"])
  67. nok2_res = AsyncResult(self.task4["id"])
  68. self.assertEquals(ok_res.get(), "the")
  69. self.assertEquals(ok2_res.get(), "quick")
  70. self.assertRaises(KeyError, nok_res.get)
  71. self.assertTrue(isinstance(nok2_res.result, KeyError))
  72. def test_get_timeout(self):
  73. res = AsyncResult(self.task4["id"]) # has RETRY status
  74. self.assertRaises(TimeoutError, res.get, timeout=0.1)
  75. @skip_if_quick
  76. def test_get_timeout_longer(self):
  77. res = AsyncResult(self.task4["id"]) # has RETRY status
  78. self.assertRaises(TimeoutError, res.get, timeout=1)
  79. def test_ready(self):
  80. oks = (AsyncResult(self.task1["id"]),
  81. AsyncResult(self.task2["id"]),
  82. AsyncResult(self.task3["id"]))
  83. [self.assertTrue(ok.ready()) for ok in oks]
  84. self.assertFalse(AsyncResult(self.task4["id"]).ready())
  85. class MockAsyncResultFailure(AsyncResult):
  86. @property
  87. def result(self):
  88. return KeyError("baz")
  89. @property
  90. def status(self):
  91. return "FAILURE"
  92. class MockAsyncResultSuccess(AsyncResult):
  93. @property
  94. def result(self):
  95. return 42
  96. @property
  97. def status(self):
  98. return "SUCCESS"
  99. class TestTaskSetResult(unittest.TestCase):
  100. def setUp(self):
  101. self.size = 10
  102. self.ts = TaskSetResult(gen_unique_id(), make_mock_taskset(self.size))
  103. def test_total(self):
  104. self.assertEquals(self.ts.total, self.size)
  105. def test_iterate_raises(self):
  106. ar = MockAsyncResultFailure(gen_unique_id())
  107. ts = TaskSetResult(gen_unique_id(), [ar])
  108. it = iter(ts)
  109. self.assertRaises(KeyError, it.next)
  110. def test_iterate_yields(self):
  111. ar = MockAsyncResultSuccess(gen_unique_id())
  112. ar2 = MockAsyncResultSuccess(gen_unique_id())
  113. ts = TaskSetResult(gen_unique_id(), [ar, ar2])
  114. it = iter(ts)
  115. self.assertEquals(it.next(), 42)
  116. self.assertEquals(it.next(), 42)
  117. def test_join_timeout(self):
  118. ar = MockAsyncResultSuccess(gen_unique_id())
  119. ar2 = MockAsyncResultSuccess(gen_unique_id())
  120. ar3 = AsyncResult(gen_unique_id())
  121. ts = TaskSetResult(gen_unique_id(), [ar, ar2, ar3])
  122. self.assertRaises(TimeoutError, ts.join, timeout=0.0000001)
  123. def test_itersubtasks(self):
  124. it = self.ts.itersubtasks()
  125. for i, t in enumerate(it):
  126. self.assertEquals(t.get(), i)
  127. def test___iter__(self):
  128. it = iter(self.ts)
  129. results = sorted(list(it))
  130. self.assertEquals(results, list(xrange(self.size)))
  131. def test_join(self):
  132. joined = self.ts.join()
  133. self.assertEquals(joined, list(xrange(self.size)))
  134. def test_successful(self):
  135. self.assertTrue(self.ts.successful())
  136. def test_failed(self):
  137. self.assertFalse(self.ts.failed())
  138. def test_waiting(self):
  139. self.assertFalse(self.ts.waiting())
  140. def test_ready(self):
  141. self.assertTrue(self.ts.ready())
  142. def test_completed_count(self):
  143. self.assertEquals(self.ts.completed_count(), self.ts.total)
  144. class TestPendingAsyncResult(unittest.TestCase):
  145. def setUp(self):
  146. self.task = AsyncResult(gen_unique_id())
  147. def test_result(self):
  148. self.assertTrue(self.task.result is None)
  149. class TestFailedTaskSetResult(TestTaskSetResult):
  150. def setUp(self):
  151. self.size = 11
  152. subtasks = make_mock_taskset(10)
  153. failed = mock_task("ts11", "FAILED", KeyError("Baz"))
  154. save_result(failed)
  155. failed_res = AsyncResult(failed["id"])
  156. self.ts = TaskSetResult(gen_unique_id(), subtasks + [failed_res])
  157. def test_itersubtasks(self):
  158. it = self.ts.itersubtasks()
  159. for i in xrange(self.size - 1):
  160. t = it.next()
  161. self.assertEquals(t.get(), i)
  162. self.assertRaises(KeyError, it.next().get)
  163. def test_completed_count(self):
  164. self.assertEquals(self.ts.completed_count(), self.ts.total - 1)
  165. def test___iter__(self):
  166. it = iter(self.ts)
  167. def consume():
  168. return list(it)
  169. self.assertRaises(KeyError, consume)
  170. def test_join(self):
  171. self.assertRaises(KeyError, self.ts.join)
  172. def test_successful(self):
  173. self.assertFalse(self.ts.successful())
  174. def test_failed(self):
  175. self.assertTrue(self.ts.failed())
  176. class TestTaskSetPending(unittest.TestCase):
  177. def setUp(self):
  178. self.ts = TaskSetResult(gen_unique_id(), [
  179. AsyncResult(gen_unique_id()),
  180. AsyncResult(gen_unique_id())])
  181. def test_completed_count(self):
  182. self.assertEquals(self.ts.completed_count(), 0)
  183. def test_ready(self):
  184. self.assertFalse(self.ts.ready())
  185. def test_waiting(self):
  186. self.assertTrue(self.ts.waiting())
  187. def x_join(self):
  188. self.assertRaises(TimeoutError, self.ts.join, timeout=0.001)
  189. @skip_if_quick
  190. def x_join_longer(self):
  191. self.assertRaises(TimeoutError, self.ts.join, timeout=1)
  192. class RaisingTask(Task):
  193. def run(self, x, y):
  194. raise KeyError("xy")
  195. class TestEagerResult(unittest.TestCase):
  196. def test_wait_raises(self):
  197. res = RaisingTask.apply(args=[3, 3])
  198. self.assertRaises(KeyError, res.wait)
  199. def test_revoke(self):
  200. res = RaisingTask.apply(args=[3, 3])
  201. self.assertFalse(res.revoke())