test_trace.py 11 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, patch
  4. from kombu.exceptions import EncodeError
  5. from celery import group, uuid
  6. from celery import signals
  7. from celery import states
  8. from celery.exceptions import Ignore, Retry, Reject
  9. from celery.app.trace import (
  10. TraceInfo,
  11. build_tracer,
  12. get_log_policy,
  13. log_policy_reject,
  14. log_policy_ignore,
  15. log_policy_internal,
  16. log_policy_expected,
  17. log_policy_unexpected,
  18. trace_task,
  19. _trace_task_ret,
  20. _fast_trace_task,
  21. setup_worker_optimizations,
  22. reset_worker_optimizations,
  23. )
  24. def trace(app, task, args=(), kwargs={},
  25. propagate=False, eager=True, request=None, **opts):
  26. t = build_tracer(task.name, task,
  27. eager=eager, propagate=propagate, app=app, **opts)
  28. ret = t('id-1', args, kwargs, request)
  29. return ret.retval, ret.info
  30. class TraceCase:
  31. def setup(self):
  32. @self.app.task(shared=False)
  33. def add(x, y):
  34. return x + y
  35. self.add = add
  36. @self.app.task(shared=False, ignore_result=True)
  37. def add_cast(x, y):
  38. return x + y
  39. self.add_cast = add_cast
  40. @self.app.task(shared=False)
  41. def raises(exc):
  42. raise exc
  43. self.raises = raises
  44. def trace(self, *args, **kwargs):
  45. return trace(self.app, *args, **kwargs)
  46. class test_trace(TraceCase):
  47. def test_trace_successful(self):
  48. retval, info = self.trace(self.add, (2, 2), {})
  49. assert info is None
  50. assert retval == 4
  51. def test_trace_on_success(self):
  52. @self.app.task(shared=False, on_success=Mock())
  53. def add_with_success(x, y):
  54. return x + y
  55. self.trace(add_with_success, (2, 2), {})
  56. add_with_success.on_success.assert_called()
  57. def test_get_log_policy(self):
  58. einfo = Mock(name='einfo')
  59. einfo.internal = False
  60. assert get_log_policy(self.add, einfo, Reject()) is log_policy_reject
  61. assert get_log_policy(self.add, einfo, Ignore()) is log_policy_ignore
  62. self.add.throws = (TypeError,)
  63. assert (get_log_policy(self.add, einfo, KeyError()) is
  64. log_policy_unexpected)
  65. assert (get_log_policy(self.add, einfo, TypeError()) is
  66. log_policy_expected)
  67. einfo2 = Mock(name='einfo2')
  68. einfo2.internal = True
  69. assert (get_log_policy(self.add, einfo2, KeyError()) is
  70. log_policy_internal)
  71. def test_trace_after_return(self):
  72. @self.app.task(shared=False, after_return=Mock())
  73. def add_with_after_return(x, y):
  74. return x + y
  75. self.trace(add_with_after_return, (2, 2), {})
  76. add_with_after_return.after_return.assert_called()
  77. def test_with_prerun_receivers(self):
  78. on_prerun = Mock()
  79. signals.task_prerun.connect(on_prerun)
  80. try:
  81. self.trace(self.add, (2, 2), {})
  82. on_prerun.assert_called()
  83. finally:
  84. signals.task_prerun.receivers[:] = []
  85. def test_with_postrun_receivers(self):
  86. on_postrun = Mock()
  87. signals.task_postrun.connect(on_postrun)
  88. try:
  89. self.trace(self.add, (2, 2), {})
  90. on_postrun.assert_called()
  91. finally:
  92. signals.task_postrun.receivers[:] = []
  93. def test_with_success_receivers(self):
  94. on_success = Mock()
  95. signals.task_success.connect(on_success)
  96. try:
  97. self.trace(self.add, (2, 2), {})
  98. on_success.assert_called()
  99. finally:
  100. signals.task_success.receivers[:] = []
  101. def test_when_chord_part(self):
  102. @self.app.task(shared=False)
  103. def add(x, y):
  104. return x + y
  105. add.backend = Mock()
  106. request = {'chord': uuid()}
  107. self.trace(add, (2, 2), {}, request=request)
  108. add.backend.mark_as_done.assert_called()
  109. args, kwargs = add.backend.mark_as_done.call_args
  110. assert args[0] == 'id-1'
  111. assert args[1] == 4
  112. assert args[2].chord == request['chord']
  113. assert not args[3]
  114. def test_when_backend_cleanup_raises(self):
  115. @self.app.task(shared=False)
  116. def add(x, y):
  117. return x + y
  118. add.backend = Mock(name='backend')
  119. add.backend.process_cleanup.side_effect = KeyError()
  120. self.trace(add, (2, 2), {}, eager=False)
  121. add.backend.process_cleanup.assert_called_with()
  122. add.backend.process_cleanup.side_effect = MemoryError()
  123. with pytest.raises(MemoryError):
  124. self.trace(add, (2, 2), {}, eager=False)
  125. def test_when_Ignore(self):
  126. @self.app.task(shared=False)
  127. def ignored():
  128. raise Ignore()
  129. retval, info = self.trace(ignored, (), {})
  130. assert info.state == states.IGNORED
  131. def test_when_Reject(self):
  132. @self.app.task(shared=False)
  133. def rejecting():
  134. raise Reject()
  135. retval, info = self.trace(rejecting, (), {})
  136. assert info.state == states.REJECTED
  137. def test_backend_cleanup_raises(self):
  138. self.add.backend.process_cleanup = Mock()
  139. self.add.backend.process_cleanup.side_effect = RuntimeError()
  140. self.trace(self.add, (2, 2), {})
  141. @patch('celery.canvas.maybe_signature')
  142. def test_callbacks__scalar(self, maybe_signature):
  143. sig = Mock(name='sig')
  144. request = {'callbacks': [sig], 'root_id': 'root'}
  145. maybe_signature.return_value = sig
  146. retval, _ = self.trace(self.add, (2, 2), {}, request=request)
  147. sig.apply_async.assert_called_with(
  148. (4,), parent_id='id-1', root_id='root',
  149. )
  150. @patch('celery.canvas.maybe_signature')
  151. def test_chain_proto2(self, maybe_signature):
  152. sig = Mock(name='sig')
  153. sig2 = Mock(name='sig2')
  154. request = {'chain': [sig2, sig], 'root_id': 'root'}
  155. maybe_signature.return_value = sig
  156. retval, _ = self.trace(self.add, (2, 2), {}, request=request)
  157. sig.apply_async.assert_called_with(
  158. (4, ), parent_id='id-1', root_id='root',
  159. chain=[sig2],
  160. )
  161. @patch('celery.canvas.maybe_signature')
  162. def test_callbacks__EncodeError(self, maybe_signature):
  163. sig = Mock(name='sig')
  164. request = {'callbacks': [sig], 'root_id': 'root'}
  165. maybe_signature.return_value = sig
  166. sig.apply_async.side_effect = EncodeError()
  167. retval, einfo = self.trace(self.add, (2, 2), {}, request=request)
  168. assert einfo.state == states.FAILURE
  169. @patch('celery.canvas.maybe_signature')
  170. @patch('celery.app.trace.group.apply_async')
  171. def test_callbacks__sigs(self, group_, maybe_signature):
  172. sig1 = Mock(name='sig')
  173. sig2 = Mock(name='sig2')
  174. sig3 = group([Mock(name='g1'), Mock(name='g2')], app=self.app)
  175. sig3.apply_async = Mock(name='gapply')
  176. request = {'callbacks': [sig1, sig3, sig2], 'root_id': 'root'}
  177. def passt(s, *args, **kwargs):
  178. return s
  179. maybe_signature.side_effect = passt
  180. retval, _ = self.trace(self.add, (2, 2), {}, request=request)
  181. group_.assert_called_with(
  182. (4,), parent_id='id-1', root_id='root',
  183. )
  184. sig3.apply_async.assert_called_with(
  185. (4,), parent_id='id-1', root_id='root',
  186. )
  187. @patch('celery.canvas.maybe_signature')
  188. @patch('celery.app.trace.group.apply_async')
  189. def test_callbacks__only_groups(self, group_, maybe_signature):
  190. sig1 = group([Mock(name='g1'), Mock(name='g2')], app=self.app)
  191. sig2 = group([Mock(name='g3'), Mock(name='g4')], app=self.app)
  192. sig1.apply_async = Mock(name='gapply')
  193. sig2.apply_async = Mock(name='gapply')
  194. request = {'callbacks': [sig1, sig2], 'root_id': 'root'}
  195. def passt(s, *args, **kwargs):
  196. return s
  197. maybe_signature.side_effect = passt
  198. retval, _ = self.trace(self.add, (2, 2), {}, request=request)
  199. sig1.apply_async.assert_called_with(
  200. (4,), parent_id='id-1', root_id='root',
  201. )
  202. sig2.apply_async.assert_called_with(
  203. (4,), parent_id='id-1', root_id='root',
  204. )
  205. def test_trace_SystemExit(self):
  206. with pytest.raises(SystemExit):
  207. self.trace(self.raises, (SystemExit(),), {})
  208. def test_trace_Retry(self):
  209. exc = Retry('foo', 'bar')
  210. _, info = self.trace(self.raises, (exc,), {})
  211. assert info.state == states.RETRY
  212. assert info.retval is exc
  213. def test_trace_exception(self):
  214. exc = KeyError('foo')
  215. _, info = self.trace(self.raises, (exc,), {})
  216. assert info.state == states.FAILURE
  217. assert info.retval is exc
  218. def test_trace_task_ret__no_content_type(self):
  219. _trace_task_ret(
  220. self.add.name, 'id1', {}, ((2, 2), {}, {}), None, None,
  221. app=self.app,
  222. )
  223. def test_fast_trace_task__no_content_type(self):
  224. self.app.tasks[self.add.name].__trace__ = build_tracer(
  225. self.add.name, self.add, app=self.app,
  226. )
  227. _fast_trace_task(
  228. self.add.name, 'id1', {}, ((2, 2), {}, {}), None, None,
  229. app=self.app, _loc=[self.app.tasks, {}, 'hostname']
  230. )
  231. def test_trace_exception_propagate(self):
  232. with pytest.raises(KeyError):
  233. self.trace(self.raises, (KeyError('foo'),), {}, propagate=True)
  234. @patch('celery.app.trace.build_tracer')
  235. @patch('celery.app.trace.report_internal_error')
  236. def test_outside_body_error(self, report_internal_error, build_tracer):
  237. tracer = Mock()
  238. tracer.side_effect = KeyError('foo')
  239. build_tracer.return_value = tracer
  240. @self.app.task(shared=False)
  241. def xtask():
  242. pass
  243. trace_task(xtask, 'uuid', (), {})
  244. assert report_internal_error.call_count
  245. assert xtask.__trace__ is tracer
  246. class test_TraceInfo(TraceCase):
  247. class TI(TraceInfo):
  248. __slots__ = TraceInfo.__slots__ + ('__dict__',)
  249. def test_handle_error_state(self):
  250. x = self.TI(states.FAILURE)
  251. x.handle_failure = Mock()
  252. x.handle_error_state(self.add_cast, self.add_cast.request)
  253. x.handle_failure.assert_called_with(
  254. self.add_cast, self.add_cast.request,
  255. store_errors=self.add_cast.store_errors_even_if_ignored,
  256. call_errbacks=True,
  257. )
  258. @patch('celery.app.trace.ExceptionInfo')
  259. def test_handle_reject(self, ExceptionInfo):
  260. x = self.TI(states.FAILURE)
  261. x._log_error = Mock(name='log_error')
  262. req = Mock(name='req')
  263. x.handle_reject(self.add, req)
  264. x._log_error.assert_called_with(self.add, req, ExceptionInfo())
  265. class test_stackprotection:
  266. def test_stackprotection(self):
  267. setup_worker_optimizations(self.app)
  268. try:
  269. @self.app.task(shared=False, bind=True)
  270. def foo(self, i):
  271. if i:
  272. return foo(0)
  273. return self.request
  274. assert foo(1).called_directly
  275. finally:
  276. reset_worker_optimizations()