test_concurrency.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. from __future__ import absolute_import, unicode_literals
  2. import os
  3. from itertools import count
  4. import pytest
  5. from case import Mock, patch
  6. from celery.concurrency.base import BasePool, apply_target
  7. from celery.exceptions import WorkerShutdown, WorkerTerminate
  8. class test_BasePool:
  9. def test_apply_target(self):
  10. scratch = {}
  11. counter = count(0)
  12. def gen_callback(name, retval=None):
  13. def callback(*args):
  14. scratch[name] = (next(counter), args)
  15. return retval
  16. return callback
  17. apply_target(gen_callback('target', 42),
  18. args=(8, 16),
  19. callback=gen_callback('callback'),
  20. accept_callback=gen_callback('accept_callback'))
  21. assert scratch['target'] == (1, (8, 16))
  22. assert scratch['callback'] == (2, (42,))
  23. pa1 = scratch['accept_callback']
  24. assert pa1[0] == 0
  25. assert pa1[1][0] == os.getpid()
  26. assert pa1[1][1]
  27. # No accept callback
  28. scratch.clear()
  29. apply_target(gen_callback('target', 42),
  30. args=(8, 16),
  31. callback=gen_callback('callback'),
  32. accept_callback=None)
  33. assert scratch == {
  34. 'target': (3, (8, 16)),
  35. 'callback': (4, (42,)),
  36. }
  37. def test_apply_target__propagate(self):
  38. target = Mock(name='target')
  39. target.side_effect = KeyError()
  40. with pytest.raises(KeyError):
  41. apply_target(target, propagate=(KeyError,))
  42. def test_apply_target__raises(self):
  43. target = Mock(name='target')
  44. target.side_effect = KeyError()
  45. with pytest.raises(KeyError):
  46. apply_target(target)
  47. def test_apply_target__raises_WorkerShutdown(self):
  48. target = Mock(name='target')
  49. target.side_effect = WorkerShutdown()
  50. with pytest.raises(WorkerShutdown):
  51. apply_target(target)
  52. def test_apply_target__raises_WorkerTerminate(self):
  53. target = Mock(name='target')
  54. target.side_effect = WorkerTerminate()
  55. with pytest.raises(WorkerTerminate):
  56. apply_target(target)
  57. def test_apply_target__raises_BaseException(self):
  58. target = Mock(name='target')
  59. callback = Mock(name='callback')
  60. target.side_effect = BaseException()
  61. apply_target(target, callback=callback)
  62. callback.assert_called()
  63. @patch('celery.concurrency.base.reraise')
  64. def test_apply_target__raises_BaseException_raises_else(self, reraise):
  65. target = Mock(name='target')
  66. callback = Mock(name='callback')
  67. reraise.side_effect = KeyError()
  68. target.side_effect = BaseException()
  69. with pytest.raises(KeyError):
  70. apply_target(target, callback=callback)
  71. callback.assert_not_called()
  72. def test_does_not_debug(self):
  73. x = BasePool(10)
  74. x._does_debug = False
  75. x.apply_async(object)
  76. def test_num_processes(self):
  77. assert BasePool(7).num_processes == 7
  78. def test_interface_on_start(self):
  79. BasePool(10).on_start()
  80. def test_interface_on_stop(self):
  81. BasePool(10).on_stop()
  82. def test_interface_on_apply(self):
  83. BasePool(10).on_apply()
  84. def test_interface_info(self):
  85. assert BasePool(10).info == {
  86. 'max-concurrency': 10,
  87. }
  88. def test_interface_flush(self):
  89. assert BasePool(10).flush() is None
  90. def test_active(self):
  91. p = BasePool(10)
  92. assert not p.active
  93. p._state = p.RUN
  94. assert p.active
  95. def test_restart(self):
  96. p = BasePool(10)
  97. with pytest.raises(NotImplementedError):
  98. p.restart()
  99. def test_interface_on_terminate(self):
  100. p = BasePool(10)
  101. p.on_terminate()
  102. def test_interface_terminate_job(self):
  103. with pytest.raises(NotImplementedError):
  104. BasePool(10).terminate_job(101)
  105. def test_interface_did_start_ok(self):
  106. assert BasePool(10).did_start_ok()
  107. def test_interface_register_with_event_loop(self):
  108. assert BasePool(10).register_with_event_loop(Mock()) is None
  109. def test_interface_on_soft_timeout(self):
  110. assert BasePool(10).on_soft_timeout(Mock()) is None
  111. def test_interface_on_hard_timeout(self):
  112. assert BasePool(10).on_hard_timeout(Mock()) is None
  113. def test_interface_close(self):
  114. p = BasePool(10)
  115. p.on_close = Mock()
  116. p.close()
  117. assert p._state == p.CLOSE
  118. p.on_close.assert_called_with()
  119. def test_interface_no_close(self):
  120. assert BasePool(10).on_close() is None