test_result.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. from __future__ import generators
  2. from celery.tests.utils import unittest
  3. from celery import states
  4. from celery.app import app_or_default
  5. from celery.utils import gen_unique_id
  6. from celery.utils.compat import all
  7. from celery.utils.serialization import pickle
  8. from celery.result import AsyncResult, EagerResult, TaskSetResult
  9. from celery.exceptions import TimeoutError
  10. from celery.task.base import Task
  11. from celery.tests.utils import skip_if_quick
  12. def mock_task(name, status, result):
  13. return dict(id=gen_unique_id(), name=name, status=status, result=result)
  14. def save_result(task):
  15. app = app_or_default()
  16. traceback = "Some traceback"
  17. if task["status"] == states.SUCCESS:
  18. app.backend.mark_as_done(task["id"], task["result"])
  19. elif task["status"] == states.RETRY:
  20. app.backend.mark_as_retry(task["id"], task["result"],
  21. traceback=traceback)
  22. else:
  23. app.backend.mark_as_failure(task["id"], task["result"],
  24. traceback=traceback)
  25. def make_mock_taskset(size=10):
  26. tasks = [mock_task("ts%d" % i, states.SUCCESS, i) for i in xrange(size)]
  27. [save_result(task) for task in tasks]
  28. return [AsyncResult(task["id"]) for task in tasks]
  29. class TestAsyncResult(unittest.TestCase):
  30. def setUp(self):
  31. self.task1 = mock_task("task1", states.SUCCESS, "the")
  32. self.task2 = mock_task("task2", states.SUCCESS, "quick")
  33. self.task3 = mock_task("task3", states.FAILURE, KeyError("brown"))
  34. self.task4 = mock_task("task3", states.RETRY, KeyError("red"))
  35. for task in (self.task1, self.task2, self.task3, self.task4):
  36. save_result(task)
  37. def test_reduce(self):
  38. a1 = AsyncResult("uuid", task_name="celery.ping")
  39. restored = pickle.loads(pickle.dumps(a1))
  40. self.assertEqual(restored.task_id, "uuid")
  41. self.assertEqual(restored.task_name, "celery.ping")
  42. a2 = AsyncResult("uuid")
  43. self.assertEqual(pickle.loads(pickle.dumps(a2)).task_id, "uuid")
  44. def test_successful(self):
  45. ok_res = AsyncResult(self.task1["id"])
  46. nok_res = AsyncResult(self.task3["id"])
  47. nok_res2 = AsyncResult(self.task4["id"])
  48. self.assertTrue(ok_res.successful())
  49. self.assertFalse(nok_res.successful())
  50. self.assertFalse(nok_res2.successful())
  51. pending_res = AsyncResult(gen_unique_id())
  52. self.assertFalse(pending_res.successful())
  53. def test_str(self):
  54. ok_res = AsyncResult(self.task1["id"])
  55. ok2_res = AsyncResult(self.task2["id"])
  56. nok_res = AsyncResult(self.task3["id"])
  57. self.assertEqual(str(ok_res), self.task1["id"])
  58. self.assertEqual(str(ok2_res), self.task2["id"])
  59. self.assertEqual(str(nok_res), self.task3["id"])
  60. pending_id = gen_unique_id()
  61. pending_res = AsyncResult(pending_id)
  62. self.assertEqual(str(pending_res), pending_id)
  63. def test_repr(self):
  64. ok_res = AsyncResult(self.task1["id"])
  65. ok2_res = AsyncResult(self.task2["id"])
  66. nok_res = AsyncResult(self.task3["id"])
  67. self.assertEqual(repr(ok_res), "<AsyncResult: %s>" % (
  68. self.task1["id"]))
  69. self.assertEqual(repr(ok2_res), "<AsyncResult: %s>" % (
  70. self.task2["id"]))
  71. self.assertEqual(repr(nok_res), "<AsyncResult: %s>" % (
  72. self.task3["id"]))
  73. pending_id = gen_unique_id()
  74. pending_res = AsyncResult(pending_id)
  75. self.assertEqual(repr(pending_res), "<AsyncResult: %s>" % (
  76. pending_id))
  77. def test_get_traceback(self):
  78. ok_res = AsyncResult(self.task1["id"])
  79. nok_res = AsyncResult(self.task3["id"])
  80. nok_res2 = AsyncResult(self.task4["id"])
  81. self.assertFalse(ok_res.traceback)
  82. self.assertTrue(nok_res.traceback)
  83. self.assertTrue(nok_res2.traceback)
  84. pending_res = AsyncResult(gen_unique_id())
  85. self.assertFalse(pending_res.traceback)
  86. def test_get(self):
  87. ok_res = AsyncResult(self.task1["id"])
  88. ok2_res = AsyncResult(self.task2["id"])
  89. nok_res = AsyncResult(self.task3["id"])
  90. nok2_res = AsyncResult(self.task4["id"])
  91. self.assertEqual(ok_res.get(), "the")
  92. self.assertEqual(ok2_res.get(), "quick")
  93. self.assertRaises(KeyError, nok_res.get)
  94. self.assertIsInstance(nok2_res.result, KeyError)
  95. self.assertEqual(ok_res.info, "the")
  96. def test_get_timeout(self):
  97. res = AsyncResult(self.task4["id"]) # has RETRY status
  98. self.assertRaises(TimeoutError, res.get, timeout=0.1)
  99. pending_res = AsyncResult(gen_unique_id())
  100. self.assertRaises(TimeoutError, pending_res.get, timeout=0.1)
  101. @skip_if_quick
  102. def test_get_timeout_longer(self):
  103. res = AsyncResult(self.task4["id"]) # has RETRY status
  104. self.assertRaises(TimeoutError, res.get, timeout=1)
  105. def test_ready(self):
  106. oks = (AsyncResult(self.task1["id"]),
  107. AsyncResult(self.task2["id"]),
  108. AsyncResult(self.task3["id"]))
  109. self.assertTrue(all(result.ready() for result in oks))
  110. self.assertFalse(AsyncResult(self.task4["id"]).ready())
  111. self.assertFalse(AsyncResult(gen_unique_id()).ready())
  112. class MockAsyncResultFailure(AsyncResult):
  113. @property
  114. def result(self):
  115. return KeyError("baz")
  116. @property
  117. def status(self):
  118. return states.FAILURE
  119. class MockAsyncResultSuccess(AsyncResult):
  120. forgotten = False
  121. def forget(self):
  122. self.forgotten = True
  123. @property
  124. def result(self):
  125. return 42
  126. @property
  127. def status(self):
  128. return states.SUCCESS
  129. class SimpleBackend(object):
  130. ids = []
  131. def __init__(self, ids=[]):
  132. self.ids = ids
  133. def get_many(self, *args, **kwargs):
  134. return ((id, {"result": i}) for i, id in enumerate(self.ids))
  135. class TestTaskSetResult(unittest.TestCase):
  136. def setUp(self):
  137. self.size = 10
  138. self.ts = TaskSetResult(gen_unique_id(), make_mock_taskset(self.size))
  139. def test_total(self):
  140. self.assertEqual(self.ts.total, self.size)
  141. def test_iterate_raises(self):
  142. ar = MockAsyncResultFailure(gen_unique_id())
  143. ts = TaskSetResult(gen_unique_id(), [ar])
  144. it = iter(ts)
  145. self.assertRaises(KeyError, it.next)
  146. def test_forget(self):
  147. subs = [MockAsyncResultSuccess(gen_unique_id()),
  148. MockAsyncResultSuccess(gen_unique_id())]
  149. ts = TaskSetResult(gen_unique_id(), subs)
  150. ts.forget()
  151. for sub in subs:
  152. self.assertTrue(sub.forgotten)
  153. def test_getitem(self):
  154. subs = [MockAsyncResultSuccess(gen_unique_id()),
  155. MockAsyncResultSuccess(gen_unique_id())]
  156. ts = TaskSetResult(gen_unique_id(), subs)
  157. self.assertIs(ts[0], subs[0])
  158. def test_save_restore(self):
  159. subs = [MockAsyncResultSuccess(gen_unique_id()),
  160. MockAsyncResultSuccess(gen_unique_id())]
  161. ts = TaskSetResult(gen_unique_id(), subs)
  162. ts.save()
  163. self.assertRaises(AttributeError, ts.save, backend=object())
  164. self.assertEqual(TaskSetResult.restore(ts.taskset_id).subtasks,
  165. ts.subtasks)
  166. self.assertRaises(AttributeError,
  167. TaskSetResult.restore, ts.taskset_id,
  168. backend=object())
  169. def test_join_native(self):
  170. backend = SimpleBackend()
  171. subtasks = [AsyncResult(gen_unique_id(), backend=backend)
  172. for i in range(10)]
  173. ts = TaskSetResult(gen_unique_id(), subtasks)
  174. backend.ids = [subtask.task_id for subtask in subtasks]
  175. res = ts.join_native()
  176. self.assertEqual(res, range(10))
  177. def test_iter_native(self):
  178. backend = SimpleBackend()
  179. subtasks = [AsyncResult(gen_unique_id(), backend=backend)
  180. for i in range(10)]
  181. ts = TaskSetResult(gen_unique_id(), subtasks)
  182. backend.ids = [subtask.task_id for subtask in subtasks]
  183. self.assertEqual(len(list(ts.iter_native())), 10)
  184. def test_iterate_yields(self):
  185. ar = MockAsyncResultSuccess(gen_unique_id())
  186. ar2 = MockAsyncResultSuccess(gen_unique_id())
  187. ts = TaskSetResult(gen_unique_id(), [ar, ar2])
  188. it = iter(ts)
  189. self.assertEqual(it.next(), 42)
  190. self.assertEqual(it.next(), 42)
  191. def test_iterate_eager(self):
  192. ar1 = EagerResult(gen_unique_id(), 42, states.SUCCESS)
  193. ar2 = EagerResult(gen_unique_id(), 42, states.SUCCESS)
  194. ts = TaskSetResult(gen_unique_id(), [ar1, ar2])
  195. it = iter(ts)
  196. self.assertEqual(it.next(), 42)
  197. self.assertEqual(it.next(), 42)
  198. def test_join_timeout(self):
  199. ar = MockAsyncResultSuccess(gen_unique_id())
  200. ar2 = MockAsyncResultSuccess(gen_unique_id())
  201. ar3 = AsyncResult(gen_unique_id())
  202. ts = TaskSetResult(gen_unique_id(), [ar, ar2, ar3])
  203. self.assertRaises(TimeoutError, ts.join, timeout=0.0000001)
  204. def test_itersubtasks(self):
  205. it = self.ts.itersubtasks()
  206. for i, t in enumerate(it):
  207. self.assertEqual(t.get(), i)
  208. def test___iter__(self):
  209. it = iter(self.ts)
  210. results = sorted(list(it))
  211. self.assertListEqual(results, list(xrange(self.size)))
  212. def test_join(self):
  213. joined = self.ts.join()
  214. self.assertListEqual(joined, list(xrange(self.size)))
  215. def test_successful(self):
  216. self.assertTrue(self.ts.successful())
  217. def test_failed(self):
  218. self.assertFalse(self.ts.failed())
  219. def test_waiting(self):
  220. self.assertFalse(self.ts.waiting())
  221. def test_ready(self):
  222. self.assertTrue(self.ts.ready())
  223. def test_completed_count(self):
  224. self.assertEqual(self.ts.completed_count(), self.ts.total)
  225. class TestPendingAsyncResult(unittest.TestCase):
  226. def setUp(self):
  227. self.task = AsyncResult(gen_unique_id())
  228. def test_result(self):
  229. self.assertIsNone(self.task.result)
  230. class TestFailedTaskSetResult(TestTaskSetResult):
  231. def setUp(self):
  232. self.size = 11
  233. subtasks = make_mock_taskset(10)
  234. failed = mock_task("ts11", states.FAILURE, KeyError("Baz"))
  235. save_result(failed)
  236. failed_res = AsyncResult(failed["id"])
  237. self.ts = TaskSetResult(gen_unique_id(), subtasks + [failed_res])
  238. def test_itersubtasks(self):
  239. it = self.ts.itersubtasks()
  240. for i in xrange(self.size - 1):
  241. t = it.next()
  242. self.assertEqual(t.get(), i)
  243. self.assertRaises(KeyError, it.next().get)
  244. def test_completed_count(self):
  245. self.assertEqual(self.ts.completed_count(), self.ts.total - 1)
  246. def test___iter__(self):
  247. it = iter(self.ts)
  248. def consume():
  249. return list(it)
  250. self.assertRaises(KeyError, consume)
  251. def test_join(self):
  252. self.assertRaises(KeyError, self.ts.join)
  253. def test_successful(self):
  254. self.assertFalse(self.ts.successful())
  255. def test_failed(self):
  256. self.assertTrue(self.ts.failed())
  257. class TestTaskSetPending(unittest.TestCase):
  258. def setUp(self):
  259. self.ts = TaskSetResult(gen_unique_id(), [
  260. AsyncResult(gen_unique_id()),
  261. AsyncResult(gen_unique_id())])
  262. def test_completed_count(self):
  263. self.assertEqual(self.ts.completed_count(), 0)
  264. def test_ready(self):
  265. self.assertFalse(self.ts.ready())
  266. def test_waiting(self):
  267. self.assertTrue(self.ts.waiting())
  268. def x_join(self):
  269. self.assertRaises(TimeoutError, self.ts.join, timeout=0.001)
  270. @skip_if_quick
  271. def x_join_longer(self):
  272. self.assertRaises(TimeoutError, self.ts.join, timeout=1)
  273. class RaisingTask(Task):
  274. def run(self, x, y):
  275. raise KeyError("xy")
  276. class TestEagerResult(unittest.TestCase):
  277. def test_wait_raises(self):
  278. res = RaisingTask.apply(args=[3, 3])
  279. self.assertRaises(KeyError, res.wait)
  280. def test_wait(self):
  281. res = EagerResult("x", "x", states.RETRY)
  282. res.wait()
  283. self.assertEqual(res.state, states.RETRY)
  284. self.assertEqual(res.status, states.RETRY)
  285. def test_revoke(self):
  286. res = RaisingTask.apply(args=[3, 3])
  287. self.assertFalse(res.revoke())