test_state.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. from __future__ import absolute_import, unicode_literals
  2. import pickle
  3. from decimal import Decimal
  4. from random import shuffle
  5. from time import time
  6. from itertools import count
  7. from celery import states
  8. from celery import uuid
  9. from celery.events import Event
  10. from celery.events.state import (
  11. HEARTBEAT_EXPIRE_WINDOW,
  12. HEARTBEAT_DRIFT_MAX,
  13. State,
  14. Worker,
  15. Task,
  16. heartbeat_expires,
  17. )
  18. from celery.tests.case import AppCase, Mock, patch, skip
  19. class replay:
  20. def __init__(self, state):
  21. self.state = state
  22. self.rewind()
  23. self.setup()
  24. self.current_clock = 0
  25. def setup(self):
  26. pass
  27. def next_event(self):
  28. ev = self.events[next(self.position)]
  29. ev['local_received'] = ev['timestamp']
  30. try:
  31. self.current_clock = ev['clock']
  32. except KeyError:
  33. ev['clock'] = self.current_clock = self.current_clock + 1
  34. return ev
  35. def __iter__(self):
  36. return self
  37. def __next__(self):
  38. try:
  39. self.state.event(self.next_event())
  40. except IndexError:
  41. raise StopIteration()
  42. next = __next__
  43. def rewind(self):
  44. self.position = count(0)
  45. return self
  46. def play(self):
  47. for _ in self:
  48. pass
  49. class ev_worker_online_offline(replay):
  50. def setup(self):
  51. self.events = [
  52. Event('worker-online', hostname='utest1'),
  53. Event('worker-offline', hostname='utest1'),
  54. ]
  55. class ev_worker_heartbeats(replay):
  56. def setup(self):
  57. self.events = [
  58. Event('worker-heartbeat', hostname='utest1',
  59. timestamp=time() - HEARTBEAT_EXPIRE_WINDOW * 2),
  60. Event('worker-heartbeat', hostname='utest1'),
  61. ]
  62. class ev_task_states(replay):
  63. def setup(self):
  64. tid = self.tid = uuid()
  65. tid2 = self.tid2 = uuid()
  66. self.events = [
  67. Event('task-received', uuid=tid, name='task1',
  68. args='(2, 2)', kwargs="{'foo': 'bar'}",
  69. retries=0, eta=None, hostname='utest1'),
  70. Event('task-started', uuid=tid, hostname='utest1'),
  71. Event('task-revoked', uuid=tid, hostname='utest1'),
  72. Event('task-retried', uuid=tid, exception="KeyError('bar')",
  73. traceback='line 2 at main', hostname='utest1'),
  74. Event('task-failed', uuid=tid, exception="KeyError('foo')",
  75. traceback='line 1 at main', hostname='utest1'),
  76. Event('task-succeeded', uuid=tid, result='4',
  77. runtime=0.1234, hostname='utest1'),
  78. Event('foo-bar'),
  79. Event('task-received', uuid=tid2, name='task2',
  80. args='(4, 4)', kwargs="{'foo': 'bar'}",
  81. retries=0, eta=None, parent_id=tid, root_id=tid,
  82. hostname='utest1'),
  83. ]
  84. def QTEV(type, uuid, hostname, clock, name=None, timestamp=None):
  85. """Quick task event."""
  86. return Event('task-{0}'.format(type), uuid=uuid, hostname=hostname,
  87. clock=clock, name=name, timestamp=timestamp or time())
  88. class ev_logical_clock_ordering(replay):
  89. def __init__(self, state, offset=0, uids=None):
  90. self.offset = offset or 0
  91. self.uids = self.setuids(uids)
  92. super(ev_logical_clock_ordering, self).__init__(state)
  93. def setuids(self, uids):
  94. uids = self.tA, self.tB, self.tC = uids or [uuid(), uuid(), uuid()]
  95. return uids
  96. def setup(self):
  97. offset = self.offset
  98. tA, tB, tC = self.uids
  99. self.events = [
  100. QTEV('received', tA, 'w1', name='tA', clock=offset + 1),
  101. QTEV('received', tB, 'w2', name='tB', clock=offset + 1),
  102. QTEV('started', tA, 'w1', name='tA', clock=offset + 3),
  103. QTEV('received', tC, 'w2', name='tC', clock=offset + 3),
  104. QTEV('started', tB, 'w2', name='tB', clock=offset + 5),
  105. QTEV('retried', tA, 'w1', name='tA', clock=offset + 7),
  106. QTEV('succeeded', tB, 'w2', name='tB', clock=offset + 9),
  107. QTEV('started', tC, 'w2', name='tC', clock=offset + 10),
  108. QTEV('received', tA, 'w3', name='tA', clock=offset + 13),
  109. QTEV('succeded', tC, 'w2', name='tC', clock=offset + 12),
  110. QTEV('started', tA, 'w3', name='tA', clock=offset + 14),
  111. QTEV('succeeded', tA, 'w3', name='TA', clock=offset + 16),
  112. ]
  113. def rewind_with_offset(self, offset, uids=None):
  114. self.offset = offset
  115. self.uids = self.setuids(uids or self.uids)
  116. self.setup()
  117. self.rewind()
  118. class ev_snapshot(replay):
  119. def setup(self):
  120. self.events = [
  121. Event('worker-online', hostname='utest1'),
  122. Event('worker-online', hostname='utest2'),
  123. Event('worker-online', hostname='utest3'),
  124. ]
  125. for i in range(20):
  126. worker = not i % 2 and 'utest2' or 'utest1'
  127. type = not i % 2 and 'task2' or 'task1'
  128. self.events.append(Event('task-received', name=type,
  129. uuid=uuid(), hostname=worker))
  130. class test_Worker(AppCase):
  131. def test_equality(self):
  132. self.assertEqual(Worker(hostname='foo').hostname, 'foo')
  133. self.assertEqual(
  134. Worker(hostname='foo'), Worker(hostname='foo'),
  135. )
  136. self.assertNotEqual(
  137. Worker(hostname='foo'), Worker(hostname='bar'),
  138. )
  139. self.assertEqual(
  140. hash(Worker(hostname='foo')), hash(Worker(hostname='foo')),
  141. )
  142. self.assertNotEqual(
  143. hash(Worker(hostname='foo')), hash(Worker(hostname='bar')),
  144. )
  145. def test_heartbeat_expires__Decimal(self):
  146. self.assertEqual(
  147. heartbeat_expires(Decimal(344313.37), freq=60, expire_window=200),
  148. 344433.37,
  149. )
  150. def test_compatible_with_Decimal(self):
  151. w = Worker('george@vandelay.com')
  152. timestamp, local_received = Decimal(time()), time()
  153. w.event('worker-online', timestamp, local_received, fields={
  154. 'hostname': 'george@vandelay.com',
  155. 'timestamp': timestamp,
  156. 'local_received': local_received,
  157. 'freq': Decimal(5.6335431),
  158. })
  159. self.assertTrue(w.alive)
  160. def test_eq_ne_other(self):
  161. self.assertEqual(Worker('a@b.com'), Worker('a@b.com'))
  162. self.assertNotEqual(Worker('a@b.com'), Worker('b@b.com'))
  163. self.assertNotEqual(Worker('a@b.com'), object())
  164. def test_reduce_direct(self):
  165. w = Worker('george@vandelay.com')
  166. w.event('worker-online', 10.0, 13.0, fields={
  167. 'hostname': 'george@vandelay.com',
  168. 'timestamp': 10.0,
  169. 'local_received': 13.0,
  170. 'freq': 60,
  171. })
  172. fun, args = w.__reduce__()
  173. w2 = fun(*args)
  174. self.assertEqual(w2.hostname, w.hostname)
  175. self.assertEqual(w2.pid, w.pid)
  176. self.assertEqual(w2.freq, w.freq)
  177. self.assertEqual(w2.heartbeats, w.heartbeats)
  178. self.assertEqual(w2.clock, w.clock)
  179. self.assertEqual(w2.active, w.active)
  180. self.assertEqual(w2.processed, w.processed)
  181. self.assertEqual(w2.loadavg, w.loadavg)
  182. self.assertEqual(w2.sw_ident, w.sw_ident)
  183. def test_update(self):
  184. w = Worker('george@vandelay.com')
  185. w.update({'idx': '301'}, foo=1, clock=30, bah='foo')
  186. self.assertEqual(w.idx, '301')
  187. self.assertEqual(w.foo, 1)
  188. self.assertEqual(w.clock, 30)
  189. self.assertEqual(w.bah, 'foo')
  190. def test_survives_missing_timestamp(self):
  191. worker = Worker(hostname='foo')
  192. worker.event('heartbeat')
  193. self.assertEqual(worker.heartbeats, [])
  194. def test_repr(self):
  195. self.assertTrue(repr(Worker(hostname='foo')))
  196. def test_drift_warning(self):
  197. worker = Worker(hostname='foo')
  198. with patch('celery.events.state.warn') as warn:
  199. worker.event(None, time() + (HEARTBEAT_DRIFT_MAX * 2), time())
  200. warn.assert_called()
  201. self.assertIn('Substantial drift', warn.call_args[0][0])
  202. def test_updates_heartbeat(self):
  203. worker = Worker(hostname='foo')
  204. worker.event(None, time(), time())
  205. self.assertEqual(len(worker.heartbeats), 1)
  206. h1 = worker.heartbeats[0]
  207. worker.event(None, time(), time() - 10)
  208. self.assertEqual(len(worker.heartbeats), 2)
  209. self.assertEqual(worker.heartbeats[-1], h1)
  210. class test_Task(AppCase):
  211. def test_equality(self):
  212. self.assertEqual(Task(uuid='foo').uuid, 'foo')
  213. self.assertEqual(
  214. Task(uuid='foo'), Task(uuid='foo'),
  215. )
  216. self.assertNotEqual(
  217. Task(uuid='foo'), Task(uuid='bar'),
  218. )
  219. self.assertEqual(
  220. hash(Task(uuid='foo')), hash(Task(uuid='foo')),
  221. )
  222. self.assertNotEqual(
  223. hash(Task(uuid='foo')), hash(Task(uuid='bar')),
  224. )
  225. def test_info(self):
  226. task = Task(uuid='abcdefg',
  227. name='tasks.add',
  228. args='(2, 2)',
  229. kwargs='{}',
  230. retries=2,
  231. result=42,
  232. eta=1,
  233. runtime=0.0001,
  234. expires=1,
  235. parent_id='bdefc',
  236. root_id='dedfef',
  237. foo=None,
  238. exception=1,
  239. received=time() - 10,
  240. started=time() - 8,
  241. exchange='celery',
  242. routing_key='celery',
  243. succeeded=time())
  244. self.assertEqual(sorted(list(task._info_fields)),
  245. sorted(task.info().keys()))
  246. self.assertEqual(sorted(list(task._info_fields + ('received',))),
  247. sorted(task.info(extra=('received',))))
  248. self.assertEqual(sorted(['args', 'kwargs']),
  249. sorted(task.info(['args', 'kwargs']).keys()))
  250. self.assertFalse(list(task.info('foo')))
  251. def test_reduce_direct(self):
  252. task = Task(uuid='uuid', name='tasks.add', args='(2, 2)')
  253. fun, args = task.__reduce__()
  254. task2 = fun(*args)
  255. self.assertEqual(task, task2)
  256. def test_ready(self):
  257. task = Task(uuid='abcdefg',
  258. name='tasks.add')
  259. task.event('received', time(), time())
  260. self.assertFalse(task.ready)
  261. task.event('succeeded', time(), time())
  262. self.assertTrue(task.ready)
  263. def test_sent(self):
  264. task = Task(uuid='abcdefg',
  265. name='tasks.add')
  266. task.event('sent', time(), time())
  267. self.assertEqual(task.state, states.PENDING)
  268. def test_merge(self):
  269. task = Task()
  270. task.event('failed', time(), time())
  271. task.event('started', time(), time())
  272. task.event('received', time(), time(), {
  273. 'name': 'tasks.add', 'args': (2, 2),
  274. })
  275. self.assertEqual(task.state, states.FAILURE)
  276. self.assertEqual(task.name, 'tasks.add')
  277. self.assertTupleEqual(task.args, (2, 2))
  278. task.event('retried', time(), time())
  279. self.assertEqual(task.state, states.RETRY)
  280. def test_repr(self):
  281. self.assertTrue(repr(Task(uuid='xxx', name='tasks.add')))
  282. class test_State(AppCase):
  283. def test_repr(self):
  284. self.assertTrue(repr(State()))
  285. def test_pickleable(self):
  286. state = State()
  287. r = ev_logical_clock_ordering(state)
  288. r.play()
  289. self.assertTrue(pickle.loads(pickle.dumps(state)))
  290. def test_task_logical_clock_ordering(self):
  291. state = State()
  292. r = ev_logical_clock_ordering(state)
  293. tA, tB, tC = r.uids
  294. r.play()
  295. now = list(state.tasks_by_time())
  296. self.assertEqual(now[0][0], tA)
  297. self.assertEqual(now[1][0], tC)
  298. self.assertEqual(now[2][0], tB)
  299. for _ in range(1000):
  300. shuffle(r.uids)
  301. tA, tB, tC = r.uids
  302. r.rewind_with_offset(r.current_clock + 1, r.uids)
  303. r.play()
  304. now = list(state.tasks_by_time())
  305. self.assertEqual(now[0][0], tA)
  306. self.assertEqual(now[1][0], tC)
  307. self.assertEqual(now[2][0], tB)
  308. @skip.todo(reason='not working')
  309. def test_task_descending_clock_ordering(self):
  310. state = State()
  311. r = ev_logical_clock_ordering(state)
  312. tA, tB, tC = r.uids
  313. r.play()
  314. now = list(state.tasks_by_time(reverse=False))
  315. self.assertEqual(now[0][0], tA)
  316. self.assertEqual(now[1][0], tB)
  317. self.assertEqual(now[2][0], tC)
  318. for _ in range(1000):
  319. shuffle(r.uids)
  320. tA, tB, tC = r.uids
  321. r.rewind_with_offset(r.current_clock + 1, r.uids)
  322. r.play()
  323. now = list(state.tasks_by_time(reverse=False))
  324. self.assertEqual(now[0][0], tB)
  325. self.assertEqual(now[1][0], tC)
  326. self.assertEqual(now[2][0], tA)
  327. def test_get_or_create_task(self):
  328. state = State()
  329. task, created = state.get_or_create_task('id1')
  330. self.assertEqual(task.uuid, 'id1')
  331. self.assertTrue(created)
  332. task2, created2 = state.get_or_create_task('id1')
  333. self.assertIs(task2, task)
  334. self.assertFalse(created2)
  335. def test_get_or_create_worker(self):
  336. state = State()
  337. worker, created = state.get_or_create_worker('george@vandelay.com')
  338. self.assertEqual(worker.hostname, 'george@vandelay.com')
  339. self.assertTrue(created)
  340. worker2, created2 = state.get_or_create_worker('george@vandelay.com')
  341. self.assertIs(worker2, worker)
  342. self.assertFalse(created2)
  343. def test_get_or_create_worker__with_defaults(self):
  344. state = State()
  345. worker, created = state.get_or_create_worker(
  346. 'george@vandelay.com', pid=30,
  347. )
  348. self.assertEqual(worker.hostname, 'george@vandelay.com')
  349. self.assertEqual(worker.pid, 30)
  350. self.assertTrue(created)
  351. worker2, created2 = state.get_or_create_worker(
  352. 'george@vandelay.com', pid=40,
  353. )
  354. self.assertIs(worker2, worker)
  355. self.assertEqual(worker2.pid, 40)
  356. self.assertFalse(created2)
  357. def test_worker_online_offline(self):
  358. r = ev_worker_online_offline(State())
  359. next(r)
  360. self.assertTrue(list(r.state.alive_workers()))
  361. self.assertTrue(r.state.workers['utest1'].alive)
  362. r.play()
  363. self.assertFalse(list(r.state.alive_workers()))
  364. self.assertFalse(r.state.workers['utest1'].alive)
  365. def test_itertasks(self):
  366. s = State()
  367. s.tasks = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'}
  368. self.assertEqual(len(list(s.itertasks(limit=2))), 2)
  369. def test_worker_heartbeat_expire(self):
  370. r = ev_worker_heartbeats(State())
  371. next(r)
  372. self.assertFalse(list(r.state.alive_workers()))
  373. self.assertFalse(r.state.workers['utest1'].alive)
  374. r.play()
  375. self.assertTrue(list(r.state.alive_workers()))
  376. self.assertTrue(r.state.workers['utest1'].alive)
  377. def test_task_states(self):
  378. r = ev_task_states(State())
  379. # RECEIVED
  380. next(r)
  381. self.assertIn(r.tid, r.state.tasks)
  382. task = r.state.tasks[r.tid]
  383. self.assertEqual(task.state, states.RECEIVED)
  384. self.assertTrue(task.received)
  385. self.assertEqual(task.timestamp, task.received)
  386. self.assertEqual(task.worker.hostname, 'utest1')
  387. # STARTED
  388. next(r)
  389. self.assertTrue(r.state.workers['utest1'].alive,
  390. 'any task event adds worker heartbeat')
  391. self.assertEqual(task.state, states.STARTED)
  392. self.assertTrue(task.started)
  393. self.assertEqual(task.timestamp, task.started)
  394. self.assertEqual(task.worker.hostname, 'utest1')
  395. # REVOKED
  396. next(r)
  397. self.assertEqual(task.state, states.REVOKED)
  398. self.assertTrue(task.revoked)
  399. self.assertEqual(task.timestamp, task.revoked)
  400. self.assertEqual(task.worker.hostname, 'utest1')
  401. # RETRY
  402. next(r)
  403. self.assertEqual(task.state, states.RETRY)
  404. self.assertTrue(task.retried)
  405. self.assertEqual(task.timestamp, task.retried)
  406. self.assertEqual(task.worker.hostname, 'utest1')
  407. self.assertEqual(task.exception, "KeyError('bar')")
  408. self.assertEqual(task.traceback, 'line 2 at main')
  409. # FAILURE
  410. next(r)
  411. self.assertEqual(task.state, states.FAILURE)
  412. self.assertTrue(task.failed)
  413. self.assertEqual(task.timestamp, task.failed)
  414. self.assertEqual(task.worker.hostname, 'utest1')
  415. self.assertEqual(task.exception, "KeyError('foo')")
  416. self.assertEqual(task.traceback, 'line 1 at main')
  417. # SUCCESS
  418. next(r)
  419. self.assertEqual(task.state, states.SUCCESS)
  420. self.assertTrue(task.succeeded)
  421. self.assertEqual(task.timestamp, task.succeeded)
  422. self.assertEqual(task.worker.hostname, 'utest1')
  423. self.assertEqual(task.result, '4')
  424. self.assertEqual(task.runtime, 0.1234)
  425. # children, parent, root
  426. r.play()
  427. self.assertIn(r.tid2, r.state.tasks)
  428. task2 = r.state.tasks[r.tid2]
  429. self.assertIs(task2.parent, task)
  430. self.assertIs(task2.root, task)
  431. self.assertIn(task2, task.children)
  432. def test_task_children_set_if_received_in_wrong_order(self):
  433. r = ev_task_states(State())
  434. r.events.insert(0, r.events.pop())
  435. r.play()
  436. self.assertIn(r.state.tasks[r.tid2], r.state.tasks[r.tid].children)
  437. self.assertIs(r.state.tasks[r.tid2].root, r.state.tasks[r.tid])
  438. self.assertIs(r.state.tasks[r.tid2].parent, r.state.tasks[r.tid])
  439. def assertStateEmpty(self, state):
  440. self.assertFalse(state.tasks)
  441. self.assertFalse(state.workers)
  442. self.assertFalse(state.event_count)
  443. self.assertFalse(state.task_count)
  444. def assertState(self, state):
  445. self.assertTrue(state.tasks)
  446. self.assertTrue(state.workers)
  447. self.assertTrue(state.event_count)
  448. self.assertTrue(state.task_count)
  449. def test_freeze_while(self):
  450. s = State()
  451. r = ev_snapshot(s)
  452. r.play()
  453. def work():
  454. pass
  455. s.freeze_while(work, clear_after=True)
  456. self.assertFalse(s.event_count)
  457. s2 = State()
  458. r = ev_snapshot(s2)
  459. r.play()
  460. s2.freeze_while(work, clear_after=False)
  461. self.assertTrue(s2.event_count)
  462. def test_clear_tasks(self):
  463. s = State()
  464. r = ev_snapshot(s)
  465. r.play()
  466. self.assertTrue(s.tasks)
  467. s.clear_tasks(ready=False)
  468. self.assertFalse(s.tasks)
  469. def test_clear(self):
  470. r = ev_snapshot(State())
  471. r.play()
  472. self.assertTrue(r.state.event_count)
  473. self.assertTrue(r.state.workers)
  474. self.assertTrue(r.state.tasks)
  475. self.assertTrue(r.state.task_count)
  476. r.state.clear()
  477. self.assertFalse(r.state.event_count)
  478. self.assertFalse(r.state.workers)
  479. self.assertTrue(r.state.tasks)
  480. self.assertFalse(r.state.task_count)
  481. r.state.clear(False)
  482. self.assertFalse(r.state.tasks)
  483. def test_task_types(self):
  484. r = ev_snapshot(State())
  485. r.play()
  486. self.assertEqual(sorted(r.state.task_types()), ['task1', 'task2'])
  487. def test_tasks_by_time(self):
  488. r = ev_snapshot(State())
  489. r.play()
  490. self.assertEqual(len(list(r.state.tasks_by_time())), 20)
  491. self.assertEqual(len(list(r.state.tasks_by_time(reverse=False))), 20)
  492. def test_tasks_by_type(self):
  493. r = ev_snapshot(State())
  494. r.play()
  495. self.assertEqual(len(list(r.state.tasks_by_type('task1'))), 10)
  496. self.assertEqual(len(list(r.state.tasks_by_type('task2'))), 10)
  497. self.assertEqual(len(r.state.tasks_by_type['task1']), 10)
  498. self.assertEqual(len(r.state.tasks_by_type['task2']), 10)
  499. def test_alive_workers(self):
  500. r = ev_snapshot(State())
  501. r.play()
  502. self.assertEqual(len(list(r.state.alive_workers())), 3)
  503. def test_tasks_by_worker(self):
  504. r = ev_snapshot(State())
  505. r.play()
  506. self.assertEqual(len(list(r.state.tasks_by_worker('utest1'))), 10)
  507. self.assertEqual(len(list(r.state.tasks_by_worker('utest2'))), 10)
  508. self.assertEqual(len(r.state.tasks_by_worker['utest1']), 10)
  509. self.assertEqual(len(r.state.tasks_by_worker['utest2']), 10)
  510. def test_survives_unknown_worker_event(self):
  511. s = State()
  512. s.event({
  513. 'type': 'worker-unknown-event-xxx',
  514. 'foo': 'bar',
  515. })
  516. s.event({
  517. 'type': 'worker-unknown-event-xxx',
  518. 'hostname': 'xxx',
  519. 'foo': 'bar',
  520. })
  521. def test_survives_unknown_worker_leaving(self):
  522. s = State(on_node_leave=Mock(name='on_node_leave'))
  523. (worker, created), subject = s.event({
  524. 'type': 'worker-offline',
  525. 'hostname': 'unknown@vandelay.com',
  526. 'timestamp': time(),
  527. 'local_received': time(),
  528. 'clock': 301030134894833,
  529. })
  530. self.assertEqual(worker, Worker('unknown@vandelay.com'))
  531. self.assertFalse(created)
  532. self.assertEqual(subject, 'offline')
  533. self.assertNotIn('unknown@vandelay.com', s.workers)
  534. s.on_node_leave.assert_called_with(worker)
  535. def test_on_node_join_callback(self):
  536. s = State(on_node_join=Mock(name='on_node_join'))
  537. (worker, created), subject = s.event({
  538. 'type': 'worker-online',
  539. 'hostname': 'george@vandelay.com',
  540. 'timestamp': time(),
  541. 'local_received': time(),
  542. 'clock': 34314,
  543. })
  544. self.assertTrue(worker)
  545. self.assertTrue(created)
  546. self.assertEqual(subject, 'online')
  547. self.assertIn('george@vandelay.com', s.workers)
  548. s.on_node_join.assert_called_with(worker)
  549. def test_survives_unknown_task_event(self):
  550. s = State()
  551. s.event(
  552. {
  553. 'type': 'task-unknown-event-xxx',
  554. 'foo': 'bar',
  555. 'uuid': 'x',
  556. 'hostname': 'y',
  557. 'timestamp': time(),
  558. 'local_received': time(),
  559. 'clock': 0,
  560. },
  561. )
  562. def test_limits_maxtasks(self):
  563. s = State(max_tasks_in_memory=1)
  564. s.heap_multiplier = 2
  565. s.event({
  566. 'type': 'task-unknown-event-xxx',
  567. 'foo': 'bar',
  568. 'uuid': 'x',
  569. 'hostname': 'y',
  570. 'clock': 3,
  571. 'timestamp': time(),
  572. 'local_received': time(),
  573. })
  574. s.event({
  575. 'type': 'task-unknown-event-xxx',
  576. 'foo': 'bar',
  577. 'uuid': 'y',
  578. 'hostname': 'y',
  579. 'clock': 4,
  580. 'timestamp': time(),
  581. 'local_received': time(),
  582. })
  583. s.event({
  584. 'type': 'task-unknown-event-xxx',
  585. 'foo': 'bar',
  586. 'uuid': 'z',
  587. 'hostname': 'y',
  588. 'clock': 5,
  589. 'timestamp': time(),
  590. 'local_received': time(),
  591. })
  592. self.assertEqual(len(s._taskheap), 2)
  593. self.assertEqual(s._taskheap[0].clock, 4)
  594. self.assertEqual(s._taskheap[1].clock, 5)
  595. s._taskheap.append(s._taskheap[0])
  596. self.assertTrue(list(s.tasks_by_time()))
  597. def test_callback(self):
  598. scratch = {}
  599. def callback(state, event):
  600. scratch['recv'] = True
  601. s = State(callback=callback)
  602. s.event({'type': 'worker-online'})
  603. self.assertTrue(scratch.get('recv'))