test_pool.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import unittest
  2. import logging
  3. import itertools
  4. import time
  5. from celery.worker.pool import TaskPool
  6. from celery.datastructures import ExceptionInfo
  7. import sys
  8. def do_something(i):
  9. return i * i
  10. def long_something():
  11. time.sleep(1)
  12. def raise_something(i):
  13. try:
  14. raise KeyError("FOO EXCEPTION")
  15. except KeyError:
  16. return ExceptionInfo(sys.exc_info())
  17. class TestTaskPool(unittest.TestCase):
  18. def test_attrs(self):
  19. p = TaskPool(limit=2)
  20. self.assertEquals(p.limit, 2)
  21. self.assertTrue(isinstance(p.logger, logging.Logger))
  22. self.assertTrue(p._pool is None)
  23. def x_start_stop(self):
  24. p = TaskPool(limit=2)
  25. p.start()
  26. self.assertTrue(p._pool)
  27. p.stop()
  28. self.assertTrue(p._pool is None)
  29. def x_apply(self):
  30. p = TaskPool(limit=2)
  31. p.start()
  32. scratchpad = {}
  33. proc_counter = itertools.count().next
  34. def mycallback(ret_value, meta):
  35. process = proc_counter()
  36. scratchpad[process] = {}
  37. scratchpad[process]["ret_value"] = ret_value
  38. scratchpad[process]["meta"] = meta
  39. myerrback = mycallback
  40. res = p.apply_async(do_something, args=[10], callbacks=[mycallback],
  41. meta={"foo": "bar"})
  42. res2 = p.apply_async(raise_something, args=[10], errbacks=[myerrback],
  43. meta={"foo2": "bar2"})
  44. res3 = p.apply_async(do_something, args=[20], callbacks=[mycallback],
  45. meta={"foo3": "bar3"})
  46. self.assertEquals(res.get(), 100)
  47. time.sleep(0.5)
  48. self.assertTrue(scratchpad.get(0))
  49. self.assertEquals(scratchpad[0]["ret_value"], 100)
  50. self.assertEquals(scratchpad[0]["meta"], {"foo": "bar"})
  51. self.assertTrue(isinstance(res2.get(), ExceptionInfo))
  52. self.assertTrue(scratchpad.get(1))
  53. time.sleep(1)
  54. #self.assertEquals(scratchpad[1]["ret_value"], "FOO")
  55. self.assertTrue(isinstance(scratchpad[1]["ret_value"],
  56. ExceptionInfo))
  57. self.assertEquals(scratchpad[1]["ret_value"].exception.args,
  58. ("FOO EXCEPTION", ))
  59. self.assertEquals(scratchpad[1]["meta"], {"foo2": "bar2"})
  60. self.assertEquals(res3.get(), 400)
  61. time.sleep(0.5)
  62. self.assertTrue(scratchpad.get(2))
  63. self.assertEquals(scratchpad[2]["ret_value"], 400)
  64. self.assertEquals(scratchpad[2]["meta"], {"foo3": "bar3"})
  65. res3 = p.apply_async(do_something, args=[30], callbacks=[mycallback],
  66. meta={"foo4": "bar4"})
  67. self.assertEquals(res3.get(), 900)
  68. time.sleep(0.5)
  69. self.assertTrue(scratchpad.get(3))
  70. self.assertEquals(scratchpad[3]["ret_value"], 900)
  71. self.assertEquals(scratchpad[3]["meta"], {"foo4": "bar4"})
  72. p.stop()