test_result.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import unittest
  2. from celery.backends import default_backend
  3. from celery.result import AsyncResult
  4. from celery.result import TaskSetResult
  5. from celery.result import TimeoutError
  6. from celery.utils import gen_unique_id
  7. def mock_task(name, status, result):
  8. return dict(id=gen_unique_id(), name=name, status=status, result=result)
  9. def save_result(task):
  10. if task["status"] == "DONE":
  11. default_backend.mark_as_done(task["id"], task["result"])
  12. else:
  13. default_backend.mark_as_failure(task["id"], task["result"])
  14. def make_mock_taskset(size=10):
  15. tasks = [mock_task("ts%d" % i, "DONE", i) for i in xrange(size)]
  16. [save_result(task) for task in tasks]
  17. return [AsyncResult(task["id"]) for task in tasks]
  18. class TestAsyncResult(unittest.TestCase):
  19. def setUp(self):
  20. self.task1 = mock_task("task1", "DONE", "the")
  21. self.task2 = mock_task("task2", "DONE", "quick")
  22. self.task3 = mock_task("task3", "FAILURE", KeyError("brown"))
  23. for task in (self.task1, self.task2, self.task3):
  24. save_result(task)
  25. def test_is_done(self):
  26. ok_res = AsyncResult(self.task1["id"])
  27. nok_res = AsyncResult(self.task3["id"])
  28. self.assertTrue(ok_res.is_done())
  29. self.assertFalse(nok_res.is_done())
  30. def test_sucessful(self):
  31. ok_res = AsyncResult(self.task1["id"])
  32. nok_res = AsyncResult(self.task3["id"])
  33. self.assertTrue(ok_res.successful())
  34. self.assertFalse(nok_res.successful())
  35. def test_str(self):
  36. ok_res = AsyncResult(self.task1["id"])
  37. ok2_res = AsyncResult(self.task2["id"])
  38. nok_res = AsyncResult(self.task3["id"])
  39. self.assertEquals(str(ok_res), self.task1["id"])
  40. self.assertEquals(str(ok2_res), self.task2["id"])
  41. self.assertEquals(str(nok_res), self.task3["id"])
  42. def test_repr(self):
  43. ok_res = AsyncResult(self.task1["id"])
  44. ok2_res = AsyncResult(self.task2["id"])
  45. nok_res = AsyncResult(self.task3["id"])
  46. self.assertEquals(repr(ok_res), "<AsyncResult: %s>" % (
  47. self.task1["id"]))
  48. self.assertEquals(repr(ok2_res), "<AsyncResult: %s>" % (
  49. self.task2["id"]))
  50. self.assertEquals(repr(nok_res), "<AsyncResult: %s>" % (
  51. self.task3["id"]))
  52. def test_get(self):
  53. ok_res = AsyncResult(self.task1["id"])
  54. ok2_res = AsyncResult(self.task2["id"])
  55. nok_res = AsyncResult(self.task3["id"])
  56. self.assertEquals(ok_res.get(), "the")
  57. self.assertEquals(ok2_res.get(), "quick")
  58. self.assertRaises(KeyError, nok_res.get)
  59. def test_ready(self):
  60. oks = (AsyncResult(self.task1["id"]),
  61. AsyncResult(self.task2["id"]),
  62. AsyncResult(self.task3["id"]))
  63. [self.assertTrue(ok.ready()) for ok in oks]
  64. class TestTaskSetResult(unittest.TestCase):
  65. def setUp(self):
  66. self.size = 10
  67. self.ts = TaskSetResult(gen_unique_id(), make_mock_taskset(self.size))
  68. def test_total(self):
  69. self.assertEquals(self.ts.total, self.size)
  70. def test_itersubtasks(self):
  71. it = self.ts.itersubtasks()
  72. for i, t in enumerate(it):
  73. self.assertEquals(t.get(), i)
  74. def test___iter__(self):
  75. it = iter(self.ts)
  76. results = sorted(list(it))
  77. self.assertEquals(results, list(xrange(self.size)))
  78. def test_join(self):
  79. joined = self.ts.join()
  80. self.assertEquals(joined, list(xrange(self.size)))
  81. def test_successful(self):
  82. self.assertTrue(self.ts.successful())
  83. def test_failed(self):
  84. self.assertFalse(self.ts.failed())
  85. def test_waiting(self):
  86. self.assertFalse(self.ts.waiting())
  87. def test_ready(self):
  88. self.assertTrue(self.ts.ready())
  89. def test_completed_count(self):
  90. self.assertEquals(self.ts.completed_count(), self.ts.total)
  91. class TestPendingAsyncResult(unittest.TestCase):
  92. def setUp(self):
  93. self.task = AsyncResult(gen_unique_id())
  94. def test_result(self):
  95. self.assertTrue(self.task.result is None)
  96. class TestFailedTaskSetResult(TestTaskSetResult):
  97. def setUp(self):
  98. self.size = 11
  99. subtasks = make_mock_taskset(10)
  100. failed = mock_task("ts11", "FAILED", KeyError("Baz"))
  101. save_result(failed)
  102. failed_res = AsyncResult(failed["id"])
  103. self.ts = TaskSetResult(gen_unique_id(), subtasks + [failed_res])
  104. def test_itersubtasks(self):
  105. it = self.ts.itersubtasks()
  106. for i in xrange(self.size - 1):
  107. t = it.next()
  108. self.assertEquals(t.get(), i)
  109. self.assertRaises(KeyError, it.next().get)
  110. def test_completed_count(self):
  111. self.assertEquals(self.ts.completed_count(), self.ts.total - 1)
  112. def test___iter__(self):
  113. it = iter(self.ts)
  114. def consume():
  115. return list(it)
  116. self.assertRaises(KeyError, consume)
  117. def test_join(self):
  118. self.assertRaises(KeyError, self.ts.join)
  119. def test_successful(self):
  120. self.assertFalse(self.ts.successful())
  121. def test_failed(self):
  122. self.assertTrue(self.ts.failed())
  123. class TestTaskSetPending(unittest.TestCase):
  124. def setUp(self):
  125. self.ts = TaskSetResult(gen_unique_id(), [
  126. AsyncResult(gen_unique_id()),
  127. AsyncResult(gen_unique_id())])
  128. def test_completed_count(self):
  129. self.assertEquals(self.ts.completed_count(), 0)
  130. def test_ready(self):
  131. self.assertFalse(self.ts.ready())
  132. def test_waiting(self):
  133. self.assertTrue(self.ts.waiting())
  134. def x_join(self):
  135. self.assertRaises(TimeoutError, self.ts.join, timeout=0.001)