test_trace.py 6.6 KB


  1. from __future__ import absolute_import
  2. from celery import uuid
  3. from celery import signals
  4. from celery import states
  5. from celery.exceptions import Ignore, Retry
  6. from celery.app.trace import (
  7. TraceInfo,
  8. eager_trace_task,
  9. trace_task,
  10. setup_worker_optimizations,
  11. reset_worker_optimizations,
  12. )
  13. from celery.tests.case import AppCase, Mock, patch
  14. def trace(app, task, args=(), kwargs={}, propagate=False, **opts):
  15. return eager_trace_task(task, 'id-1', args, kwargs,
  16. propagate=propagate, app=app, **opts)
  17. class TraceCase(AppCase):
  18. def setup(self):
  19. @self.app.task(shared=False)
  20. def add(x, y):
  21. return x + y
  22. self.add = add
  23. @self.app.task(shared=False, ignore_result=True)
  24. def add_cast(x, y):
  25. return x + y
  26. self.add_cast = add_cast
  27. @self.app.task(shared=False)
  28. def raises(exc):
  29. raise exc
  30. self.raises = raises
  31. def trace(self, *args, **kwargs):
  32. return trace(self.app, *args, **kwargs)
  33. class test_trace(TraceCase):
  34. def test_trace_successful(self):
  35. retval, info = self.trace(self.add, (2, 2), {})
  36. self.assertIsNone(info)
  37. self.assertEqual(retval, 4)
  38. def test_trace_on_success(self):
  39. @self.app.task(shared=False, on_success=Mock())
  40. def add_with_success(x, y):
  41. return x + y
  42. self.trace(add_with_success, (2, 2), {})
  43. self.assertTrue(add_with_success.on_success.called)
  44. def test_trace_after_return(self):
  45. @self.app.task(shared=False, after_return=Mock())
  46. def add_with_after_return(x, y):
  47. return x + y
  48. self.trace(add_with_after_return, (2, 2), {})
  49. self.assertTrue(add_with_after_return.after_return.called)
  50. def test_with_prerun_receivers(self):
  51. on_prerun = Mock()
  52. signals.task_prerun.connect(on_prerun)
  53. try:
  54. self.trace(self.add, (2, 2), {})
  55. self.assertTrue(on_prerun.called)
  56. finally:
  57. signals.task_prerun.receivers[:] = []
  58. def test_with_postrun_receivers(self):
  59. on_postrun = Mock()
  60. signals.task_postrun.connect(on_postrun)
  61. try:
  62. self.trace(self.add, (2, 2), {})
  63. self.assertTrue(on_postrun.called)
  64. finally:
  65. signals.task_postrun.receivers[:] = []
  66. def test_with_success_receivers(self):
  67. on_success = Mock()
  68. signals.task_success.connect(on_success)
  69. try:
  70. self.trace(self.add, (2, 2), {})
  71. self.assertTrue(on_success.called)
  72. finally:
  73. signals.task_success.receivers[:] = []
  74. def test_multiple_callbacks(self):
  75. """
  76. Regression test on trace with multiple callbacks
  77. Uses the signature of the following canvas:
  78. chain(
  79. empty.subtask(link=empty.subtask()),
  80. group(empty.subtask(), empty.subtask())
  81. )
  82. """
  83. @self.app.task(shared=False)
  84. def empty(*args, **kwargs):
  85. pass
  86. empty.backend = Mock()
  87. sig = {
  88. 'chord_size': None, 'task': 'empty', 'args': (), 'options': {},
  89. 'subtask_type': None, 'kwargs': {}, 'immutable': False
  90. }
  91. group_sig = {
  92. 'chord_size': None, 'task': 'celery.group', 'args': (),
  93. 'options': {}, 'subtask_type': 'group',
  94. 'kwargs': {'tasks': (empty(), empty())}, 'immutable': False
  95. }
  96. callbacks = [sig, group_sig]
  97. # should not raise an exception
  98. self.trace(empty, [], {}, request={'callbacks': callbacks})
  99. def test_when_chord_part(self):
  100. @self.app.task(shared=False)
  101. def add(x, y):
  102. return x + y
  103. add.backend = Mock()
  104. self.trace(add, (2, 2), {}, request={'chord': uuid()})
  105. add.backend.on_chord_part_return.assert_called_with(add, 'SUCCESS', 4)
  106. def test_when_backend_cleanup_raises(self):
  107. @self.app.task(shared=False)
  108. def add(x, y):
  109. return x + y
  110. add.backend = Mock(name='backend')
  111. add.backend.process_cleanup.side_effect = KeyError()
  112. self.trace(add, (2, 2), {}, eager=False)
  113. add.backend.process_cleanup.assert_called_with()
  114. add.backend.process_cleanup.side_effect = MemoryError()
  115. with self.assertRaises(MemoryError):
  116. self.trace(add, (2, 2), {}, eager=False)
  117. def test_when_Ignore(self):
  118. @self.app.task(shared=False)
  119. def ignored():
  120. raise Ignore()
  121. retval, info = self.trace(ignored, (), {})
  122. self.assertEqual(info.state, states.IGNORED)
  123. def test_trace_SystemExit(self):
  124. with self.assertRaises(SystemExit):
  125. self.trace(self.raises, (SystemExit(), ), {})
  126. def test_trace_Retry(self):
  127. exc = Retry('foo', 'bar')
  128. _, info = self.trace(self.raises, (exc, ), {})
  129. self.assertEqual(info.state, states.RETRY)
  130. self.assertIs(info.retval, exc)
  131. def test_trace_exception(self):
  132. exc = KeyError('foo')
  133. _, info = self.trace(self.raises, (exc, ), {})
  134. self.assertEqual(info.state, states.FAILURE)
  135. self.assertIs(info.retval, exc)
  136. def test_trace_exception_propagate(self):
  137. with self.assertRaises(KeyError):
  138. self.trace(self.raises, (KeyError('foo'), ), {}, propagate=True)
  139. @patch('celery.app.trace.build_tracer')
  140. @patch('celery.app.trace.report_internal_error')
  141. def test_outside_body_error(self, report_internal_error, build_tracer):
  142. tracer = Mock()
  143. tracer.side_effect = KeyError('foo')
  144. build_tracer.return_value = tracer
  145. @self.app.task(shared=False)
  146. def xtask():
  147. pass
  148. trace_task(xtask, 'uuid', (), {})
  149. self.assertTrue(report_internal_error.call_count)
  150. self.assertIs(xtask.__trace__, tracer)
  151. class test_TraceInfo(TraceCase):
  152. class TI(TraceInfo):
  153. __slots__ = TraceInfo.__slots__ + ('__dict__', )
  154. def test_handle_error_state(self):
  155. x = self.TI(states.FAILURE)
  156. x.handle_failure = Mock()
  157. x.handle_error_state(self.add_cast)
  158. x.handle_failure.assert_called_with(
  159. self.add_cast,
  160. store_errors=self.add_cast.store_errors_even_if_ignored,
  161. )
  162. class test_stackprotection(AppCase):
  163. def test_stackprotection(self):
  164. setup_worker_optimizations(self.app)
  165. try:
  166. @self.app.task(shared=False, bind=True)
  167. def foo(self, i):
  168. if i:
  169. return foo(0)
  170. return self.request
  171. self.assertTrue(foo(1).called_directly)
  172. finally:
  173. reset_worker_optimizations()