test_events_state.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import time
  2. import unittest2 as unittest
  3. from itertools import count
  4. from celery import states
  5. from celery.events import Event
  6. from celery.events.state import State, HEARTBEAT_EXPIRE
  7. from celery.utils import gen_unique_id
  8. class replay(object):
  9. def __init__(self, state):
  10. self.state = state
  11. self.rewind()
  12. def __iter__(self):
  13. return self
  14. def next(self):
  15. try:
  16. self.state.event(self.events[self.position()])
  17. except IndexError:
  18. raise StopIteration()
  19. def rewind(self):
  20. self.position = count(0).next
  21. return self
  22. def play(self):
  23. for _ in self:
  24. pass
  25. class ev_worker_online_offline(replay):
  26. events = [
  27. Event("worker-online", hostname="utest1"),
  28. Event("worker-offline", hostname="utest1"),
  29. ]
  30. class ev_worker_heartbeats(replay):
  31. events = [
  32. Event("worker-heartbeat", hostname="utest1",
  33. timestamp=time.time() - HEARTBEAT_EXPIRE * 2),
  34. Event("worker-heartbeat", hostname="utest1"),
  35. ]
  36. class ev_task_states(replay):
  37. uuid = gen_unique_id()
  38. events = [
  39. Event("task-received", uuid=uuid, name="task1",
  40. args="(2, 2)", kwargs="{'foo': 'bar'}",
  41. retries=0, eta=None, hostname="utest1"),
  42. Event("task-started", uuid=uuid, hostname="utest1"),
  43. Event("task-succeeded", uuid=uuid, result="4",
  44. runtime=0.1234, hostname="utest1"),
  45. Event("task-failed", uuid=uuid, exception="KeyError('foo')",
  46. traceback="line 1 at main", hostname="utest1"),
  47. Event("task-retried", uuid=uuid, exception="KeyError('bar')",
  48. traceback="line 2 at main", hostname="utest1"),
  49. Event("task-revoked", uuid=uuid, hostname="utest1"),
  50. ]
  51. class ev_snapshot(replay):
  52. events = [
  53. Event("worker-online", hostname="utest1"),
  54. Event("worker-online", hostname="utest2"),
  55. Event("worker-online", hostname="utest3"),
  56. ]
  57. for i in range(20):
  58. worker = not i % 2 and "utest2" or "utest1"
  59. type = not i % 2 and "task2" or "task1"
  60. events.append(Event("task-received", name=type,
  61. uuid=gen_unique_id(), hostname=worker))
  62. class test_State(unittest.TestCase):
  63. def test_worker_online_offline(self):
  64. r = ev_worker_online_offline(State())
  65. r.next()
  66. self.assertTrue(r.state.alive_workers())
  67. self.assertTrue(r.state.workers["utest1"].alive)
  68. r.play()
  69. self.assertFalse(r.state.alive_workers())
  70. self.assertFalse(r.state.workers["utest1"].alive)
  71. def test_worker_heartbeat_expire(self):
  72. r = ev_worker_heartbeats(State())
  73. r.next()
  74. self.assertFalse(r.state.alive_workers())
  75. self.assertFalse(r.state.workers["utest1"].alive)
  76. r.play()
  77. self.assertTrue(r.state.alive_workers())
  78. self.assertTrue(r.state.workers["utest1"].alive)
  79. def test_task_states(self):
  80. r = ev_task_states(State())
  81. # RECEIVED
  82. r.next()
  83. self.assertTrue(r.uuid in r.state.tasks)
  84. task = r.state.tasks[r.uuid]
  85. self.assertEqual(task.state, "RECEIVED")
  86. self.assertTrue(task.received)
  87. self.assertEqual(task.timestamp, task.received)
  88. self.assertEqual(task.worker.hostname, "utest1")
  89. # STARTED
  90. r.next()
  91. self.assertTrue(r.state.workers["utest1"].alive,
  92. "any task event adds worker heartbeat")
  93. self.assertEqual(task.state, states.STARTED)
  94. self.assertTrue(task.started)
  95. self.assertEqual(task.timestamp, task.started)
  96. self.assertEqual(task.worker.hostname, "utest1")
  97. # SUCCESS
  98. r.next()
  99. self.assertEqual(task.state, states.SUCCESS)
  100. self.assertTrue(task.succeeded)
  101. self.assertEqual(task.timestamp, task.succeeded)
  102. self.assertEqual(task.worker.hostname, "utest1")
  103. self.assertEqual(task.result, "4")
  104. self.assertEqual(task.runtime, 0.1234)
  105. # FAILURE
  106. r.next()
  107. self.assertEqual(task.state, states.FAILURE)
  108. self.assertTrue(task.failed)
  109. self.assertEqual(task.timestamp, task.failed)
  110. self.assertEqual(task.worker.hostname, "utest1")
  111. self.assertEqual(task.exception, "KeyError('foo')")
  112. self.assertEqual(task.traceback, "line 1 at main")
  113. # RETRY
  114. r.next()
  115. self.assertEqual(task.state, states.RETRY)
  116. self.assertTrue(task.retried)
  117. self.assertEqual(task.timestamp, task.retried)
  118. self.assertEqual(task.worker.hostname, "utest1")
  119. self.assertEqual(task.exception, "KeyError('bar')")
  120. self.assertEqual(task.traceback, "line 2 at main")
  121. # REVOKED
  122. r.next()
  123. self.assertEqual(task.state, states.REVOKED)
  124. self.assertTrue(task.revoked)
  125. self.assertEqual(task.timestamp, task.revoked)
  126. self.assertEqual(task.worker.hostname, "utest1")
  127. def test_tasks_by_timestamp(self):
  128. r = ev_snapshot(State())
  129. r.play()
  130. self.assertEqual(len(r.state.tasks_by_timestamp()), 20)
  131. def test_tasks_by_type(self):
  132. r = ev_snapshot(State())
  133. r.play()
  134. self.assertEqual(len(r.state.tasks_by_type("task1")), 10)
  135. self.assertEqual(len(r.state.tasks_by_type("task2")), 10)
  136. def test_alive_workers(self):
  137. r = ev_snapshot(State())
  138. r.play()
  139. self.assertEqual(len(r.state.alive_workers()), 3)
  140. def test_tasks_by_worker(self):
  141. r = ev_snapshot(State())
  142. r.play()
  143. self.assertEqual(len(r.state.tasks_by_worker("utest1")), 10)
  144. self.assertEqual(len(r.state.tasks_by_worker("utest2")), 10)