conftest.py 772 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from celery.contrib.testing.manager import Manager
  4. @pytest.fixture(scope='session')
  5. def celery_config():
  6. return {
  7. 'broker_url': 'pyamqp://',
  8. 'result_backend': 'rpc',
  9. }
  10. @pytest.fixture(scope='session')
  11. def celery_enable_logging():
  12. return True
  13. @pytest.fixture(scope='session')
  14. def celery_worker_pool():
  15. return 'prefork'
  16. @pytest.fixture(scope='session')
  17. def celery_includes():
  18. return {'t.integration.tasks'}
  19. @pytest.fixture
  20. def app(celery_app):
  21. yield celery_app
  22. @pytest.fixture
  23. def manager(app, celery_session_worker):
  24. return Manager(app)
  25. @pytest.fixture(autouse=True)
  26. def ZZZZ_set_app_current(app):
  27. app.set_current()
  28. app.set_default()