test_bootsteps.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, patch
  4. from celery import bootsteps
  5. class test_StepFormatter:
  6. def test_get_prefix(self):
  7. f = bootsteps.StepFormatter()
  8. s = Mock()
  9. s.last = True
  10. assert f._get_prefix(s) == f.blueprint_prefix
  11. s2 = Mock()
  12. s2.last = False
  13. s2.conditional = True
  14. assert f._get_prefix(s2) == f.conditional_prefix
  15. s3 = Mock()
  16. s3.last = s3.conditional = False
  17. assert f._get_prefix(s3) == ''
  18. def test_node(self):
  19. f = bootsteps.StepFormatter()
  20. f.draw_node = Mock()
  21. step = Mock()
  22. step.last = False
  23. f.node(step, x=3)
  24. f.draw_node.assert_called_with(step, f.node_scheme, {'x': 3})
  25. step.last = True
  26. f.node(step, x=3)
  27. f.draw_node.assert_called_with(step, f.blueprint_scheme, {'x': 3})
  28. def test_edge(self):
  29. f = bootsteps.StepFormatter()
  30. f.draw_edge = Mock()
  31. a, b = Mock(), Mock()
  32. a.last = True
  33. f.edge(a, b, x=6)
  34. f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
  35. 'x': 6, 'arrowhead': 'none', 'color': 'darkseagreen3',
  36. })
  37. a.last = False
  38. f.edge(a, b, x=6)
  39. f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
  40. 'x': 6,
  41. })
  42. class test_Step:
  43. class Def(bootsteps.StartStopStep):
  44. name = 'test_Step.Def'
  45. def setup(self):
  46. self.steps = []
  47. def test_blueprint_name(self, bp='test_blueprint_name'):
  48. class X(bootsteps.Step):
  49. blueprint = bp
  50. name = 'X'
  51. assert X.name == 'X'
  52. class Y(bootsteps.Step):
  53. name = '%s.Y' % bp
  54. assert Y.name == '{0}.Y'.format(bp)
  55. def test_init(self):
  56. assert self.Def(self)
  57. def test_create(self):
  58. self.Def(self).create(self)
  59. def test_include_if(self):
  60. x = self.Def(self)
  61. x.enabled = True
  62. assert x.include_if(self)
  63. x.enabled = False
  64. assert not x.include_if(self)
  65. def test_instantiate(self):
  66. assert isinstance(
  67. self.Def(self).instantiate(self.Def, self),
  68. self.Def,
  69. )
  70. def test_include_when_enabled(self):
  71. x = self.Def(self)
  72. x.create = Mock()
  73. x.create.return_value = 'George'
  74. assert x.include(self)
  75. assert x.obj == 'George'
  76. x.create.assert_called_with(self)
  77. def test_include_when_disabled(self):
  78. x = self.Def(self)
  79. x.enabled = False
  80. x.create = Mock()
  81. assert not x.include(self)
  82. x.create.assert_not_called()
  83. def test_repr(self):
  84. x = self.Def(self)
  85. assert repr(x)
  86. class test_ConsumerStep:
  87. def test_interface(self):
  88. step = bootsteps.ConsumerStep(self)
  89. with pytest.raises(NotImplementedError):
  90. step.get_consumers(self)
  91. def test_start_stop_shutdown(self):
  92. consumer = Mock()
  93. self.connection = Mock()
  94. class Step(bootsteps.ConsumerStep):
  95. def get_consumers(self, c):
  96. return [consumer]
  97. step = Step(self)
  98. assert step.get_consumers(self) == [consumer]
  99. step.start(self)
  100. consumer.consume.assert_called_with()
  101. step.stop(self)
  102. consumer.cancel.assert_called_with()
  103. step.shutdown(self)
  104. consumer.channel.close.assert_called_with()
  105. def test_start_no_consumers(self):
  106. self.connection = Mock()
  107. class Step(bootsteps.ConsumerStep):
  108. def get_consumers(self, c):
  109. return ()
  110. step = Step(self)
  111. step.start(self)
  112. def test_close_no_consumer_channel(self):
  113. step = bootsteps.ConsumerStep(Mock())
  114. step.consumers = [Mock()]
  115. step.consumers[0].channel = None
  116. step._close(Mock())
  117. class test_StartStopStep:
  118. class Def(bootsteps.StartStopStep):
  119. name = 'test_StartStopStep.Def'
  120. def setup(self):
  121. self.steps = []
  122. def test_start__stop(self):
  123. x = self.Def(self)
  124. x.create = Mock()
  125. # include creates the underlying object and sets
  126. # its x.obj attribute to it, as well as appending
  127. # it to the parent.steps list.
  128. x.include(self)
  129. assert self.steps
  130. assert self.steps[0] is x
  131. x.start(self)
  132. x.obj.start.assert_called_with()
  133. x.stop(self)
  134. x.obj.stop.assert_called_with()
  135. x.obj = None
  136. assert x.start(self) is None
  137. def test_terminate__no_obj(self):
  138. x = self.Def(self)
  139. x.obj = None
  140. x.terminate(Mock())
  141. def test_include_when_disabled(self):
  142. x = self.Def(self)
  143. x.enabled = False
  144. x.include(self)
  145. assert not self.steps
  146. def test_terminate(self):
  147. x = self.Def(self)
  148. x.create = Mock()
  149. x.include(self)
  150. delattr(x.obj, 'terminate')
  151. x.terminate(self)
  152. x.obj.stop.assert_called_with()
  153. class test_Blueprint:
  154. class Blueprint(bootsteps.Blueprint):
  155. name = 'test_Blueprint'
  156. def test_steps_added_to_unclaimed(self):
  157. class tnA(bootsteps.Step):
  158. name = 'test_Blueprint.A'
  159. class tnB(bootsteps.Step):
  160. name = 'test_Blueprint.B'
  161. class xxA(bootsteps.Step):
  162. name = 'xx.A'
  163. class Blueprint(self.Blueprint):
  164. default_steps = [tnA, tnB]
  165. blueprint = Blueprint()
  166. assert tnA in blueprint.types
  167. assert tnB in blueprint.types
  168. assert xxA not in blueprint.types
  169. def test_init(self):
  170. blueprint = self.Blueprint()
  171. assert blueprint.name == 'test_Blueprint'
  172. def test_close__on_close_is_None(self):
  173. blueprint = self.Blueprint()
  174. blueprint.on_close = None
  175. blueprint.send_all = Mock()
  176. blueprint.close(1)
  177. blueprint.send_all.assert_called_with(
  178. 1, 'close', 'closing', reverse=False,
  179. )
  180. def test_send_all_with_None_steps(self):
  181. parent = Mock()
  182. blueprint = self.Blueprint()
  183. parent.steps = [None, None, None]
  184. blueprint.send_all(parent, 'close', 'Closing', reverse=False)
  185. def test_send_all_raises(self):
  186. parent = Mock()
  187. blueprint = self.Blueprint()
  188. parent.steps = [Mock()]
  189. parent.steps[0].foo.side_effect = KeyError()
  190. blueprint.send_all(parent, 'foo', propagate=False)
  191. with pytest.raises(KeyError):
  192. blueprint.send_all(parent, 'foo', propagate=True)
  193. def test_stop_state_in_TERMINATE(self):
  194. blueprint = self.Blueprint()
  195. blueprint.state = bootsteps.TERMINATE
  196. blueprint.stop(Mock())
  197. def test_join_raises_IGNORE_ERRORS(self):
  198. prev, bootsteps.IGNORE_ERRORS = bootsteps.IGNORE_ERRORS, (KeyError,)
  199. try:
  200. blueprint = self.Blueprint()
  201. blueprint.shutdown_complete = Mock()
  202. blueprint.shutdown_complete.wait.side_effect = KeyError('luke')
  203. blueprint.join(timeout=10)
  204. blueprint.shutdown_complete.wait.assert_called_with(timeout=10)
  205. finally:
  206. bootsteps.IGNORE_ERRORS = prev
  207. def test_connect_with(self):
  208. class b1s1(bootsteps.Step):
  209. pass
  210. class b1s2(bootsteps.Step):
  211. last = True
  212. class b2s1(bootsteps.Step):
  213. pass
  214. class b2s2(bootsteps.Step):
  215. last = True
  216. b1 = self.Blueprint([b1s1, b1s2])
  217. b2 = self.Blueprint([b2s1, b2s2])
  218. b1.apply(Mock())
  219. b2.apply(Mock())
  220. b1.connect_with(b2)
  221. assert b1s1 in b1.graph
  222. assert b2s1 in b1.graph
  223. assert b2s2 in b1.graph
  224. assert repr(b1s1)
  225. assert str(b1s1)
  226. def test_topsort_raises_KeyError(self):
  227. class Step(bootsteps.Step):
  228. requires = ('xyxxx.fsdasewe.Unknown',)
  229. b = self.Blueprint([Step])
  230. b.steps = b.claim_steps()
  231. with pytest.raises(ImportError):
  232. b._finalize_steps(b.steps)
  233. Step.requires = ()
  234. b.steps = b.claim_steps()
  235. b._finalize_steps(b.steps)
  236. with patch('celery.bootsteps.DependencyGraph') as Dep:
  237. g = Dep.return_value = Mock()
  238. g.topsort.side_effect = KeyError('foo')
  239. with pytest.raises(KeyError):
  240. b._finalize_steps(b.steps)
  241. def test_apply(self):
  242. class MyBlueprint(bootsteps.Blueprint):
  243. name = 'test_apply'
  244. def modules(self):
  245. return ['A', 'B']
  246. class B(bootsteps.Step):
  247. name = 'test_apply.B'
  248. class C(bootsteps.Step):
  249. name = 'test_apply.C'
  250. requires = [B]
  251. class A(bootsteps.Step):
  252. name = 'test_apply.A'
  253. requires = [C]
  254. class D(bootsteps.Step):
  255. name = 'test_apply.D'
  256. last = True
  257. x = MyBlueprint([A, D])
  258. x.apply(self)
  259. assert isinstance(x.order[0], B)
  260. assert isinstance(x.order[1], C)
  261. assert isinstance(x.order[2], A)
  262. assert isinstance(x.order[3], D)
  263. assert A in x.types
  264. assert x[A.name] is x.order[2]
  265. def test_find_last_but_no_steps(self):
  266. class MyBlueprint(bootsteps.Blueprint):
  267. name = 'qwejwioqjewoqiej'
  268. x = MyBlueprint()
  269. x.apply(self)
  270. assert x._find_last() is None