test_processes.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. from __future__ import absolute_import
  2. import errno
  3. import socket
  4. import time
  5. from itertools import cycle
  6. from mock import Mock, call, patch
  7. from nose import SkipTest
  8. from celery.five import items, range
  9. from celery.utils.functional import noop
  10. from celery.tests.utils import AppCase
  11. try:
  12. from celery.concurrency import processes as mp
  13. except ImportError:
  14. class _mp(object):
  15. RUN = 0x1
  16. class TaskPool(object):
  17. _pool = Mock()
  18. def __init__(self, *args, **kwargs):
  19. pass
  20. def start(self):
  21. pass
  22. def stop(self):
  23. pass
  24. def apply_async(self, *args, **kwargs):
  25. pass
  26. mp = _mp() # noqa
  27. class Object(object): # for writeable attributes.
  28. def __init__(self, **kwargs):
  29. [setattr(self, k, v) for k, v in items(kwargs)]
  30. class MockResult(object):
  31. def __init__(self, value, pid):
  32. self.value = value
  33. self.pid = pid
  34. def worker_pids(self):
  35. return [self.pid]
  36. def get(self):
  37. return self.value
  38. class MockPool(object):
  39. started = False
  40. closed = False
  41. joined = False
  42. terminated = False
  43. _state = None
  44. def __init__(self, *args, **kwargs):
  45. self.started = True
  46. self._timeout_handler = Mock()
  47. self._result_handler = Mock()
  48. self.maintain_pool = Mock()
  49. self._state = mp.RUN
  50. self._processes = kwargs.get('processes')
  51. self._pool = [Object(pid=i, inqW_fd=1, outqR_fd=2)
  52. for i in range(self._processes)]
  53. self._current_proc = cycle(range(self._processes))
  54. def close(self):
  55. self.closed = True
  56. self._state = 'CLOSE'
  57. def join(self):
  58. self.joined = True
  59. def terminate(self):
  60. self.terminated = True
  61. def terminate_job(self, *args, **kwargs):
  62. pass
  63. def restart(self, *args, **kwargs):
  64. pass
  65. def handle_result_event(self, *args, **kwargs):
  66. pass
  67. def grow(self, n=1):
  68. self._processes += n
  69. def shrink(self, n=1):
  70. self._processes -= n
  71. def apply_async(self, *args, **kwargs):
  72. pass
  73. class ExeMockPool(MockPool):
  74. def apply_async(self, target, args=(), kwargs={}, callback=noop):
  75. from threading import Timer
  76. res = target(*args, **kwargs)
  77. Timer(0.1, callback, (res, )).start()
  78. return MockResult(res, next(self._current_proc))
  79. class TaskPool(mp.TaskPool):
  80. Pool = BlockingPool = MockPool
  81. class ExeMockTaskPool(mp.TaskPool):
  82. Pool = BlockingPool = ExeMockPool
  83. class PoolCase(AppCase):
  84. def setup(self):
  85. try:
  86. import multiprocessing # noqa
  87. except ImportError:
  88. raise SkipTest('multiprocessing not supported')
  89. class test_AsynPool(PoolCase):
  90. def test_gen_not_started(self):
  91. def gen():
  92. yield 1
  93. yield 2
  94. g = gen()
  95. self.assertTrue(mp.gen_not_started(g))
  96. next(g)
  97. self.assertFalse(mp.gen_not_started(g))
  98. list(g)
  99. self.assertFalse(mp.gen_not_started(g))
  100. def test_select(self):
  101. ebadf = socket.error()
  102. ebadf.errno = errno.EBADF
  103. with patch('select.select') as select:
  104. select.return_value = ([3], [], [])
  105. self.assertEqual(
  106. mp._select(set([3])),
  107. ([3], [], 0),
  108. )
  109. select.return_value = ([], [], [3])
  110. self.assertEqual(
  111. mp._select(set([3]), None, set([3])),
  112. ([3], [], 0),
  113. )
  114. eintr = socket.error()
  115. eintr.errno = errno.EINTR
  116. select.side_effect = eintr
  117. readers = set([3])
  118. self.assertEqual(mp._select(readers), ([], [], 1))
  119. self.assertIn(3, readers)
  120. with patch('select.select') as select:
  121. select.side_effect = ebadf
  122. readers = set([3])
  123. self.assertEqual(mp._select(readers), ([], [], 1))
  124. select.assert_has_calls([call([3], [], [], 0)])
  125. self.assertNotIn(3, readers)
  126. with patch('select.select') as select:
  127. select.side_effect = MemoryError()
  128. with self.assertRaises(MemoryError):
  129. mp._select(set([1]))
  130. with patch('select.select') as select:
  131. def se(*args):
  132. select.side_effect = MemoryError()
  133. raise ebadf
  134. select.side_effect = se
  135. with self.assertRaises(MemoryError):
  136. mp._select(set([3]))
  137. with patch('select.select') as select:
  138. def se(*args):
  139. select.side_effect = socket.error()
  140. select.side_effect.errno = 1321
  141. raise ebadf
  142. select.side_effect = se
  143. with self.assertRaises(socket.error):
  144. mp._select(set([3]))
  145. with patch('select.select') as select:
  146. select.side_effect = socket.error()
  147. select.side_effect.errno = 34134
  148. with self.assertRaises(socket.error):
  149. mp._select(set([3]))
  150. def test_promise(self):
  151. fun = Mock()
  152. x = mp.promise(fun, 1, foo=1)
  153. x()
  154. self.assertTrue(x.ready)
  155. fun.assert_called_with(1, foo=1)
  156. def test_Worker(self):
  157. w = mp.Worker(Mock(), Mock())
  158. w.on_loop_start(1234)
  159. w.outq.put.assert_called_with((mp.WORKER_UP, (1234, )))
  160. class test_ResultHandler(PoolCase):
  161. def test_process_result(self):
  162. x = mp.ResultHandler(
  163. Mock(), Mock(), {}, Mock(),
  164. Mock(), Mock(), Mock(), Mock(),
  165. fileno_to_outq={},
  166. on_process_alive=Mock(),
  167. )
  168. self.assertTrue(x)
  169. x.on_state_change = Mock()
  170. proc = x.fileno_to_outq[3] = Mock()
  171. reader = proc.outq._reader
  172. reader.poll.return_value = False
  173. x.handle_event(6) # KeyError
  174. x.handle_event(3)
  175. reader.poll.assert_called_with(0)
  176. self.assertFalse(x.on_state_change.called)
  177. reader.poll.reset()
  178. reader.poll.return_value = True
  179. task = reader.recv.return_value = (1, (2, 3))
  180. x.handle_event(3)
  181. reader.poll.assert_called_with(0)
  182. reader.recv.assert_called_with()
  183. x.on_state_change.assert_called_with(task)
  184. self.assertTrue(x._it)
  185. reader.recv.return_value = None
  186. x.handle_event(3)
  187. self.assertIsNone(x._it)
  188. x._state = mp.TERMINATE
  189. it = x._process_result()
  190. next(it)
  191. with self.assertRaises(mp.CoroStop):
  192. it.send(3)
  193. x.handle_event(3)
  194. self.assertIsNone(x._it)
  195. x._state == mp.RUN
  196. reader.recv.side_effect = EOFError()
  197. it = x._process_result()
  198. next(it)
  199. with self.assertRaises(mp.CoroStop):
  200. it.send(3)
  201. reader.recv.side_effect = None
  202. class test_TaskPool(PoolCase):
  203. def test_start(self):
  204. pool = TaskPool(10)
  205. pool.start()
  206. self.assertTrue(pool._pool.started)
  207. self.assertTrue(pool._pool._state == mp.RUN)
  208. _pool = pool._pool
  209. pool.stop()
  210. self.assertTrue(_pool.closed)
  211. self.assertTrue(_pool.joined)
  212. pool.stop()
  213. pool.start()
  214. _pool = pool._pool
  215. pool.terminate()
  216. pool.terminate()
  217. self.assertTrue(_pool.terminated)
  218. def test_apply_async(self):
  219. pool = TaskPool(10)
  220. pool.start()
  221. pool.apply_async(lambda x: x, (2, ), {})
  222. def test_grow_shrink(self):
  223. pool = TaskPool(10)
  224. pool.start()
  225. self.assertEqual(pool._pool._processes, 10)
  226. pool.grow()
  227. self.assertEqual(pool._pool._processes, 11)
  228. pool.shrink(2)
  229. self.assertEqual(pool._pool._processes, 9)
  230. def test_info(self):
  231. pool = TaskPool(10)
  232. procs = [Object(pid=i) for i in range(pool.limit)]
  233. pool._pool = Object(_pool=procs,
  234. _maxtasksperchild=None,
  235. timeout=10,
  236. soft_timeout=5)
  237. info = pool.info
  238. self.assertEqual(info['max-concurrency'], pool.limit)
  239. self.assertEqual(info['max-tasks-per-child'], 'N/A')
  240. self.assertEqual(info['timeouts'], (5, 10))
  241. def test_num_processes(self):
  242. pool = TaskPool(7)
  243. pool.start()
  244. self.assertEqual(pool.num_processes, 7)
  245. def test_restart(self):
  246. raise SkipTest('functional test')
  247. def get_pids(pool):
  248. return set([p.pid for p in pool._pool._pool])
  249. tp = self.TaskPool(5)
  250. time.sleep(0.5)
  251. tp.start()
  252. pids = get_pids(tp)
  253. tp.restart()
  254. time.sleep(0.5)
  255. self.assertEqual(pids, get_pids(tp))