from __future__ import absolute_import, unicode_literals import pytest from celery.contrib.testing.manager import Manager @pytest.fixture(scope='session') def celery_config(): return { 'broker_url': 'pyamqp://', 'result_backend': 'rpc', } @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_worker_pool(): return 'prefork' @pytest.fixture(scope='session') def celery_includes(): return {'t.integration.tasks'} @pytest.fixture def app(celery_app): yield celery_app @pytest.fixture def manager(app, celery_session_worker): return Manager(app) @pytest.fixture(autouse=True) def ZZZZ_set_app_current(app): app.set_current() app.set_default()