conftest.py 845 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import os
  2. import pytest
  3. from celery.contrib.testing.manager import Manager
  4. TEST_BROKER = os.environ.get('TEST_BROKER', 'pyamqp://')
  5. TEST_BACKEND = os.environ.get('TEST_BACKEND', 'redis://')
  6. @pytest.fixture(scope='session')
  7. def celery_config():
  8. return {
  9. 'broker_url': TEST_BROKER,
  10. 'result_backend': TEST_BACKEND
  11. }
  12. @pytest.fixture(scope='session')
  13. def celery_enable_logging():
  14. return True
  15. @pytest.fixture(scope='session')
  16. def celery_worker_pool():
  17. return 'prefork'
  18. @pytest.fixture(scope='session')
  19. def celery_includes():
  20. return {'t.integration.tasks'}
  21. @pytest.fixture
  22. def app(celery_app):
  23. yield celery_app
  24. @pytest.fixture
  25. def manager(app, celery_session_worker):
  26. return Manager(app)
  27. @pytest.fixture(autouse=True)
  28. def ZZZZ_set_app_current(app):
  29. app.set_current()
  30. app.set_default()