test_state.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import pickle
  2. import pytest
  3. from time import time
  4. from case import Mock, patch
  5. from celery import uuid
  6. from celery.exceptions import WorkerShutdown, WorkerTerminate
  7. from celery.worker import state
  8. from celery.utils.collections import LimitedSet
  9. @pytest.fixture
  10. def reset_state():
  11. yield
  12. state.active_requests.clear()
  13. state.revoked.clear()
  14. state.total_count.clear()
  15. class MockShelve(dict):
  16. filename = None
  17. in_sync = False
  18. closed = False
  19. def open(self, filename, **kwargs):
  20. self.filename = filename
  21. return self
  22. def sync(self):
  23. self.in_sync = True
  24. def close(self):
  25. self.closed = True
  26. class MyPersistent(state.Persistent):
  27. storage = MockShelve()
  28. class test_maybe_shutdown:
  29. def teardown(self):
  30. state.should_stop = None
  31. state.should_terminate = None
  32. def test_should_stop(self):
  33. state.should_stop = True
  34. with pytest.raises(WorkerShutdown):
  35. state.maybe_shutdown()
  36. state.should_stop = 0
  37. with pytest.raises(WorkerShutdown):
  38. state.maybe_shutdown()
  39. state.should_stop = False
  40. try:
  41. state.maybe_shutdown()
  42. except SystemExit:
  43. raise RuntimeError('should not have exited')
  44. state.should_stop = None
  45. try:
  46. state.maybe_shutdown()
  47. except SystemExit:
  48. raise RuntimeError('should not have exited')
  49. state.should_stop = 0
  50. try:
  51. state.maybe_shutdown()
  52. except SystemExit as exc:
  53. assert exc.code == 0
  54. else:
  55. raise RuntimeError('should have exited')
  56. state.should_stop = 303
  57. try:
  58. state.maybe_shutdown()
  59. except SystemExit as exc:
  60. assert exc.code == 303
  61. else:
  62. raise RuntimeError('should have exited')
  63. def test_should_terminate(self):
  64. state.should_terminate = True
  65. with pytest.raises(WorkerTerminate):
  66. state.maybe_shutdown()
  67. @pytest.mark.usefixtures('reset_state')
  68. class test_Persistent:
  69. @pytest.fixture
  70. def p(self):
  71. return MyPersistent(state, filename='celery-state')
  72. def test_close_twice(self, p):
  73. p._is_open = False
  74. p.close()
  75. def test_constructor(self, p):
  76. assert p.db == {}
  77. assert p.db.filename == p.filename
  78. def test_save(self, p):
  79. p.db['foo'] = 'bar'
  80. p.save()
  81. assert p.db.in_sync
  82. assert p.db.closed
  83. def add_revoked(self, p, *ids):
  84. for id in ids:
  85. p.db.setdefault(str('revoked'), LimitedSet()).add(id)
  86. def test_merge(self, p, data=['foo', 'bar', 'baz']):
  87. state.revoked.update(data)
  88. p.merge()
  89. for item in data:
  90. assert item in state.revoked
  91. def test_merge_dict(self, p):
  92. p.clock = Mock()
  93. p.clock.adjust.return_value = 626
  94. d = {str('revoked'): {str('abc'): time()}, str('clock'): 313}
  95. p._merge_with(d)
  96. p.clock.adjust.assert_called_with(313)
  97. assert d[str('clock')] == 626
  98. assert str('abc') in state.revoked
  99. def test_sync_clock_and_purge(self, p):
  100. passthrough = Mock()
  101. passthrough.side_effect = lambda x: x
  102. with patch('celery.worker.state.revoked') as revoked:
  103. d = {str('clock'): 0}
  104. p.clock = Mock()
  105. p.clock.forward.return_value = 627
  106. p._dumps = passthrough
  107. p.compress = passthrough
  108. p._sync_with(d)
  109. revoked.purge.assert_called_with()
  110. assert d[str('clock')] == 627
  111. assert str('revoked') not in d
  112. assert d[str('zrevoked')] is revoked
  113. def test_sync(self, p,
  114. data1=['foo', 'bar', 'baz'], data2=['baz', 'ini', 'koz']):
  115. self.add_revoked(p, *data1)
  116. for item in data2:
  117. state.revoked.add(item)
  118. p.sync()
  119. assert p.db[str('zrevoked')]
  120. pickled = p.decompress(p.db[str('zrevoked')])
  121. assert pickled
  122. saved = pickle.loads(pickled)
  123. for item in data2:
  124. assert item in saved
  125. class SimpleReq:
  126. def __init__(self, name):
  127. self.id = uuid()
  128. self.name = name
  129. @pytest.mark.usefixtures('reset_state')
  130. class test_state:
  131. def test_accepted(self, requests=[SimpleReq('foo'),
  132. SimpleReq('bar'),
  133. SimpleReq('baz'),
  134. SimpleReq('baz')]):
  135. for request in requests:
  136. state.task_accepted(request)
  137. for req in requests:
  138. assert req in state.active_requests
  139. assert state.total_count['foo'] == 1
  140. assert state.total_count['bar'] == 1
  141. assert state.total_count['baz'] == 2
  142. def test_ready(self, requests=[SimpleReq('foo'),
  143. SimpleReq('bar')]):
  144. for request in requests:
  145. state.task_accepted(request)
  146. assert len(state.active_requests) == 2
  147. for request in requests:
  148. state.task_ready(request)
  149. assert len(state.active_requests) == 0