test_concurrency_gevent.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from __future__ import absolute_import
  2. from nose import SkipTest
  3. from celery.concurrency.gevent import TaskPool
  4. from celery.tests.utils import unittest
  5. class GeventCase(unittest.TestCase):
  6. def setUp(self):
  7. try:
  8. self.gevent = __import__("gevent")
  9. except ImportError:
  10. raise SkipTest(
  11. "gevent not installed, skipping related tests.")
  12. class test_TaskPool(GeventCase):
  13. def test_grow(self):
  14. pool = TaskPool(10)
  15. pool.start()
  16. self.assertEqual(pool._pool.size, 10)
  17. pool.grow()
  18. self.assertEqual(pool._pool.size, 11)
  19. def test_grow_many(self):
  20. pool = TaskPool(10)
  21. pool.start()
  22. self.assertEqual(pool._pool.size, 10)
  23. pool.grow(2)
  24. self.assertEqual(pool._pool.size, 12)
  25. def test_shrink(self):
  26. pool = TaskPool(10)
  27. pool.start()
  28. self.assertEqual(pool._pool.size, 10)
  29. pool.shrink()
  30. self.assertEqual(pool._pool.size, 9)
  31. def test_shrink_many(self):
  32. pool = TaskPool(10)
  33. pool.start()
  34. self.assertEqual(pool._pool.size, 10)
  35. pool.shrink(2)
  36. self.assertEqual(pool._pool.size, 8)
  37. def test_num_processes(self):
  38. pool = TaskPool(10)
  39. pool.start()
  40. pool.apply_async(lambda x: x, (2, ), {})
  41. self.assertEqual(pool.num_processes, 1)