test_pool.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import time
  2. import itertools
  3. from case import skip
  4. from billiard.einfo import ExceptionInfo
  5. def do_something(i):
  6. return i * i
  7. def long_something():
  8. time.sleep(1)
  9. def raise_something(i):
  10. try:
  11. raise KeyError('FOO EXCEPTION')
  12. except KeyError:
  13. return ExceptionInfo()
  14. @skip.unless_module('multiprocessing')
  15. class test_TaskPool:
  16. def setup(self):
  17. from celery.concurrency.prefork import TaskPool
  18. self.TaskPool = TaskPool
  19. def test_attrs(self):
  20. p = self.TaskPool(2)
  21. assert p.limit == 2
  22. assert p._pool is None
  23. def x_apply(self):
  24. p = self.TaskPool(2)
  25. p.start()
  26. scratchpad = {}
  27. proc_counter = itertools.count()
  28. def mycallback(ret_value):
  29. process = next(proc_counter)
  30. scratchpad[process] = {}
  31. scratchpad[process]['ret_value'] = ret_value
  32. myerrback = mycallback
  33. res = p.apply_async(do_something, args=[10], callback=mycallback)
  34. res2 = p.apply_async(raise_something, args=[10], errback=myerrback)
  35. res3 = p.apply_async(do_something, args=[20], callback=mycallback)
  36. assert res.get() == 100
  37. time.sleep(0.5)
  38. assert scratchpad.get(0)['ret_value'] == 100
  39. assert isinstance(res2.get(), ExceptionInfo)
  40. assert scratchpad.get(1)
  41. time.sleep(1)
  42. assert isinstance(scratchpad[1]['ret_value'], ExceptionInfo)
  43. assert scratchpad[1]['ret_value'].exception.args == ('FOO EXCEPTION',)
  44. assert res3.get() == 400
  45. time.sleep(0.5)
  46. assert scratchpad.get(2)['ret_value'] == 400
  47. res3 = p.apply_async(do_something, args=[30], callback=mycallback)
  48. assert res3.get() == 900
  49. time.sleep(0.5)
  50. assert scratchpad.get(3)['ret_value'] == 900
  51. p.stop()