test_pool.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from __future__ import absolute_import
  2. import time
  3. import itertools
  4. from nose import SkipTest
  5. from celery.datastructures import ExceptionInfo
  6. from celery.tests.utils import Case
  7. def do_something(i):
  8. return i * i
  9. def long_something():
  10. time.sleep(1)
  11. def raise_something(i):
  12. try:
  13. raise KeyError('FOO EXCEPTION')
  14. except KeyError:
  15. return ExceptionInfo()
  16. class test_TaskPool(Case):
  17. def setUp(self):
  18. try:
  19. __import__('multiprocessing')
  20. except ImportError:
  21. raise SkipTest('multiprocessing not supported')
  22. from celery.concurrency.processes import TaskPool
  23. self.TaskPool = TaskPool
  24. def test_attrs(self):
  25. p = self.TaskPool(2)
  26. self.assertEqual(p.limit, 2)
  27. self.assertIsNone(p._pool)
  28. def x_apply(self):
  29. p = self.TaskPool(2)
  30. p.start()
  31. scratchpad = {}
  32. proc_counter = itertools.count()
  33. def mycallback(ret_value):
  34. process = next(proc_counter)
  35. scratchpad[process] = {}
  36. scratchpad[process]['ret_value'] = ret_value
  37. myerrback = mycallback
  38. res = p.apply_async(do_something, args=[10], callback=mycallback)
  39. res2 = p.apply_async(raise_something, args=[10], errback=myerrback)
  40. res3 = p.apply_async(do_something, args=[20], callback=mycallback)
  41. self.assertEqual(res.get(), 100)
  42. time.sleep(0.5)
  43. self.assertDictContainsSubset({'ret_value': 100},
  44. scratchpad.get(0))
  45. self.assertIsInstance(res2.get(), ExceptionInfo)
  46. self.assertTrue(scratchpad.get(1))
  47. time.sleep(1)
  48. self.assertIsInstance(scratchpad[1]['ret_value'],
  49. ExceptionInfo)
  50. self.assertEqual(scratchpad[1]['ret_value'].exception.args,
  51. ('FOO EXCEPTION', ))
  52. self.assertEqual(res3.get(), 400)
  53. time.sleep(0.5)
  54. self.assertDictContainsSubset({'ret_value': 400},
  55. scratchpad.get(2))
  56. res3 = p.apply_async(do_something, args=[30], callback=mycallback)
  57. self.assertEqual(res3.get(), 900)
  58. time.sleep(0.5)
  59. self.assertDictContainsSubset({'ret_value': 900},
  60. scratchpad.get(3))
  61. p.stop()