test_state.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. from __future__ import absolute_import, unicode_literals
  2. import pickle
  3. import pytest
  4. from time import time
  5. from case import Mock, patch
  6. from celery import uuid
  7. from celery.exceptions import WorkerShutdown, WorkerTerminate
  8. from celery.worker import state
  9. from celery.utils.collections import LimitedSet
  10. @pytest.fixture
  11. def reset_state(request):
  12. def fin():
  13. state.active_requests.clear()
  14. state.revoked.clear()
  15. state.total_count.clear()
  16. request.addfinalizer(fin)
  17. class MockShelve(dict):
  18. filename = None
  19. in_sync = False
  20. closed = False
  21. def open(self, filename, **kwargs):
  22. self.filename = filename
  23. return self
  24. def sync(self):
  25. self.in_sync = True
  26. def close(self):
  27. self.closed = True
  28. class MyPersistent(state.Persistent):
  29. storage = MockShelve()
  30. class test_maybe_shutdown:
  31. def teardown(self):
  32. state.should_stop = None
  33. state.should_terminate = None
  34. def test_should_stop(self):
  35. state.should_stop = True
  36. with pytest.raises(WorkerShutdown):
  37. state.maybe_shutdown()
  38. state.should_stop = 0
  39. with pytest.raises(WorkerShutdown):
  40. state.maybe_shutdown()
  41. state.should_stop = False
  42. try:
  43. state.maybe_shutdown()
  44. except SystemExit:
  45. raise RuntimeError('should not have exited')
  46. state.should_stop = None
  47. try:
  48. state.maybe_shutdown()
  49. except SystemExit:
  50. raise RuntimeError('should not have exited')
  51. state.should_stop = 0
  52. try:
  53. state.maybe_shutdown()
  54. except SystemExit as exc:
  55. assert exc.code == 0
  56. else:
  57. raise RuntimeError('should have exited')
  58. state.should_stop = 303
  59. try:
  60. state.maybe_shutdown()
  61. except SystemExit as exc:
  62. assert exc.code == 303
  63. else:
  64. raise RuntimeError('should have exited')
  65. def test_should_terminate(self):
  66. state.should_terminate = True
  67. with pytest.raises(WorkerTerminate):
  68. state.maybe_shutdown()
  69. @pytest.mark.usefixtures('reset_state')
  70. class test_Persistent:
  71. @pytest.fixture
  72. def p(self):
  73. return MyPersistent(state, filename='celery-state')
  74. def test_close_twice(self, p):
  75. p._is_open = False
  76. p.close()
  77. def test_constructor(self, p):
  78. assert p.db == {}
  79. assert p.db.filename == p.filename
  80. def test_save(self, p):
  81. p.db['foo'] = 'bar'
  82. p.save()
  83. assert p.db.in_sync
  84. assert p.db.closed
  85. def add_revoked(self, p, *ids):
  86. for id in ids:
  87. p.db.setdefault('revoked', LimitedSet()).add(id)
  88. def test_merge(self, p, data=['foo', 'bar', 'baz']):
  89. state.revoked.update(data)
  90. p.merge()
  91. for item in data:
  92. assert item in state.revoked
  93. def test_merge_dict(self, p):
  94. p.clock = Mock()
  95. p.clock.adjust.return_value = 626
  96. d = {'revoked': {'abc': time()}, 'clock': 313}
  97. p._merge_with(d)
  98. p.clock.adjust.assert_called_with(313)
  99. assert d['clock'] == 626
  100. assert 'abc' in state.revoked
  101. def test_sync_clock_and_purge(self, p):
  102. passthrough = Mock()
  103. passthrough.side_effect = lambda x: x
  104. with patch('celery.worker.state.revoked') as revoked:
  105. d = {'clock': 0}
  106. p.clock = Mock()
  107. p.clock.forward.return_value = 627
  108. p._dumps = passthrough
  109. p.compress = passthrough
  110. p._sync_with(d)
  111. revoked.purge.assert_called_with()
  112. assert d['clock'] == 627
  113. assert 'revoked' not in d
  114. assert d['zrevoked'] is revoked
  115. def test_sync(self, p,
  116. data1=['foo', 'bar', 'baz'], data2=['baz', 'ini', 'koz']):
  117. self.add_revoked(p, *data1)
  118. for item in data2:
  119. state.revoked.add(item)
  120. p.sync()
  121. assert p.db['zrevoked']
  122. pickled = p.decompress(p.db['zrevoked'])
  123. assert pickled
  124. saved = pickle.loads(pickled)
  125. for item in data2:
  126. assert item in saved
  127. class SimpleReq(object):
  128. def __init__(self, name):
  129. self.id = uuid()
  130. self.name = name
  131. @pytest.mark.usefixtures('reset_state')
  132. class test_state:
  133. def test_accepted(self, requests=[SimpleReq('foo'),
  134. SimpleReq('bar'),
  135. SimpleReq('baz'),
  136. SimpleReq('baz')]):
  137. for request in requests:
  138. state.task_accepted(request)
  139. for req in requests:
  140. assert req in state.active_requests
  141. assert state.total_count['foo'] == 1
  142. assert state.total_count['bar'] == 1
  143. assert state.total_count['baz'] == 2
  144. def test_ready(self, requests=[SimpleReq('foo'),
  145. SimpleReq('bar')]):
  146. for request in requests:
  147. state.task_accepted(request)
  148. assert len(state.active_requests) == 2
  149. for request in requests:
  150. state.task_ready(request)
  151. assert len(state.active_requests) == 0