test_pool.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from __future__ import absolute_import, unicode_literals
  2. import itertools
  3. import time
  4. from billiard.einfo import ExceptionInfo
  5. from case import skip
  6. def do_something(i):
  7. return i * i
  8. def long_something():
  9. time.sleep(1)
  10. def raise_something(i):
  11. try:
  12. raise KeyError('FOO EXCEPTION')
  13. except KeyError:
  14. return ExceptionInfo()
  15. @skip.unless_module('multiprocessing')
  16. class test_TaskPool:
  17. def setup(self):
  18. from celery.concurrency.prefork import TaskPool
  19. self.TaskPool = TaskPool
  20. def test_attrs(self):
  21. p = self.TaskPool(2)
  22. assert p.limit == 2
  23. assert p._pool is None
  24. def x_apply(self):
  25. p = self.TaskPool(2)
  26. p.start()
  27. scratchpad = {}
  28. proc_counter = itertools.count()
  29. def mycallback(ret_value):
  30. process = next(proc_counter)
  31. scratchpad[process] = {}
  32. scratchpad[process]['ret_value'] = ret_value
  33. myerrback = mycallback
  34. res = p.apply_async(do_something, args=[10], callback=mycallback)
  35. res2 = p.apply_async(raise_something, args=[10], errback=myerrback)
  36. res3 = p.apply_async(do_something, args=[20], callback=mycallback)
  37. assert res.get() == 100
  38. time.sleep(0.5)
  39. assert scratchpad.get(0)['ret_value'] == 100
  40. assert isinstance(res2.get(), ExceptionInfo)
  41. assert scratchpad.get(1)
  42. time.sleep(1)
  43. assert isinstance(scratchpad[1]['ret_value'], ExceptionInfo)
  44. assert scratchpad[1]['ret_value'].exception.args == ('FOO EXCEPTION',)
  45. assert res3.get() == 400
  46. time.sleep(0.5)
  47. assert scratchpad.get(2)['ret_value'] == 400
  48. res3 = p.apply_async(do_something, args=[30], callback=mycallback)
  49. assert res3.get() == 900
  50. time.sleep(0.5)
  51. assert scratchpad.get(3)['ret_value'] == 900
  52. p.stop()