| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679 | 
							- from __future__ import absolute_import, unicode_literals
 
- import pickle
 
- from decimal import Decimal
 
- from itertools import count
 
- from random import shuffle
 
- from time import time
 
- from case import Mock, patch, skip
 
- from celery import states, uuid
 
- from celery.events import Event
 
- from celery.events.state import (HEARTBEAT_DRIFT_MAX, HEARTBEAT_EXPIRE_WINDOW,
 
-                                  State, Task, Worker, heartbeat_expires)
 
- from celery.five import range
 
- class replay(object):
 
-     def __init__(self, state):
 
-         self.state = state
 
-         self.rewind()
 
-         self.setup()
 
-         self.current_clock = 0
 
-     def setup(self):
 
-         pass
 
-     def next_event(self):
 
-         ev = self.events[next(self.position)]
 
-         ev['local_received'] = ev['timestamp']
 
-         try:
 
-             self.current_clock = ev['clock']
 
-         except KeyError:
 
-             ev['clock'] = self.current_clock = self.current_clock + 1
 
-         return ev
 
-     def __iter__(self):
 
-         return self
 
-     def __next__(self):
 
-         try:
 
-             self.state.event(self.next_event())
 
-         except IndexError:
 
-             raise StopIteration()
 
-     next = __next__
 
-     def rewind(self):
 
-         self.position = count(0)
 
-         return self
 
-     def play(self):
 
-         for _ in self:
 
-             pass
 
- class ev_worker_online_offline(replay):
 
-     def setup(self):
 
-         self.events = [
 
-             Event('worker-online', hostname='utest1'),
 
-             Event('worker-offline', hostname='utest1'),
 
-         ]
 
- class ev_worker_heartbeats(replay):
 
-     def setup(self):
 
-         self.events = [
 
-             Event('worker-heartbeat', hostname='utest1',
 
-                   timestamp=time() - HEARTBEAT_EXPIRE_WINDOW * 2),
 
-             Event('worker-heartbeat', hostname='utest1'),
 
-         ]
 
- class ev_task_states(replay):
 
-     def setup(self):
 
-         tid = self.tid = uuid()
 
-         tid2 = self.tid2 = uuid()
 
-         self.events = [
 
-             Event('task-received', uuid=tid, name='task1',
 
-                   args='(2, 2)', kwargs="{'foo': 'bar'}",
 
-                   retries=0, eta=None, hostname='utest1'),
 
-             Event('task-started', uuid=tid, hostname='utest1'),
 
-             Event('task-revoked', uuid=tid, hostname='utest1'),
 
-             Event('task-retried', uuid=tid, exception="KeyError('bar')",
 
-                   traceback='line 2 at main', hostname='utest1'),
 
-             Event('task-failed', uuid=tid, exception="KeyError('foo')",
 
-                   traceback='line 1 at main', hostname='utest1'),
 
-             Event('task-succeeded', uuid=tid, result='4',
 
-                   runtime=0.1234, hostname='utest1'),
 
-             Event('foo-bar'),
 
-             Event('task-received', uuid=tid2, name='task2',
 
-                   args='(4, 4)', kwargs="{'foo': 'bar'}",
 
-                   retries=0, eta=None, parent_id=tid, root_id=tid,
 
-                   hostname='utest1'),
 
-         ]
 
- def QTEV(type, uuid, hostname, clock, name=None, timestamp=None):
 
-     """Quick task event."""
 
-     return Event('task-{0}'.format(type), uuid=uuid, hostname=hostname,
 
-                  clock=clock, name=name, timestamp=timestamp or time())
 
- class ev_logical_clock_ordering(replay):
 
-     def __init__(self, state, offset=0, uids=None):
 
-         self.offset = offset or 0
 
-         self.uids = self.setuids(uids)
 
-         super(ev_logical_clock_ordering, self).__init__(state)
 
-     def setuids(self, uids):
 
-         uids = self.tA, self.tB, self.tC = uids or [uuid(), uuid(), uuid()]
 
-         return uids
 
-     def setup(self):
 
-         offset = self.offset
 
-         tA, tB, tC = self.uids
 
-         self.events = [
 
-             QTEV('received', tA, 'w1', name='tA', clock=offset + 1),
 
-             QTEV('received', tB, 'w2', name='tB', clock=offset + 1),
 
-             QTEV('started', tA, 'w1', name='tA', clock=offset + 3),
 
-             QTEV('received', tC, 'w2', name='tC', clock=offset + 3),
 
-             QTEV('started', tB, 'w2', name='tB', clock=offset + 5),
 
-             QTEV('retried', tA, 'w1', name='tA', clock=offset + 7),
 
-             QTEV('succeeded', tB, 'w2', name='tB', clock=offset + 9),
 
-             QTEV('started', tC, 'w2', name='tC', clock=offset + 10),
 
-             QTEV('received', tA, 'w3', name='tA', clock=offset + 13),
 
-             QTEV('succeded', tC, 'w2', name='tC', clock=offset + 12),
 
-             QTEV('started', tA, 'w3', name='tA', clock=offset + 14),
 
-             QTEV('succeeded', tA, 'w3', name='TA', clock=offset + 16),
 
-         ]
 
-     def rewind_with_offset(self, offset, uids=None):
 
-         self.offset = offset
 
-         self.uids = self.setuids(uids or self.uids)
 
-         self.setup()
 
-         self.rewind()
 
- class ev_snapshot(replay):
 
-     def setup(self):
 
-         self.events = [
 
-             Event('worker-online', hostname='utest1'),
 
-             Event('worker-online', hostname='utest2'),
 
-             Event('worker-online', hostname='utest3'),
 
-         ]
 
-         for i in range(20):
 
-             worker = not i % 2 and 'utest2' or 'utest1'
 
-             type = not i % 2 and 'task2' or 'task1'
 
-             self.events.append(Event('task-received', name=type,
 
-                                      uuid=uuid(), hostname=worker))
 
- class test_Worker:
 
-     def test_equality(self):
 
-         assert Worker(hostname='foo').hostname == 'foo'
 
-         assert Worker(hostname='foo') == Worker(hostname='foo')
 
-         assert Worker(hostname='foo') != Worker(hostname='bar')
 
-         assert hash(Worker(hostname='foo')) == hash(Worker(hostname='foo'))
 
-         assert hash(Worker(hostname='foo')) != hash(Worker(hostname='bar'))
 
-     def test_heartbeat_expires__Decimal(self):
 
-         assert heartbeat_expires(
 
-             Decimal(344313.37), freq=60, expire_window=200) == 344433.37
 
-     def test_compatible_with_Decimal(self):
 
-         w = Worker('george@vandelay.com')
 
-         timestamp, local_received = Decimal(time()), time()
 
-         w.event('worker-online', timestamp, local_received, fields={
 
-             'hostname': 'george@vandelay.com',
 
-             'timestamp': timestamp,
 
-             'local_received': local_received,
 
-             'freq': Decimal(5.6335431),
 
-         })
 
-         assert w.alive
 
-     def test_eq_ne_other(self):
 
-         assert Worker('a@b.com') == Worker('a@b.com')
 
-         assert Worker('a@b.com') != Worker('b@b.com')
 
-         assert Worker('a@b.com') != object()
 
-     def test_reduce_direct(self):
 
-         w = Worker('george@vandelay.com')
 
-         w.event('worker-online', 10.0, 13.0, fields={
 
-             'hostname': 'george@vandelay.com',
 
-             'timestamp': 10.0,
 
-             'local_received': 13.0,
 
-             'freq': 60,
 
-         })
 
-         fun, args = w.__reduce__()
 
-         w2 = fun(*args)
 
-         assert w2.hostname == w.hostname
 
-         assert w2.pid == w.pid
 
-         assert w2.freq == w.freq
 
-         assert w2.heartbeats == w.heartbeats
 
-         assert w2.clock == w.clock
 
-         assert w2.active == w.active
 
-         assert w2.processed == w.processed
 
-         assert w2.loadavg == w.loadavg
 
-         assert w2.sw_ident == w.sw_ident
 
-     def test_update(self):
 
-         w = Worker('george@vandelay.com')
 
-         w.update({'idx': '301'}, foo=1, clock=30, bah='foo')
 
-         assert w.idx == '301'
 
-         assert w.foo == 1
 
-         assert w.clock == 30
 
-         assert w.bah == 'foo'
 
-     def test_survives_missing_timestamp(self):
 
-         worker = Worker(hostname='foo')
 
-         worker.event('heartbeat')
 
-         assert worker.heartbeats == []
 
-     def test_repr(self):
 
-         assert repr(Worker(hostname='foo'))
 
-     def test_drift_warning(self):
 
-         worker = Worker(hostname='foo')
 
-         with patch('celery.events.state.warn') as warn:
 
-             worker.event(None, time() + (HEARTBEAT_DRIFT_MAX * 2), time())
 
-             warn.assert_called()
 
-             assert 'Substantial drift' in warn.call_args[0][0]
 
-     def test_updates_heartbeat(self):
 
-         worker = Worker(hostname='foo')
 
-         worker.event(None, time(), time())
 
-         assert len(worker.heartbeats) == 1
 
-         h1 = worker.heartbeats[0]
 
-         worker.event(None, time(), time() - 10)
 
-         assert len(worker.heartbeats) == 2
 
-         assert worker.heartbeats[-1] == h1
 
- class test_Task:
 
-     def test_equality(self):
 
-         assert Task(uuid='foo').uuid == 'foo'
 
-         assert Task(uuid='foo') == Task(uuid='foo')
 
-         assert Task(uuid='foo') != Task(uuid='bar')
 
-         assert hash(Task(uuid='foo')) == hash(Task(uuid='foo'))
 
-         assert hash(Task(uuid='foo')) != hash(Task(uuid='bar'))
 
-     def test_info(self):
 
-         task = Task(uuid='abcdefg',
 
-                     name='tasks.add',
 
-                     args='(2, 2)',
 
-                     kwargs='{}',
 
-                     retries=2,
 
-                     result=42,
 
-                     eta=1,
 
-                     runtime=0.0001,
 
-                     expires=1,
 
-                     parent_id='bdefc',
 
-                     root_id='dedfef',
 
-                     foo=None,
 
-                     exception=1,
 
-                     received=time() - 10,
 
-                     started=time() - 8,
 
-                     exchange='celery',
 
-                     routing_key='celery',
 
-                     succeeded=time())
 
-         assert sorted(list(task._info_fields)) == sorted(task.info().keys())
 
-         assert (sorted(list(task._info_fields + ('received',))) ==
 
-                 sorted(task.info(extra=('received',))))
 
-         assert (sorted(['args', 'kwargs']) ==
 
-                 sorted(task.info(['args', 'kwargs']).keys()))
 
-         assert not list(task.info('foo'))
 
-     def test_reduce_direct(self):
 
-         task = Task(uuid='uuid', name='tasks.add', args='(2, 2)')
 
-         fun, args = task.__reduce__()
 
-         task2 = fun(*args)
 
-         assert task == task2
 
-     def test_ready(self):
 
-         task = Task(uuid='abcdefg',
 
-                     name='tasks.add')
 
-         task.event('received', time(), time())
 
-         assert not task.ready
 
-         task.event('succeeded', time(), time())
 
-         assert task.ready
 
-     def test_sent(self):
 
-         task = Task(uuid='abcdefg',
 
-                     name='tasks.add')
 
-         task.event('sent', time(), time())
 
-         assert task.state == states.PENDING
 
-     def test_merge(self):
 
-         task = Task()
 
-         task.event('failed', time(), time())
 
-         task.event('started', time(), time())
 
-         task.event('received', time(), time(), {
 
-             'name': 'tasks.add', 'args': (2, 2),
 
-         })
 
-         assert task.state == states.FAILURE
 
-         assert task.name == 'tasks.add'
 
-         assert task.args == (2, 2)
 
-         task.event('retried', time(), time())
 
-         assert task.state == states.RETRY
 
-     def test_repr(self):
 
-         assert repr(Task(uuid='xxx', name='tasks.add'))
 
- class test_State:
 
-     def test_repr(self):
 
-         assert repr(State())
 
-     def test_pickleable(self):
 
-         state = State()
 
-         r = ev_logical_clock_ordering(state)
 
-         r.play()
 
-         assert pickle.loads(pickle.dumps(state))
 
-     def test_task_logical_clock_ordering(self):
 
-         state = State()
 
-         r = ev_logical_clock_ordering(state)
 
-         tA, tB, tC = r.uids
 
-         r.play()
 
-         now = list(state.tasks_by_time())
 
-         assert now[0][0] == tA
 
-         assert now[1][0] == tC
 
-         assert now[2][0] == tB
 
-         for _ in range(1000):
 
-             shuffle(r.uids)
 
-             tA, tB, tC = r.uids
 
-             r.rewind_with_offset(r.current_clock + 1, r.uids)
 
-             r.play()
 
-         now = list(state.tasks_by_time())
 
-         assert now[0][0] == tA
 
-         assert now[1][0] == tC
 
-         assert now[2][0] == tB
 
-     @skip.todo(reason='not working')
 
-     def test_task_descending_clock_ordering(self):
 
-         state = State()
 
-         r = ev_logical_clock_ordering(state)
 
-         tA, tB, tC = r.uids
 
-         r.play()
 
-         now = list(state.tasks_by_time(reverse=False))
 
-         assert now[0][0] == tA
 
-         assert now[1][0] == tB
 
-         assert now[2][0] == tC
 
-         for _ in range(1000):
 
-             shuffle(r.uids)
 
-             tA, tB, tC = r.uids
 
-             r.rewind_with_offset(r.current_clock + 1, r.uids)
 
-             r.play()
 
-         now = list(state.tasks_by_time(reverse=False))
 
-         assert now[0][0] == tB
 
-         assert now[1][0] == tC
 
-         assert now[2][0] == tA
 
-     def test_get_or_create_task(self):
 
-         state = State()
 
-         task, created = state.get_or_create_task('id1')
 
-         assert task.uuid == 'id1'
 
-         assert created
 
-         task2, created2 = state.get_or_create_task('id1')
 
-         assert task2 is task
 
-         assert not created2
 
-     def test_get_or_create_worker(self):
 
-         state = State()
 
-         worker, created = state.get_or_create_worker('george@vandelay.com')
 
-         assert worker.hostname == 'george@vandelay.com'
 
-         assert created
 
-         worker2, created2 = state.get_or_create_worker('george@vandelay.com')
 
-         assert worker2 is worker
 
-         assert not created2
 
-     def test_get_or_create_worker__with_defaults(self):
 
-         state = State()
 
-         worker, created = state.get_or_create_worker(
 
-             'george@vandelay.com', pid=30,
 
-         )
 
-         assert worker.hostname == 'george@vandelay.com'
 
-         assert worker.pid == 30
 
-         assert created
 
-         worker2, created2 = state.get_or_create_worker(
 
-             'george@vandelay.com', pid=40,
 
-         )
 
-         assert worker2 is worker
 
-         assert worker2.pid == 40
 
-         assert not created2
 
-     def test_worker_online_offline(self):
 
-         r = ev_worker_online_offline(State())
 
-         next(r)
 
-         assert list(r.state.alive_workers())
 
-         assert r.state.workers['utest1'].alive
 
-         r.play()
 
-         assert not list(r.state.alive_workers())
 
-         assert not r.state.workers['utest1'].alive
 
-     def test_itertasks(self):
 
-         s = State()
 
-         s.tasks = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'}
 
-         assert len(list(s.itertasks(limit=2))) == 2
 
-     def test_worker_heartbeat_expire(self):
 
-         r = ev_worker_heartbeats(State())
 
-         next(r)
 
-         assert not list(r.state.alive_workers())
 
-         assert not r.state.workers['utest1'].alive
 
-         r.play()
 
-         assert list(r.state.alive_workers())
 
-         assert r.state.workers['utest1'].alive
 
-     def test_task_states(self):
 
-         r = ev_task_states(State())
 
-         # RECEIVED
 
-         next(r)
 
-         assert r.tid in r.state.tasks
 
-         task = r.state.tasks[r.tid]
 
-         assert task.state == states.RECEIVED
 
-         assert task.received
 
-         assert task.timestamp == task.received
 
-         assert task.worker.hostname == 'utest1'
 
-         # STARTED
 
-         next(r)
 
-         assert r.state.workers['utest1'].alive
 
-         assert task.state == states.STARTED
 
-         assert task.started
 
-         assert task.timestamp == task.started
 
-         assert task.worker.hostname == 'utest1'
 
-         # REVOKED
 
-         next(r)
 
-         assert task.state == states.REVOKED
 
-         assert task.revoked
 
-         assert task.timestamp == task.revoked
 
-         assert task.worker.hostname == 'utest1'
 
-         # RETRY
 
-         next(r)
 
-         assert task.state == states.RETRY
 
-         assert task.retried
 
-         assert task.timestamp == task.retried
 
-         assert task.worker.hostname, 'utest1'
 
-         assert task.exception == "KeyError('bar')"
 
-         assert task.traceback == 'line 2 at main'
 
-         # FAILURE
 
-         next(r)
 
-         assert task.state == states.FAILURE
 
-         assert task.failed
 
-         assert task.timestamp == task.failed
 
-         assert task.worker.hostname == 'utest1'
 
-         assert task.exception == "KeyError('foo')"
 
-         assert task.traceback == 'line 1 at main'
 
-         # SUCCESS
 
-         next(r)
 
-         assert task.state == states.SUCCESS
 
-         assert task.succeeded
 
-         assert task.timestamp == task.succeeded
 
-         assert task.worker.hostname == 'utest1'
 
-         assert task.result == '4'
 
-         assert task.runtime == 0.1234
 
-         # children, parent, root
 
-         r.play()
 
-         assert r.tid2 in r.state.tasks
 
-         task2 = r.state.tasks[r.tid2]
 
-         assert task2.parent is task
 
-         assert task2.root is task
 
-         assert task2 in task.children
 
-     def test_task_children_set_if_received_in_wrong_order(self):
 
-         r = ev_task_states(State())
 
-         r.events.insert(0, r.events.pop())
 
-         r.play()
 
-         assert r.state.tasks[r.tid2] in r.state.tasks[r.tid].children
 
-         assert r.state.tasks[r.tid2].root is r.state.tasks[r.tid]
 
-         assert r.state.tasks[r.tid2].parent is r.state.tasks[r.tid]
 
-     def assertStateEmpty(self, state):
 
-         assert not state.tasks
 
-         assert not state.workers
 
-         assert not state.event_count
 
-         assert not state.task_count
 
-     def assertState(self, state):
 
-         assert state.tasks
 
-         assert state.workers
 
-         assert state.event_count
 
-         assert state.task_count
 
-     def test_freeze_while(self):
 
-         s = State()
 
-         r = ev_snapshot(s)
 
-         r.play()
 
-         def work():
 
-             pass
 
-         s.freeze_while(work, clear_after=True)
 
-         assert not s.event_count
 
-         s2 = State()
 
-         r = ev_snapshot(s2)
 
-         r.play()
 
-         s2.freeze_while(work, clear_after=False)
 
-         assert s2.event_count
 
-     def test_clear_tasks(self):
 
-         s = State()
 
-         r = ev_snapshot(s)
 
-         r.play()
 
-         assert s.tasks
 
-         s.clear_tasks(ready=False)
 
-         assert not s.tasks
 
-     def test_clear(self):
 
-         r = ev_snapshot(State())
 
-         r.play()
 
-         assert r.state.event_count
 
-         assert r.state.workers
 
-         assert r.state.tasks
 
-         assert r.state.task_count
 
-         r.state.clear()
 
-         assert not r.state.event_count
 
-         assert not r.state.workers
 
-         assert r.state.tasks
 
-         assert not r.state.task_count
 
-         r.state.clear(False)
 
-         assert not r.state.tasks
 
-     def test_task_types(self):
 
-         r = ev_snapshot(State())
 
-         r.play()
 
-         assert sorted(r.state.task_types()) == ['task1', 'task2']
 
-     def test_tasks_by_time(self):
 
-         r = ev_snapshot(State())
 
-         r.play()
 
-         assert len(list(r.state.tasks_by_time())) == 20
 
-         assert len(list(r.state.tasks_by_time(reverse=False))) == 20
 
-     def test_tasks_by_type(self):
 
-         r = ev_snapshot(State())
 
-         r.play()
 
-         assert len(list(r.state.tasks_by_type('task1'))) == 10
 
-         assert len(list(r.state.tasks_by_type('task2'))) == 10
 
-         assert len(r.state.tasks_by_type['task1']) == 10
 
-         assert len(r.state.tasks_by_type['task2']) == 10
 
-     def test_alive_workers(self):
 
-         r = ev_snapshot(State())
 
-         r.play()
 
-         assert len(list(r.state.alive_workers())) == 3
 
-     def test_tasks_by_worker(self):
 
-         r = ev_snapshot(State())
 
-         r.play()
 
-         assert len(list(r.state.tasks_by_worker('utest1'))) == 10
 
-         assert len(list(r.state.tasks_by_worker('utest2'))) == 10
 
-         assert len(r.state.tasks_by_worker['utest1']) == 10
 
-         assert len(r.state.tasks_by_worker['utest2']) == 10
 
-     def test_survives_unknown_worker_event(self):
 
-         s = State()
 
-         s.event({
 
-             'type': 'worker-unknown-event-xxx',
 
-             'foo': 'bar',
 
-         })
 
-         s.event({
 
-             'type': 'worker-unknown-event-xxx',
 
-             'hostname': 'xxx',
 
-             'foo': 'bar',
 
-         })
 
-     def test_survives_unknown_worker_leaving(self):
 
-         s = State(on_node_leave=Mock(name='on_node_leave'))
 
-         (worker, created), subject = s.event({
 
-             'type': 'worker-offline',
 
-             'hostname': 'unknown@vandelay.com',
 
-             'timestamp': time(),
 
-             'local_received': time(),
 
-             'clock': 301030134894833,
 
-         })
 
-         assert worker == Worker('unknown@vandelay.com')
 
-         assert not created
 
-         assert subject == 'offline'
 
-         assert 'unknown@vandelay.com' not in s.workers
 
-         s.on_node_leave.assert_called_with(worker)
 
-     def test_on_node_join_callback(self):
 
-         s = State(on_node_join=Mock(name='on_node_join'))
 
-         (worker, created), subject = s.event({
 
-             'type': 'worker-online',
 
-             'hostname': 'george@vandelay.com',
 
-             'timestamp': time(),
 
-             'local_received': time(),
 
-             'clock': 34314,
 
-         })
 
-         assert worker
 
-         assert created
 
-         assert subject == 'online'
 
-         assert 'george@vandelay.com' in s.workers
 
-         s.on_node_join.assert_called_with(worker)
 
-     def test_survives_unknown_task_event(self):
 
-         s = State()
 
-         s.event({
 
-             'type': 'task-unknown-event-xxx',
 
-             'foo': 'bar',
 
-             'uuid': 'x',
 
-             'hostname': 'y',
 
-             'timestamp': time(),
 
-             'local_received': time(),
 
-             'clock': 0,
 
-         })
 
-     def test_limits_maxtasks(self):
 
-         s = State(max_tasks_in_memory=1)
 
-         s.heap_multiplier = 2
 
-         s.event({
 
-             'type': 'task-unknown-event-xxx',
 
-             'foo': 'bar',
 
-             'uuid': 'x',
 
-             'hostname': 'y',
 
-             'clock': 3,
 
-             'timestamp': time(),
 
-             'local_received': time(),
 
-         })
 
-         s.event({
 
-             'type': 'task-unknown-event-xxx',
 
-             'foo': 'bar',
 
-             'uuid': 'y',
 
-             'hostname': 'y',
 
-             'clock': 4,
 
-             'timestamp': time(),
 
-             'local_received': time(),
 
-         })
 
-         s.event({
 
-             'type': 'task-unknown-event-xxx',
 
-             'foo': 'bar',
 
-             'uuid': 'z',
 
-             'hostname': 'y',
 
-             'clock': 5,
 
-             'timestamp': time(),
 
-             'local_received': time(),
 
-         })
 
-         assert len(s._taskheap) == 2
 
-         assert s._taskheap[0].clock == 4
 
-         assert s._taskheap[1].clock == 5
 
-         s._taskheap.append(s._taskheap[0])
 
-         assert list(s.tasks_by_time())
 
-     def test_callback(self):
 
-         scratch = {}
 
-         def callback(state, event):
 
-             scratch['recv'] = True
 
-         s = State(callback=callback)
 
-         s.event({'type': 'worker-online'})
 
-         assert scratch.get('recv')
 
 
  |