conftest.py 904 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. from __future__ import absolute_import, unicode_literals
  2. import os
  3. import pytest
  4. from celery.contrib.testing.manager import Manager
  5. TEST_BROKER = os.environ.get('TEST_BROKER', 'pyamqp://')
  6. TEST_BACKEND = os.environ.get('TEST_BACKEND', 'redis://')
  7. @pytest.fixture(scope='session')
  8. def celery_config():
  9. return {
  10. 'broker_url': TEST_BROKER,
  11. 'result_backend': TEST_BACKEND
  12. }
  13. @pytest.fixture(scope='session')
  14. def celery_enable_logging():
  15. return True
  16. @pytest.fixture(scope='session')
  17. def celery_worker_pool():
  18. return 'prefork'
  19. @pytest.fixture(scope='session')
  20. def celery_includes():
  21. return {'t.integration.tasks'}
  22. @pytest.fixture
  23. def app(celery_app):
  24. yield celery_app
  25. @pytest.fixture
  26. def manager(app, celery_session_worker):
  27. return Manager(app)
  28. @pytest.fixture(autouse=True)
  29. def ZZZZ_set_app_current(app):
  30. app.set_current()
  31. app.set_default()