123456789101112131415161718192021222324252627 |
- from __future__ import absolute_import, unicode_literals
- from celery import group
- from .conftest import flaky
- from .tasks import print_unicode, retry_once, sleeping
- class test_tasks:
- @flaky
- def test_task_accepted(self, manager, sleep=1):
- r1 = sleeping.delay(sleep)
- sleeping.delay(sleep)
- manager.assert_accepted([r1.id])
- @flaky
- def test_task_retried(self):
- res = retry_once.delay()
- assert res.get(timeout=10) == 1 # retried once
- @flaky
- def test_unicode_task(self, manager):
- manager.join(
- group(print_unicode.s() for _ in range(5))(),
- timeout=10, propagate=True,
- )
|