test_tasks.py 470 B

12345678910111213141516171819
  1. from celery import group
  2. from .conftest import flaky
  3. from .tasks import print_unicode, sleeping
  4. class test_tasks:
  5. @flaky
  6. def test_task_accepted(self, manager, sleep=1):
  7. r1 = sleeping.delay(sleep)
  8. sleeping.delay(sleep)
  9. manager.assert_accepted([r1.id])
  10. @flaky
  11. def test_unicode_task(self, manager):
  12. manager.join(
  13. group(print_unicode.s() for _ in range(5))(),
  14. timeout=10, propagate=True,
  15. )