test_trace.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. from __future__ import absolute_import
  2. from mock import Mock, patch
  3. from celery import uuid
  4. from celery import signals
  5. from celery import states
  6. from celery.exceptions import RetryTaskError, Ignore
  7. from celery.task.trace import (
  8. TraceInfo,
  9. eager_trace_task,
  10. trace_task,
  11. setup_worker_optimizations,
  12. reset_worker_optimizations,
  13. )
  14. from celery.tests.utils import AppCase
  15. def trace(task, args=(), kwargs={}, propagate=False, **opts):
  16. return eager_trace_task(task, 'id-1', args, kwargs,
  17. propagate=propagate, **opts)
  18. class TraceCase(AppCase):
  19. def setup(self):
  20. @self.app.task
  21. def add(x, y):
  22. return x + y
  23. self.add = add
  24. @self.app.task(ignore_result=True)
  25. def add_cast(x, y):
  26. return x + y
  27. self.add_cast = add_cast
  28. @self.app.task
  29. def raises(exc):
  30. raise exc
  31. self.raises = raises
  32. class test_trace(TraceCase):
  33. def test_trace_successful(self):
  34. retval, info = trace(self.add, (2, 2), {})
  35. self.assertIsNone(info)
  36. self.assertEqual(retval, 4)
  37. def test_trace_on_success(self):
  38. @self.app.task(on_success=Mock())
  39. def add_with_success(x, y):
  40. return x + y
  41. trace(add_with_success, (2, 2), {})
  42. self.assertTrue(add_with_success.on_success.called)
  43. def test_trace_after_return(self):
  44. @self.app.task(after_return=Mock())
  45. def add_with_after_return(x, y):
  46. return x + y
  47. trace(add_with_after_return, (2, 2), {})
  48. self.assertTrue(add_with_after_return.after_return.called)
  49. def test_with_prerun_receivers(self):
  50. on_prerun = Mock()
  51. signals.task_prerun.connect(on_prerun)
  52. try:
  53. trace(self.add, (2, 2), {})
  54. self.assertTrue(on_prerun.called)
  55. finally:
  56. signals.task_prerun.receivers[:] = []
  57. def test_with_postrun_receivers(self):
  58. on_postrun = Mock()
  59. signals.task_postrun.connect(on_postrun)
  60. try:
  61. trace(self.add, (2, 2), {})
  62. self.assertTrue(on_postrun.called)
  63. finally:
  64. signals.task_postrun.receivers[:] = []
  65. def test_with_success_receivers(self):
  66. on_success = Mock()
  67. signals.task_success.connect(on_success)
  68. try:
  69. trace(self.add, (2, 2), {})
  70. self.assertTrue(on_success.called)
  71. finally:
  72. signals.task_success.receivers[:] = []
  73. def test_when_chord_part(self):
  74. @self.app.task
  75. def add(x, y):
  76. return x + y
  77. add.backend = Mock()
  78. trace(add, (2, 2), {}, request={'chord': uuid()})
  79. add.backend.on_chord_part_return.assert_called_with(add)
  80. def test_when_backend_cleanup_raises(self):
  81. @self.app.task
  82. def add(x, y):
  83. return x + y
  84. add.backend = Mock(name='backend')
  85. add.backend.process_cleanup.side_effect = KeyError()
  86. trace(add, (2, 2), {}, eager=False)
  87. add.backend.process_cleanup.assert_called_with()
  88. add.backend.process_cleanup.side_effect = MemoryError()
  89. with self.assertRaises(MemoryError):
  90. trace(add, (2, 2), {}, eager=False)
  91. def test_when_Ignore(self):
  92. @self.app.task
  93. def ignored():
  94. raise Ignore()
  95. retval, info = trace(ignored, (), {})
  96. self.assertEqual(info.state, states.IGNORED)
  97. def test_trace_SystemExit(self):
  98. with self.assertRaises(SystemExit):
  99. trace(self.raises, (SystemExit(), ), {})
  100. def test_trace_RetryTaskError(self):
  101. exc = RetryTaskError('foo', 'bar')
  102. _, info = trace(self.raises, (exc, ), {})
  103. self.assertEqual(info.state, states.RETRY)
  104. self.assertIs(info.retval, exc)
  105. def test_trace_exception(self):
  106. exc = KeyError('foo')
  107. _, info = trace(self.raises, (exc, ), {})
  108. self.assertEqual(info.state, states.FAILURE)
  109. self.assertIs(info.retval, exc)
  110. def test_trace_exception_propagate(self):
  111. with self.assertRaises(KeyError):
  112. trace(self.raises, (KeyError('foo'), ), {}, propagate=True)
  113. @patch('celery.task.trace.build_tracer')
  114. @patch('celery.task.trace.report_internal_error')
  115. def test_outside_body_error(self, report_internal_error, build_tracer):
  116. tracer = Mock()
  117. tracer.side_effect = KeyError('foo')
  118. build_tracer.return_value = tracer
  119. @self.app.task
  120. def xtask():
  121. pass
  122. trace_task(xtask, 'uuid', (), {})
  123. self.assertTrue(report_internal_error.call_count)
  124. self.assertIs(xtask.__trace__, tracer)
  125. class test_TraceInfo(TraceCase):
  126. class TI(TraceInfo):
  127. __slots__ = TraceInfo.__slots__ + ('__dict__', )
  128. def test_handle_error_state(self):
  129. x = self.TI(states.FAILURE)
  130. x.handle_failure = Mock()
  131. x.handle_error_state(self.add_cast)
  132. x.handle_failure.assert_called_with(
  133. self.add_cast,
  134. store_errors=self.add_cast.store_errors_even_if_ignored,
  135. )
  136. class test_stackprotection(AppCase):
  137. def test_stackprotection(self):
  138. setup_worker_optimizations(self.app)
  139. try:
  140. @self.app.task(bind=True)
  141. def foo(self, i):
  142. if i:
  143. return foo(0)
  144. return self.request
  145. self.assertTrue(foo(1).called_directly)
  146. finally:
  147. reset_worker_optimizations()