| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 | from __future__ import absolute_import, unicode_literalsimport osimport sysimport warningsimport pytestfrom case import Mock, mock, patchfrom celery import loadersfrom celery.exceptions import NotConfiguredfrom celery.five import bytes_if_py2from celery.loaders import base, defaultfrom celery.loaders.app import AppLoaderfrom celery.utils.imports import NotAPackageclass DummyLoader(base.BaseLoader):    def read_configuration(self):        return {'foo': 'bar', 'imports': ('os', 'sys')}class test_loaders:    def test_get_loader_cls(self):        assert loaders.get_loader_cls('default') is default.Loaderclass test_LoaderBase:    message_options = {'subject': 'Subject',                       'body': 'Body',                       'sender': 'x@x.com',                       'to': 'y@x.com'}    server_options = {'host': 'smtp.x.com',                      'port': 1234,                      'user': 'x',                      'password': 'qwerty',                      'timeout': 3}    def setup(self):        self.loader = DummyLoader(app=self.app)    def test_handlers_pass(self):        self.loader.on_task_init('foo.task', 'feedface-cafebabe')        self.loader.on_worker_init()    def test_now(self):        assert self.loader.now(utc=True)        assert self.loader.now(utc=False)    def test_read_configuration_no_env(self):        assert base.BaseLoader(app=self.app).read_configuration(            'FOO_X_S_WE_WQ_Q_WE') is None    def test_autodiscovery(self):        with patch('celery.loaders.base.autodiscover_tasks') as auto:            auto.return_value = [Mock()]            auto.return_value[0].__name__ = 'moo'            self.loader.autodiscover_tasks(['A', 'B'])            assert 'moo' in self.loader.task_modules            self.loader.task_modules.discard('moo')    def test_import_task_module(self):        assert sys == self.loader.import_task_module('sys')    def test_init_worker_process(self):        self.loader.on_worker_process_init()        m = self.loader.on_worker_process_init = Mock()        self.loader.init_worker_process()        m.assert_called_with()    def test_config_from_object_module(self):        self.loader.import_from_cwd = Mock()        self.loader.config_from_object('module_name')        self.loader.import_from_cwd.assert_called_with('module_name')    def test_conf_property(self):        assert self.loader.conf['foo'] == 'bar'        assert self.loader._conf['foo'] == 'bar'        assert self.loader.conf['foo'] == 'bar'    def test_import_default_modules(self):        def modnames(l):            return [m.__name__ for m in l]        self.app.conf.imports = ('os', 'sys')        assert (sorted(modnames(self.loader.import_default_modules())) ==                sorted(modnames([os, sys])))    def test_import_default_modules_with_exception(self):        """ Make sure exceptions are not silenced since this step is prior to            setup logging. """        def trigger_exception(**kwargs):            raise ImportError('Dummy ImportError')        from celery.signals import import_modules        import_modules.connect(trigger_exception)        self.app.conf.imports = ('os', 'sys')        with pytest.raises(ImportError):            self.loader.import_default_modules()    def test_import_from_cwd_custom_imp(self):        imp = Mock(name='imp')        self.loader.import_from_cwd('foo', imp=imp)        imp.assert_called()    def test_cmdline_config_ValueError(self):        with pytest.raises(ValueError):            self.loader.cmdline_config_parser(['broker.port=foobar'])class test_DefaultLoader:    @patch('celery.loaders.base.find_module')    def test_read_configuration_not_a_package(self, find_module):        find_module.side_effect = NotAPackage()        l = default.Loader(app=self.app)        with pytest.raises(NotAPackage):            l.read_configuration(fail_silently=False)    @patch('celery.loaders.base.find_module')    @mock.environ('CELERY_CONFIG_MODULE', 'celeryconfig.py')    def test_read_configuration_py_in_name(self, find_module):        find_module.side_effect = NotAPackage()        l = default.Loader(app=self.app)        with pytest.raises(NotAPackage):            l.read_configuration(fail_silently=False)    @patch('celery.loaders.base.find_module')    def test_read_configuration_importerror(self, find_module):        default.C_WNOCONF = True        find_module.side_effect = ImportError()        l = default.Loader(app=self.app)        with pytest.warns(NotConfigured):            l.read_configuration(fail_silently=True)        default.C_WNOCONF = False        l.read_configuration(fail_silently=True)    def test_read_configuration(self):        from types import ModuleType        class ConfigModule(ModuleType):            pass        configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'        celeryconfig = ConfigModule(bytes_if_py2(configname))        celeryconfig.imports = ('os', 'sys')        prevconfig = sys.modules.get(configname)        sys.modules[configname] = celeryconfig        try:            l = default.Loader(app=self.app)            l.find_module = Mock(name='find_module')            settings = l.read_configuration(fail_silently=False)            assert settings.imports == ('os', 'sys')            settings = l.read_configuration(fail_silently=False)            assert settings.imports == ('os', 'sys')            l.on_worker_init()        finally:            if prevconfig:                sys.modules[configname] = prevconfig    def test_read_configuration_ImportError(self):        sentinel = object()        prev, os.environ['CELERY_CONFIG_MODULE'] = (            os.environ.get('CELERY_CONFIG_MODULE', sentinel), 'daweqew.dweqw',        )        try:            l = default.Loader(app=self.app)            with pytest.raises(ImportError):                l.read_configuration(fail_silently=False)            l.read_configuration(fail_silently=True)        finally:            if prev is not sentinel:                os.environ['CELERY_CONFIG_MODULE'] = prev            else:                os.environ.pop('CELERY_CONFIG_MODULE', None)    def test_import_from_cwd(self):        l = default.Loader(app=self.app)        old_path = list(sys.path)        try:            sys.path.remove(os.getcwd())        except ValueError:            pass        celery = sys.modules.pop('celery', None)        sys.modules.pop('celery.local', None)        try:            assert l.import_from_cwd('celery')            sys.modules.pop('celery', None)            sys.modules.pop('celery.local', None)            sys.path.insert(0, os.getcwd())            assert l.import_from_cwd('celery')        finally:            sys.path = old_path            sys.modules['celery'] = celery    def test_unconfigured_settings(self):        context_executed = [False]        class _Loader(default.Loader):            def find_module(self, name):                raise ImportError(name)        with warnings.catch_warnings(record=True):            l = _Loader(app=self.app)            assert not l.configured            context_executed[0] = True        assert context_executed[0]class test_AppLoader:    def setup(self):        self.loader = AppLoader(app=self.app)    def test_on_worker_init(self):        self.app.conf.imports = ('subprocess',)        sys.modules.pop('subprocess', None)        self.loader.init_worker()        assert 'subprocess' in sys.modulesclass test_autodiscovery:    def test_autodiscover_tasks(self):        base._RACE_PROTECTION = True        try:            base.autodiscover_tasks(['foo'])        finally:            base._RACE_PROTECTION = False        with patch('celery.loaders.base.find_related_module') as frm:            base.autodiscover_tasks(['foo'])            frm.assert_called()    def test_find_related_module(self):        with patch('importlib.import_module') as imp:            with patch('imp.find_module') as find:                imp.return_value = Mock()                imp.return_value.__path__ = 'foo'                base.find_related_module(base, 'tasks')                find.side_effect = ImportError()                base.find_related_module(base, 'tasks')
 |