test_bootsteps.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. import pytest
  2. from case import Mock, patch
  3. from celery import bootsteps
  4. class test_StepFormatter:
  5. def test_get_prefix(self):
  6. f = bootsteps.StepFormatter()
  7. s = Mock()
  8. s.last = True
  9. assert f._get_prefix(s) == f.blueprint_prefix
  10. s2 = Mock()
  11. s2.last = False
  12. s2.conditional = True
  13. assert f._get_prefix(s2) == f.conditional_prefix
  14. s3 = Mock()
  15. s3.last = s3.conditional = False
  16. assert f._get_prefix(s3) == ''
  17. def test_node(self):
  18. f = bootsteps.StepFormatter()
  19. f.draw_node = Mock()
  20. step = Mock()
  21. step.last = False
  22. f.node(step, x=3)
  23. f.draw_node.assert_called_with(step, f.node_scheme, {'x': 3})
  24. step.last = True
  25. f.node(step, x=3)
  26. f.draw_node.assert_called_with(step, f.blueprint_scheme, {'x': 3})
  27. def test_edge(self):
  28. f = bootsteps.StepFormatter()
  29. f.draw_edge = Mock()
  30. a, b = Mock(), Mock()
  31. a.last = True
  32. f.edge(a, b, x=6)
  33. f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
  34. 'x': 6, 'arrowhead': 'none', 'color': 'darkseagreen3',
  35. })
  36. a.last = False
  37. f.edge(a, b, x=6)
  38. f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
  39. 'x': 6,
  40. })
  41. class test_Step:
  42. class Def(bootsteps.StartStopStep):
  43. name = 'test_Step.Def'
  44. def setup(self):
  45. self.steps = []
  46. def test_blueprint_name(self, bp='test_blueprint_name'):
  47. class X(bootsteps.Step):
  48. blueprint = bp
  49. name = 'X'
  50. assert X.name == 'X'
  51. class Y(bootsteps.Step):
  52. name = '%s.Y' % bp
  53. assert Y.name == '{0}.Y'.format(bp)
  54. def test_init(self):
  55. assert self.Def(self)
  56. def test_create(self):
  57. self.Def(self).create(self)
  58. def test_include_if(self):
  59. x = self.Def(self)
  60. x.enabled = True
  61. assert x.include_if(self)
  62. x.enabled = False
  63. assert not x.include_if(self)
  64. def test_instantiate(self):
  65. assert isinstance(
  66. self.Def(self).instantiate(self.Def, self),
  67. self.Def,
  68. )
  69. def test_include_when_enabled(self):
  70. x = self.Def(self)
  71. x.create = Mock()
  72. x.create.return_value = 'George'
  73. assert x.include(self)
  74. assert x.obj == 'George'
  75. x.create.assert_called_with(self)
  76. def test_include_when_disabled(self):
  77. x = self.Def(self)
  78. x.enabled = False
  79. x.create = Mock()
  80. assert not x.include(self)
  81. x.create.assert_not_called()
  82. def test_repr(self):
  83. x = self.Def(self)
  84. assert repr(x)
  85. class test_ConsumerStep:
  86. def test_interface(self):
  87. step = bootsteps.ConsumerStep(self)
  88. with pytest.raises(NotImplementedError):
  89. step.get_consumers(self)
  90. def test_start_stop_shutdown(self):
  91. consumer = Mock()
  92. self.connection = Mock()
  93. class Step(bootsteps.ConsumerStep):
  94. def get_consumers(self, c):
  95. return [consumer]
  96. step = Step(self)
  97. assert step.get_consumers(self) == [consumer]
  98. step.start(self)
  99. consumer.consume.assert_called_with()
  100. step.stop(self)
  101. consumer.cancel.assert_called_with()
  102. step.shutdown(self)
  103. consumer.channel.close.assert_called_with()
  104. def test_start_no_consumers(self):
  105. self.connection = Mock()
  106. class Step(bootsteps.ConsumerStep):
  107. def get_consumers(self, c):
  108. return ()
  109. step = Step(self)
  110. step.start(self)
  111. def test_close_no_consumer_channel(self):
  112. step = bootsteps.ConsumerStep(Mock())
  113. step.consumers = [Mock()]
  114. step.consumers[0].channel = None
  115. step._close(Mock())
  116. class test_StartStopStep:
  117. class Def(bootsteps.StartStopStep):
  118. name = 'test_StartStopStep.Def'
  119. def setup(self):
  120. self.steps = []
  121. def test_start__stop(self):
  122. x = self.Def(self)
  123. x.create = Mock()
  124. # include creates the underlying object and sets
  125. # its x.obj attribute to it, as well as appending
  126. # it to the parent.steps list.
  127. x.include(self)
  128. assert self.steps
  129. assert self.steps[0] is x
  130. x.start(self)
  131. x.obj.start.assert_called_with()
  132. x.stop(self)
  133. x.obj.stop.assert_called_with()
  134. x.obj = None
  135. assert x.start(self) is None
  136. def test_terminate__no_obj(self):
  137. x = self.Def(self)
  138. x.obj = None
  139. x.terminate(Mock())
  140. def test_include_when_disabled(self):
  141. x = self.Def(self)
  142. x.enabled = False
  143. x.include(self)
  144. assert not self.steps
  145. def test_terminate(self):
  146. x = self.Def(self)
  147. x.create = Mock()
  148. x.include(self)
  149. delattr(x.obj, 'terminate')
  150. x.terminate(self)
  151. x.obj.stop.assert_called_with()
  152. class test_Blueprint:
  153. class Blueprint(bootsteps.Blueprint):
  154. name = 'test_Blueprint'
  155. def test_steps_added_to_unclaimed(self):
  156. class tnA(bootsteps.Step):
  157. name = 'test_Blueprint.A'
  158. class tnB(bootsteps.Step):
  159. name = 'test_Blueprint.B'
  160. class xxA(bootsteps.Step):
  161. name = 'xx.A'
  162. class Blueprint(self.Blueprint):
  163. default_steps = [tnA, tnB]
  164. blueprint = Blueprint()
  165. assert tnA in blueprint.types
  166. assert tnB in blueprint.types
  167. assert xxA not in blueprint.types
  168. def test_init(self):
  169. blueprint = self.Blueprint()
  170. assert blueprint.name == 'test_Blueprint'
  171. def test_close__on_close_is_None(self):
  172. blueprint = self.Blueprint()
  173. blueprint.on_close = None
  174. blueprint.send_all = Mock()
  175. blueprint.close(1)
  176. blueprint.send_all.assert_called_with(
  177. 1, 'close', 'closing', reverse=False,
  178. )
  179. def test_send_all_with_None_steps(self):
  180. parent = Mock()
  181. blueprint = self.Blueprint()
  182. parent.steps = [None, None, None]
  183. blueprint.send_all(parent, 'close', 'Closing', reverse=False)
  184. def test_send_all_raises(self):
  185. parent = Mock()
  186. blueprint = self.Blueprint()
  187. parent.steps = [Mock()]
  188. parent.steps[0].foo.side_effect = KeyError()
  189. blueprint.send_all(parent, 'foo', propagate=False)
  190. with pytest.raises(KeyError):
  191. blueprint.send_all(parent, 'foo', propagate=True)
  192. def test_stop_state_in_TERMINATE(self):
  193. blueprint = self.Blueprint()
  194. blueprint.state = bootsteps.TERMINATE
  195. blueprint.stop(Mock())
  196. def test_join_raises_IGNORE_ERRORS(self):
  197. prev, bootsteps.IGNORE_ERRORS = bootsteps.IGNORE_ERRORS, (KeyError,)
  198. try:
  199. blueprint = self.Blueprint()
  200. blueprint.shutdown_complete = Mock()
  201. blueprint.shutdown_complete.wait.side_effect = KeyError('luke')
  202. blueprint.join(timeout=10)
  203. blueprint.shutdown_complete.wait.assert_called_with(timeout=10)
  204. finally:
  205. bootsteps.IGNORE_ERRORS = prev
  206. def test_connect_with(self):
  207. class b1s1(bootsteps.Step):
  208. pass
  209. class b1s2(bootsteps.Step):
  210. last = True
  211. class b2s1(bootsteps.Step):
  212. pass
  213. class b2s2(bootsteps.Step):
  214. last = True
  215. b1 = self.Blueprint([b1s1, b1s2])
  216. b2 = self.Blueprint([b2s1, b2s2])
  217. b1.apply(Mock())
  218. b2.apply(Mock())
  219. b1.connect_with(b2)
  220. assert b1s1 in b1.graph
  221. assert b2s1 in b1.graph
  222. assert b2s2 in b1.graph
  223. assert repr(b1s1)
  224. assert str(b1s1)
  225. def test_topsort_raises_KeyError(self):
  226. class Step(bootsteps.Step):
  227. requires = ('xyxxx.fsdasewe.Unknown',)
  228. b = self.Blueprint([Step])
  229. b.steps = b.claim_steps()
  230. with pytest.raises(ImportError):
  231. b._finalize_steps(b.steps)
  232. Step.requires = ()
  233. b.steps = b.claim_steps()
  234. b._finalize_steps(b.steps)
  235. with patch('celery.bootsteps.DependencyGraph') as Dep:
  236. g = Dep.return_value = Mock()
  237. g.topsort.side_effect = KeyError('foo')
  238. with pytest.raises(KeyError):
  239. b._finalize_steps(b.steps)
  240. def test_apply(self):
  241. class MyBlueprint(bootsteps.Blueprint):
  242. name = 'test_apply'
  243. def modules(self):
  244. return ['A', 'B']
  245. class B(bootsteps.Step):
  246. name = 'test_apply.B'
  247. class C(bootsteps.Step):
  248. name = 'test_apply.C'
  249. requires = [B]
  250. class A(bootsteps.Step):
  251. name = 'test_apply.A'
  252. requires = [C]
  253. class D(bootsteps.Step):
  254. name = 'test_apply.D'
  255. last = True
  256. x = MyBlueprint([A, D])
  257. x.apply(self)
  258. assert isinstance(x.order[0], B)
  259. assert isinstance(x.order[1], C)
  260. assert isinstance(x.order[2], A)
  261. assert isinstance(x.order[3], D)
  262. assert A in x.types
  263. assert x[A.name] is x.order[2]
  264. def test_find_last_but_no_steps(self):
  265. class MyBlueprint(bootsteps.Blueprint):
  266. name = 'qwejwioqjewoqiej'
  267. x = MyBlueprint()
  268. x.apply(self)
  269. assert x._find_last() is None