| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 | import picklefrom decimal import Decimalfrom random import shufflefrom time import timefrom itertools import countfrom case import Mock, patch, skipfrom celery import statesfrom celery import uuidfrom celery.events import Eventfrom celery.events.state import (    HEARTBEAT_EXPIRE_WINDOW,    HEARTBEAT_DRIFT_MAX,    State,    Worker,    Task,    heartbeat_expires,)class replay:    def __init__(self, state):        self.state = state        self.rewind()        self.setup()        self.current_clock = 0    def setup(self):        pass    def next_event(self):        ev = self.events[next(self.position)]        ev['local_received'] = ev['timestamp']        try:            self.current_clock = ev['clock']        except KeyError:            ev['clock'] = self.current_clock = self.current_clock + 1        return ev    def __iter__(self):        return self    def __next__(self):        try:            self.state.event(self.next_event())        except IndexError:            raise StopIteration()    next = __next__    def rewind(self):        self.position = count(0)        return self    def play(self):        for _ in self:            passclass ev_worker_online_offline(replay):    def setup(self):        self.events = [            Event('worker-online', hostname='utest1'),            Event('worker-offline', hostname='utest1'),        ]class ev_worker_heartbeats(replay):    def setup(self):        self.events = [            Event('worker-heartbeat', hostname='utest1',                  timestamp=time() - HEARTBEAT_EXPIRE_WINDOW * 2),            Event('worker-heartbeat', hostname='utest1'),        ]class ev_task_states(replay):    def setup(self):        tid = self.tid = uuid()        tid2 = self.tid2 = uuid()        self.events = [            Event('task-received', uuid=tid, name='task1',                  args='(2, 2)', kwargs="{'foo': 'bar'}",                  retries=0, eta=None, hostname='utest1'),            Event('task-started', uuid=tid, hostname='utest1'),            Event('task-revoked', uuid=tid, hostname='utest1'),            Event('task-retried', uuid=tid, exception="KeyError('bar')",                  traceback='line 2 at main', hostname='utest1'),            Event('task-failed', uuid=tid, exception="KeyError('foo')",                  traceback='line 1 at main', hostname='utest1'),            Event('task-succeeded', uuid=tid, result='4',                  runtime=0.1234, hostname='utest1'),            Event('foo-bar'),            Event('task-received', uuid=tid2, name='task2',                  args='(4, 4)', kwargs="{'foo': 'bar'}",                  retries=0, eta=None, parent_id=tid, root_id=tid,                  hostname='utest1'),        ]def QTEV(type, uuid, hostname, clock, name=None, timestamp=None):    """Quick task event."""    return Event('task-{0}'.format(type), uuid=uuid, hostname=hostname,                 clock=clock, name=name, timestamp=timestamp or time())class ev_logical_clock_ordering(replay):    def __init__(self, state, offset=0, uids=None):        self.offset = offset or 0        self.uids = self.setuids(uids)        super().__init__(state)    def setuids(self, uids):        uids = self.tA, self.tB, self.tC = uids or [uuid(), uuid(), uuid()]        return uids    def setup(self):        offset = self.offset        tA, tB, tC = self.uids        self.events = [            QTEV('received', tA, 'w1', name='tA', clock=offset + 1),            QTEV('received', tB, 'w2', name='tB', clock=offset + 1),            QTEV('started', tA, 'w1', name='tA', clock=offset + 3),            QTEV('received', tC, 'w2', name='tC', clock=offset + 3),            QTEV('started', tB, 'w2', name='tB', clock=offset + 5),            QTEV('retried', tA, 'w1', name='tA', clock=offset + 7),            QTEV('succeeded', tB, 'w2', name='tB', clock=offset + 9),            QTEV('started', tC, 'w2', name='tC', clock=offset + 10),            QTEV('received', tA, 'w3', name='tA', clock=offset + 13),            QTEV('succeded', tC, 'w2', name='tC', clock=offset + 12),            QTEV('started', tA, 'w3', name='tA', clock=offset + 14),            QTEV('succeeded', tA, 'w3', name='TA', clock=offset + 16),        ]    def rewind_with_offset(self, offset, uids=None):        self.offset = offset        self.uids = self.setuids(uids or self.uids)        self.setup()        self.rewind()class ev_snapshot(replay):    def setup(self):        self.events = [            Event('worker-online', hostname='utest1'),            Event('worker-online', hostname='utest2'),            Event('worker-online', hostname='utest3'),        ]        for i in range(20):            worker = not i % 2 and 'utest2' or 'utest1'            type = not i % 2 and 'task2' or 'task1'            self.events.append(Event('task-received', name=type,                               uuid=uuid(), hostname=worker))class test_Worker:    def test_equality(self):        assert Worker(hostname='foo').hostname == 'foo'        assert Worker(hostname='foo') == Worker(hostname='foo')        assert Worker(hostname='foo') != Worker(hostname='bar')        assert hash(Worker(hostname='foo')) == hash(Worker(hostname='foo'))        assert hash(Worker(hostname='foo')) != hash(Worker(hostname='bar'))    def test_heartbeat_expires__Decimal(self):        assert heartbeat_expires(            Decimal(344313.37), freq=60, expire_window=200) == 344433.37    def test_compatible_with_Decimal(self):        w = Worker('george@vandelay.com')        timestamp, local_received = Decimal(time()), time()        w.event('worker-online', timestamp, local_received, fields={            'hostname': 'george@vandelay.com',            'timestamp': timestamp,            'local_received': local_received,            'freq': Decimal(5.6335431),        })        assert w.alive    def test_eq_ne_other(self):        assert Worker('a@b.com') == Worker('a@b.com')        assert Worker('a@b.com') != Worker('b@b.com')        assert Worker('a@b.com') != object()    def test_reduce_direct(self):        w = Worker('george@vandelay.com')        w.event('worker-online', 10.0, 13.0, fields={            'hostname': 'george@vandelay.com',            'timestamp': 10.0,            'local_received': 13.0,            'freq': 60,        })        fun, args = w.__reduce__()        w2 = fun(*args)        assert w2.hostname == w.hostname        assert w2.pid == w.pid        assert w2.freq == w.freq        assert w2.heartbeats == w.heartbeats        assert w2.clock == w.clock        assert w2.active == w.active        assert w2.processed == w.processed        assert w2.loadavg == w.loadavg        assert w2.sw_ident == w.sw_ident    def test_update(self):        w = Worker('george@vandelay.com')        w.update({'idx': '301'}, foo=1, clock=30, bah='foo')        assert w.idx == '301'        assert w.foo == 1        assert w.clock == 30        assert w.bah == 'foo'    def test_survives_missing_timestamp(self):        worker = Worker(hostname='foo')        worker.event('heartbeat')        assert worker.heartbeats == []    def test_repr(self):        assert repr(Worker(hostname='foo'))    def test_drift_warning(self):        worker = Worker(hostname='foo')        with patch('celery.events.state.warn') as warn:            worker.event(None, time() + (HEARTBEAT_DRIFT_MAX * 2), time())            warn.assert_called()            assert 'Substantial drift' in warn.call_args[0][0]    def test_updates_heartbeat(self):        worker = Worker(hostname='foo')        worker.event(None, time(), time())        assert len(worker.heartbeats) == 1        h1 = worker.heartbeats[0]        worker.event(None, time(), time() - 10)        assert len(worker.heartbeats) == 2        assert worker.heartbeats[-1] == h1class test_Task:    def test_equality(self):        assert Task(uuid='foo').uuid == 'foo'        assert Task(uuid='foo') == Task(uuid='foo')        assert Task(uuid='foo') != Task(uuid='bar')        assert hash(Task(uuid='foo')) == hash(Task(uuid='foo'))        assert hash(Task(uuid='foo')) != hash(Task(uuid='bar'))    def test_info(self):        task = Task(uuid='abcdefg',                    name='tasks.add',                    args='(2, 2)',                    kwargs='{}',                    retries=2,                    result=42,                    eta=1,                    runtime=0.0001,                    expires=1,                    parent_id='bdefc',                    root_id='dedfef',                    foo=None,                    exception=1,                    received=time() - 10,                    started=time() - 8,                    exchange='celery',                    routing_key='celery',                    succeeded=time())        assert sorted(list(task._info_fields)) == sorted(task.info().keys())        assert (sorted(list(task._info_fields + ('received',))) ==                sorted(task.info(extra=('received',))))        assert (sorted(['args', 'kwargs']) ==                sorted(task.info(['args', 'kwargs']).keys()))        assert not list(task.info('foo'))    def test_reduce_direct(self):        task = Task(uuid='uuid', name='tasks.add', args='(2, 2)')        fun, args = task.__reduce__()        task2 = fun(*args)        assert task == task2    def test_ready(self):        task = Task(uuid='abcdefg',                    name='tasks.add')        task.event('received', time(), time())        assert not task.ready        task.event('succeeded', time(), time())        assert task.ready    def test_sent(self):        task = Task(uuid='abcdefg',                    name='tasks.add')        task.event('sent', time(), time())        assert task.state == states.PENDING    def test_merge(self):        task = Task()        task.event('failed', time(), time())        task.event('started', time(), time())        task.event('received', time(), time(), {            'name': 'tasks.add', 'args': (2, 2),        })        assert task.state == states.FAILURE        assert task.name == 'tasks.add'        assert task.args == (2, 2)        task.event('retried', time(), time())        assert task.state == states.RETRY    def test_repr(self):        assert repr(Task(uuid='xxx', name='tasks.add'))class test_State:    def test_repr(self):        assert repr(State())    def test_pickleable(self):        state = State()        r = ev_logical_clock_ordering(state)        r.play()        assert pickle.loads(pickle.dumps(state))    def test_task_logical_clock_ordering(self):        state = State()        r = ev_logical_clock_ordering(state)        tA, tB, tC = r.uids        r.play()        now = list(state.tasks_by_time())        assert now[0][0] == tA        assert now[1][0] == tC        assert now[2][0] == tB        for _ in range(1000):            shuffle(r.uids)            tA, tB, tC = r.uids            r.rewind_with_offset(r.current_clock + 1, r.uids)            r.play()        now = list(state.tasks_by_time())        assert now[0][0] == tA        assert now[1][0] == tC        assert now[2][0] == tB    @skip.todo(reason='not working')    def test_task_descending_clock_ordering(self):        state = State()        r = ev_logical_clock_ordering(state)        tA, tB, tC = r.uids        r.play()        now = list(state.tasks_by_time(reverse=False))        assert now[0][0] == tA        assert now[1][0] == tB        assert now[2][0] == tC        for _ in range(1000):            shuffle(r.uids)            tA, tB, tC = r.uids            r.rewind_with_offset(r.current_clock + 1, r.uids)            r.play()        now = list(state.tasks_by_time(reverse=False))        assert now[0][0] == tB        assert now[1][0] == tC        assert now[2][0] == tA    def test_get_or_create_task(self):        state = State()        task, created = state.get_or_create_task('id1')        assert task.uuid == 'id1'        assert created        task2, created2 = state.get_or_create_task('id1')        assert task2 is task        assert not created2    def test_get_or_create_worker(self):        state = State()        worker, created = state.get_or_create_worker('george@vandelay.com')        assert worker.hostname == 'george@vandelay.com'        assert created        worker2, created2 = state.get_or_create_worker('george@vandelay.com')        assert worker2 is worker        assert not created2    def test_get_or_create_worker__with_defaults(self):        state = State()        worker, created = state.get_or_create_worker(            'george@vandelay.com', pid=30,        )        assert worker.hostname == 'george@vandelay.com'        assert worker.pid == 30        assert created        worker2, created2 = state.get_or_create_worker(            'george@vandelay.com', pid=40,        )        assert worker2 is worker        assert worker2.pid == 40        assert not created2    def test_worker_online_offline(self):        r = ev_worker_online_offline(State())        next(r)        assert list(r.state.alive_workers())        assert r.state.workers['utest1'].alive        r.play()        assert not list(r.state.alive_workers())        assert not r.state.workers['utest1'].alive    def test_itertasks(self):        s = State()        s.tasks = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'}        assert len(list(s.itertasks(limit=2))) == 2    def test_worker_heartbeat_expire(self):        r = ev_worker_heartbeats(State())        next(r)        assert not list(r.state.alive_workers())        assert not r.state.workers['utest1'].alive        r.play()        assert list(r.state.alive_workers())        assert r.state.workers['utest1'].alive    def test_task_states(self):        r = ev_task_states(State())        # RECEIVED        next(r)        assert r.tid in r.state.tasks        task = r.state.tasks[r.tid]        assert task.state == states.RECEIVED        assert task.received        assert task.timestamp == task.received        assert task.worker.hostname == 'utest1'        # STARTED        next(r)        assert r.state.workers['utest1'].alive        assert task.state == states.STARTED        assert task.started        assert task.timestamp == task.started        assert task.worker.hostname == 'utest1'        # REVOKED        next(r)        assert task.state == states.REVOKED        assert task.revoked        assert task.timestamp == task.revoked        assert task.worker.hostname == 'utest1'        # RETRY        next(r)        assert task.state == states.RETRY        assert task.retried        assert task.timestamp == task.retried        assert task.worker.hostname, 'utest1'        assert task.exception == "KeyError('bar')"        assert task.traceback == 'line 2 at main'        # FAILURE        next(r)        assert task.state == states.FAILURE        assert task.failed        assert task.timestamp == task.failed        assert task.worker.hostname == 'utest1'        assert task.exception == "KeyError('foo')"        assert task.traceback == 'line 1 at main'        # SUCCESS        next(r)        assert task.state == states.SUCCESS        assert task.succeeded        assert task.timestamp == task.succeeded        assert task.worker.hostname == 'utest1'        assert task.result == '4'        assert task.runtime == 0.1234        # children, parent, root        r.play()        assert r.tid2 in r.state.tasks        task2 = r.state.tasks[r.tid2]        assert task2.parent is task        assert task2.root is task        assert task2 in task.children    def test_task_children_set_if_received_in_wrong_order(self):        r = ev_task_states(State())        r.events.insert(0, r.events.pop())        r.play()        assert r.state.tasks[r.tid2] in r.state.tasks[r.tid].children        assert r.state.tasks[r.tid2].root is r.state.tasks[r.tid]        assert r.state.tasks[r.tid2].parent is r.state.tasks[r.tid]    def assertStateEmpty(self, state):        assert not state.tasks        assert not state.workers        assert not state.event_count        assert not state.task_count    def assertState(self, state):        assert state.tasks        assert state.workers        assert state.event_count        assert state.task_count    def test_freeze_while(self):        s = State()        r = ev_snapshot(s)        r.play()        def work():            pass        s.freeze_while(work, clear_after=True)        assert not s.event_count        s2 = State()        r = ev_snapshot(s2)        r.play()        s2.freeze_while(work, clear_after=False)        assert s2.event_count    def test_clear_tasks(self):        s = State()        r = ev_snapshot(s)        r.play()        assert s.tasks        s.clear_tasks(ready=False)        assert not s.tasks    def test_clear(self):        r = ev_snapshot(State())        r.play()        assert r.state.event_count        assert r.state.workers        assert r.state.tasks        assert r.state.task_count        r.state.clear()        assert not r.state.event_count        assert not r.state.workers        assert r.state.tasks        assert not r.state.task_count        r.state.clear(False)        assert not r.state.tasks    def test_task_types(self):        r = ev_snapshot(State())        r.play()        assert sorted(r.state.task_types()) == ['task1', 'task2']    def test_tasks_by_time(self):        r = ev_snapshot(State())        r.play()        assert len(list(r.state.tasks_by_time())) == 20        assert len(list(r.state.tasks_by_time(reverse=False))) == 20    def test_tasks_by_type(self):        r = ev_snapshot(State())        r.play()        assert len(list(r.state.tasks_by_type('task1'))) == 10        assert len(list(r.state.tasks_by_type('task2'))) == 10        assert len(r.state.tasks_by_type['task1']) == 10        assert len(r.state.tasks_by_type['task2']) == 10    def test_alive_workers(self):        r = ev_snapshot(State())        r.play()        assert len(list(r.state.alive_workers())) == 3    def test_tasks_by_worker(self):        r = ev_snapshot(State())        r.play()        assert len(list(r.state.tasks_by_worker('utest1'))) == 10        assert len(list(r.state.tasks_by_worker('utest2'))) == 10        assert len(r.state.tasks_by_worker['utest1']) == 10        assert len(r.state.tasks_by_worker['utest2']) == 10    def test_survives_unknown_worker_event(self):        s = State()        s.event({            'type': 'worker-unknown-event-xxx',            'foo': 'bar',        })        s.event({            'type': 'worker-unknown-event-xxx',            'hostname': 'xxx',            'foo': 'bar',        })    def test_survives_unknown_worker_leaving(self):        s = State(on_node_leave=Mock(name='on_node_leave'))        (worker, created), subject = s.event({            'type': 'worker-offline',            'hostname': 'unknown@vandelay.com',            'timestamp': time(),            'local_received': time(),            'clock': 301030134894833,        })        assert worker == Worker('unknown@vandelay.com')        assert not created        assert subject == 'offline'        assert 'unknown@vandelay.com' not in s.workers        s.on_node_leave.assert_called_with(worker)    def test_on_node_join_callback(self):        s = State(on_node_join=Mock(name='on_node_join'))        (worker, created), subject = s.event({            'type': 'worker-online',            'hostname': 'george@vandelay.com',            'timestamp': time(),            'local_received': time(),            'clock': 34314,        })        assert worker        assert created        assert subject == 'online'        assert 'george@vandelay.com' in s.workers        s.on_node_join.assert_called_with(worker)    def test_survives_unknown_task_event(self):        s = State()        s.event({            'type': 'task-unknown-event-xxx',            'foo': 'bar',            'uuid': 'x',            'hostname': 'y',            'timestamp': time(),            'local_received': time(),            'clock': 0,        })    def test_limits_maxtasks(self):        s = State(max_tasks_in_memory=1)        s.heap_multiplier = 2        s.event({            'type': 'task-unknown-event-xxx',            'foo': 'bar',            'uuid': 'x',            'hostname': 'y',            'clock': 3,            'timestamp': time(),            'local_received': time(),        })        s.event({            'type': 'task-unknown-event-xxx',            'foo': 'bar',            'uuid': 'y',            'hostname': 'y',            'clock': 4,            'timestamp': time(),            'local_received': time(),        })        s.event({            'type': 'task-unknown-event-xxx',            'foo': 'bar',            'uuid': 'z',            'hostname': 'y',            'clock': 5,            'timestamp': time(),            'local_received': time(),        })        assert len(s._taskheap) == 2        assert s._taskheap[0].clock == 4        assert s._taskheap[1].clock == 5        s._taskheap.append(s._taskheap[0])        assert list(s.tasks_by_time())    def test_callback(self):        scratch = {}        def callback(state, event):            scratch['recv'] = True        s = State(callback=callback)        s.event({'type': 'worker-online'})        assert scratch.get('recv')
 |