conftest.py 878 B

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. import os
  2. import pytest
  3. from celery.contrib.testing.manager import Manager
  4. TEST_BROKER = os.environ.get('TEST_BROKER', 'pyamqp://')
  5. TEST_BACKEND = os.environ.get('TEST_BACKEND', 'redis://')
  6. @pytest.fixture(scope='session')
  7. def celery_config():
  8. assert TEST_BACKEND == 'rpc'
  9. return {
  10. 'broker_url': TEST_BROKER,
  11. 'result_backend': TEST_BACKEND
  12. }
  13. @pytest.fixture(scope='session')
  14. def celery_enable_logging():
  15. return True
  16. @pytest.fixture(scope='session')
  17. def celery_worker_pool():
  18. return 'prefork'
  19. @pytest.fixture(scope='session')
  20. def celery_includes():
  21. return {'t.integration.tasks'}
  22. @pytest.fixture
  23. def app(celery_app):
  24. yield celery_app
  25. @pytest.fixture
  26. def manager(app, celery_session_worker):
  27. return Manager(app)
  28. @pytest.fixture(autouse=True)
  29. def ZZZZ_set_app_current(app):
  30. app.set_current()
  31. app.set_default()