12345678910111213141516171819 |
- from celery import group
- from .conftest import flaky
- from .tasks import print_unicode, sleeping
- class test_tasks:
- @flaky
- def test_task_accepted(self, manager, sleep=1):
- r1 = sleeping.delay(sleep)
- sleeping.delay(sleep)
- manager.assert_accepted([r1.id])
- @flaky
- def test_unicode_task(self, manager):
- manager.join(
- group(print_unicode.s() for _ in range(5))(),
- timeout=10, propagate=True,
- )
|