| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519 | from __future__ import absolute_importfrom __future__ import with_statementimport osfrom mock import Mock, patchfrom pickle import loads, dumpsfrom kombu import Exchangefrom celery import Celeryfrom celery import app as _appfrom celery import _statefrom celery.app import defaultsfrom celery.loaders.base import BaseLoaderfrom celery.platforms import pyimplementationfrom celery.utils.serialization import picklefrom celery.tests import configfrom celery.tests.utils import (Case, mask_modules, platform_pyimp,                                sys_platform, pypy_version)from celery.utils import uuidfrom celery.utils.mail import ErrorMailTHIS_IS_A_KEY = 'this is a value'class Object(object):    def __init__(self, **kwargs):        for key, value in kwargs.items():            setattr(self, key, value)def _get_test_config():    return dict((key, getattr(config, key))                for key in dir(config)                if key.isupper() and not key.startswith('_'))test_config = _get_test_config()class test_module(Case):    def test_default_app(self):        self.assertEqual(_app.default_app, _state.default_app)    def test_bugreport(self):        self.assertTrue(_app.bugreport())class test_App(Case):    def setUp(self):        self.app = Celery(set_as_current=False)        self.app.conf.update(test_config)    def test_task(self):        app = Celery('foozibari', set_as_current=False)        def fun():            pass        fun.__module__ = '__main__'        task = app.task(fun)        self.assertEqual(task.name, app.main + '.fun')    def test_with_broker(self):        prev = os.environ.get('CELERY_BROKER_URL')        os.environ.pop('CELERY_BROKER_URL', None)        try:            app = Celery(set_as_current=False, broker='foo://baribaz')            self.assertEqual(app.conf.BROKER_HOST, 'foo://baribaz')        finally:            os.environ['CELERY_BROKER_URL'] = prev    def test_repr(self):        self.assertTrue(repr(self.app))    def test_custom_task_registry(self):        app1 = Celery(set_as_current=False)        app2 = Celery(set_as_current=False, tasks=app1.tasks)        self.assertIs(app2.tasks, app1.tasks)    def test_include_argument(self):        app = Celery(set_as_current=False, include=('foo', 'bar.foo'))        self.assertEqual(app.conf.CELERY_IMPORTS, ('foo', 'bar.foo'))    def test_set_as_current(self):        current = _state._tls.current_app        try:            app = Celery(set_as_current=True)            self.assertIs(_state._tls.current_app, app)        finally:            _state._tls.current_app = current    def test_current_task(self):        app = Celery(set_as_current=False)        @app.task        def foo():            pass        _state._task_stack.push(foo)        try:            self.assertEqual(app.current_task.name, foo.name)        finally:            _state._task_stack.pop()    def test_task_not_shared(self):        with patch('celery.app.base.shared_task') as shared_task:            app = Celery(set_as_current=False)            @app.task(shared=False)            def foo():                pass            self.assertFalse(shared_task.called)    def test_task_compat_with_filter(self):        app = Celery(set_as_current=False, accept_magic_kwargs=True)        check = Mock()        def filter(task):            check(task)            return task        @app.task(filter=filter)        def foo():            pass        check.assert_called_with(foo)    def test_task_with_filter(self):        app = Celery(set_as_current=False, accept_magic_kwargs=False)        check = Mock()        def filter(task):            check(task)            return task        @app.task(filter=filter)        def foo():            pass        check.assert_called_with(foo)    def test_task_sets_main_name_MP_MAIN_FILE(self):        from celery import utils as _utils        _utils.MP_MAIN_FILE = __file__        try:            app = Celery('xuzzy', set_as_current=False)            @app.task            def foo():                pass            self.assertEqual(foo.name, 'xuzzy.foo')        finally:            _utils.MP_MAIN_FILE = None    def test_base_task_inherits_magic_kwargs_from_app(self):        from celery.task import Task as OldTask        class timkX(OldTask):            abstract = True        app = Celery(set_as_current=False, accept_magic_kwargs=True)        timkX.bind(app)        # see #918        self.assertFalse(timkX.accept_magic_kwargs)        from celery import Task as NewTask        class timkY(NewTask):            abstract = True        timkY.bind(app)        self.assertFalse(timkY.accept_magic_kwargs)    def test_annotate_decorator(self):        from celery.app.task import Task        class adX(Task):            abstract = True            def run(self, y, z, x):                return y, z, x        check = Mock()        def deco(fun):            def _inner(*args, **kwargs):                check(*args, **kwargs)                return fun(*args, **kwargs)            return _inner        app = Celery(set_as_current=False)        app.conf.CELERY_ANNOTATIONS = {            adX.name: {'@__call__': deco}        }        adX.bind(app)        self.assertIs(adX.app, app)        i = adX()        i(2, 4, x=3)        check.assert_called_with(i, 2, 4, x=3)        i.annotate()        i.annotate()    def test_apply_async_has__self__(self):        app = Celery(set_as_current=False)        @app.task(__self__='hello')        def aawsX():            pass        with patch('celery.app.amqp.TaskProducer.publish_task') as dt:            aawsX.apply_async((4, 5))            args = dt.call_args[0][1]            self.assertEqual(args, ('hello', 4, 5))    def test_apply_async__connection_arg(self):        app = Celery(set_as_current=False)        @app.task()        def aacaX():            pass        connection = app.connection('asd://')        with self.assertRaises(KeyError):            aacaX.apply_async(connection=connection)    def test_apply_async_adds_children(self):        from celery._state import _task_stack        app = Celery(set_as_current=False)        @app.task()        def a3cX1(self):            pass        @app.task()        def a3cX2(self):            pass        _task_stack.push(a3cX1)        try:            a3cX1.push_request(called_directly=False)            try:                res = a3cX2.apply_async(add_to_parent=True)                self.assertIn(res, a3cX1.request.children)            finally:                a3cX1.pop_request()        finally:            _task_stack.pop()    def test_TaskSet(self):        ts = self.app.TaskSet()        self.assertListEqual(ts.tasks, [])        self.assertIs(ts.app, self.app)    def test_pickle_app(self):        changes = dict(THE_FOO_BAR='bars',                       THE_MII_MAR='jars')        self.app.conf.update(changes)        saved = pickle.dumps(self.app)        self.assertLess(len(saved), 2048)        restored = pickle.loads(saved)        self.assertDictContainsSubset(changes, restored.conf)    def test_worker_main(self):        from celery.bin import celeryd        class WorkerCommand(celeryd.WorkerCommand):            def execute_from_commandline(self, argv):                return argv        prev, celeryd.WorkerCommand = celeryd.WorkerCommand, WorkerCommand        try:            ret = self.app.worker_main(argv=['--version'])            self.assertListEqual(ret, ['--version'])        finally:            celeryd.WorkerCommand = prev    def test_config_from_envvar(self):        os.environ['CELERYTEST_CONFIG_OBJECT'] = 'celery.tests.app.test_app'        self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT')        self.assertEqual(self.app.conf.THIS_IS_A_KEY, 'this is a value')    def test_config_from_object(self):        class Object(object):            LEAVE_FOR_WORK = True            MOMENT_TO_STOP = True            CALL_ME_BACK = 123456789            WANT_ME_TO = False            UNDERSTAND_ME = True        self.app.config_from_object(Object())        self.assertTrue(self.app.conf.LEAVE_FOR_WORK)        self.assertTrue(self.app.conf.MOMENT_TO_STOP)        self.assertEqual(self.app.conf.CALL_ME_BACK, 123456789)        self.assertFalse(self.app.conf.WANT_ME_TO)        self.assertTrue(self.app.conf.UNDERSTAND_ME)    def test_config_from_cmdline(self):        cmdline = ['.always_eager=no',                   '.result_backend=/dev/null',                   '.task_error_whitelist=(list)["a", "b", "c"]',                   'celeryd.prefetch_multiplier=368',                   '.foobarstring=(string)300',                   '.foobarint=(int)300',                   '.result_engine_options=(dict){"foo": "bar"}']        self.app.config_from_cmdline(cmdline, namespace='celery')        self.assertFalse(self.app.conf.CELERY_ALWAYS_EAGER)        self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, '/dev/null')        self.assertEqual(self.app.conf.CELERYD_PREFETCH_MULTIPLIER, 368)        self.assertListEqual(self.app.conf.CELERY_TASK_ERROR_WHITELIST,                             ['a', 'b', 'c'])        self.assertEqual(self.app.conf.CELERY_FOOBARSTRING, '300')        self.assertEqual(self.app.conf.CELERY_FOOBARINT, 300)        self.assertDictEqual(self.app.conf.CELERY_RESULT_ENGINE_OPTIONS,                             {'foo': 'bar'})    def test_compat_setting_CELERY_BACKEND(self):        self.app.config_from_object(Object(CELERY_BACKEND='set_by_us'))        self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, 'set_by_us')    def test_setting_BROKER_TRANSPORT_OPTIONS(self):        _args = {'foo': 'bar', 'spam': 'baz'}        self.app.config_from_object(Object())        self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, {})        self.app.config_from_object(Object(BROKER_TRANSPORT_OPTIONS=_args))        self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, _args)    def test_Windows_log_color_disabled(self):        self.app.IS_WINDOWS = True        self.assertFalse(self.app.log.supports_color(True))    def test_compat_setting_CARROT_BACKEND(self):        self.app.config_from_object(Object(CARROT_BACKEND='set_by_us'))        self.assertEqual(self.app.conf.BROKER_TRANSPORT, 'set_by_us')    def test_WorkController(self):        x = self.app.WorkController        self.assertIs(x.app, self.app)    def test_Worker(self):        x = self.app.Worker        self.assertIs(x.app, self.app)    def test_AsyncResult(self):        x = self.app.AsyncResult('1')        self.assertIs(x.app, self.app)        r = loads(dumps(x))        # not set as current, so ends up as default app after reduce        self.assertIs(r.app, _state.default_app)    @patch('celery.bin.celery.CeleryCommand.execute_from_commandline')    def test_start(self, execute):        self.app.start()        self.assertTrue(execute.called)    def test_mail_admins(self):        class Loader(BaseLoader):            def mail_admins(*args, **kwargs):                return args, kwargs        self.app.loader = Loader()        self.app.conf.ADMINS = None        self.assertFalse(self.app.mail_admins('Subject', 'Body'))        self.app.conf.ADMINS = [('George Costanza', 'george@vandelay.com')]        self.assertTrue(self.app.mail_admins('Subject', 'Body'))    def test_amqp_get_broker_info(self):        self.assertDictContainsSubset(            {'hostname': 'localhost',             'userid': 'guest',             'password': 'guest',             'virtual_host': '/'},            self.app.connection('amqp://').info(),        )        self.app.conf.BROKER_PORT = 1978        self.app.conf.BROKER_VHOST = 'foo'        self.assertDictContainsSubset(            {'port': 1978, 'virtual_host': 'foo'},            self.app.connection('amqp://:1978/foo').info(),        )        conn = self.app.connection('amqp:////value')        self.assertDictContainsSubset({'virtual_host': '/value'},                                      conn.info())    def test_BROKER_BACKEND_alias(self):        self.assertEqual(self.app.conf.BROKER_BACKEND,                         self.app.conf.BROKER_TRANSPORT)    def test_with_default_connection(self):        @self.app.with_default_connection        def handler(connection=None, foo=None):            return connection, foo        connection, foo = handler(foo=42)        self.assertEqual(foo, 42)        self.assertTrue(connection)    def test_after_fork(self):        p = self.app._pool = Mock()        self.app._after_fork(self.app)        p.force_close_all.assert_called_with()        self.assertIsNone(self.app._pool)        self.app._after_fork(self.app)    def test_pool_no_multiprocessing(self):        with mask_modules('multiprocessing.util'):            pool = self.app.pool            self.assertIs(pool, self.app._pool)    def test_bugreport(self):        self.assertTrue(self.app.bugreport())    def test_send_task_sent_event(self):        class Dispatcher(object):            sent = []            def publish(self, type, fields, *args, **kwargs):                self.sent.append((type, fields))        conn = self.app.connection()        chan = conn.channel()        try:            for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'):                chan.exchange_declare(e, 'direct', durable=True)                chan.queue_declare(e, durable=True)                chan.queue_bind(e, e, e)        finally:            chan.close()        assert conn.transport_cls == 'memory'        prod = self.app.amqp.TaskProducer(            conn, exchange=Exchange('foo_exchange'),            send_sent_event=True,        )        dispatcher = Dispatcher()        self.assertTrue(prod.publish_task('footask', (), {},                                          exchange='moo_exchange',                                          routing_key='moo_exchange',                                          event_dispatcher=dispatcher))        self.assertTrue(dispatcher.sent)        self.assertEqual(dispatcher.sent[0][0], 'task-sent')        self.assertTrue(prod.publish_task('footask', (), {},                                          event_dispatcher=dispatcher,                                          exchange='bar_exchange',                                          routing_key='bar_exchange'))    def test_error_mail_sender(self):        x = ErrorMail.subject % {'name': 'task_name',                                 'id': uuid(),                                 'exc': 'FOOBARBAZ',                                 'hostname': 'lana'}        self.assertTrue(x)class test_defaults(Case):    def test_str_to_bool(self):        for s in ('false', 'no', '0'):            self.assertFalse(defaults.strtobool(s))        for s in ('true', 'yes', '1'):            self.assertTrue(defaults.strtobool(s))        with self.assertRaises(TypeError):            defaults.strtobool('unsure')class test_debugging_utils(Case):    def test_enable_disable_trace(self):        try:            _app.enable_trace()            self.assertEqual(_app.app_or_default, _app._app_or_default_trace)            _app.disable_trace()            self.assertEqual(_app.app_or_default, _app._app_or_default)        finally:            _app.disable_trace()class test_pyimplementation(Case):    def test_platform_python_implementation(self):        with platform_pyimp(lambda: 'Xython'):            self.assertEqual(pyimplementation(), 'Xython')    def test_platform_jython(self):        with platform_pyimp():            with sys_platform('java 1.6.51'):                self.assertIn('Jython', pyimplementation())    def test_platform_pypy(self):        with platform_pyimp():            with sys_platform('darwin'):                with pypy_version((1, 4, 3)):                    self.assertIn('PyPy', pyimplementation())                with pypy_version((1, 4, 3, 'a4')):                    self.assertIn('PyPy', pyimplementation())    def test_platform_fallback(self):        with platform_pyimp():            with sys_platform('darwin'):                with pypy_version():                    self.assertEqual('CPython', pyimplementation())
 |