test_trace.py 11 KB


  1. import pytest
  2. from case import Mock, patch
  3. from kombu.exceptions import EncodeError
  4. from celery import group, uuid
  5. from celery import signals
  6. from celery import states
  7. from celery.exceptions import Ignore, Retry, Reject
  8. from celery.app.trace import (
  9. TraceInfo,
  10. build_tracer,
  11. get_log_policy,
  12. log_policy_reject,
  13. log_policy_ignore,
  14. log_policy_internal,
  15. log_policy_expected,
  16. log_policy_unexpected,
  17. trace_task,
  18. _trace_task_ret,
  19. _fast_trace_task,
  20. setup_worker_optimizations,
  21. reset_worker_optimizations,
  22. )
  23. def trace(app, task, args=(), kwargs={},
  24. propagate=False, eager=True, request=None, **opts):
  25. t = build_tracer(task.name, task,
  26. eager=eager, propagate=propagate, app=app, **opts)
  27. ret = t('id-1', args, kwargs, request)
  28. return ret.retval, ret.info
  29. class TraceCase:
  30. def setup(self):
  31. @self.app.task(shared=False)
  32. def add(x, y):
  33. return x + y
  34. self.add = add
  35. @self.app.task(shared=False, ignore_result=True)
  36. def add_cast(x, y):
  37. return x + y
  38. self.add_cast = add_cast
  39. @self.app.task(shared=False)
  40. def raises(exc):
  41. raise exc
  42. self.raises = raises
  43. def trace(self, *args, **kwargs):
  44. return trace(self.app, *args, **kwargs)
  45. class test_trace(TraceCase):
  46. def test_trace_successful(self):
  47. retval, info = self.trace(self.add, (2, 2), {})
  48. assert info is None
  49. assert retval == 4
  50. def test_trace_on_success(self):
  51. @self.app.task(shared=False, on_success=Mock())
  52. def add_with_success(x, y):
  53. return x + y
  54. self.trace(add_with_success, (2, 2), {})
  55. add_with_success.on_success.assert_called()
  56. def test_get_log_policy(self):
  57. einfo = Mock(name='einfo')
  58. einfo.internal = False
  59. assert get_log_policy(self.add, einfo, Reject()) is log_policy_reject
  60. assert get_log_policy(self.add, einfo, Ignore()) is log_policy_ignore
  61. self.add.throws = (TypeError,)
  62. assert (get_log_policy(self.add, einfo, KeyError()) is
  63. log_policy_unexpected)
  64. assert (get_log_policy(self.add, einfo, TypeError()) is
  65. log_policy_expected)
  66. einfo2 = Mock(name='einfo2')
  67. einfo2.internal = True
  68. assert (get_log_policy(self.add, einfo2, KeyError()) is
  69. log_policy_internal)
  70. def test_trace_after_return(self):
  71. @self.app.task(shared=False, after_return=Mock())
  72. def add_with_after_return(x, y):
  73. return x + y
  74. self.trace(add_with_after_return, (2, 2), {})
  75. add_with_after_return.after_return.assert_called()
  76. def test_with_prerun_receivers(self):
  77. on_prerun = Mock()
  78. signals.task_prerun.connect(on_prerun)
  79. try:
  80. self.trace(self.add, (2, 2), {})
  81. on_prerun.assert_called()
  82. finally:
  83. signals.task_prerun.receivers[:] = []
  84. def test_with_postrun_receivers(self):
  85. on_postrun = Mock()
  86. signals.task_postrun.connect(on_postrun)
  87. try:
  88. self.trace(self.add, (2, 2), {})
  89. on_postrun.assert_called()
  90. finally:
  91. signals.task_postrun.receivers[:] = []
  92. def test_with_success_receivers(self):
  93. on_success = Mock()
  94. signals.task_success.connect(on_success)
  95. try:
  96. self.trace(self.add, (2, 2), {})
  97. on_success.assert_called()
  98. finally:
  99. signals.task_success.receivers[:] = []
  100. def test_when_chord_part(self):
  101. @self.app.task(shared=False)
  102. def add(x, y):
  103. return x + y
  104. add.backend = Mock()
  105. request = {'chord': uuid()}
  106. self.trace(add, (2, 2), {}, request=request)
  107. add.backend.mark_as_done.assert_called()
  108. args, kwargs = add.backend.mark_as_done.call_args
  109. assert args[0] == 'id-1'
  110. assert args[1] == 4
  111. assert args[2].chord == request['chord']
  112. assert not args[3]
  113. def test_when_backend_cleanup_raises(self):
  114. @self.app.task(shared=False)
  115. def add(x, y):
  116. return x + y
  117. add.backend = Mock(name='backend')
  118. add.backend.process_cleanup.side_effect = KeyError()
  119. self.trace(add, (2, 2), {}, eager=False)
  120. add.backend.process_cleanup.assert_called_with()
  121. add.backend.process_cleanup.side_effect = MemoryError()
  122. with pytest.raises(MemoryError):
  123. self.trace(add, (2, 2), {}, eager=False)
  124. def test_when_Ignore(self):
  125. @self.app.task(shared=False)
  126. def ignored():
  127. raise Ignore()
  128. retval, info = self.trace(ignored, (), {})
  129. assert info.state == states.IGNORED
  130. def test_when_Reject(self):
  131. @self.app.task(shared=False)
  132. def rejecting():
  133. raise Reject()
  134. retval, info = self.trace(rejecting, (), {})
  135. assert info.state == states.REJECTED
  136. def test_backend_cleanup_raises(self):
  137. self.add.backend.process_cleanup = Mock()
  138. self.add.backend.process_cleanup.side_effect = RuntimeError()
  139. self.trace(self.add, (2, 2), {})
  140. @patch('celery.canvas.maybe_signature')
  141. def test_callbacks__scalar(self, maybe_signature):
  142. sig = Mock(name='sig')
  143. request = {'callbacks': [sig], 'root_id': 'root'}
  144. maybe_signature.return_value = sig
  145. retval, _ = self.trace(self.add, (2, 2), {}, request=request)
  146. sig.apply_async.assert_called_with(
  147. (4,), parent_id='id-1', root_id='root',
  148. )
  149. @patch('celery.canvas.maybe_signature')
  150. def test_chain_proto2(self, maybe_signature):
  151. sig = Mock(name='sig')
  152. sig2 = Mock(name='sig2')
  153. request = {'chain': [sig2, sig], 'root_id': 'root'}
  154. maybe_signature.return_value = sig
  155. retval, _ = self.trace(self.add, (2, 2), {}, request=request)
  156. sig.apply_async.assert_called_with(
  157. (4, ), parent_id='id-1', root_id='root',
  158. chain=[sig2],
  159. )
  160. @patch('celery.canvas.maybe_signature')
  161. def test_callbacks__EncodeError(self, maybe_signature):
  162. sig = Mock(name='sig')
  163. request = {'callbacks': [sig], 'root_id': 'root'}
  164. maybe_signature.return_value = sig
  165. sig.apply_async.side_effect = EncodeError()
  166. retval, einfo = self.trace(self.add, (2, 2), {}, request=request)
  167. assert einfo.state == states.FAILURE
  168. @patch('celery.canvas.maybe_signature')
  169. @patch('celery.app.trace.group.apply_async')
  170. def test_callbacks__sigs(self, group_, maybe_signature):
  171. sig1 = Mock(name='sig')
  172. sig2 = Mock(name='sig2')
  173. sig3 = group([Mock(name='g1'), Mock(name='g2')], app=self.app)
  174. sig3.apply_async = Mock(name='gapply')
  175. request = {'callbacks': [sig1, sig3, sig2], 'root_id': 'root'}
  176. def passt(s, *args, **kwargs):
  177. return s
  178. maybe_signature.side_effect = passt
  179. retval, _ = self.trace(self.add, (2, 2), {}, request=request)
  180. group_.assert_called_with(
  181. (4,), parent_id='id-1', root_id='root',
  182. )
  183. sig3.apply_async.assert_called_with(
  184. (4,), parent_id='id-1', root_id='root',
  185. )
  186. @patch('celery.canvas.maybe_signature')
  187. @patch('celery.app.trace.group.apply_async')
  188. def test_callbacks__only_groups(self, group_, maybe_signature):
  189. sig1 = group([Mock(name='g1'), Mock(name='g2')], app=self.app)
  190. sig2 = group([Mock(name='g3'), Mock(name='g4')], app=self.app)
  191. sig1.apply_async = Mock(name='gapply')
  192. sig2.apply_async = Mock(name='gapply')
  193. request = {'callbacks': [sig1, sig2], 'root_id': 'root'}
  194. def passt(s, *args, **kwargs):
  195. return s
  196. maybe_signature.side_effect = passt
  197. retval, _ = self.trace(self.add, (2, 2), {}, request=request)
  198. sig1.apply_async.assert_called_with(
  199. (4,), parent_id='id-1', root_id='root',
  200. )
  201. sig2.apply_async.assert_called_with(
  202. (4,), parent_id='id-1', root_id='root',
  203. )
  204. def test_trace_SystemExit(self):
  205. with pytest.raises(SystemExit):
  206. self.trace(self.raises, (SystemExit(),), {})
  207. def test_trace_Retry(self):
  208. exc = Retry('foo', 'bar')
  209. _, info = self.trace(self.raises, (exc,), {})
  210. assert info.state == states.RETRY
  211. assert info.retval is exc
  212. def test_trace_exception(self):
  213. exc = KeyError('foo')
  214. _, info = self.trace(self.raises, (exc,), {})
  215. assert info.state == states.FAILURE
  216. assert info.retval is exc
  217. def test_trace_task_ret__no_content_type(self):
  218. _trace_task_ret(
  219. self.add.name, 'id1', {}, ((2, 2), {}, {}), None, None,
  220. app=self.app,
  221. )
  222. def test_fast_trace_task__no_content_type(self):
  223. self.app.tasks[self.add.name].__trace__ = build_tracer(
  224. self.add.name, self.add, app=self.app,
  225. )
  226. _fast_trace_task(
  227. self.add.name, 'id1', {}, ((2, 2), {}, {}), None, None,
  228. app=self.app, _loc=[self.app.tasks, {}, 'hostname']
  229. )
  230. def test_trace_exception_propagate(self):
  231. with pytest.raises(KeyError):
  232. self.trace(self.raises, (KeyError('foo'),), {}, propagate=True)
  233. @patch('celery.app.trace.build_tracer')
  234. @patch('celery.app.trace.report_internal_error')
  235. def test_outside_body_error(self, report_internal_error, build_tracer):
  236. tracer = Mock()
  237. tracer.side_effect = KeyError('foo')
  238. build_tracer.return_value = tracer
  239. @self.app.task(shared=False)
  240. def xtask():
  241. pass
  242. trace_task(xtask, 'uuid', (), {})
  243. assert report_internal_error.call_count
  244. assert xtask.__trace__ is tracer
  245. class test_TraceInfo(TraceCase):
  246. class TI(TraceInfo):
  247. __slots__ = TraceInfo.__slots__ + ('__dict__',)
  248. def test_handle_error_state(self):
  249. x = self.TI(states.FAILURE)
  250. x.handle_failure = Mock()
  251. x.handle_error_state(self.add_cast, self.add_cast.request)
  252. x.handle_failure.assert_called_with(
  253. self.add_cast, self.add_cast.request,
  254. store_errors=self.add_cast.store_errors_even_if_ignored,
  255. call_errbacks=True,
  256. )
  257. @patch('celery.app.trace.ExceptionInfo')
  258. def test_handle_reject(self, ExceptionInfo):
  259. x = self.TI(states.FAILURE)
  260. x._log_error = Mock(name='log_error')
  261. req = Mock(name='req')
  262. x.handle_reject(self.add, req)
  263. x._log_error.assert_called_with(self.add, req, ExceptionInfo())
  264. class test_stackprotection:
  265. def test_stackprotection(self):
  266. setup_worker_optimizations(self.app)
  267. try:
  268. @self.app.task(shared=False, bind=True)
  269. def foo(self, i):
  270. if i:
  271. return foo(0)
  272. return self.request
  273. assert foo(1).called_directly
  274. finally:
  275. reset_worker_optimizations()