| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 | 
							- from __future__ import absolute_import, unicode_literals
 
- import sys
 
- import pytest
 
- from case import Mock, patch, skip
 
- from celery.concurrency.eventlet import TaskPool, Timer, apply_target
 
- eventlet_modules = (
 
-     'eventlet',
 
-     'eventlet.debug',
 
-     'eventlet.greenthread',
 
-     'eventlet.greenpool',
 
-     'greenlet',
 
- )
 
- @skip.if_pypy()
 
- @skip.unless_module('eventlet')
 
- class EventletCase:
 
-     def setup(self):
 
-         self.patching.modules(*eventlet_modules)
 
-     def teardown(self):
 
-         for mod in [mod for mod in sys.modules
 
-                     if mod.startswith('eventlet')]:
 
-             try:
 
-                 del(sys.modules[mod])
 
-             except KeyError:
 
-                 pass
 
- class test_aaa_eventlet_patch(EventletCase):
 
-     def test_aaa_is_patched(self):
 
-         with patch('eventlet.monkey_patch', create=True) as monkey_patch:
 
-             from celery import maybe_patch_concurrency
 
-             maybe_patch_concurrency(['x', '-P', 'eventlet'])
 
-             monkey_patch.assert_called_with()
 
-     @patch('eventlet.debug.hub_blocking_detection', create=True)
 
-     @patch('eventlet.monkey_patch', create=True)
 
-     def test_aaa_blockdetecet(
 
-             self, monkey_patch, hub_blocking_detection, patching):
 
-         patching.setenv('EVENTLET_NOBLOCK', '10.3')
 
-         from celery import maybe_patch_concurrency
 
-         maybe_patch_concurrency(['x', '-P', 'eventlet'])
 
-         monkey_patch.assert_called_with()
 
-         hub_blocking_detection.assert_called_with(10.3, 10.3)
 
- class test_Timer(EventletCase):
 
-     @pytest.fixture(autouse=True)
 
-     def setup_patches(self, patching):
 
-         self.spawn_after = patching('eventlet.greenthread.spawn_after')
 
-         self.GreenletExit = patching('greenlet.GreenletExit')
 
-     def test_sched(self):
 
-         x = Timer()
 
-         x.GreenletExit = KeyError
 
-         entry = Mock()
 
-         g = x._enter(1, 0, entry)
 
-         assert x.queue
 
-         x._entry_exit(g, entry)
 
-         g.wait.side_effect = KeyError()
 
-         x._entry_exit(g, entry)
 
-         entry.cancel.assert_called_with()
 
-         assert not x._queue
 
-         x._queue.add(g)
 
-         x.clear()
 
-         x._queue.add(g)
 
-         g.cancel.side_effect = KeyError()
 
-         x.clear()
 
-     def test_cancel(self):
 
-         x = Timer()
 
-         tref = Mock(name='tref')
 
-         x.cancel(tref)
 
-         tref.cancel.assert_called_with()
 
-         x.GreenletExit = KeyError
 
-         tref.cancel.side_effect = KeyError()
 
-         x.cancel(tref)
 
- class test_TaskPool(EventletCase):
 
-     @pytest.fixture(autouse=True)
 
-     def setup_patches(self, patching):
 
-         self.GreenPool = patching('eventlet.greenpool.GreenPool')
 
-         self.greenthread = patching('eventlet.greenthread')
 
-     def test_pool(self):
 
-         x = TaskPool()
 
-         x.on_start()
 
-         x.on_stop()
 
-         x.on_apply(Mock())
 
-         x._pool = None
 
-         x.on_stop()
 
-         assert x.getpid()
 
-     @patch('celery.concurrency.eventlet.base')
 
-     def test_apply_target(self, base):
 
-         apply_target(Mock(), getpid=Mock())
 
-         base.apply_target.assert_called()
 
-     def test_grow(self):
 
-         x = TaskPool(10)
 
-         x._pool = Mock(name='_pool')
 
-         x.grow(2)
 
-         assert x.limit == 12
 
-         x._pool.resize.assert_called_with(12)
 
-     def test_shrink(self):
 
-         x = TaskPool(10)
 
-         x._pool = Mock(name='_pool')
 
-         x.shrink(2)
 
-         assert x.limit == 8
 
-         x._pool.resize.assert_called_with(8)
 
-     def test_get_info(self):
 
-         x = TaskPool(10)
 
-         x._pool = Mock(name='_pool')
 
-         assert x._get_info() == {
 
-             'max-concurrency': 10,
 
-             'free-threads': x._pool.free(),
 
-             'running-threads': x._pool.running(),
 
-         }
 
 
  |