test_pool.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from __future__ import absolute_import
  2. import time
  3. import itertools
  4. from billiard.einfo import ExceptionInfo
  5. from celery.tests.case import AppCase, SkipTest
  6. def do_something(i):
  7. return i * i
  8. def long_something():
  9. time.sleep(1)
  10. def raise_something(i):
  11. try:
  12. raise KeyError('FOO EXCEPTION')
  13. except KeyError:
  14. return ExceptionInfo()
  15. class test_TaskPool(AppCase):
  16. def setup(self):
  17. try:
  18. __import__('multiprocessing')
  19. except ImportError:
  20. raise SkipTest('multiprocessing not supported')
  21. from celery.concurrency.prefork import TaskPool
  22. self.TaskPool = TaskPool
  23. def test_attrs(self):
  24. p = self.TaskPool(2)
  25. self.assertEqual(p.limit, 2)
  26. self.assertIsNone(p._pool)
  27. def x_apply(self):
  28. p = self.TaskPool(2)
  29. p.start()
  30. scratchpad = {}
  31. proc_counter = itertools.count()
  32. def mycallback(ret_value):
  33. process = next(proc_counter)
  34. scratchpad[process] = {}
  35. scratchpad[process]['ret_value'] = ret_value
  36. myerrback = mycallback
  37. res = p.apply_async(do_something, args=[10], callback=mycallback)
  38. res2 = p.apply_async(raise_something, args=[10], errback=myerrback)
  39. res3 = p.apply_async(do_something, args=[20], callback=mycallback)
  40. self.assertEqual(res.get(), 100)
  41. time.sleep(0.5)
  42. self.assertDictContainsSubset({'ret_value': 100},
  43. scratchpad.get(0))
  44. self.assertIsInstance(res2.get(), ExceptionInfo)
  45. self.assertTrue(scratchpad.get(1))
  46. time.sleep(1)
  47. self.assertIsInstance(scratchpad[1]['ret_value'],
  48. ExceptionInfo)
  49. self.assertEqual(scratchpad[1]['ret_value'].exception.args,
  50. ('FOO EXCEPTION', ))
  51. self.assertEqual(res3.get(), 400)
  52. time.sleep(0.5)
  53. self.assertDictContainsSubset({'ret_value': 400},
  54. scratchpad.get(2))
  55. res3 = p.apply_async(do_something, args=[30], callback=mycallback)
  56. self.assertEqual(res3.get(), 900)
  57. time.sleep(0.5)
  58. self.assertDictContainsSubset({'ret_value': 900},
  59. scratchpad.get(3))
  60. p.stop()