test_pool.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import unittest
  2. import logging
  3. import itertools
  4. import time
  5. from celery.pool import TaskPool
  6. from celery.datastructures import ExceptionInfo
  7. import sys
  8. def do_something(i):
  9. return i * i
  10. def long_something():
  11. import time
  12. time.sleep(1)
  13. def raise_something(i):
  14. try:
  15. raise KeyError("FOO EXCEPTION")
  16. except KeyError:
  17. return ExceptionInfo(sys.exc_info())
  18. class TestTaskPool(unittest.TestCase):
  19. def test_attrs(self):
  20. p = TaskPool(limit=2)
  21. self.assertEquals(p.limit, 2)
  22. self.assertTrue(isinstance(p.logger, logging.Logger))
  23. self.assertTrue(p._pool is None)
  24. def x_start_stop(self):
  25. p = TaskPool(limit=2)
  26. p.start()
  27. self.assertTrue(p._pool)
  28. p.stop()
  29. self.assertTrue(p._pool is None)
  30. def x_apply(self):
  31. p = TaskPool(limit=2)
  32. p.start()
  33. scratchpad = {}
  34. proc_counter = itertools.count().next
  35. def mycallback(ret_value, meta):
  36. process = proc_counter()
  37. scratchpad[process] = {}
  38. scratchpad[process]["ret_value"] = ret_value
  39. scratchpad[process]["meta"] = meta
  40. myerrback = mycallback
  41. res = p.apply_async(do_something, args=[10], callbacks=[mycallback],
  42. meta={"foo": "bar"})
  43. res2 = p.apply_async(raise_something, args=[10], errbacks=[myerrback],
  44. meta={"foo2": "bar2"})
  45. res3 = p.apply_async(do_something, args=[20], callbacks=[mycallback],
  46. meta={"foo3": "bar3"})
  47. self.assertEquals(res.get(), 100)
  48. time.sleep(0.5)
  49. self.assertTrue(scratchpad.get(0))
  50. self.assertEquals(scratchpad[0]["ret_value"], 100)
  51. self.assertEquals(scratchpad[0]["meta"], {"foo": "bar"})
  52. self.assertTrue(isinstance(res2.get(), ExceptionInfo))
  53. self.assertTrue(scratchpad.get(1))
  54. time.sleep(1)
  55. #self.assertEquals(scratchpad[1]["ret_value"], "FOO")
  56. self.assertTrue(isinstance(scratchpad[1]["ret_value"],
  57. ExceptionInfo))
  58. self.assertEquals(scratchpad[1]["ret_value"].exception.args,
  59. ("FOO EXCEPTION", ))
  60. self.assertEquals(scratchpad[1]["meta"], {"foo2": "bar2"})
  61. self.assertEquals(res3.get(), 400)
  62. time.sleep(0.5)
  63. self.assertTrue(scratchpad.get(2))
  64. self.assertEquals(scratchpad[2]["ret_value"], 400)
  65. self.assertEquals(scratchpad[2]["meta"], {"foo3": "bar3"})
  66. res3 = p.apply_async(do_something, args=[30], callbacks=[mycallback],
  67. meta={"foo4": "bar4"})
  68. self.assertEquals(res3.get(), 900)
  69. time.sleep(0.5)
  70. self.assertTrue(scratchpad.get(3))
  71. self.assertEquals(scratchpad[3]["ret_value"], 900)
  72. self.assertEquals(scratchpad[3]["meta"], {"foo4": "bar4"})
  73. p.stop()