test_trace.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. from __future__ import absolute_import
  2. from celery import uuid
  3. from celery import signals
  4. from celery import states
  5. from celery.exceptions import Ignore, Retry
  6. from celery.app.trace import (
  7. TraceInfo,
  8. eager_trace_task,
  9. trace_task,
  10. setup_worker_optimizations,
  11. reset_worker_optimizations,
  12. )
  13. from celery.tests.case import AppCase, Mock, patch
  14. def trace(app, task, args=(), kwargs={}, propagate=False, **opts):
  15. return eager_trace_task(task, 'id-1', args, kwargs,
  16. propagate=propagate, app=app, **opts)
  17. class TraceCase(AppCase):
  18. def setup(self):
  19. @self.app.task(shared=False)
  20. def add(x, y):
  21. return x + y
  22. self.add = add
  23. @self.app.task(shared=False, ignore_result=True)
  24. def add_cast(x, y):
  25. return x + y
  26. self.add_cast = add_cast
  27. @self.app.task(shared=False)
  28. def raises(exc):
  29. raise exc
  30. self.raises = raises
  31. def trace(self, *args, **kwargs):
  32. return trace(self.app, *args, **kwargs)
  33. class test_trace(TraceCase):
  34. def test_trace_successful(self):
  35. retval, info = self.trace(self.add, (2, 2), {})
  36. self.assertIsNone(info)
  37. self.assertEqual(retval, 4)
  38. def test_trace_on_success(self):
  39. @self.app.task(shared=False, on_success=Mock())
  40. def add_with_success(x, y):
  41. return x + y
  42. self.trace(add_with_success, (2, 2), {})
  43. self.assertTrue(add_with_success.on_success.called)
  44. def test_trace_after_return(self):
  45. @self.app.task(shared=False, after_return=Mock())
  46. def add_with_after_return(x, y):
  47. return x + y
  48. self.trace(add_with_after_return, (2, 2), {})
  49. self.assertTrue(add_with_after_return.after_return.called)
  50. def test_with_prerun_receivers(self):
  51. on_prerun = Mock()
  52. signals.task_prerun.connect(on_prerun)
  53. try:
  54. self.trace(self.add, (2, 2), {})
  55. self.assertTrue(on_prerun.called)
  56. finally:
  57. signals.task_prerun.receivers[:] = []
  58. def test_with_postrun_receivers(self):
  59. on_postrun = Mock()
  60. signals.task_postrun.connect(on_postrun)
  61. try:
  62. self.trace(self.add, (2, 2), {})
  63. self.assertTrue(on_postrun.called)
  64. finally:
  65. signals.task_postrun.receivers[:] = []
  66. def test_with_success_receivers(self):
  67. on_success = Mock()
  68. signals.task_success.connect(on_success)
  69. try:
  70. self.trace(self.add, (2, 2), {})
  71. self.assertTrue(on_success.called)
  72. finally:
  73. signals.task_success.receivers[:] = []
  74. def test_when_chord_part(self):
  75. @self.app.task(shared=False)
  76. def add(x, y):
  77. return x + y
  78. add.backend = Mock()
  79. self.trace(add, (2, 2), {}, request={'chord': uuid()})
  80. add.backend.on_chord_part_return.assert_called_with(add, 'SUCCESS', 4)
  81. def test_when_backend_cleanup_raises(self):
  82. @self.app.task(shared=False)
  83. def add(x, y):
  84. return x + y
  85. add.backend = Mock(name='backend')
  86. add.backend.process_cleanup.side_effect = KeyError()
  87. self.trace(add, (2, 2), {}, eager=False)
  88. add.backend.process_cleanup.assert_called_with()
  89. add.backend.process_cleanup.side_effect = MemoryError()
  90. with self.assertRaises(MemoryError):
  91. self.trace(add, (2, 2), {}, eager=False)
  92. def test_when_Ignore(self):
  93. @self.app.task(shared=False)
  94. def ignored():
  95. raise Ignore()
  96. retval, info = self.trace(ignored, (), {})
  97. self.assertEqual(info.state, states.IGNORED)
  98. def test_trace_SystemExit(self):
  99. with self.assertRaises(SystemExit):
  100. self.trace(self.raises, (SystemExit(), ), {})
  101. def test_trace_Retry(self):
  102. exc = Retry('foo', 'bar')
  103. _, info = self.trace(self.raises, (exc, ), {})
  104. self.assertEqual(info.state, states.RETRY)
  105. self.assertIs(info.retval, exc)
  106. def test_trace_exception(self):
  107. exc = KeyError('foo')
  108. _, info = self.trace(self.raises, (exc, ), {})
  109. self.assertEqual(info.state, states.FAILURE)
  110. self.assertIs(info.retval, exc)
  111. def test_trace_exception_propagate(self):
  112. with self.assertRaises(KeyError):
  113. self.trace(self.raises, (KeyError('foo'), ), {}, propagate=True)
  114. @patch('celery.app.trace.build_tracer')
  115. @patch('celery.app.trace.report_internal_error')
  116. def test_outside_body_error(self, report_internal_error, build_tracer):
  117. tracer = Mock()
  118. tracer.side_effect = KeyError('foo')
  119. build_tracer.return_value = tracer
  120. @self.app.task(shared=False)
  121. def xtask():
  122. pass
  123. trace_task(xtask, 'uuid', (), {})
  124. self.assertTrue(report_internal_error.call_count)
  125. self.assertIs(xtask.__trace__, tracer)
  126. class test_TraceInfo(TraceCase):
  127. class TI(TraceInfo):
  128. __slots__ = TraceInfo.__slots__ + ('__dict__', )
  129. def test_handle_error_state(self):
  130. x = self.TI(states.FAILURE)
  131. x.handle_failure = Mock()
  132. x.handle_error_state(self.add_cast)
  133. x.handle_failure.assert_called_with(
  134. self.add_cast,
  135. store_errors=self.add_cast.store_errors_even_if_ignored,
  136. )
  137. class test_stackprotection(AppCase):
  138. def test_stackprotection(self):
  139. setup_worker_optimizations(self.app)
  140. try:
  141. @self.app.task(shared=False, bind=True)
  142. def foo(self, i):
  143. if i:
  144. return foo(0)
  145. return self.request
  146. self.assertTrue(foo(1).called_directly)
  147. finally:
  148. reset_worker_optimizations()