| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 | 
							- from __future__ import absolute_import, unicode_literals
 
- import os
 
- import pytest
 
- from functools import wraps
 
- from celery.contrib.testing.manager import Manager
 
- TEST_BROKER = os.environ.get('TEST_BROKER', 'pyamqp://')
 
- TEST_BACKEND = os.environ.get('TEST_BACKEND', 'redis://')
 
- def flaky(fun):
 
-     @wraps(fun)
 
-     def _inner(*args, **kwargs):
 
-         for i in reversed(range(3)):
 
-             try:
 
-                 return fun(*args, **kwargs)
 
-             except Exception:
 
-                 if not i:
 
-                     raise
 
-     _inner.__wrapped__ = fun
 
-     return _inner
 
- @pytest.fixture(scope='session')
 
- def celery_config():
 
-     return {
 
-         'broker_url': TEST_BROKER,
 
-         'result_backend': TEST_BACKEND
 
-     }
 
- @pytest.fixture(scope='session')
 
- def celery_enable_logging():
 
-     return True
 
- @pytest.fixture(scope='session')
 
- def celery_worker_pool():
 
-     return 'prefork'
 
- @pytest.fixture(scope='session')
 
- def celery_includes():
 
-     return {'t.integration.tasks'}
 
- @pytest.fixture
 
- def app(celery_app):
 
-     yield celery_app
 
- @pytest.fixture
 
- def manager(app, celery_session_worker):
 
-     return Manager(app)
 
- @pytest.fixture(autouse=True)
 
- def ZZZZ_set_app_current(app):
 
-     app.set_current()
 
-     app.set_default()
 
 
  |