test_components.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, patch, skip
  4. from celery.exceptions import ImproperlyConfigured
  5. from celery.worker.components import Beat, Hub, Pool, Timer
  6. # some of these are tested in test_worker, so I've only written tests
  7. # here to complete coverage. Should move everyting to this module at some
  8. # point [-ask]
  9. class test_Timer:
  10. def test_create__eventloop(self):
  11. w = Mock(name='w')
  12. w.use_eventloop = True
  13. Timer(w).create(w)
  14. assert not w.timer.queue
  15. class test_Hub:
  16. def setup(self):
  17. self.w = Mock(name='w')
  18. self.hub = Hub(self.w)
  19. self.w.hub = Mock(name='w.hub')
  20. @patch('celery.worker.components.set_event_loop')
  21. @patch('celery.worker.components.get_event_loop')
  22. def test_create(self, get_event_loop, set_event_loop):
  23. self.hub._patch_thread_primitives = Mock(name='ptp')
  24. assert self.hub.create(self.w) is self.hub
  25. self.hub._patch_thread_primitives.assert_called_with(self.w)
  26. def test_start(self):
  27. self.hub.start(self.w)
  28. def test_stop(self):
  29. self.hub.stop(self.w)
  30. self.w.hub.close.assert_called_with()
  31. def test_terminate(self):
  32. self.hub.terminate(self.w)
  33. self.w.hub.close.assert_called_with()
  34. class test_Pool:
  35. def test_close_terminate(self):
  36. w = Mock()
  37. comp = Pool(w)
  38. pool = w.pool = Mock()
  39. comp.close(w)
  40. pool.close.assert_called_with()
  41. comp.terminate(w)
  42. pool.terminate.assert_called_with()
  43. w.pool = None
  44. comp.close(w)
  45. comp.terminate(w)
  46. @skip.if_win32()
  47. def test_create_when_eventloop(self):
  48. w = Mock()
  49. w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
  50. comp = Pool(w)
  51. w.pool = Mock()
  52. comp.create(w)
  53. assert w.process_task is w._process_task_sem
  54. def test_create_calls_instantiate_with_max_memory(self):
  55. w = Mock()
  56. w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
  57. comp = Pool(w)
  58. comp.instantiate = Mock()
  59. w.max_memory_per_child = 32
  60. comp.create(w)
  61. assert comp.instantiate.call_args[1]['max_memory_per_child'] == 32
  62. class test_Beat:
  63. def test_create__green(self):
  64. w = Mock(name='w')
  65. w.pool_cls.__module__ = 'foo_gevent'
  66. with pytest.raises(ImproperlyConfigured):
  67. Beat(w).create(w)