conftest.py 713 B

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import pytest
  2. from celery.contrib.testing.manager import Manager
  3. @pytest.fixture(scope='session')
  4. def celery_config():
  5. return {
  6. 'broker_url': 'pyamqp://',
  7. 'result_backend': 'rpc',
  8. }
  9. @pytest.fixture(scope='session')
  10. def celery_enable_logging():
  11. return True
  12. @pytest.fixture(scope='session')
  13. def celery_worker_pool():
  14. return 'prefork'
  15. @pytest.fixture(scope='session')
  16. def celery_includes():
  17. return {'t.integration.tasks'}
  18. @pytest.fixture
  19. def app(celery_app):
  20. yield celery_app
  21. @pytest.fixture
  22. def manager(app, celery_session_worker):
  23. return Manager(app)
  24. @pytest.fixture(autouse=True)
  25. def ZZZZ_set_app_current(app):
  26. app.set_current()
  27. app.set_default()