test_concurrency_base.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import os
  2. from itertools import count
  3. from celery.concurrency.base import apply_target, BasePool
  4. from celery.tests.utils import unittest
  5. class test_BasePool(unittest.TestCase):
  6. def test_apply_target(self):
  7. scratch = {}
  8. counter = count(0).next
  9. def gen_callback(name, retval=None):
  10. def callback(*args):
  11. scratch[name] = (counter(), args)
  12. return retval
  13. return callback
  14. apply_target(gen_callback("target", 42),
  15. args=(8, 16),
  16. callback=gen_callback("callback"),
  17. accept_callback=gen_callback("accept_callback"))
  18. self.assertDictContainsSubset({
  19. "target": (1, (8, 16)),
  20. "callback": (2, (42, ))}, scratch)
  21. pa1 = scratch["accept_callback"]
  22. self.assertEqual(0, pa1[0])
  23. self.assertEqual(pa1[1][0], os.getpid())
  24. self.assertTrue(pa1[1][1])
  25. # No accept callback
  26. scratch.clear()
  27. apply_target(gen_callback("target", 42),
  28. args=(8, 16),
  29. callback=gen_callback("callback"),
  30. accept_callback=None)
  31. self.assertDictEqual(scratch,
  32. {"target": (3, (8, 16)),
  33. "callback": (4, (42, ))})
  34. def test_interface_on_start(self):
  35. BasePool(10).on_start()
  36. def test_interface_on_stop(self):
  37. BasePool(10).on_stop()
  38. def test_interface_on_apply(self):
  39. BasePool(10).on_apply()
  40. def test_interface_info(self):
  41. self.assertDictEqual(BasePool(10).info, {})
  42. def test_active(self):
  43. p = BasePool(10)
  44. self.assertFalse(p.active)
  45. p._state = p.RUN
  46. self.assertTrue(p.active)