| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 | from __future__ import absolute_import, unicode_literalsfrom contextlib import contextmanagerimport pytestfrom case import Mock, mock, patchfrom celery.fixups.django import (DjangoFixup, DjangoWorkerFixup,                                  FixupWarning, _maybe_close_fd, fixup)class FixupCase:    Fixup = None    @contextmanager    def fixup_context(self, app):        with patch('celery.fixups.django.DjangoWorkerFixup.validate_models'):            with patch('celery.fixups.django.symbol_by_name') as symbyname:                with patch('celery.fixups.django.import_module') as impmod:                    f = self.Fixup(app)                    yield f, impmod, symbynameclass test_DjangoFixup(FixupCase):    Fixup = DjangoFixup    def test_setting_default_app(self):        from celery import _state        prev, _state.default_app = _state.default_app, None        try:            app = Mock(name='app')            DjangoFixup(app)            app.set_default.assert_called_with()        finally:            _state.default_app = prev    @patch('celery.fixups.django.DjangoWorkerFixup')    def test_worker_fixup_property(self, DjangoWorkerFixup):        f = DjangoFixup(self.app)        f._worker_fixup = None        assert f.worker_fixup is DjangoWorkerFixup()        assert f.worker_fixup is DjangoWorkerFixup()    def test_on_import_modules(self):        f = DjangoFixup(self.app)        f.worker_fixup = Mock(name='worker_fixup')        f.on_import_modules()        f.worker_fixup.validate_models.assert_called_with()    def test_autodiscover_tasks(self, patching):        patching.modules('django.apps')        from django.apps import apps        f = DjangoFixup(self.app)        configs = [Mock(name='c1'), Mock(name='c2')]        apps.get_app_configs.return_value = configs        assert f.autodiscover_tasks() == [c.name for c in configs]    def test_fixup(self, patching):        with patch('celery.fixups.django.DjangoFixup') as Fixup:            patching.setenv('DJANGO_SETTINGS_MODULE', '')            fixup(self.app)            Fixup.assert_not_called()            patching.setenv('DJANGO_SETTINGS_MODULE', 'settings')            with mock.mask_modules('django'):                with pytest.warns(FixupWarning):                    fixup(self.app)                Fixup.assert_not_called()            with mock.module_exists('django'):                import django                django.VERSION = (1, 10, 1)                fixup(self.app)                Fixup.assert_called()    def test_maybe_close_fd(self):        with patch('os.close'):            _maybe_close_fd(Mock())            _maybe_close_fd(object())    def test_init(self):        with self.fixup_context(self.app) as (f, importmod, sym):            assert f    def test_install(self, patching):        self.app.loader = Mock()        self.cw = patching('os.getcwd')        self.p = patching('sys.path')        self.sigs = patching('celery.fixups.django.signals')        with self.fixup_context(self.app) as (f, _, _):            self.cw.return_value = '/opt/vandelay'            f.install()            self.sigs.worker_init.connect.assert_called_with(f.on_worker_init)            assert self.app.loader.now == f.now            self.p.append.assert_called_with('/opt/vandelay')    def test_now(self):        with self.fixup_context(self.app) as (f, _, _):            assert f.now(utc=True)            f._now.assert_not_called()            assert f.now(utc=False)            f._now.assert_called()    def test_on_worker_init(self):        with self.fixup_context(self.app) as (f, _, _):            with patch('celery.fixups.django.DjangoWorkerFixup') as DWF:                f.on_worker_init()                DWF.assert_called_with(f.app)                DWF.return_value.install.assert_called_with()                assert f._worker_fixup is DWF.return_valueclass test_DjangoWorkerFixup(FixupCase):    Fixup = DjangoWorkerFixup    def test_init(self):        with self.fixup_context(self.app) as (f, importmod, sym):            assert f    def test_install(self):        self.app.conf = {'CELERY_DB_REUSE_MAX': None}        self.app.loader = Mock()        with self.fixup_context(self.app) as (f, _, _):            with patch('celery.fixups.django.signals') as sigs:                f.install()                sigs.beat_embedded_init.connect.assert_called_with(                    f.close_database,                )                sigs.worker_ready.connect.assert_called_with(f.on_worker_ready)                sigs.task_prerun.connect.assert_called_with(f.on_task_prerun)                sigs.task_postrun.connect.assert_called_with(f.on_task_postrun)                sigs.worker_process_init.connect.assert_called_with(                    f.on_worker_process_init,                )    def test_on_worker_process_init(self, patching):        with self.fixup_context(self.app) as (f, _, _):            with patch('celery.fixups.django._maybe_close_fd') as mcf:                _all = f._db.connections.all = Mock()                conns = _all.return_value = [                    Mock(), Mock(),                ]                conns[0].connection = None                with patch.object(f, 'close_cache'):                    with patch.object(f, '_close_database'):                        f.on_worker_process_init()                        mcf.assert_called_with(conns[1].connection)                        f.close_cache.assert_called_with()                        f._close_database.assert_called_with()                        f.validate_models = Mock(name='validate_models')                        patching.setenv('FORKED_BY_MULTIPROCESSING', '1')                        f.on_worker_process_init()                        f.validate_models.assert_called_with()    def test_on_task_prerun(self):        task = Mock()        with self.fixup_context(self.app) as (f, _, _):            task.request.is_eager = False            with patch.object(f, 'close_database'):                f.on_task_prerun(task)                f.close_database.assert_called_with()            task.request.is_eager = True            with patch.object(f, 'close_database'):                f.on_task_prerun(task)                f.close_database.assert_not_called()    def test_on_task_postrun(self):        task = Mock()        with self.fixup_context(self.app) as (f, _, _):            with patch.object(f, 'close_cache'):                task.request.is_eager = False                with patch.object(f, 'close_database'):                    f.on_task_postrun(task)                    f.close_database.assert_called()                    f.close_cache.assert_called()            # when a task is eager, don't close connections            with patch.object(f, 'close_cache'):                task.request.is_eager = True                with patch.object(f, 'close_database'):                    f.on_task_postrun(task)                    f.close_database.assert_not_called()                    f.close_cache.assert_not_called()    def test_close_database(self):        with self.fixup_context(self.app) as (f, _, _):            with patch.object(f, '_close_database') as _close:                f.db_reuse_max = None                f.close_database()                _close.assert_called_with()                _close.reset_mock()                f.db_reuse_max = 10                f._db_recycles = 3                f.close_database()                _close.assert_not_called()                assert f._db_recycles == 4                _close.reset_mock()                f._db_recycles = 20                f.close_database()                _close.assert_called_with()                assert f._db_recycles == 1    def test__close_database(self):        with self.fixup_context(self.app) as (f, _, _):            conns = [Mock(), Mock(), Mock()]            conns[1].close.side_effect = KeyError('already closed')            f.DatabaseError = KeyError            f.interface_errors = ()            f._db.connections = Mock()  # ConnectionHandler            f._db.connections.all.side_effect = lambda: conns            f._close_database()            conns[0].close_if_unusable_or_obsolete.assert_called_with()            conns[1].close_if_unusable_or_obsolete.assert_called_with()            conns[2].close_if_unusable_or_obsolete.assert_called_with()            conns[1].close_if_unusable_or_obsolete.side_effect = KeyError(                'omg')            with pytest.raises(KeyError):                f._close_database()    def test_close_cache(self):        with self.fixup_context(self.app) as (f, _, _):            f.close_cache()            f._cache.close_caches.assert_called_with()    def test_on_worker_ready(self):        with self.fixup_context(self.app) as (f, _, _):            f._settings.DEBUG = False            f.on_worker_ready()            with pytest.warns(UserWarning):                f._settings.DEBUG = True                f.on_worker_ready()    def test_validate_models(self, patching):        with mock.module('django', 'django.db', 'django.core',                         'django.core.cache', 'django.conf',                         'django.db.utils'):            f = self.Fixup(self.app)            f.django_setup = Mock(name='django.setup')            patching.modules('django.core.checks')            from django.core.checks import run_checks            f.validate_models()            f.django_setup.assert_called_with()            run_checks.assert_called_with()    def test_django_setup(self, patching):        patching('celery.fixups.django.symbol_by_name')        patching('celery.fixups.django.import_module')        django, = patching.modules('django')        f = self.Fixup(self.app)        f.django_setup()        django.setup.assert_called_with()
 |