test_state.py 23 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import pickle
  3. from decimal import Decimal
  4. from random import shuffle
  5. from time import time
  6. from itertools import count
  7. from celery import states
  8. from celery import uuid
  9. from celery.events import Event
  10. from celery.events.state import (
  11. HEARTBEAT_EXPIRE_WINDOW,
  12. HEARTBEAT_DRIFT_MAX,
  13. State,
  14. Worker,
  15. Task,
  16. heartbeat_expires,
  17. )
  18. from celery.tests.case import AppCase, Mock, patch, skip
  19. class replay:
  20. def __init__(self, state):
  21. self.state = state
  22. self.rewind()
  23. self.setup()
  24. self.current_clock = 0
  25. def setup(self):
  26. pass
  27. def next_event(self):
  28. ev = self.events[next(self.position)]
  29. ev['local_received'] = ev['timestamp']
  30. try:
  31. self.current_clock = ev['clock']
  32. except KeyError:
  33. ev['clock'] = self.current_clock = self.current_clock + 1
  34. return ev
  35. def __iter__(self):
  36. return self
  37. def __next__(self):
  38. try:
  39. self.state.event(self.next_event())
  40. except IndexError:
  41. raise StopIteration()
  42. next = __next__
  43. def rewind(self):
  44. self.position = count(0)
  45. return self
  46. def play(self):
  47. for _ in self:
  48. pass
  49. class ev_worker_online_offline(replay):
  50. def setup(self):
  51. self.events = [
  52. Event('worker-online', hostname='utest1'),
  53. Event('worker-offline', hostname='utest1'),
  54. ]
  55. class ev_worker_heartbeats(replay):
  56. def setup(self):
  57. self.events = [
  58. Event('worker-heartbeat', hostname='utest1',
  59. timestamp=time() - HEARTBEAT_EXPIRE_WINDOW * 2),
  60. Event('worker-heartbeat', hostname='utest1'),
  61. ]
  62. class ev_task_states(replay):
  63. def setup(self):
  64. tid = self.tid = uuid()
  65. tid2 = self.tid2 = uuid()
  66. self.events = [
  67. Event('task-received', uuid=tid, name='task1',
  68. args='(2, 2)', kwargs="{'foo': 'bar'}",
  69. retries=0, eta=None, hostname='utest1'),
  70. Event('task-started', uuid=tid, hostname='utest1'),
  71. Event('task-revoked', uuid=tid, hostname='utest1'),
  72. Event('task-retried', uuid=tid, exception="KeyError('bar')",
  73. traceback='line 2 at main', hostname='utest1'),
  74. Event('task-failed', uuid=tid, exception="KeyError('foo')",
  75. traceback='line 1 at main', hostname='utest1'),
  76. Event('task-succeeded', uuid=tid, result='4',
  77. runtime=0.1234, hostname='utest1'),
  78. Event('foo-bar'),
  79. Event('task-received', uuid=tid2, name='task2',
  80. args='(4, 4)', kwargs="{'foo': 'bar'}",
  81. retries=0, eta=None, parent_id=tid, root_id=tid,
  82. hostname='utest1'),
  83. ]
  84. def QTEV(type, uuid, hostname, clock, name=None, timestamp=None):
  85. """Quick task event."""
  86. return Event('task-{0}'.format(type), uuid=uuid, hostname=hostname,
  87. clock=clock, name=name, timestamp=timestamp or time())
  88. class ev_logical_clock_ordering(replay):
  89. def __init__(self, state, offset=0, uids=None):
  90. self.offset = offset or 0
  91. self.uids = self.setuids(uids)
  92. super(ev_logical_clock_ordering, self).__init__(state)
  93. def setuids(self, uids):
  94. uids = self.tA, self.tB, self.tC = uids or [uuid(), uuid(), uuid()]
  95. return uids
  96. def setup(self):
  97. offset = self.offset
  98. tA, tB, tC = self.uids
  99. self.events = [
  100. QTEV('received', tA, 'w1', name='tA', clock=offset + 1),
  101. QTEV('received', tB, 'w2', name='tB', clock=offset + 1),
  102. QTEV('started', tA, 'w1', name='tA', clock=offset + 3),
  103. QTEV('received', tC, 'w2', name='tC', clock=offset + 3),
  104. QTEV('started', tB, 'w2', name='tB', clock=offset + 5),
  105. QTEV('retried', tA, 'w1', name='tA', clock=offset + 7),
  106. QTEV('succeeded', tB, 'w2', name='tB', clock=offset + 9),
  107. QTEV('started', tC, 'w2', name='tC', clock=offset + 10),
  108. QTEV('received', tA, 'w3', name='tA', clock=offset + 13),
  109. QTEV('succeded', tC, 'w2', name='tC', clock=offset + 12),
  110. QTEV('started', tA, 'w3', name='tA', clock=offset + 14),
  111. QTEV('succeeded', tA, 'w3', name='TA', clock=offset + 16),
  112. ]
  113. def rewind_with_offset(self, offset, uids=None):
  114. self.offset = offset
  115. self.uids = self.setuids(uids or self.uids)
  116. self.setup()
  117. self.rewind()
  118. class ev_snapshot(replay):
  119. def setup(self):
  120. self.events = [
  121. Event('worker-online', hostname='utest1'),
  122. Event('worker-online', hostname='utest2'),
  123. Event('worker-online', hostname='utest3'),
  124. ]
  125. for i in range(20):
  126. worker = not i % 2 and 'utest2' or 'utest1'
  127. type = not i % 2 and 'task2' or 'task1'
  128. self.events.append(Event('task-received', name=type,
  129. uuid=uuid(), hostname=worker))
  130. class test_Worker(AppCase):
  131. def test_equality(self):
  132. self.assertEqual(Worker(hostname='foo').hostname, 'foo')
  133. self.assertEqual(
  134. Worker(hostname='foo'), Worker(hostname='foo'),
  135. )
  136. self.assertNotEqual(
  137. Worker(hostname='foo'), Worker(hostname='bar'),
  138. )
  139. self.assertEqual(
  140. hash(Worker(hostname='foo')), hash(Worker(hostname='foo')),
  141. )
  142. self.assertNotEqual(
  143. hash(Worker(hostname='foo')), hash(Worker(hostname='bar')),
  144. )
  145. def test_heartbeat_expires__Decimal(self):
  146. self.assertEqual(
  147. heartbeat_expires(Decimal(344313.37), freq=60, expire_window=200),
  148. 344433.37,
  149. )
  150. def test_compatible_with_Decimal(self):
  151. w = Worker('george@vandelay.com')
  152. timestamp, local_received = Decimal(time()), time()
  153. w.event('worker-online', timestamp, local_received, fields={
  154. 'hostname': 'george@vandelay.com',
  155. 'timestamp': timestamp,
  156. 'local_received': local_received,
  157. 'freq': Decimal(5.6335431),
  158. })
  159. self.assertTrue(w.alive)
  160. def test_eq_ne_other(self):
  161. self.assertEqual(Worker('a@b.com'), Worker('a@b.com'))
  162. self.assertNotEqual(Worker('a@b.com'), Worker('b@b.com'))
  163. self.assertNotEqual(Worker('a@b.com'), object())
  164. def test_reduce_direct(self):
  165. w = Worker('george@vandelay.com')
  166. w.event('worker-online', 10.0, 13.0, fields={
  167. 'hostname': 'george@vandelay.com',
  168. 'timestamp': 10.0,
  169. 'local_received': 13.0,
  170. 'freq': 60,
  171. })
  172. fun, args = w.__reduce__()
  173. w2 = fun(*args)
  174. self.assertEqual(w2.hostname, w.hostname)
  175. self.assertEqual(w2.pid, w.pid)
  176. self.assertEqual(w2.freq, w.freq)
  177. self.assertEqual(w2.heartbeats, w.heartbeats)
  178. self.assertEqual(w2.clock, w.clock)
  179. self.assertEqual(w2.active, w.active)
  180. self.assertEqual(w2.processed, w.processed)
  181. self.assertEqual(w2.loadavg, w.loadavg)
  182. self.assertEqual(w2.sw_ident, w.sw_ident)
  183. def test_update(self):
  184. w = Worker('george@vandelay.com')
  185. w.update({'idx': '301'}, foo=1, clock=30, bah='foo')
  186. self.assertEqual(w.idx, '301')
  187. self.assertEqual(w.foo, 1)
  188. self.assertEqual(w.clock, 30)
  189. self.assertEqual(w.bah, 'foo')
  190. def test_survives_missing_timestamp(self):
  191. worker = Worker(hostname='foo')
  192. worker.event('heartbeat')
  193. self.assertEqual(worker.heartbeats, [])
  194. def test_repr(self):
  195. self.assertTrue(repr(Worker(hostname='foo')))
  196. def test_drift_warning(self):
  197. worker = Worker(hostname='foo')
  198. with patch('celery.events.state.warn') as warn:
  199. worker.event(None, time() + (HEARTBEAT_DRIFT_MAX * 2), time())
  200. warn.assert_called()
  201. self.assertIn('Substantial drift', warn.call_args[0][0])
  202. def test_updates_heartbeat(self):
  203. worker = Worker(hostname='foo')
  204. worker.event(None, time(), time())
  205. self.assertEqual(len(worker.heartbeats), 1)
  206. h1 = worker.heartbeats[0]
  207. worker.event(None, time(), time() - 10)
  208. self.assertEqual(len(worker.heartbeats), 2)
  209. self.assertEqual(worker.heartbeats[-1], h1)
  210. class test_Task(AppCase):
  211. def test_equality(self):
  212. self.assertEqual(Task(uuid='foo').uuid, 'foo')
  213. self.assertEqual(
  214. Task(uuid='foo'), Task(uuid='foo'),
  215. )
  216. self.assertNotEqual(
  217. Task(uuid='foo'), Task(uuid='bar'),
  218. )
  219. self.assertEqual(
  220. hash(Task(uuid='foo')), hash(Task(uuid='foo')),
  221. )
  222. self.assertNotEqual(
  223. hash(Task(uuid='foo')), hash(Task(uuid='bar')),
  224. )
  225. def test_info(self):
  226. task = Task(uuid='abcdefg',
  227. name='tasks.add',
  228. args='(2, 2)',
  229. kwargs='{}',
  230. retries=2,
  231. result=42,
  232. eta=1,
  233. runtime=0.0001,
  234. expires=1,
  235. parent_id='bdefc',
  236. root_id='dedfef',
  237. foo=None,
  238. exception=1,
  239. received=time() - 10,
  240. started=time() - 8,
  241. exchange='celery',
  242. routing_key='celery',
  243. succeeded=time())
  244. self.assertEqual(sorted(list(task._info_fields)),
  245. sorted(task.info().keys()))
  246. self.assertEqual(sorted(list(task._info_fields + ('received',))),
  247. sorted(task.info(extra=('received',))))
  248. self.assertEqual(sorted(['args', 'kwargs']),
  249. sorted(task.info(['args', 'kwargs']).keys()))
  250. self.assertFalse(list(task.info('foo')))
  251. def test_reduce_direct(self):
  252. task = Task(uuid='uuid', name='tasks.add', args='(2, 2)')
  253. fun, args = task.__reduce__()
  254. task2 = fun(*args)
  255. self.assertEqual(task, task2)
  256. def test_ready(self):
  257. task = Task(uuid='abcdefg',
  258. name='tasks.add')
  259. task.event('received', time(), time())
  260. self.assertFalse(task.ready)
  261. task.event('succeeded', time(), time())
  262. self.assertTrue(task.ready)
  263. def test_sent(self):
  264. task = Task(uuid='abcdefg',
  265. name='tasks.add')
  266. task.event('sent', time(), time())
  267. self.assertEqual(task.state, states.PENDING)
  268. def test_merge(self):
  269. task = Task()
  270. task.event('failed', time(), time())
  271. task.event('started', time(), time())
  272. task.event('received', time(), time(), {
  273. 'name': 'tasks.add', 'args': (2, 2),
  274. })
  275. self.assertEqual(task.state, states.FAILURE)
  276. self.assertEqual(task.name, 'tasks.add')
  277. self.assertTupleEqual(task.args, (2, 2))
  278. task.event('retried', time(), time())
  279. self.assertEqual(task.state, states.RETRY)
  280. def test_repr(self):
  281. self.assertTrue(repr(Task(uuid='xxx', name='tasks.add')))
  282. class test_State(AppCase):
  283. def test_repr(self):
  284. self.assertTrue(repr(State()))
  285. def test_pickleable(self):
  286. state = State()
  287. r = ev_logical_clock_ordering(state)
  288. r.play()
  289. self.assertTrue(pickle.loads(pickle.dumps(state)))
  290. def test_task_logical_clock_ordering(self):
  291. state = State()
  292. r = ev_logical_clock_ordering(state)
  293. tA, tB, tC = r.uids
  294. r.play()
  295. now = list(state.tasks_by_time())
  296. self.assertEqual(now[0][0], tA)
  297. self.assertEqual(now[1][0], tC)
  298. self.assertEqual(now[2][0], tB)
  299. for _ in range(1000):
  300. shuffle(r.uids)
  301. tA, tB, tC = r.uids
  302. r.rewind_with_offset(r.current_clock + 1, r.uids)
  303. r.play()
  304. now = list(state.tasks_by_time())
  305. self.assertEqual(now[0][0], tA)
  306. self.assertEqual(now[1][0], tC)
  307. self.assertEqual(now[2][0], tB)
  308. @skip.todo(reason='not working')
  309. def test_task_descending_clock_ordering(self):
  310. state = State()
  311. r = ev_logical_clock_ordering(state)
  312. tA, tB, tC = r.uids
  313. r.play()
  314. now = list(state.tasks_by_time(reverse=False))
  315. self.assertEqual(now[0][0], tA)
  316. self.assertEqual(now[1][0], tB)
  317. self.assertEqual(now[2][0], tC)
  318. for _ in range(1000):
  319. shuffle(r.uids)
  320. tA, tB, tC = r.uids
  321. r.rewind_with_offset(r.current_clock + 1, r.uids)
  322. r.play()
  323. now = list(state.tasks_by_time(reverse=False))
  324. self.assertEqual(now[0][0], tB)
  325. self.assertEqual(now[1][0], tC)
  326. self.assertEqual(now[2][0], tA)
  327. def test_get_or_create_task(self):
  328. state = State()
  329. task, created = state.get_or_create_task('id1')
  330. self.assertEqual(task.uuid, 'id1')
  331. self.assertTrue(created)
  332. task2, created2 = state.get_or_create_task('id1')
  333. self.assertIs(task2, task)
  334. self.assertFalse(created2)
  335. def test_get_or_create_worker(self):
  336. state = State()
  337. worker, created = state.get_or_create_worker('george@vandelay.com')
  338. self.assertEqual(worker.hostname, 'george@vandelay.com')
  339. self.assertTrue(created)
  340. worker2, created2 = state.get_or_create_worker('george@vandelay.com')
  341. self.assertIs(worker2, worker)
  342. self.assertFalse(created2)
  343. def test_get_or_create_worker__with_defaults(self):
  344. state = State()
  345. worker, created = state.get_or_create_worker(
  346. 'george@vandelay.com', pid=30,
  347. )
  348. self.assertEqual(worker.hostname, 'george@vandelay.com')
  349. self.assertEqual(worker.pid, 30)
  350. self.assertTrue(created)
  351. worker2, created2 = state.get_or_create_worker(
  352. 'george@vandelay.com', pid=40,
  353. )
  354. self.assertIs(worker2, worker)
  355. self.assertEqual(worker2.pid, 40)
  356. self.assertFalse(created2)
  357. def test_worker_online_offline(self):
  358. r = ev_worker_online_offline(State())
  359. next(r)
  360. self.assertTrue(list(r.state.alive_workers()))
  361. self.assertTrue(r.state.workers['utest1'].alive)
  362. r.play()
  363. self.assertFalse(list(r.state.alive_workers()))
  364. self.assertFalse(r.state.workers['utest1'].alive)
  365. def test_itertasks(self):
  366. s = State()
  367. s.tasks = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'}
  368. self.assertEqual(len(list(s.itertasks(limit=2))), 2)
  369. def test_worker_heartbeat_expire(self):
  370. r = ev_worker_heartbeats(State())
  371. next(r)
  372. self.assertFalse(list(r.state.alive_workers()))
  373. self.assertFalse(r.state.workers['utest1'].alive)
  374. r.play()
  375. self.assertTrue(list(r.state.alive_workers()))
  376. self.assertTrue(r.state.workers['utest1'].alive)
  377. def test_task_states(self):
  378. r = ev_task_states(State())
  379. # RECEIVED
  380. next(r)
  381. self.assertIn(r.tid, r.state.tasks)
  382. task = r.state.tasks[r.tid]
  383. self.assertEqual(task.state, states.RECEIVED)
  384. self.assertTrue(task.received)
  385. self.assertEqual(task.timestamp, task.received)
  386. self.assertEqual(task.worker.hostname, 'utest1')
  387. # STARTED
  388. next(r)
  389. self.assertTrue(r.state.workers['utest1'].alive,
  390. 'any task event adds worker heartbeat')
  391. self.assertEqual(task.state, states.STARTED)
  392. self.assertTrue(task.started)
  393. self.assertEqual(task.timestamp, task.started)
  394. self.assertEqual(task.worker.hostname, 'utest1')
  395. # REVOKED
  396. next(r)
  397. self.assertEqual(task.state, states.REVOKED)
  398. self.assertTrue(task.revoked)
  399. self.assertEqual(task.timestamp, task.revoked)
  400. self.assertEqual(task.worker.hostname, 'utest1')
  401. # RETRY
  402. next(r)
  403. self.assertEqual(task.state, states.RETRY)
  404. self.assertTrue(task.retried)
  405. self.assertEqual(task.timestamp, task.retried)
  406. self.assertEqual(task.worker.hostname, 'utest1')
  407. self.assertEqual(task.exception, "KeyError('bar')")
  408. self.assertEqual(task.traceback, 'line 2 at main')
  409. # FAILURE
  410. next(r)
  411. self.assertEqual(task.state, states.FAILURE)
  412. self.assertTrue(task.failed)
  413. self.assertEqual(task.timestamp, task.failed)
  414. self.assertEqual(task.worker.hostname, 'utest1')
  415. self.assertEqual(task.exception, "KeyError('foo')")
  416. self.assertEqual(task.traceback, 'line 1 at main')
  417. # SUCCESS
  418. next(r)
  419. self.assertEqual(task.state, states.SUCCESS)
  420. self.assertTrue(task.succeeded)
  421. self.assertEqual(task.timestamp, task.succeeded)
  422. self.assertEqual(task.worker.hostname, 'utest1')
  423. self.assertEqual(task.result, '4')
  424. self.assertEqual(task.runtime, 0.1234)
  425. # children, parent, root
  426. r.play()
  427. self.assertIn(r.tid2, r.state.tasks)
  428. task2 = r.state.tasks[r.tid2]
  429. self.assertIs(task2.parent, task)
  430. self.assertIs(task2.root, task)
  431. self.assertIn(task2, task.children)
  432. def test_task_children_set_if_received_in_wrong_order(self):
  433. r = ev_task_states(State())
  434. r.events.insert(0, r.events.pop())
  435. r.play()
  436. self.assertIn(r.state.tasks[r.tid2], r.state.tasks[r.tid].children)
  437. self.assertIs(r.state.tasks[r.tid2].root, r.state.tasks[r.tid])
  438. self.assertIs(r.state.tasks[r.tid2].parent, r.state.tasks[r.tid])
  439. def assertStateEmpty(self, state):
  440. self.assertFalse(state.tasks)
  441. self.assertFalse(state.workers)
  442. self.assertFalse(state.event_count)
  443. self.assertFalse(state.task_count)
  444. def assertState(self, state):
  445. self.assertTrue(state.tasks)
  446. self.assertTrue(state.workers)
  447. self.assertTrue(state.event_count)
  448. self.assertTrue(state.task_count)
  449. def test_freeze_while(self):
  450. s = State()
  451. r = ev_snapshot(s)
  452. r.play()
  453. def work():
  454. pass
  455. s.freeze_while(work, clear_after=True)
  456. self.assertFalse(s.event_count)
  457. s2 = State()
  458. r = ev_snapshot(s2)
  459. r.play()
  460. s2.freeze_while(work, clear_after=False)
  461. self.assertTrue(s2.event_count)
  462. def test_clear_tasks(self):
  463. s = State()
  464. r = ev_snapshot(s)
  465. r.play()
  466. self.assertTrue(s.tasks)
  467. s.clear_tasks(ready=False)
  468. self.assertFalse(s.tasks)
  469. def test_clear(self):
  470. r = ev_snapshot(State())
  471. r.play()
  472. self.assertTrue(r.state.event_count)
  473. self.assertTrue(r.state.workers)
  474. self.assertTrue(r.state.tasks)
  475. self.assertTrue(r.state.task_count)
  476. r.state.clear()
  477. self.assertFalse(r.state.event_count)
  478. self.assertFalse(r.state.workers)
  479. self.assertTrue(r.state.tasks)
  480. self.assertFalse(r.state.task_count)
  481. r.state.clear(False)
  482. self.assertFalse(r.state.tasks)
  483. def test_task_types(self):
  484. r = ev_snapshot(State())
  485. r.play()
  486. self.assertEqual(sorted(r.state.task_types()), ['task1', 'task2'])
  487. def test_tasks_by_time(self):
  488. r = ev_snapshot(State())
  489. r.play()
  490. self.assertEqual(len(list(r.state.tasks_by_time())), 20)
  491. self.assertEqual(len(list(r.state.tasks_by_time(reverse=False))), 20)
  492. def test_tasks_by_type(self):
  493. r = ev_snapshot(State())
  494. r.play()
  495. self.assertEqual(len(list(r.state.tasks_by_type('task1'))), 10)
  496. self.assertEqual(len(list(r.state.tasks_by_type('task2'))), 10)
  497. self.assertEqual(len(r.state.tasks_by_type['task1']), 10)
  498. self.assertEqual(len(r.state.tasks_by_type['task2']), 10)
  499. def test_alive_workers(self):
  500. r = ev_snapshot(State())
  501. r.play()
  502. self.assertEqual(len(list(r.state.alive_workers())), 3)
  503. def test_tasks_by_worker(self):
  504. r = ev_snapshot(State())
  505. r.play()
  506. self.assertEqual(len(list(r.state.tasks_by_worker('utest1'))), 10)
  507. self.assertEqual(len(list(r.state.tasks_by_worker('utest2'))), 10)
  508. self.assertEqual(len(r.state.tasks_by_worker['utest1']), 10)
  509. self.assertEqual(len(r.state.tasks_by_worker['utest2']), 10)
  510. def test_survives_unknown_worker_event(self):
  511. s = State()
  512. s.event({
  513. 'type': 'worker-unknown-event-xxx',
  514. 'foo': 'bar',
  515. })
  516. s.event({
  517. 'type': 'worker-unknown-event-xxx',
  518. 'hostname': 'xxx',
  519. 'foo': 'bar',
  520. })
  521. def test_survives_unknown_worker_leaving(self):
  522. s = State(on_node_leave=Mock(name='on_node_leave'))
  523. (worker, created), subject = s.event({
  524. 'type': 'worker-offline',
  525. 'hostname': 'unknown@vandelay.com',
  526. 'timestamp': time(),
  527. 'local_received': time(),
  528. 'clock': 301030134894833,
  529. })
  530. self.assertEqual(worker, Worker('unknown@vandelay.com'))
  531. self.assertFalse(created)
  532. self.assertEqual(subject, 'offline')
  533. self.assertNotIn('unknown@vandelay.com', s.workers)
  534. s.on_node_leave.assert_called_with(worker)
  535. def test_on_node_join_callback(self):
  536. s = State(on_node_join=Mock(name='on_node_join'))
  537. (worker, created), subject = s.event({
  538. 'type': 'worker-online',
  539. 'hostname': 'george@vandelay.com',
  540. 'timestamp': time(),
  541. 'local_received': time(),
  542. 'clock': 34314,
  543. })
  544. self.assertTrue(worker)
  545. self.assertTrue(created)
  546. self.assertEqual(subject, 'online')
  547. self.assertIn('george@vandelay.com', s.workers)
  548. s.on_node_join.assert_called_with(worker)
  549. def test_survives_unknown_task_event(self):
  550. s = State()
  551. s.event(
  552. {
  553. 'type': 'task-unknown-event-xxx',
  554. 'foo': 'bar',
  555. 'uuid': 'x',
  556. 'hostname': 'y',
  557. 'timestamp': time(),
  558. 'local_received': time(),
  559. 'clock': 0,
  560. },
  561. )
  562. def test_limits_maxtasks(self):
  563. s = State(max_tasks_in_memory=1)
  564. s.heap_multiplier = 2
  565. s.event({
  566. 'type': 'task-unknown-event-xxx',
  567. 'foo': 'bar',
  568. 'uuid': 'x',
  569. 'hostname': 'y',
  570. 'clock': 3,
  571. 'timestamp': time(),
  572. 'local_received': time(),
  573. })
  574. s.event({
  575. 'type': 'task-unknown-event-xxx',
  576. 'foo': 'bar',
  577. 'uuid': 'y',
  578. 'hostname': 'y',
  579. 'clock': 4,
  580. 'timestamp': time(),
  581. 'local_received': time(),
  582. })
  583. s.event({
  584. 'type': 'task-unknown-event-xxx',
  585. 'foo': 'bar',
  586. 'uuid': 'z',
  587. 'hostname': 'y',
  588. 'clock': 5,
  589. 'timestamp': time(),
  590. 'local_received': time(),
  591. })
  592. self.assertEqual(len(s._taskheap), 2)
  593. self.assertEqual(s._taskheap[0].clock, 4)
  594. self.assertEqual(s._taskheap[1].clock, 5)
  595. s._taskheap.append(s._taskheap[0])
  596. self.assertTrue(list(s.tasks_by_time()))
  597. def test_callback(self):
  598. scratch = {}
  599. def callback(state, event):
  600. scratch['recv'] = True
  601. s = State(callback=callback)
  602. s.event({'type': 'worker-online'})
  603. self.assertTrue(scratch.get('recv'))