test_concurrency_processes.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import sys
  2. import unittest2 as unittest
  3. from celery.concurrency import processes as mp
  4. from celery.datastructures import ExceptionInfo
  5. def to_excinfo(exc):
  6. try:
  7. raise exc
  8. except:
  9. return ExceptionInfo(sys.exc_info())
  10. class MockPool(object):
  11. started = False
  12. closed = False
  13. joined = False
  14. terminated = False
  15. _state = None
  16. def __init__(self, *args, **kwargs):
  17. self.started = True
  18. self._state = mp.RUN
  19. def close(self):
  20. self.closed = True
  21. self._state = "CLOSE"
  22. def join(self):
  23. self.joined = True
  24. def terminate(self):
  25. self.terminated = True
  26. def apply_async(self, *args, **kwargs):
  27. pass
  28. class TaskPool(mp.TaskPool):
  29. Pool = MockPool
  30. class test_TaskPool(unittest.TestCase):
  31. def test_start(self):
  32. pool = TaskPool(10)
  33. pool.start()
  34. self.assertTrue(pool._pool.started)
  35. _pool = pool._pool
  36. pool.stop()
  37. self.assertTrue(_pool.closed)
  38. self.assertTrue(_pool.joined)
  39. pool.stop()
  40. pool.start()
  41. _pool = pool._pool
  42. pool.terminate()
  43. pool.terminate()
  44. self.assertTrue(_pool.terminated)
  45. def test_on_ready_exception(self):
  46. scratch = [None]
  47. def errback(retval):
  48. scratch[0] = retval
  49. pool = TaskPool(10)
  50. exc = to_excinfo(KeyError("foo"))
  51. pool.on_ready([], [errback], exc)
  52. self.assertEqual(exc, scratch[0])
  53. def test_on_ready_value(self):
  54. scratch = [None]
  55. def callback(retval):
  56. scratch[0] = retval
  57. pool = TaskPool(10)
  58. retval = "the quick brown fox"
  59. pool.on_ready([callback], [], retval)
  60. self.assertEqual(retval, scratch[0])
  61. def test_on_ready_exit_exception(self):
  62. pool = TaskPool(10)
  63. exc = to_excinfo(SystemExit("foo"))
  64. self.assertRaises(SystemExit, pool.on_ready, [], [], exc)
  65. def test_apply_async(self):
  66. pool = TaskPool(10)
  67. pool.start()
  68. pool.apply_async(lambda x: x, (2, ), {})