test_pool.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import unittest2 as unittest
  2. import logging
  3. import itertools
  4. import time
  5. from celery.worker.pool import TaskPool
  6. from celery.datastructures import ExceptionInfo
  7. import sys
  8. def do_something(i):
  9. return i * i
  10. def long_something():
  11. time.sleep(1)
  12. def raise_something(i):
  13. try:
  14. raise KeyError("FOO EXCEPTION")
  15. except KeyError:
  16. return ExceptionInfo(sys.exc_info())
  17. class TestTaskPool(unittest.TestCase):
  18. def test_attrs(self):
  19. p = TaskPool(limit=2)
  20. self.assertEqual(p.limit, 2)
  21. self.assertIsInstance(p.logger, logging.Logger)
  22. self.assertIsNone(p._pool)
  23. def x_apply(self):
  24. p = TaskPool(limit=2)
  25. p.start()
  26. scratchpad = {}
  27. proc_counter = itertools.count().next
  28. def mycallback(ret_value):
  29. process = proc_counter()
  30. scratchpad[process] = {}
  31. scratchpad[process]["ret_value"] = ret_value
  32. myerrback = mycallback
  33. res = p.apply_async(do_something, args=[10], callbacks=[mycallback])
  34. res2 = p.apply_async(raise_something, args=[10], errbacks=[myerrback])
  35. res3 = p.apply_async(do_something, args=[20], callbacks=[mycallback])
  36. self.assertEqual(res.get(), 100)
  37. time.sleep(0.5)
  38. self.assertDictContainsSubset({"ret_value": 100},
  39. scratchpad.get(0))
  40. self.assertIsInstance(res2.get(), ExceptionInfo)
  41. self.assertTrue(scratchpad.get(1))
  42. time.sleep(1)
  43. self.assertIsInstance(scratchpad[1]["ret_value"],
  44. ExceptionInfo)
  45. self.assertEqual(scratchpad[1]["ret_value"].exception.args,
  46. ("FOO EXCEPTION", ))
  47. self.assertEqual(res3.get(), 400)
  48. time.sleep(0.5)
  49. self.assertDictContainsSubset({"ret_value": 400},
  50. scratchpad.get(2))
  51. res3 = p.apply_async(do_something, args=[30], callbacks=[mycallback])
  52. self.assertEqual(res3.get(), 900)
  53. time.sleep(0.5)
  54. self.assertDictContainsSubset({"ret_value": 900},
  55. scratchpad.get(3))
  56. p.stop()