| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048 | from __future__ import absolute_import, unicode_literalsfrom datetime import datetime, timedeltaimport gcimport itertoolsimport osimport pytestfrom copy import deepcopyfrom pickle import loads, dumpsfrom case import ContextMock, Mock, mock, patchfrom vine import promisefrom celery import Celeryfrom celery import shared_task, current_appfrom celery import app as _appfrom celery import _statefrom celery.app import base as _appbasefrom celery.app import defaultsfrom celery.exceptions import ImproperlyConfiguredfrom celery.five import items, keysfrom celery.loaders.base import unconfiguredfrom celery.platforms import pyimplementationfrom celery.utils.collections import DictAttributefrom celery.utils.serialization import picklefrom celery.utils.time import timezone, to_utc, localizefrom celery.utils.objects import BunchTHIS_IS_A_KEY = 'this is a value'class ObjectConfig(object):    FOO = 1    BAR = 2object_config = ObjectConfig()dict_config = {'FOO': 10, 'BAR': 20}class ObjectConfig2(object):    LEAVE_FOR_WORK = True    MOMENT_TO_STOP = True    CALL_ME_BACK = 123456789    WANT_ME_TO = False    UNDERSTAND_ME = Trueclass test_module:    def test_default_app(self):        assert _app.default_app == _state.default_app    def test_bugreport(self, app):        assert _app.bugreport(app=app)class test_task_join_will_block:    def test_task_join_will_block(self, patching):        patching('celery._state._task_join_will_block', 0)        assert _state._task_join_will_block == 0        _state._set_task_join_will_block(True)        assert _state._task_join_will_block is True        # fixture 'app' sets this, so need to use orig_ function        # set there by that fixture.        res = _state.orig_task_join_will_block()        assert res is Trueclass test_App:    def setup(self):        self.app.add_defaults(deepcopy(self.CELERY_TEST_CONFIG))    def test_now(self):        timezone_setting_value = 'US/Eastern'        tz_utc = timezone.get_timezone('UTC')        tz_us_eastern = timezone.get_timezone(timezone_setting_value)        now = to_utc(datetime.utcnow())        app_now = self.app.now()        assert app_now.tzinfo is tz_utc        assert app_now - now <= timedelta(seconds=1)        # Check that timezone conversion is applied from configuration        self.app.conf.enable_utc = False        self.app.conf.timezone = timezone_setting_value        # timezone is a cached property        del self.app.timezone        app_now = self.app.now()        assert app_now.tzinfo.zone == tz_us_eastern.zone        diff = to_utc(datetime.utcnow()) - localize(app_now, tz_utc)        assert diff <= timedelta(seconds=1)        # Verify that timezone setting overrides enable_utc=on setting        self.app.conf.enable_utc = True        del self.app.timezone        app_now = self.app.now()        assert self.app.timezone == tz_us_eastern        assert app_now.tzinfo.zone == tz_us_eastern.zone    @patch('celery.app.base.set_default_app')    def test_set_default(self, set_default_app):        self.app.set_default()        set_default_app.assert_called_with(self.app)    @patch('celery.security.setup_security')    def test_setup_security(self, setup_security):        self.app.setup_security(            {'json'}, 'key', 'cert', 'store', 'digest', 'serializer')        setup_security.assert_called_with(            {'json'}, 'key', 'cert', 'store', 'digest', 'serializer',            app=self.app)    def test_task_autofinalize_disabled(self):        with self.Celery('xyzibari', autofinalize=False) as app:            @app.task            def ttafd():                return 42            with pytest.raises(RuntimeError):                ttafd()        with self.Celery('xyzibari', autofinalize=False) as app:            @app.task            def ttafd2():                return 42            app.finalize()            assert ttafd2() == 42    def test_registry_autofinalize_disabled(self):        with self.Celery('xyzibari', autofinalize=False) as app:            with pytest.raises(RuntimeError):                app.tasks['celery.chain']            app.finalize()            assert app.tasks['celery.chain']    def test_task(self):        with self.Celery('foozibari') as app:            def fun():                pass            fun.__module__ = '__main__'            task = app.task(fun)            assert task.name == app.main + '.fun'    def test_task_too_many_args(self):        with pytest.raises(TypeError):            self.app.task(Mock(name='fun'), True)        with pytest.raises(TypeError):            self.app.task(Mock(name='fun'), True, 1, 2)    def test_with_config_source(self):        with self.Celery(config_source=ObjectConfig) as app:            assert app.conf.FOO == 1            assert app.conf.BAR == 2    @pytest.mark.usefixtures('depends_on_current_app')    def test_task_windows_execv(self):        prev, _appbase.USING_EXECV = _appbase.USING_EXECV, True        try:            @self.app.task(shared=False)            def foo():                pass            assert foo._get_current_object()  # is proxy        finally:            _appbase.USING_EXECV = prev        assert not _appbase.USING_EXECV    def test_task_takes_no_args(self):        with pytest.raises(TypeError):            @self.app.task(1)            def foo():                pass    def test_add_defaults(self):        assert not self.app.configured        _conf = {'foo': 300}        def conf():            return _conf        self.app.add_defaults(conf)        assert conf in self.app._pending_defaults        assert not self.app.configured        assert self.app.conf.foo == 300        assert self.app.configured        assert not self.app._pending_defaults        # defaults not pickled        appr = loads(dumps(self.app))        with pytest.raises(AttributeError):            appr.conf.foo        # add more defaults after configured        conf2 = {'foo': 'BAR'}        self.app.add_defaults(conf2)        assert self.app.conf.foo == 'BAR'        assert _conf in self.app.conf.defaults        assert conf2 in self.app.conf.defaults    def test_connection_or_acquire(self):        with self.app.connection_or_acquire(block=True):            assert self.app.pool._dirty        with self.app.connection_or_acquire(pool=False):            assert not self.app.pool._dirty    def test_using_v1_reduce(self):        self.app._using_v1_reduce = True        assert loads(dumps(self.app))    def test_autodiscover_tasks_force(self):        self.app.loader.autodiscover_tasks = Mock()        self.app.autodiscover_tasks(['proj.A', 'proj.B'], force=True)        self.app.loader.autodiscover_tasks.assert_called_with(            ['proj.A', 'proj.B'], 'tasks',        )        self.app.loader.autodiscover_tasks = Mock()        def lazy_list():            return ['proj.A', 'proj.B']        self.app.autodiscover_tasks(            lazy_list,            related_name='george',            force=True,        )        self.app.loader.autodiscover_tasks.assert_called_with(            ['proj.A', 'proj.B'], 'george',        )    def test_autodiscover_tasks_lazy(self):        with patch('celery.signals.import_modules') as import_modules:            def lazy_list():                return [1, 2, 3]            self.app.autodiscover_tasks(lazy_list)            import_modules.connect.assert_called()            prom = import_modules.connect.call_args[0][0]            assert isinstance(prom, promise)            assert prom.fun == self.app._autodiscover_tasks            assert prom.args[0](), [1, 2 == 3]    def test_autodiscover_tasks__no_packages(self):        fixup1 = Mock(name='fixup')        fixup2 = Mock(name='fixup')        self.app._autodiscover_tasks_from_names = Mock(name='auto')        self.app._fixups = [fixup1, fixup2]        fixup1.autodiscover_tasks.return_value = ['A', 'B', 'C']        fixup2.autodiscover_tasks.return_value = ['D', 'E', 'F']        self.app.autodiscover_tasks(force=True)        self.app._autodiscover_tasks_from_names.assert_called_with(            ['A', 'B', 'C', 'D', 'E', 'F'], related_name='tasks',        )    def test_with_broker(self, patching):        patching.setenv('CELERY_BROKER_URL', '')        with self.Celery(broker='foo://baribaz') as app:            assert app.conf.broker_url == 'foo://baribaz'    def test_pending_configuration__setattr(self):        with self.Celery(broker='foo://bar') as app:            app.conf.task_default_delivery_mode = 44            app.conf.worker_agent = 'foo:Bar'            assert not app.configured            assert app.conf.worker_agent == 'foo:Bar'            assert app.conf.broker_url == 'foo://bar'            assert app._preconf['worker_agent'] == 'foo:Bar'            assert app.configured            reapp = pickle.loads(pickle.dumps(app))            assert reapp._preconf['worker_agent'] == 'foo:Bar'            assert not reapp.configured            assert reapp.conf.worker_agent == 'foo:Bar'            assert reapp.configured            assert reapp.conf.broker_url == 'foo://bar'            assert reapp._preconf['worker_agent'] == 'foo:Bar'    def test_pending_configuration__update(self):        with self.Celery(broker='foo://bar') as app:            app.conf.update(                task_default_delivery_mode=44,                worker_agent='foo:Bar',            )            assert not app.configured            assert app.conf.worker_agent == 'foo:Bar'            assert app.conf.broker_url == 'foo://bar'            assert app._preconf['worker_agent'] == 'foo:Bar'    def test_pending_configuration__compat_settings(self):        with self.Celery(broker='foo://bar', backend='foo') as app:            app.conf.update(                CELERY_ALWAYS_EAGER=4,                CELERY_DEFAULT_DELIVERY_MODE=63,                CELERYD_AGENT='foo:Barz',            )            assert app.conf.task_always_eager == 4            assert app.conf.task_default_delivery_mode == 63            assert app.conf.worker_agent == 'foo:Barz'            assert app.conf.broker_url == 'foo://bar'            assert app.conf.result_backend == 'foo'    def test_pending_configuration__compat_settings_mixing(self):        with self.Celery(broker='foo://bar', backend='foo') as app:            app.conf.update(                CELERY_ALWAYS_EAGER=4,                CELERY_DEFAULT_DELIVERY_MODE=63,                CELERYD_AGENT='foo:Barz',                worker_consumer='foo:Fooz',            )            with pytest.raises(ImproperlyConfigured):                assert app.conf.task_always_eager == 4    def test_pending_configuration__django_settings(self):        with self.Celery(broker='foo://bar', backend='foo') as app:            app.config_from_object(DictAttribute(Bunch(                CELERY_TASK_ALWAYS_EAGER=4,                CELERY_TASK_DEFAULT_DELIVERY_MODE=63,                CELERY_WORKER_AGENT='foo:Barz',                CELERY_RESULT_SERIALIZER='pickle',            )), namespace='CELERY')            assert app.conf.result_serializer == 'pickle'            assert app.conf.CELERY_RESULT_SERIALIZER == 'pickle'            assert app.conf.task_always_eager == 4            assert app.conf.task_default_delivery_mode == 63            assert app.conf.worker_agent == 'foo:Barz'            assert app.conf.broker_url == 'foo://bar'            assert app.conf.result_backend == 'foo'    def test_pending_configuration__compat_settings_mixing_new(self):        with self.Celery(broker='foo://bar', backend='foo') as app:            app.conf.update(                task_always_eager=4,                task_default_delivery_mode=63,                worker_agent='foo:Barz',                CELERYD_CONSUMER='foo:Fooz',                CELERYD_AUTOSCALER='foo:Xuzzy',            )            with pytest.raises(ImproperlyConfigured):                assert app.conf.worker_consumer == 'foo:Fooz'    def test_pending_configuration__compat_settings_mixing_alt(self):        with self.Celery(broker='foo://bar', backend='foo') as app:            app.conf.update(                task_always_eager=4,                task_default_delivery_mode=63,                worker_agent='foo:Barz',                CELERYD_CONSUMER='foo:Fooz',                worker_consumer='foo:Fooz',                CELERYD_AUTOSCALER='foo:Xuzzy',                worker_autoscaler='foo:Xuzzy'            )    def test_pending_configuration__setdefault(self):        with self.Celery(broker='foo://bar') as app:            assert not app.configured            app.conf.setdefault('worker_agent', 'foo:Bar')            assert not app.configured    def test_pending_configuration__iter(self):        with self.Celery(broker='foo://bar') as app:            app.conf.worker_agent = 'foo:Bar'            assert not app.configured            assert list(keys(app.conf))            assert app.configured            assert 'worker_agent' in app.conf            assert dict(app.conf)    def test_pending_configuration__raises_ImproperlyConfigured(self):        with self.Celery(set_as_current=False) as app:            app.conf.worker_agent = 'foo://bar'            app.conf.task_default_delivery_mode = 44            app.conf.CELERY_ALWAYS_EAGER = 5            with pytest.raises(ImproperlyConfigured):                app.finalize()        with self.Celery() as app:            assert not self.app.conf.task_always_eager    def test_repr(self):        assert repr(self.app)    def test_custom_task_registry(self):        with self.Celery(tasks=self.app.tasks) as app2:            assert app2.tasks is self.app.tasks    def test_include_argument(self):        with self.Celery(include=('foo', 'bar.foo')) as app:            assert app.conf.include, ('foo' == 'bar.foo')    def test_set_as_current(self):        current = _state._tls.current_app        try:            app = self.Celery(set_as_current=True)            assert _state._tls.current_app is app        finally:            _state._tls.current_app = current    def test_current_task(self):        @self.app.task        def foo(shared=False):            pass        _state._task_stack.push(foo)        try:            assert self.app.current_task.name == foo.name        finally:            _state._task_stack.pop()    def test_task_not_shared(self):        with patch('celery.app.base.connect_on_app_finalize') as sh:            @self.app.task(shared=False)            def foo():                pass            sh.assert_not_called()    def test_task_compat_with_filter(self):        with self.Celery() as app:            check = Mock()            def filter(task):                check(task)                return task            @app.task(filter=filter, shared=False)            def foo():                pass            check.assert_called_with(foo)    def test_task_with_filter(self):        with self.Celery() as app:            check = Mock()            def filter(task):                check(task)                return task            assert not _appbase.USING_EXECV            @app.task(filter=filter, shared=False)            def foo():                pass            check.assert_called_with(foo)    def test_task_sets_main_name_MP_MAIN_FILE(self):        from celery.utils import imports as _imports        _imports.MP_MAIN_FILE = __file__        try:            with self.Celery('xuzzy') as app:                @app.task                def foo():                    pass                assert foo.name == 'xuzzy.foo'        finally:            _imports.MP_MAIN_FILE = None    def test_annotate_decorator(self):        from celery.app.task import Task        class adX(Task):            def run(self, y, z, x):                return y, z, x        check = Mock()        def deco(fun):            def _inner(*args, **kwargs):                check(*args, **kwargs)                return fun(*args, **kwargs)            return _inner        self.app.conf.task_annotations = {            adX.name: {'@__call__': deco}        }        adX.bind(self.app)        assert adX.app is self.app        i = adX()        i(2, 4, x=3)        check.assert_called_with(i, 2, 4, x=3)        i.annotate()        i.annotate()    def test_apply_async_has__self__(self):        @self.app.task(__self__='hello', shared=False)        def aawsX(x, y):            pass        with pytest.raises(TypeError):            aawsX.apply_async(())        with pytest.raises(TypeError):            aawsX.apply_async((2,))        with patch('celery.app.amqp.AMQP.create_task_message') as create:            with patch('celery.app.amqp.AMQP.send_task_message') as send:                create.return_value = Mock(), Mock(), Mock(), Mock()                aawsX.apply_async((4, 5))                args = create.call_args[0][2]                assert args, ('hello', 4 == 5)                send.assert_called()    def test_apply_async_adds_children(self):        from celery._state import _task_stack        @self.app.task(bind=True, shared=False)        def a3cX1(self):            pass        @self.app.task(bind=True, shared=False)        def a3cX2(self):            pass        _task_stack.push(a3cX1)        try:            a3cX1.push_request(called_directly=False)            try:                res = a3cX2.apply_async(add_to_parent=True)                assert res in a3cX1.request.children            finally:                a3cX1.pop_request()        finally:            _task_stack.pop()    def test_pickle_app(self):        changes = {'THE_FOO_BAR': 'bars',                   'THE_MII_MAR': 'jars'}        self.app.conf.update(changes)        saved = pickle.dumps(self.app)        assert len(saved) < 2048        restored = pickle.loads(saved)        for key, value in items(changes):            assert restored.conf[key] == value    def test_worker_main(self):        from celery.bin import worker as worker_bin        class worker(worker_bin.worker):            def execute_from_commandline(self, argv):                return argv        prev, worker_bin.worker = worker_bin.worker, worker        try:            ret = self.app.worker_main(argv=['--version'])            assert ret == ['--version']        finally:            worker_bin.worker = prev    def test_config_from_envvar(self):        os.environ['CELERYTEST_CONFIG_OBJECT'] = 't.unit.app.test_app'        self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT')        assert self.app.conf.THIS_IS_A_KEY == 'this is a value'    def assert_config2(self):        assert self.app.conf.LEAVE_FOR_WORK        assert self.app.conf.MOMENT_TO_STOP        assert self.app.conf.CALL_ME_BACK == 123456789        assert not self.app.conf.WANT_ME_TO        assert self.app.conf.UNDERSTAND_ME    def test_config_from_object__lazy(self):        conf = ObjectConfig2()        self.app.config_from_object(conf)        assert self.app.loader._conf is unconfigured        assert self.app._config_source is conf        self.assert_config2()    def test_config_from_object__force(self):        self.app.config_from_object(ObjectConfig2(), force=True)        assert self.app.loader._conf        self.assert_config2()    def test_config_from_object__compat(self):        class Config(object):            CELERY_ALWAYS_EAGER = 44            CELERY_DEFAULT_DELIVERY_MODE = 30            CELERY_TASK_PUBLISH_RETRY = False        self.app.config_from_object(Config)        assert self.app.conf.task_always_eager == 44        assert self.app.conf.CELERY_ALWAYS_EAGER == 44        assert not self.app.conf.task_publish_retry        assert self.app.conf.task_default_routing_key == 'testcelery'    def test_config_from_object__supports_old_names(self):        class Config(object):            task_always_eager = 45            task_default_delivery_mode = 301        self.app.config_from_object(Config())        assert self.app.conf.CELERY_ALWAYS_EAGER == 45        assert self.app.conf.task_always_eager == 45        assert self.app.conf.CELERY_DEFAULT_DELIVERY_MODE == 301        assert self.app.conf.task_default_delivery_mode == 301        assert self.app.conf.task_default_routing_key == 'testcelery'    def test_config_from_object__namespace_uppercase(self):        class Config(object):            CELERY_TASK_ALWAYS_EAGER = 44            CELERY_TASK_DEFAULT_DELIVERY_MODE = 301        self.app.config_from_object(Config(), namespace='CELERY')        assert self.app.conf.task_always_eager == 44    def test_config_from_object__namespace_lowercase(self):        class Config(object):            celery_task_always_eager = 44            celery_task_default_delivery_mode = 301        self.app.config_from_object(Config(), namespace='celery')        assert self.app.conf.task_always_eager == 44    def test_config_from_object__mixing_new_and_old(self):        class Config(object):            task_always_eager = 44            worker_agent = 'foo:Agent'            worker_consumer = 'foo:Consumer'            beat_schedule = '/foo/schedule'            CELERY_DEFAULT_DELIVERY_MODE = 301        with pytest.raises(ImproperlyConfigured) as exc:            self.app.config_from_object(Config(), force=True)            assert exc.args[0].startswith('CELERY_DEFAULT_DELIVERY_MODE')            assert 'task_default_delivery_mode' in exc.args[0]    def test_config_from_object__mixing_old_and_new(self):        class Config(object):            CELERY_ALWAYS_EAGER = 46            CELERYD_AGENT = 'foo:Agent'            CELERYD_CONSUMER = 'foo:Consumer'            CELERYBEAT_SCHEDULE = '/foo/schedule'            task_default_delivery_mode = 301        with pytest.raises(ImproperlyConfigured) as exc:            self.app.config_from_object(Config(), force=True)            assert exc.args[0].startswith('task_default_delivery_mode')            assert 'CELERY_DEFAULT_DELIVERY_MODE' in exc.args[0]    def test_config_from_cmdline(self):        cmdline = ['task_always_eager=no',                   'result_backend=/dev/null',                   'worker_prefetch_multiplier=368',                   '.foobarstring=(string)300',                   '.foobarint=(int)300',                   'database_engine_options=(dict){"foo": "bar"}']        self.app.config_from_cmdline(cmdline, namespace='worker')        assert not self.app.conf.task_always_eager        assert self.app.conf.result_backend == '/dev/null'        assert self.app.conf.worker_prefetch_multiplier == 368        assert self.app.conf.worker_foobarstring == '300'        assert self.app.conf.worker_foobarint == 300        assert self.app.conf.database_engine_options == {'foo': 'bar'}    def test_setting__broker_transport_options(self):        _args = {'foo': 'bar', 'spam': 'baz'}        self.app.config_from_object(Bunch())        assert self.app.conf.broker_transport_options == \            {'polling_interval': 0.1}        self.app.config_from_object(Bunch(broker_transport_options=_args))        assert self.app.conf.broker_transport_options == _args    def test_Windows_log_color_disabled(self):        self.app.IS_WINDOWS = True        assert not self.app.log.supports_color(True)    def test_WorkController(self):        x = self.app.WorkController        assert x.app is self.app    def test_Worker(self):        x = self.app.Worker        assert x.app is self.app    @pytest.mark.usefixtures('depends_on_current_app')    def test_AsyncResult(self):        x = self.app.AsyncResult('1')        assert x.app is self.app        r = loads(dumps(x))        # not set as current, so ends up as default app after reduce        assert r.app is current_app._get_current_object()    def test_get_active_apps(self):        assert list(_state._get_active_apps())        app1 = self.Celery()        appid = id(app1)        assert app1 in _state._get_active_apps()        app1.close()        del(app1)        gc.collect()        # weakref removed from list when app goes out of scope.        with pytest.raises(StopIteration):            next(app for app in _state._get_active_apps() if id(app) == appid)    def test_config_from_envvar_more(self, key='CELERY_HARNESS_CFG1'):        assert not self.app.config_from_envvar(            'HDSAJIHWIQHEWQU', force=True, silent=True)        with pytest.raises(ImproperlyConfigured):            self.app.config_from_envvar(                'HDSAJIHWIQHEWQU', force=True, silent=False,            )        os.environ[key] = __name__ + '.object_config'        assert self.app.config_from_envvar(key, force=True)        assert self.app.conf['FOO'] == 1        assert self.app.conf['BAR'] == 2        os.environ[key] = 'unknown_asdwqe.asdwqewqe'        with pytest.raises(ImportError):            self.app.config_from_envvar(key, silent=False)        assert not self.app.config_from_envvar(key, force=True, silent=True)        os.environ[key] = __name__ + '.dict_config'        assert self.app.config_from_envvar(key, force=True)        assert self.app.conf['FOO'] == 10        assert self.app.conf['BAR'] == 20    @patch('celery.bin.celery.CeleryCommand.execute_from_commandline')    def test_start(self, execute):        self.app.start()        execute.assert_called()    @pytest.mark.parametrize('url,expected_fields', [        ('pyamqp://', {            'hostname': 'localhost',            'userid': 'guest',            'password': 'guest',            'virtual_host': '/',        }),        ('pyamqp://:1978/foo', {            'port': 1978,            'virtual_host': 'foo',        }),        ('pyamqp:////value', {            'virtual_host': '/value',        })    ])    def test_amqp_get_broker_info(self, url, expected_fields):        info = self.app.connection(url).info()        for key, expected_value in items(expected_fields):            assert info[key] == expected_value    def test_amqp_failover_strategy_selection(self):        # Test passing in a string and make sure the string        # gets there untouched        self.app.conf.broker_failover_strategy = 'foo-bar'        assert self.app.connection('amqp:////value') \                       .failover_strategy == 'foo-bar'        # Try passing in None        self.app.conf.broker_failover_strategy = None        assert self.app.connection('amqp:////value') \                       .failover_strategy == itertools.cycle        # Test passing in a method        def my_failover_strategy(it):            yield True        self.app.conf.broker_failover_strategy = my_failover_strategy        assert self.app.connection('amqp:////value') \                       .failover_strategy == my_failover_strategy    def test_amqp_heartbeat_settings(self):        # Test default broker_heartbeat value        assert self.app.connection('amqp:////value') \                   .heartbeat == 0        # Test passing heartbeat through app configuration        self.app.conf.broker_heartbeat = 60        assert self.app.connection('amqp:////value') \                   .heartbeat == 60        # Test passing heartbeat as connection argument        assert self.app.connection('amqp:////value', heartbeat=30) \                   .heartbeat == 30    def test_after_fork(self):        self.app._pool = Mock()        self.app.on_after_fork = Mock(name='on_after_fork')        self.app._after_fork()        assert self.app._pool is None        self.app.on_after_fork.send.assert_called_with(sender=self.app)        self.app._after_fork()    def test_global_after_fork(self):        self.app._after_fork = Mock(name='_after_fork')        _appbase._after_fork_cleanup_app(self.app)        self.app._after_fork.assert_called_with()    @patch('celery.app.base.logger')    def test_after_fork_cleanup_app__raises(self, logger):        self.app._after_fork = Mock(name='_after_fork')        exc = self.app._after_fork.side_effect = KeyError()        _appbase._after_fork_cleanup_app(self.app)        logger.info.assert_called_with(            'after forker raised exception: %r', exc, exc_info=1)    def test_ensure_after_fork__no_multiprocessing(self):        prev, _appbase.register_after_fork = (            _appbase.register_after_fork, None)        try:            self.app._after_fork_registered = False            self.app._ensure_after_fork()            assert self.app._after_fork_registered        finally:            _appbase.register_after_fork = prev    def test_canvas(self):        assert self.app._canvas.Signature    def test_signature(self):        sig = self.app.signature('foo', (1, 2))        assert sig.app is self.app    def test_timezone__none_set(self):        self.app.conf.timezone = None        tz = self.app.timezone        assert tz == timezone.get_timezone('UTC')    def test_uses_utc_timezone(self):        self.app.conf.timezone = None        assert self.app.uses_utc_timezone() is True        self.app.conf.timezone = 'US/Eastern'        assert self.app.uses_utc_timezone() is False        self.app.conf.timezone = 'UTC'        assert self.app.uses_utc_timezone() is True    def test_compat_on_configure(self):        _on_configure = Mock(name='on_configure')        class CompatApp(Celery):            def on_configure(self, *args, **kwargs):                # on pypy3 if named on_configure the class function                # will be called, instead of the mock defined above,                # so we add the underscore.                _on_configure(*args, **kwargs)        with CompatApp(set_as_current=False) as app:            app.loader = Mock()            app.loader.conf = {}            app._load_config()            _on_configure.assert_called_with()    def test_add_periodic_task(self):        @self.app.task        def add(x, y):            pass        assert not self.app.configured        self.app.add_periodic_task(            10, self.app.signature('add', (2, 2)),            name='add1', expires=3,        )        assert self.app._pending_periodic_tasks        assert not self.app.configured        sig2 = add.s(4, 4)        assert self.app.configured        self.app.add_periodic_task(20, sig2, name='add2', expires=4)        assert 'add1' in self.app.conf.beat_schedule        assert 'add2' in self.app.conf.beat_schedule    def test_pool_no_multiprocessing(self):        with mock.mask_modules('multiprocessing.util'):            pool = self.app.pool            assert pool is self.app._pool    def test_bugreport(self):        assert self.app.bugreport()    def test_send_task__connection_provided(self):        connection = Mock(name='connection')        router = Mock(name='router')        router.route.return_value = {}        self.app.amqp = Mock(name='amqp')        self.app.amqp.Producer.attach_mock(ContextMock(), 'return_value')        self.app.send_task('foo', (1, 2), connection=connection, router=router)        self.app.amqp.Producer.assert_called_with(            connection, auto_declare=False)        self.app.amqp.send_task_message.assert_called_with(            self.app.amqp.Producer(), 'foo',            self.app.amqp.create_task_message())    def test_send_task_sent_event(self):        class Dispatcher(object):            sent = []            def publish(self, type, fields, *args, **kwargs):                self.sent.append((type, fields))        conn = self.app.connection()        chan = conn.channel()        try:            for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'):                chan.exchange_declare(e, 'direct', durable=True)                chan.queue_declare(e, durable=True)                chan.queue_bind(e, e, e)        finally:            chan.close()        assert conn.transport_cls == 'memory'        message = self.app.amqp.create_task_message(            'id', 'footask', (), {}, create_sent_event=True,        )        prod = self.app.amqp.Producer(conn)        dispatcher = Dispatcher()        self.app.amqp.send_task_message(            prod, 'footask', message,            exchange='moo_exchange', routing_key='moo_exchange',            event_dispatcher=dispatcher,        )        assert dispatcher.sent        assert dispatcher.sent[0][0] == 'task-sent'        self.app.amqp.send_task_message(            prod, 'footask', message, event_dispatcher=dispatcher,            exchange='bar_exchange', routing_key='bar_exchange',        )    def test_select_queues(self):        self.app.amqp = Mock(name='amqp')        self.app.select_queues({'foo', 'bar'})        self.app.amqp.queues.select.assert_called_with({'foo', 'bar'})    def test_Beat(self):        from celery.apps.beat import Beat        beat = self.app.Beat()        assert isinstance(beat, Beat)    def test_registry_cls(self):        class TaskRegistry(self.app.registry_cls):            pass        class CustomCelery(type(self.app)):            registry_cls = TaskRegistry        app = CustomCelery(set_as_current=False)        assert isinstance(app.tasks, TaskRegistry)class test_defaults:    def test_strtobool(self):        for s in ('false', 'no', '0'):            assert not defaults.strtobool(s)        for s in ('true', 'yes', '1'):            assert defaults.strtobool(s)        with pytest.raises(TypeError):            defaults.strtobool('unsure')class test_debugging_utils:    def test_enable_disable_trace(self):        try:            _app.enable_trace()            assert _state.app_or_default == _state._app_or_default_trace            _app.disable_trace()            assert _state.app_or_default == _state._app_or_default        finally:            _app.disable_trace()class test_pyimplementation:    def test_platform_python_implementation(self):        with mock.platform_pyimp(lambda: 'Xython'):            assert pyimplementation() == 'Xython'    def test_platform_jython(self):        with mock.platform_pyimp():            with mock.sys_platform('java 1.6.51'):                assert 'Jython' in pyimplementation()    def test_platform_pypy(self):        with mock.platform_pyimp():            with mock.sys_platform('darwin'):                with mock.pypy_version((1, 4, 3)):                    assert 'PyPy' in pyimplementation()                with mock.pypy_version((1, 4, 3, 'a4')):                    assert 'PyPy' in pyimplementation()    def test_platform_fallback(self):        with mock.platform_pyimp():            with mock.sys_platform('darwin'):                with mock.pypy_version():                    assert 'CPython' == pyimplementation()class test_shared_task:    def test_registers_to_all_apps(self):        with self.Celery('xproj', set_as_current=True) as xproj:            xproj.finalize()            @shared_task            def foo():                return 42            @shared_task()            def bar():                return 84            assert foo.app is xproj            assert bar.app is xproj            assert foo._get_current_object()            with self.Celery('yproj', set_as_current=True) as yproj:                assert foo.app is yproj                assert bar.app is yproj                @shared_task()                def baz():                    return 168                assert baz.app is yproj
 |