| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 | 
							- from __future__ import absolute_import, unicode_literals
 
- import pickle
 
- from time import time
 
- import pytest
 
- from case import Mock, patch
 
- from celery import uuid
 
- from celery.exceptions import WorkerShutdown, WorkerTerminate
 
- from celery.utils.collections import LimitedSet
 
- from celery.worker import state
 
- @pytest.fixture
 
- def reset_state():
 
-     yield
 
-     state.active_requests.clear()
 
-     state.revoked.clear()
 
-     state.total_count.clear()
 
- class MockShelve(dict):
 
-     filename = None
 
-     in_sync = False
 
-     closed = False
 
-     def open(self, filename, **kwargs):
 
-         self.filename = filename
 
-         return self
 
-     def sync(self):
 
-         self.in_sync = True
 
-     def close(self):
 
-         self.closed = True
 
- class MyPersistent(state.Persistent):
 
-     storage = MockShelve()
 
- class test_maybe_shutdown:
 
-     def teardown(self):
 
-         state.should_stop = None
 
-         state.should_terminate = None
 
-     def test_should_stop(self):
 
-         state.should_stop = True
 
-         with pytest.raises(WorkerShutdown):
 
-             state.maybe_shutdown()
 
-         state.should_stop = 0
 
-         with pytest.raises(WorkerShutdown):
 
-             state.maybe_shutdown()
 
-         state.should_stop = False
 
-         try:
 
-             state.maybe_shutdown()
 
-         except SystemExit:
 
-             raise RuntimeError('should not have exited')
 
-         state.should_stop = None
 
-         try:
 
-             state.maybe_shutdown()
 
-         except SystemExit:
 
-             raise RuntimeError('should not have exited')
 
-         state.should_stop = 0
 
-         try:
 
-             state.maybe_shutdown()
 
-         except SystemExit as exc:
 
-             assert exc.code == 0
 
-         else:
 
-             raise RuntimeError('should have exited')
 
-         state.should_stop = 303
 
-         try:
 
-             state.maybe_shutdown()
 
-         except SystemExit as exc:
 
-             assert exc.code == 303
 
-         else:
 
-             raise RuntimeError('should have exited')
 
-     def test_should_terminate(self):
 
-         state.should_terminate = True
 
-         with pytest.raises(WorkerTerminate):
 
-             state.maybe_shutdown()
 
- @pytest.mark.usefixtures('reset_state')
 
- class test_Persistent:
 
-     @pytest.fixture
 
-     def p(self):
 
-         return MyPersistent(state, filename='celery-state')
 
-     def test_close_twice(self, p):
 
-         p._is_open = False
 
-         p.close()
 
-     def test_constructor(self, p):
 
-         assert p.db == {}
 
-         assert p.db.filename == p.filename
 
-     def test_save(self, p):
 
-         p.db['foo'] = 'bar'
 
-         p.save()
 
-         assert p.db.in_sync
 
-         assert p.db.closed
 
-     def add_revoked(self, p, *ids):
 
-         for id in ids:
 
-             p.db.setdefault(str('revoked'), LimitedSet()).add(id)
 
-     def test_merge(self, p, data=['foo', 'bar', 'baz']):
 
-         state.revoked.update(data)
 
-         p.merge()
 
-         for item in data:
 
-             assert item in state.revoked
 
-     def test_merge_dict(self, p):
 
-         p.clock = Mock()
 
-         p.clock.adjust.return_value = 626
 
-         d = {str('revoked'): {str('abc'): time()}, str('clock'): 313}
 
-         p._merge_with(d)
 
-         p.clock.adjust.assert_called_with(313)
 
-         assert d[str('clock')] == 626
 
-         assert str('abc') in state.revoked
 
-     def test_sync_clock_and_purge(self, p):
 
-         passthrough = Mock()
 
-         passthrough.side_effect = lambda x: x
 
-         with patch('celery.worker.state.revoked') as revoked:
 
-             d = {str('clock'): 0}
 
-             p.clock = Mock()
 
-             p.clock.forward.return_value = 627
 
-             p._dumps = passthrough
 
-             p.compress = passthrough
 
-             p._sync_with(d)
 
-             revoked.purge.assert_called_with()
 
-             assert d[str('clock')] == 627
 
-             assert str('revoked') not in d
 
-             assert d[str('zrevoked')] is revoked
 
-     def test_sync(self, p,
 
-                   data1=['foo', 'bar', 'baz'], data2=['baz', 'ini', 'koz']):
 
-         self.add_revoked(p, *data1)
 
-         for item in data2:
 
-             state.revoked.add(item)
 
-         p.sync()
 
-         assert p.db[str('zrevoked')]
 
-         pickled = p.decompress(p.db[str('zrevoked')])
 
-         assert pickled
 
-         saved = pickle.loads(pickled)
 
-         for item in data2:
 
-             assert item in saved
 
- class SimpleReq(object):
 
-     def __init__(self, name):
 
-         self.id = uuid()
 
-         self.name = name
 
- @pytest.mark.usefixtures('reset_state')
 
- class test_state:
 
-     def test_accepted(self, requests=[SimpleReq('foo'),
 
-                                       SimpleReq('bar'),
 
-                                       SimpleReq('baz'),
 
-                                       SimpleReq('baz')]):
 
-         for request in requests:
 
-             state.task_accepted(request)
 
-         for req in requests:
 
-             assert req in state.active_requests
 
-         assert state.total_count['foo'] == 1
 
-         assert state.total_count['bar'] == 1
 
-         assert state.total_count['baz'] == 2
 
-     def test_ready(self, requests=[SimpleReq('foo'),
 
-                                    SimpleReq('bar')]):
 
-         for request in requests:
 
-             state.task_accepted(request)
 
-         assert len(state.active_requests) == 2
 
-         for request in requests:
 
-             state.task_ready(request)
 
-         assert len(state.active_requests) == 0
 
 
  |