test_state.py 21 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import pickle
  3. from decimal import Decimal
  4. from itertools import count
  5. from random import shuffle
  6. from time import time
  7. from case import Mock, patch, skip
  8. from celery import states, uuid
  9. from celery.events import Event
  10. from celery.events.state import (HEARTBEAT_DRIFT_MAX, HEARTBEAT_EXPIRE_WINDOW,
  11. State, Task, Worker, heartbeat_expires)
  12. from celery.five import range
  13. class replay(object):
  14. def __init__(self, state):
  15. self.state = state
  16. self.rewind()
  17. self.setup()
  18. self.current_clock = 0
  19. def setup(self):
  20. pass
  21. def next_event(self):
  22. ev = self.events[next(self.position)]
  23. ev['local_received'] = ev['timestamp']
  24. try:
  25. self.current_clock = ev['clock']
  26. except KeyError:
  27. ev['clock'] = self.current_clock = self.current_clock + 1
  28. return ev
  29. def __iter__(self):
  30. return self
  31. def __next__(self):
  32. try:
  33. self.state.event(self.next_event())
  34. except IndexError:
  35. raise StopIteration()
  36. next = __next__
  37. def rewind(self):
  38. self.position = count(0)
  39. return self
  40. def play(self):
  41. for _ in self:
  42. pass
  43. class ev_worker_online_offline(replay):
  44. def setup(self):
  45. self.events = [
  46. Event('worker-online', hostname='utest1'),
  47. Event('worker-offline', hostname='utest1'),
  48. ]
  49. class ev_worker_heartbeats(replay):
  50. def setup(self):
  51. self.events = [
  52. Event('worker-heartbeat', hostname='utest1',
  53. timestamp=time() - HEARTBEAT_EXPIRE_WINDOW * 2),
  54. Event('worker-heartbeat', hostname='utest1'),
  55. ]
  56. class ev_task_states(replay):
  57. def setup(self):
  58. tid = self.tid = uuid()
  59. tid2 = self.tid2 = uuid()
  60. self.events = [
  61. Event('task-received', uuid=tid, name='task1',
  62. args='(2, 2)', kwargs="{'foo': 'bar'}",
  63. retries=0, eta=None, hostname='utest1'),
  64. Event('task-started', uuid=tid, hostname='utest1'),
  65. Event('task-revoked', uuid=tid, hostname='utest1'),
  66. Event('task-retried', uuid=tid, exception="KeyError('bar')",
  67. traceback='line 2 at main', hostname='utest1'),
  68. Event('task-failed', uuid=tid, exception="KeyError('foo')",
  69. traceback='line 1 at main', hostname='utest1'),
  70. Event('task-succeeded', uuid=tid, result='4',
  71. runtime=0.1234, hostname='utest1'),
  72. Event('foo-bar'),
  73. Event('task-received', uuid=tid2, name='task2',
  74. args='(4, 4)', kwargs="{'foo': 'bar'}",
  75. retries=0, eta=None, parent_id=tid, root_id=tid,
  76. hostname='utest1'),
  77. ]
  78. def QTEV(type, uuid, hostname, clock, name=None, timestamp=None):
  79. """Quick task event."""
  80. return Event('task-{0}'.format(type), uuid=uuid, hostname=hostname,
  81. clock=clock, name=name, timestamp=timestamp or time())
  82. class ev_logical_clock_ordering(replay):
  83. def __init__(self, state, offset=0, uids=None):
  84. self.offset = offset or 0
  85. self.uids = self.setuids(uids)
  86. super(ev_logical_clock_ordering, self).__init__(state)
  87. def setuids(self, uids):
  88. uids = self.tA, self.tB, self.tC = uids or [uuid(), uuid(), uuid()]
  89. return uids
  90. def setup(self):
  91. offset = self.offset
  92. tA, tB, tC = self.uids
  93. self.events = [
  94. QTEV('received', tA, 'w1', name='tA', clock=offset + 1),
  95. QTEV('received', tB, 'w2', name='tB', clock=offset + 1),
  96. QTEV('started', tA, 'w1', name='tA', clock=offset + 3),
  97. QTEV('received', tC, 'w2', name='tC', clock=offset + 3),
  98. QTEV('started', tB, 'w2', name='tB', clock=offset + 5),
  99. QTEV('retried', tA, 'w1', name='tA', clock=offset + 7),
  100. QTEV('succeeded', tB, 'w2', name='tB', clock=offset + 9),
  101. QTEV('started', tC, 'w2', name='tC', clock=offset + 10),
  102. QTEV('received', tA, 'w3', name='tA', clock=offset + 13),
  103. QTEV('succeded', tC, 'w2', name='tC', clock=offset + 12),
  104. QTEV('started', tA, 'w3', name='tA', clock=offset + 14),
  105. QTEV('succeeded', tA, 'w3', name='TA', clock=offset + 16),
  106. ]
  107. def rewind_with_offset(self, offset, uids=None):
  108. self.offset = offset
  109. self.uids = self.setuids(uids or self.uids)
  110. self.setup()
  111. self.rewind()
  112. class ev_snapshot(replay):
  113. def setup(self):
  114. self.events = [
  115. Event('worker-online', hostname='utest1'),
  116. Event('worker-online', hostname='utest2'),
  117. Event('worker-online', hostname='utest3'),
  118. ]
  119. for i in range(20):
  120. worker = not i % 2 and 'utest2' or 'utest1'
  121. type = not i % 2 and 'task2' or 'task1'
  122. self.events.append(Event('task-received', name=type,
  123. uuid=uuid(), hostname=worker))
  124. class test_Worker:
  125. def test_equality(self):
  126. assert Worker(hostname='foo').hostname == 'foo'
  127. assert Worker(hostname='foo') == Worker(hostname='foo')
  128. assert Worker(hostname='foo') != Worker(hostname='bar')
  129. assert hash(Worker(hostname='foo')) == hash(Worker(hostname='foo'))
  130. assert hash(Worker(hostname='foo')) != hash(Worker(hostname='bar'))
  131. def test_heartbeat_expires__Decimal(self):
  132. assert heartbeat_expires(
  133. Decimal(344313.37), freq=60, expire_window=200) == 344433.37
  134. def test_compatible_with_Decimal(self):
  135. w = Worker('george@vandelay.com')
  136. timestamp, local_received = Decimal(time()), time()
  137. w.event('worker-online', timestamp, local_received, fields={
  138. 'hostname': 'george@vandelay.com',
  139. 'timestamp': timestamp,
  140. 'local_received': local_received,
  141. 'freq': Decimal(5.6335431),
  142. })
  143. assert w.alive
  144. def test_eq_ne_other(self):
  145. assert Worker('a@b.com') == Worker('a@b.com')
  146. assert Worker('a@b.com') != Worker('b@b.com')
  147. assert Worker('a@b.com') != object()
  148. def test_reduce_direct(self):
  149. w = Worker('george@vandelay.com')
  150. w.event('worker-online', 10.0, 13.0, fields={
  151. 'hostname': 'george@vandelay.com',
  152. 'timestamp': 10.0,
  153. 'local_received': 13.0,
  154. 'freq': 60,
  155. })
  156. fun, args = w.__reduce__()
  157. w2 = fun(*args)
  158. assert w2.hostname == w.hostname
  159. assert w2.pid == w.pid
  160. assert w2.freq == w.freq
  161. assert w2.heartbeats == w.heartbeats
  162. assert w2.clock == w.clock
  163. assert w2.active == w.active
  164. assert w2.processed == w.processed
  165. assert w2.loadavg == w.loadavg
  166. assert w2.sw_ident == w.sw_ident
  167. def test_update(self):
  168. w = Worker('george@vandelay.com')
  169. w.update({'idx': '301'}, foo=1, clock=30, bah='foo')
  170. assert w.idx == '301'
  171. assert w.foo == 1
  172. assert w.clock == 30
  173. assert w.bah == 'foo'
  174. def test_survives_missing_timestamp(self):
  175. worker = Worker(hostname='foo')
  176. worker.event('heartbeat')
  177. assert worker.heartbeats == []
  178. def test_repr(self):
  179. assert repr(Worker(hostname='foo'))
  180. def test_drift_warning(self):
  181. worker = Worker(hostname='foo')
  182. with patch('celery.events.state.warn') as warn:
  183. worker.event(None, time() + (HEARTBEAT_DRIFT_MAX * 2), time())
  184. warn.assert_called()
  185. assert 'Substantial drift' in warn.call_args[0][0]
  186. def test_updates_heartbeat(self):
  187. worker = Worker(hostname='foo')
  188. worker.event(None, time(), time())
  189. assert len(worker.heartbeats) == 1
  190. h1 = worker.heartbeats[0]
  191. worker.event(None, time(), time() - 10)
  192. assert len(worker.heartbeats) == 2
  193. assert worker.heartbeats[-1] == h1
  194. class test_Task:
  195. def test_equality(self):
  196. assert Task(uuid='foo').uuid == 'foo'
  197. assert Task(uuid='foo') == Task(uuid='foo')
  198. assert Task(uuid='foo') != Task(uuid='bar')
  199. assert hash(Task(uuid='foo')) == hash(Task(uuid='foo'))
  200. assert hash(Task(uuid='foo')) != hash(Task(uuid='bar'))
  201. def test_info(self):
  202. task = Task(uuid='abcdefg',
  203. name='tasks.add',
  204. args='(2, 2)',
  205. kwargs='{}',
  206. retries=2,
  207. result=42,
  208. eta=1,
  209. runtime=0.0001,
  210. expires=1,
  211. parent_id='bdefc',
  212. root_id='dedfef',
  213. foo=None,
  214. exception=1,
  215. received=time() - 10,
  216. started=time() - 8,
  217. exchange='celery',
  218. routing_key='celery',
  219. succeeded=time())
  220. assert sorted(list(task._info_fields)) == sorted(task.info().keys())
  221. assert (sorted(list(task._info_fields + ('received',))) ==
  222. sorted(task.info(extra=('received',))))
  223. assert (sorted(['args', 'kwargs']) ==
  224. sorted(task.info(['args', 'kwargs']).keys()))
  225. assert not list(task.info('foo'))
  226. def test_reduce_direct(self):
  227. task = Task(uuid='uuid', name='tasks.add', args='(2, 2)')
  228. fun, args = task.__reduce__()
  229. task2 = fun(*args)
  230. assert task == task2
  231. def test_ready(self):
  232. task = Task(uuid='abcdefg',
  233. name='tasks.add')
  234. task.event('received', time(), time())
  235. assert not task.ready
  236. task.event('succeeded', time(), time())
  237. assert task.ready
  238. def test_sent(self):
  239. task = Task(uuid='abcdefg',
  240. name='tasks.add')
  241. task.event('sent', time(), time())
  242. assert task.state == states.PENDING
  243. def test_merge(self):
  244. task = Task()
  245. task.event('failed', time(), time())
  246. task.event('started', time(), time())
  247. task.event('received', time(), time(), {
  248. 'name': 'tasks.add', 'args': (2, 2),
  249. })
  250. assert task.state == states.FAILURE
  251. assert task.name == 'tasks.add'
  252. assert task.args == (2, 2)
  253. task.event('retried', time(), time())
  254. assert task.state == states.RETRY
  255. def test_repr(self):
  256. assert repr(Task(uuid='xxx', name='tasks.add'))
  257. class test_State:
  258. def test_repr(self):
  259. assert repr(State())
  260. def test_pickleable(self):
  261. state = State()
  262. r = ev_logical_clock_ordering(state)
  263. r.play()
  264. assert pickle.loads(pickle.dumps(state))
  265. def test_task_logical_clock_ordering(self):
  266. state = State()
  267. r = ev_logical_clock_ordering(state)
  268. tA, tB, tC = r.uids
  269. r.play()
  270. now = list(state.tasks_by_time())
  271. assert now[0][0] == tA
  272. assert now[1][0] == tC
  273. assert now[2][0] == tB
  274. for _ in range(1000):
  275. shuffle(r.uids)
  276. tA, tB, tC = r.uids
  277. r.rewind_with_offset(r.current_clock + 1, r.uids)
  278. r.play()
  279. now = list(state.tasks_by_time())
  280. assert now[0][0] == tA
  281. assert now[1][0] == tC
  282. assert now[2][0] == tB
  283. @skip.todo(reason='not working')
  284. def test_task_descending_clock_ordering(self):
  285. state = State()
  286. r = ev_logical_clock_ordering(state)
  287. tA, tB, tC = r.uids
  288. r.play()
  289. now = list(state.tasks_by_time(reverse=False))
  290. assert now[0][0] == tA
  291. assert now[1][0] == tB
  292. assert now[2][0] == tC
  293. for _ in range(1000):
  294. shuffle(r.uids)
  295. tA, tB, tC = r.uids
  296. r.rewind_with_offset(r.current_clock + 1, r.uids)
  297. r.play()
  298. now = list(state.tasks_by_time(reverse=False))
  299. assert now[0][0] == tB
  300. assert now[1][0] == tC
  301. assert now[2][0] == tA
  302. def test_get_or_create_task(self):
  303. state = State()
  304. task, created = state.get_or_create_task('id1')
  305. assert task.uuid == 'id1'
  306. assert created
  307. task2, created2 = state.get_or_create_task('id1')
  308. assert task2 is task
  309. assert not created2
  310. def test_get_or_create_worker(self):
  311. state = State()
  312. worker, created = state.get_or_create_worker('george@vandelay.com')
  313. assert worker.hostname == 'george@vandelay.com'
  314. assert created
  315. worker2, created2 = state.get_or_create_worker('george@vandelay.com')
  316. assert worker2 is worker
  317. assert not created2
  318. def test_get_or_create_worker__with_defaults(self):
  319. state = State()
  320. worker, created = state.get_or_create_worker(
  321. 'george@vandelay.com', pid=30,
  322. )
  323. assert worker.hostname == 'george@vandelay.com'
  324. assert worker.pid == 30
  325. assert created
  326. worker2, created2 = state.get_or_create_worker(
  327. 'george@vandelay.com', pid=40,
  328. )
  329. assert worker2 is worker
  330. assert worker2.pid == 40
  331. assert not created2
  332. def test_worker_online_offline(self):
  333. r = ev_worker_online_offline(State())
  334. next(r)
  335. assert list(r.state.alive_workers())
  336. assert r.state.workers['utest1'].alive
  337. r.play()
  338. assert not list(r.state.alive_workers())
  339. assert not r.state.workers['utest1'].alive
  340. def test_itertasks(self):
  341. s = State()
  342. s.tasks = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'}
  343. assert len(list(s.itertasks(limit=2))) == 2
  344. def test_worker_heartbeat_expire(self):
  345. r = ev_worker_heartbeats(State())
  346. next(r)
  347. assert not list(r.state.alive_workers())
  348. assert not r.state.workers['utest1'].alive
  349. r.play()
  350. assert list(r.state.alive_workers())
  351. assert r.state.workers['utest1'].alive
  352. def test_task_states(self):
  353. r = ev_task_states(State())
  354. # RECEIVED
  355. next(r)
  356. assert r.tid in r.state.tasks
  357. task = r.state.tasks[r.tid]
  358. assert task.state == states.RECEIVED
  359. assert task.received
  360. assert task.timestamp == task.received
  361. assert task.worker.hostname == 'utest1'
  362. # STARTED
  363. next(r)
  364. assert r.state.workers['utest1'].alive
  365. assert task.state == states.STARTED
  366. assert task.started
  367. assert task.timestamp == task.started
  368. assert task.worker.hostname == 'utest1'
  369. # REVOKED
  370. next(r)
  371. assert task.state == states.REVOKED
  372. assert task.revoked
  373. assert task.timestamp == task.revoked
  374. assert task.worker.hostname == 'utest1'
  375. # RETRY
  376. next(r)
  377. assert task.state == states.RETRY
  378. assert task.retried
  379. assert task.timestamp == task.retried
  380. assert task.worker.hostname, 'utest1'
  381. assert task.exception == "KeyError('bar')"
  382. assert task.traceback == 'line 2 at main'
  383. # FAILURE
  384. next(r)
  385. assert task.state == states.FAILURE
  386. assert task.failed
  387. assert task.timestamp == task.failed
  388. assert task.worker.hostname == 'utest1'
  389. assert task.exception == "KeyError('foo')"
  390. assert task.traceback == 'line 1 at main'
  391. # SUCCESS
  392. next(r)
  393. assert task.state == states.SUCCESS
  394. assert task.succeeded
  395. assert task.timestamp == task.succeeded
  396. assert task.worker.hostname == 'utest1'
  397. assert task.result == '4'
  398. assert task.runtime == 0.1234
  399. # children, parent, root
  400. r.play()
  401. assert r.tid2 in r.state.tasks
  402. task2 = r.state.tasks[r.tid2]
  403. assert task2.parent is task
  404. assert task2.root is task
  405. assert task2 in task.children
  406. def test_task_children_set_if_received_in_wrong_order(self):
  407. r = ev_task_states(State())
  408. r.events.insert(0, r.events.pop())
  409. r.play()
  410. assert r.state.tasks[r.tid2] in r.state.tasks[r.tid].children
  411. assert r.state.tasks[r.tid2].root is r.state.tasks[r.tid]
  412. assert r.state.tasks[r.tid2].parent is r.state.tasks[r.tid]
  413. def assertStateEmpty(self, state):
  414. assert not state.tasks
  415. assert not state.workers
  416. assert not state.event_count
  417. assert not state.task_count
  418. def assertState(self, state):
  419. assert state.tasks
  420. assert state.workers
  421. assert state.event_count
  422. assert state.task_count
  423. def test_freeze_while(self):
  424. s = State()
  425. r = ev_snapshot(s)
  426. r.play()
  427. def work():
  428. pass
  429. s.freeze_while(work, clear_after=True)
  430. assert not s.event_count
  431. s2 = State()
  432. r = ev_snapshot(s2)
  433. r.play()
  434. s2.freeze_while(work, clear_after=False)
  435. assert s2.event_count
  436. def test_clear_tasks(self):
  437. s = State()
  438. r = ev_snapshot(s)
  439. r.play()
  440. assert s.tasks
  441. s.clear_tasks(ready=False)
  442. assert not s.tasks
  443. def test_clear(self):
  444. r = ev_snapshot(State())
  445. r.play()
  446. assert r.state.event_count
  447. assert r.state.workers
  448. assert r.state.tasks
  449. assert r.state.task_count
  450. r.state.clear()
  451. assert not r.state.event_count
  452. assert not r.state.workers
  453. assert r.state.tasks
  454. assert not r.state.task_count
  455. r.state.clear(False)
  456. assert not r.state.tasks
  457. def test_task_types(self):
  458. r = ev_snapshot(State())
  459. r.play()
  460. assert sorted(r.state.task_types()) == ['task1', 'task2']
  461. def test_tasks_by_time(self):
  462. r = ev_snapshot(State())
  463. r.play()
  464. assert len(list(r.state.tasks_by_time())) == 20
  465. assert len(list(r.state.tasks_by_time(reverse=False))) == 20
  466. def test_tasks_by_type(self):
  467. r = ev_snapshot(State())
  468. r.play()
  469. assert len(list(r.state.tasks_by_type('task1'))) == 10
  470. assert len(list(r.state.tasks_by_type('task2'))) == 10
  471. assert len(r.state.tasks_by_type['task1']) == 10
  472. assert len(r.state.tasks_by_type['task2']) == 10
  473. def test_alive_workers(self):
  474. r = ev_snapshot(State())
  475. r.play()
  476. assert len(list(r.state.alive_workers())) == 3
  477. def test_tasks_by_worker(self):
  478. r = ev_snapshot(State())
  479. r.play()
  480. assert len(list(r.state.tasks_by_worker('utest1'))) == 10
  481. assert len(list(r.state.tasks_by_worker('utest2'))) == 10
  482. assert len(r.state.tasks_by_worker['utest1']) == 10
  483. assert len(r.state.tasks_by_worker['utest2']) == 10
  484. def test_survives_unknown_worker_event(self):
  485. s = State()
  486. s.event({
  487. 'type': 'worker-unknown-event-xxx',
  488. 'foo': 'bar',
  489. })
  490. s.event({
  491. 'type': 'worker-unknown-event-xxx',
  492. 'hostname': 'xxx',
  493. 'foo': 'bar',
  494. })
  495. def test_survives_unknown_worker_leaving(self):
  496. s = State(on_node_leave=Mock(name='on_node_leave'))
  497. (worker, created), subject = s.event({
  498. 'type': 'worker-offline',
  499. 'hostname': 'unknown@vandelay.com',
  500. 'timestamp': time(),
  501. 'local_received': time(),
  502. 'clock': 301030134894833,
  503. })
  504. assert worker == Worker('unknown@vandelay.com')
  505. assert not created
  506. assert subject == 'offline'
  507. assert 'unknown@vandelay.com' not in s.workers
  508. s.on_node_leave.assert_called_with(worker)
  509. def test_on_node_join_callback(self):
  510. s = State(on_node_join=Mock(name='on_node_join'))
  511. (worker, created), subject = s.event({
  512. 'type': 'worker-online',
  513. 'hostname': 'george@vandelay.com',
  514. 'timestamp': time(),
  515. 'local_received': time(),
  516. 'clock': 34314,
  517. })
  518. assert worker
  519. assert created
  520. assert subject == 'online'
  521. assert 'george@vandelay.com' in s.workers
  522. s.on_node_join.assert_called_with(worker)
  523. def test_survives_unknown_task_event(self):
  524. s = State()
  525. s.event({
  526. 'type': 'task-unknown-event-xxx',
  527. 'foo': 'bar',
  528. 'uuid': 'x',
  529. 'hostname': 'y',
  530. 'timestamp': time(),
  531. 'local_received': time(),
  532. 'clock': 0,
  533. })
  534. def test_limits_maxtasks(self):
  535. s = State(max_tasks_in_memory=1)
  536. s.heap_multiplier = 2
  537. s.event({
  538. 'type': 'task-unknown-event-xxx',
  539. 'foo': 'bar',
  540. 'uuid': 'x',
  541. 'hostname': 'y',
  542. 'clock': 3,
  543. 'timestamp': time(),
  544. 'local_received': time(),
  545. })
  546. s.event({
  547. 'type': 'task-unknown-event-xxx',
  548. 'foo': 'bar',
  549. 'uuid': 'y',
  550. 'hostname': 'y',
  551. 'clock': 4,
  552. 'timestamp': time(),
  553. 'local_received': time(),
  554. })
  555. s.event({
  556. 'type': 'task-unknown-event-xxx',
  557. 'foo': 'bar',
  558. 'uuid': 'z',
  559. 'hostname': 'y',
  560. 'clock': 5,
  561. 'timestamp': time(),
  562. 'local_received': time(),
  563. })
  564. assert len(s._taskheap) == 2
  565. assert s._taskheap[0].clock == 4
  566. assert s._taskheap[1].clock == 5
  567. s._taskheap.append(s._taskheap[0])
  568. assert list(s.tasks_by_time())
  569. def test_callback(self):
  570. scratch = {}
  571. def callback(state, event):
  572. scratch['recv'] = True
  573. s = State(callback=callback)
  574. s.event({'type': 'worker-online'})
  575. assert scratch.get('recv')