| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 | from __future__ import absolute_import, unicode_literalsimport osfrom functools import wrapsimport pytestfrom celery.contrib.testing.manager import ManagerTEST_BROKER = os.environ.get('TEST_BROKER', 'pyamqp://')TEST_BACKEND = os.environ.get('TEST_BACKEND', 'redis://')def flaky(fun):    @wraps(fun)    def _inner(*args, **kwargs):        for i in reversed(range(3)):            try:                return fun(*args, **kwargs)            except Exception:                if not i:                    raise    _inner.__wrapped__ = fun    return _inner@pytest.fixture(scope='session')def celery_config():    return {        'broker_url': TEST_BROKER,        'result_backend': TEST_BACKEND    }@pytest.fixture(scope='session')def celery_enable_logging():    return True@pytest.fixture(scope='session')def celery_worker_pool():    return 'prefork'@pytest.fixture(scope='session')def celery_includes():    return {'t.integration.tasks'}@pytest.fixturedef app(celery_app):    yield celery_app@pytest.fixturedef manager(app, celery_session_worker):    return Manager(app)@pytest.fixture(autouse=True)def ZZZZ_set_app_current(app):    app.set_current()    app.set_default()
 |