test_result.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. from celery import states
  2. from celery.app import app_or_default
  3. from celery.utils import gen_unique_id
  4. from celery.utils.serialization import pickle
  5. from celery.result import AsyncResult, EagerResult, TaskSetResult
  6. from celery.exceptions import TimeoutError
  7. from celery.task.base import Task
  8. from celery.tests.utils import unittest
  9. from celery.tests.utils import skip_if_quick
  10. def mock_task(name, status, result):
  11. return dict(id=gen_unique_id(), name=name, status=status, result=result)
  12. def save_result(task):
  13. app = app_or_default()
  14. traceback = "Some traceback"
  15. if task["status"] == states.SUCCESS:
  16. app.backend.mark_as_done(task["id"], task["result"])
  17. elif task["status"] == states.RETRY:
  18. app.backend.mark_as_retry(task["id"], task["result"],
  19. traceback=traceback)
  20. else:
  21. app.backend.mark_as_failure(task["id"], task["result"],
  22. traceback=traceback)
  23. def make_mock_taskset(size=10):
  24. tasks = [mock_task("ts%d" % i, states.SUCCESS, i) for i in xrange(size)]
  25. [save_result(task) for task in tasks]
  26. return [AsyncResult(task["id"]) for task in tasks]
  27. class TestAsyncResult(unittest.TestCase):
  28. def setUp(self):
  29. self.task1 = mock_task("task1", states.SUCCESS, "the")
  30. self.task2 = mock_task("task2", states.SUCCESS, "quick")
  31. self.task3 = mock_task("task3", states.FAILURE, KeyError("brown"))
  32. self.task4 = mock_task("task3", states.RETRY, KeyError("red"))
  33. for task in (self.task1, self.task2, self.task3, self.task4):
  34. save_result(task)
  35. def test_reduce(self):
  36. a1 = AsyncResult("uuid", task_name="celery.ping")
  37. restored = pickle.loads(pickle.dumps(a1))
  38. self.assertEqual(restored.task_id, "uuid")
  39. self.assertEqual(restored.task_name, "celery.ping")
  40. a2 = AsyncResult("uuid")
  41. self.assertEqual(pickle.loads(pickle.dumps(a2)).task_id, "uuid")
  42. def test_successful(self):
  43. ok_res = AsyncResult(self.task1["id"])
  44. nok_res = AsyncResult(self.task3["id"])
  45. nok_res2 = AsyncResult(self.task4["id"])
  46. self.assertTrue(ok_res.successful())
  47. self.assertFalse(nok_res.successful())
  48. self.assertFalse(nok_res2.successful())
  49. pending_res = AsyncResult(gen_unique_id())
  50. self.assertFalse(pending_res.successful())
  51. def test_str(self):
  52. ok_res = AsyncResult(self.task1["id"])
  53. ok2_res = AsyncResult(self.task2["id"])
  54. nok_res = AsyncResult(self.task3["id"])
  55. self.assertEqual(str(ok_res), self.task1["id"])
  56. self.assertEqual(str(ok2_res), self.task2["id"])
  57. self.assertEqual(str(nok_res), self.task3["id"])
  58. pending_id = gen_unique_id()
  59. pending_res = AsyncResult(pending_id)
  60. self.assertEqual(str(pending_res), pending_id)
  61. def test_repr(self):
  62. ok_res = AsyncResult(self.task1["id"])
  63. ok2_res = AsyncResult(self.task2["id"])
  64. nok_res = AsyncResult(self.task3["id"])
  65. self.assertEqual(repr(ok_res), "<AsyncResult: %s>" % (
  66. self.task1["id"]))
  67. self.assertEqual(repr(ok2_res), "<AsyncResult: %s>" % (
  68. self.task2["id"]))
  69. self.assertEqual(repr(nok_res), "<AsyncResult: %s>" % (
  70. self.task3["id"]))
  71. pending_id = gen_unique_id()
  72. pending_res = AsyncResult(pending_id)
  73. self.assertEqual(repr(pending_res), "<AsyncResult: %s>" % (
  74. pending_id))
  75. def test_get_traceback(self):
  76. ok_res = AsyncResult(self.task1["id"])
  77. nok_res = AsyncResult(self.task3["id"])
  78. nok_res2 = AsyncResult(self.task4["id"])
  79. self.assertFalse(ok_res.traceback)
  80. self.assertTrue(nok_res.traceback)
  81. self.assertTrue(nok_res2.traceback)
  82. pending_res = AsyncResult(gen_unique_id())
  83. self.assertFalse(pending_res.traceback)
  84. def test_get(self):
  85. ok_res = AsyncResult(self.task1["id"])
  86. ok2_res = AsyncResult(self.task2["id"])
  87. nok_res = AsyncResult(self.task3["id"])
  88. nok2_res = AsyncResult(self.task4["id"])
  89. self.assertEqual(ok_res.get(), "the")
  90. self.assertEqual(ok2_res.get(), "quick")
  91. self.assertRaises(KeyError, nok_res.get)
  92. self.assertIsInstance(nok2_res.result, KeyError)
  93. self.assertEqual(ok_res.info, "the")
  94. def test_get_timeout(self):
  95. res = AsyncResult(self.task4["id"]) # has RETRY status
  96. self.assertRaises(TimeoutError, res.get, timeout=0.1)
  97. pending_res = AsyncResult(gen_unique_id())
  98. self.assertRaises(TimeoutError, pending_res.get, timeout=0.1)
  99. @skip_if_quick
  100. def test_get_timeout_longer(self):
  101. res = AsyncResult(self.task4["id"]) # has RETRY status
  102. self.assertRaises(TimeoutError, res.get, timeout=1)
  103. def test_ready(self):
  104. oks = (AsyncResult(self.task1["id"]),
  105. AsyncResult(self.task2["id"]),
  106. AsyncResult(self.task3["id"]))
  107. self.assertTrue(all(result.ready() for result in oks))
  108. self.assertFalse(AsyncResult(self.task4["id"]).ready())
  109. self.assertFalse(AsyncResult(gen_unique_id()).ready())
  110. class MockAsyncResultFailure(AsyncResult):
  111. @property
  112. def result(self):
  113. return KeyError("baz")
  114. @property
  115. def status(self):
  116. return states.FAILURE
  117. def get(self, propagate=True, **kwargs):
  118. if propagate:
  119. raise self.result
  120. return self.result
  121. class MockAsyncResultSuccess(AsyncResult):
  122. forgotten = False
  123. def forget(self):
  124. self.forgotten = True
  125. @property
  126. def result(self):
  127. return 42
  128. @property
  129. def status(self):
  130. return states.SUCCESS
  131. def get(self, **kwargs):
  132. return self.result
  133. class SimpleBackend(object):
  134. ids = []
  135. def __init__(self, ids=[]):
  136. self.ids = ids
  137. def get_many(self, *args, **kwargs):
  138. return ((id, {"result": i}) for i, id in enumerate(self.ids))
  139. class TestTaskSetResult(unittest.TestCase):
  140. def setUp(self):
  141. self.size = 10
  142. self.ts = TaskSetResult(gen_unique_id(), make_mock_taskset(self.size))
  143. def test_total(self):
  144. self.assertEqual(self.ts.total, self.size)
  145. def test_iterate_raises(self):
  146. ar = MockAsyncResultFailure(gen_unique_id())
  147. ts = TaskSetResult(gen_unique_id(), [ar])
  148. it = iter(ts)
  149. self.assertRaises(KeyError, it.next)
  150. def test_forget(self):
  151. subs = [MockAsyncResultSuccess(gen_unique_id()),
  152. MockAsyncResultSuccess(gen_unique_id())]
  153. ts = TaskSetResult(gen_unique_id(), subs)
  154. ts.forget()
  155. for sub in subs:
  156. self.assertTrue(sub.forgotten)
  157. def test_getitem(self):
  158. subs = [MockAsyncResultSuccess(gen_unique_id()),
  159. MockAsyncResultSuccess(gen_unique_id())]
  160. ts = TaskSetResult(gen_unique_id(), subs)
  161. self.assertIs(ts[0], subs[0])
  162. def test_save_restore(self):
  163. subs = [MockAsyncResultSuccess(gen_unique_id()),
  164. MockAsyncResultSuccess(gen_unique_id())]
  165. ts = TaskSetResult(gen_unique_id(), subs)
  166. ts.save()
  167. self.assertRaises(AttributeError, ts.save, backend=object())
  168. self.assertEqual(TaskSetResult.restore(ts.taskset_id).subtasks,
  169. ts.subtasks)
  170. ts.delete()
  171. self.assertIsNone(TaskSetResult.restore(ts.taskset_id))
  172. self.assertRaises(AttributeError,
  173. TaskSetResult.restore, ts.taskset_id,
  174. backend=object())
  175. def test_join_native(self):
  176. backend = SimpleBackend()
  177. subtasks = [AsyncResult(gen_unique_id(), backend=backend)
  178. for i in range(10)]
  179. ts = TaskSetResult(gen_unique_id(), subtasks)
  180. backend.ids = [subtask.task_id for subtask in subtasks]
  181. res = ts.join_native()
  182. self.assertEqual(res, range(10))
  183. def test_iter_native(self):
  184. backend = SimpleBackend()
  185. subtasks = [AsyncResult(gen_unique_id(), backend=backend)
  186. for i in range(10)]
  187. ts = TaskSetResult(gen_unique_id(), subtasks)
  188. backend.ids = [subtask.task_id for subtask in subtasks]
  189. self.assertEqual(len(list(ts.iter_native())), 10)
  190. def test_iterate_yields(self):
  191. ar = MockAsyncResultSuccess(gen_unique_id())
  192. ar2 = MockAsyncResultSuccess(gen_unique_id())
  193. ts = TaskSetResult(gen_unique_id(), [ar, ar2])
  194. it = iter(ts)
  195. self.assertEqual(it.next(), 42)
  196. self.assertEqual(it.next(), 42)
  197. def test_iterate_eager(self):
  198. ar1 = EagerResult(gen_unique_id(), 42, states.SUCCESS)
  199. ar2 = EagerResult(gen_unique_id(), 42, states.SUCCESS)
  200. ts = TaskSetResult(gen_unique_id(), [ar1, ar2])
  201. it = iter(ts)
  202. self.assertEqual(it.next(), 42)
  203. self.assertEqual(it.next(), 42)
  204. def test_join_timeout(self):
  205. ar = MockAsyncResultSuccess(gen_unique_id())
  206. ar2 = MockAsyncResultSuccess(gen_unique_id())
  207. ar3 = AsyncResult(gen_unique_id())
  208. ts = TaskSetResult(gen_unique_id(), [ar, ar2, ar3])
  209. self.assertRaises(TimeoutError, ts.join, timeout=0.0000001)
  210. def test_itersubtasks(self):
  211. it = self.ts.itersubtasks()
  212. for i, t in enumerate(it):
  213. self.assertEqual(t.get(), i)
  214. def test___iter__(self):
  215. it = iter(self.ts)
  216. results = sorted(list(it))
  217. self.assertListEqual(results, list(xrange(self.size)))
  218. def test_join(self):
  219. joined = self.ts.join()
  220. self.assertListEqual(joined, list(xrange(self.size)))
  221. def test_successful(self):
  222. self.assertTrue(self.ts.successful())
  223. def test_failed(self):
  224. self.assertFalse(self.ts.failed())
  225. def test_waiting(self):
  226. self.assertFalse(self.ts.waiting())
  227. def test_ready(self):
  228. self.assertTrue(self.ts.ready())
  229. def test_completed_count(self):
  230. self.assertEqual(self.ts.completed_count(), self.ts.total)
  231. class TestPendingAsyncResult(unittest.TestCase):
  232. def setUp(self):
  233. self.task = AsyncResult(gen_unique_id())
  234. def test_result(self):
  235. self.assertIsNone(self.task.result)
  236. class TestFailedTaskSetResult(TestTaskSetResult):
  237. def setUp(self):
  238. self.size = 11
  239. subtasks = make_mock_taskset(10)
  240. failed = mock_task("ts11", states.FAILURE, KeyError("Baz"))
  241. save_result(failed)
  242. failed_res = AsyncResult(failed["id"])
  243. self.ts = TaskSetResult(gen_unique_id(), subtasks + [failed_res])
  244. def test_itersubtasks(self):
  245. it = self.ts.itersubtasks()
  246. for i in xrange(self.size - 1):
  247. t = it.next()
  248. self.assertEqual(t.get(), i)
  249. self.assertRaises(KeyError, it.next().get)
  250. def test_completed_count(self):
  251. self.assertEqual(self.ts.completed_count(), self.ts.total - 1)
  252. def test___iter__(self):
  253. it = iter(self.ts)
  254. def consume():
  255. return list(it)
  256. self.assertRaises(KeyError, consume)
  257. def test_join(self):
  258. self.assertRaises(KeyError, self.ts.join)
  259. def test_successful(self):
  260. self.assertFalse(self.ts.successful())
  261. def test_failed(self):
  262. self.assertTrue(self.ts.failed())
  263. class TestTaskSetPending(unittest.TestCase):
  264. def setUp(self):
  265. self.ts = TaskSetResult(gen_unique_id(), [
  266. AsyncResult(gen_unique_id()),
  267. AsyncResult(gen_unique_id())])
  268. def test_completed_count(self):
  269. self.assertEqual(self.ts.completed_count(), 0)
  270. def test_ready(self):
  271. self.assertFalse(self.ts.ready())
  272. def test_waiting(self):
  273. self.assertTrue(self.ts.waiting())
  274. def x_join(self):
  275. self.assertRaises(TimeoutError, self.ts.join, timeout=0.001)
  276. @skip_if_quick
  277. def x_join_longer(self):
  278. self.assertRaises(TimeoutError, self.ts.join, timeout=1)
  279. class RaisingTask(Task):
  280. def run(self, x, y):
  281. raise KeyError("xy")
  282. class TestEagerResult(unittest.TestCase):
  283. def test_wait_raises(self):
  284. res = RaisingTask.apply(args=[3, 3])
  285. self.assertRaises(KeyError, res.wait)
  286. def test_wait(self):
  287. res = EagerResult("x", "x", states.RETRY)
  288. res.wait()
  289. self.assertEqual(res.state, states.RETRY)
  290. self.assertEqual(res.status, states.RETRY)
  291. def test_revoke(self):
  292. res = RaisingTask.apply(args=[3, 3])
  293. self.assertFalse(res.revoke())