| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 | from __future__ import absolute_import, unicode_literalsimport pytestfrom cyanide.suite import ManagerMixindef _celerymark(app, redis_results=None, **kwargs):    if redis_results and not app.conf.result_backend.startswith('redis'):        pytest.skip('Test needs Redis result backend.')@pytest.fixturedef app(request):    from .app import app    app.finalize()    app.set_current()    mark = request.node.get_marker('celery')    mark = mark and mark.kwargs or {}    _celerymark(app, **mark)    yield app@pytest.fixturedef manager(app):    with CeleryManager(app) as manager:        yield managerclass CeleryManager(ManagerMixin):    # we don't stop full suite when a task result is missing.    TaskPredicate = AssertionError    def __init__(self, app, no_join=False, **kwargs):        self.app = app        self.no_join = no_join        self._init_manager(app, **kwargs)    def __enter__(self):        return self    def __exit__(self, *exc_info):        self.close()    def close(self):        pass
 |