test_concurrency_processes.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import sys
  2. from celery.tests.utils import unittest
  3. from itertools import cycle
  4. from celery.concurrency import processes as mp
  5. from celery.datastructures import ExceptionInfo
  6. from celery.utils import noop
  7. class Object(object): # for writeable attributes.
  8. def __init__(self, **kwargs):
  9. [setattr(self, k, v) for k, v in kwargs.items()]
  10. def to_excinfo(exc):
  11. try:
  12. raise exc
  13. except:
  14. return ExceptionInfo(sys.exc_info())
  15. class MockResult(object):
  16. def __init__(self, value, pid):
  17. self.value = value
  18. self.pid = pid
  19. def worker_pids(self):
  20. return [self.pid]
  21. def get(self):
  22. return self.value
  23. class MockPool(object):
  24. started = False
  25. closed = False
  26. joined = False
  27. terminated = False
  28. _state = None
  29. def __init__(self, *args, **kwargs):
  30. self.started = True
  31. self._state = mp.RUN
  32. self.processes = kwargs.get("processes")
  33. self._pool = [Object(pid=i) for i in range(self.processes)]
  34. self._current_proc = cycle(xrange(self.processes)).next
  35. def close(self):
  36. self.closed = True
  37. self._state = "CLOSE"
  38. def join(self):
  39. self.joined = True
  40. def terminate(self):
  41. self.terminated = True
  42. def grow(self, n=1):
  43. self.processes += n
  44. def shrink(self, n=1):
  45. self.processes -= n
  46. def apply_async(self, *args, **kwargs):
  47. pass
  48. class ExeMockPool(MockPool):
  49. def apply_async(self, target, args=(), kwargs={}, callback=noop):
  50. from threading import Timer
  51. res = target(*args, **kwargs)
  52. Timer(0.1, callback, (res, )).start()
  53. return MockResult(res, self._current_proc())
  54. class TaskPool(mp.TaskPool):
  55. Pool = MockPool
  56. class ExeMockTaskPool(mp.TaskPool):
  57. Pool = ExeMockPool
  58. class test_TaskPool(unittest.TestCase):
  59. def test_start(self):
  60. pool = TaskPool(10)
  61. pool.start()
  62. self.assertTrue(pool._pool.started)
  63. _pool = pool._pool
  64. pool.stop()
  65. self.assertTrue(_pool.closed)
  66. self.assertTrue(_pool.joined)
  67. pool.stop()
  68. pool.start()
  69. _pool = pool._pool
  70. pool.terminate()
  71. pool.terminate()
  72. self.assertTrue(_pool.terminated)
  73. def test_on_worker_error(self):
  74. scratch = [None]
  75. def errback(einfo):
  76. scratch[0] = einfo
  77. pool = TaskPool(10)
  78. exc = KeyError("foo")
  79. pool.on_worker_error([errback], exc)
  80. self.assertTrue(scratch[0])
  81. self.assertIs(scratch[0].exception, exc)
  82. self.assertTrue(scratch[0].traceback)
  83. def test_on_ready_exception(self):
  84. scratch = [None]
  85. def errback(retval):
  86. scratch[0] = retval
  87. pool = TaskPool(10)
  88. exc = to_excinfo(KeyError("foo"))
  89. pool.on_ready([], [errback], exc)
  90. self.assertEqual(exc, scratch[0])
  91. def test_safe_apply_callback(self):
  92. _good_called = [0]
  93. _evil_called = [0]
  94. def good(x):
  95. _good_called[0] = 1
  96. return x
  97. def evil(x):
  98. _evil_called[0] = 1
  99. raise KeyError(x)
  100. pool = TaskPool(10)
  101. self.assertIsNone(pool.safe_apply_callback(good, 10))
  102. self.assertIsNone(pool.safe_apply_callback(evil, 10))
  103. self.assertTrue(_good_called[0])
  104. self.assertTrue(_evil_called[0])
  105. def test_on_ready_value(self):
  106. scratch = [None]
  107. def callback(retval):
  108. scratch[0] = retval
  109. pool = TaskPool(10)
  110. retval = "the quick brown fox"
  111. pool.on_ready([callback], [], retval)
  112. self.assertEqual(retval, scratch[0])
  113. def test_on_ready_exit_exception(self):
  114. pool = TaskPool(10)
  115. exc = to_excinfo(SystemExit("foo"))
  116. self.assertRaises(SystemExit, pool.on_ready, [], [], exc)
  117. def test_apply_async(self):
  118. pool = TaskPool(10)
  119. pool.start()
  120. pool.apply_async(lambda x: x, (2, ), {})
  121. def test_grow_shrink(self):
  122. pool = TaskPool(10)
  123. pool.start()
  124. self.assertEqual(pool._pool.processes, 10)
  125. pool.grow()
  126. self.assertEqual(pool._pool.processes, 11)
  127. pool.shrink(2)
  128. self.assertEqual(pool._pool.processes, 9)
  129. def test_info(self):
  130. pool = TaskPool(10)
  131. procs = [Object(pid=i) for i in range(pool.limit)]
  132. pool._pool = Object(_pool=procs,
  133. _maxtasksperchild=None,
  134. timeout=10,
  135. soft_timeout=5)
  136. info = pool.info
  137. self.assertEqual(info["max-concurrency"], pool.limit)
  138. self.assertIsNone(info["max-tasks-per-child"])
  139. self.assertEqual(info["timeouts"], (5, 10))