test_concurrency.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import os
  2. import pytest
  3. from itertools import count
  4. from case import Mock, patch
  5. from celery.concurrency.base import apply_target, BasePool
  6. from celery.exceptions import WorkerShutdown, WorkerTerminate
  7. class test_BasePool:
  8. def test_apply_target(self):
  9. scratch = {}
  10. counter = count(0)
  11. def gen_callback(name, retval=None):
  12. def callback(*args):
  13. scratch[name] = (next(counter), args)
  14. return retval
  15. return callback
  16. apply_target(gen_callback('target', 42),
  17. args=(8, 16),
  18. callback=gen_callback('callback'),
  19. accept_callback=gen_callback('accept_callback'))
  20. assert scratch['target'] == (1, (8, 16))
  21. assert scratch['callback'] == (2, (42,))
  22. pa1 = scratch['accept_callback']
  23. assert pa1[0] == 0
  24. assert pa1[1][0] == os.getpid()
  25. assert pa1[1][1]
  26. # No accept callback
  27. scratch.clear()
  28. apply_target(gen_callback('target', 42),
  29. args=(8, 16),
  30. callback=gen_callback('callback'),
  31. accept_callback=None)
  32. assert scratch == {
  33. 'target': (3, (8, 16)),
  34. 'callback': (4, (42,)),
  35. }
  36. def test_apply_target__propagate(self):
  37. target = Mock(name='target')
  38. target.side_effect = KeyError()
  39. with pytest.raises(KeyError):
  40. apply_target(target, propagate=(KeyError,))
  41. def test_apply_target__raises(self):
  42. target = Mock(name='target')
  43. target.side_effect = KeyError()
  44. with pytest.raises(KeyError):
  45. apply_target(target)
  46. def test_apply_target__raises_WorkerShutdown(self):
  47. target = Mock(name='target')
  48. target.side_effect = WorkerShutdown()
  49. with pytest.raises(WorkerShutdown):
  50. apply_target(target)
  51. def test_apply_target__raises_WorkerTerminate(self):
  52. target = Mock(name='target')
  53. target.side_effect = WorkerTerminate()
  54. with pytest.raises(WorkerTerminate):
  55. apply_target(target)
  56. def test_apply_target__raises_BaseException(self):
  57. target = Mock(name='target')
  58. callback = Mock(name='callback')
  59. target.side_effect = BaseException()
  60. apply_target(target, callback=callback)
  61. callback.assert_called()
  62. @patch('celery.concurrency.base.reraise')
  63. def test_apply_target__raises_BaseException_raises_else(self, reraise):
  64. target = Mock(name='target')
  65. callback = Mock(name='callback')
  66. reraise.side_effect = KeyError()
  67. target.side_effect = BaseException()
  68. with pytest.raises(KeyError):
  69. apply_target(target, callback=callback)
  70. callback.assert_not_called()
  71. def test_does_not_debug(self):
  72. x = BasePool(10)
  73. x._does_debug = False
  74. x.apply_async(object)
  75. def test_num_processes(self):
  76. assert BasePool(7).num_processes == 7
  77. def test_interface_on_start(self):
  78. BasePool(10).on_start()
  79. def test_interface_on_stop(self):
  80. BasePool(10).on_stop()
  81. def test_interface_on_apply(self):
  82. BasePool(10).on_apply()
  83. def test_interface_info(self):
  84. assert BasePool(10).info == {
  85. 'max-concurrency': 10,
  86. }
  87. def test_interface_flush(self):
  88. assert BasePool(10).flush() is None
  89. def test_active(self):
  90. p = BasePool(10)
  91. assert not p.active
  92. p._state = p.RUN
  93. assert p.active
  94. def test_restart(self):
  95. p = BasePool(10)
  96. with pytest.raises(NotImplementedError):
  97. p.restart()
  98. def test_interface_on_terminate(self):
  99. p = BasePool(10)
  100. p.on_terminate()
  101. def test_interface_terminate_job(self):
  102. with pytest.raises(NotImplementedError):
  103. BasePool(10).terminate_job(101)
  104. def test_interface_did_start_ok(self):
  105. assert BasePool(10).did_start_ok()
  106. def test_interface_register_with_event_loop(self):
  107. assert BasePool(10).register_with_event_loop(Mock()) is None
  108. def test_interface_on_soft_timeout(self):
  109. assert BasePool(10).on_soft_timeout(Mock()) is None
  110. def test_interface_on_hard_timeout(self):
  111. assert BasePool(10).on_hard_timeout(Mock()) is None
  112. def test_interface_close(self):
  113. p = BasePool(10)
  114. p.on_close = Mock()
  115. p.close()
  116. assert p._state == p.CLOSE
  117. p.on_close.assert_called_with()
  118. def test_interface_no_close(self):
  119. assert BasePool(10).on_close() is None