test_tasks.py 675 B

123456789101112131415161718192021222324252627
  1. from __future__ import absolute_import, unicode_literals
  2. from celery import group
  3. from .conftest import flaky
  4. from .tasks import print_unicode, retry_once, sleeping
  5. class test_tasks:
  6. @flaky
  7. def test_task_accepted(self, manager, sleep=1):
  8. r1 = sleeping.delay(sleep)
  9. sleeping.delay(sleep)
  10. manager.assert_accepted([r1.id])
  11. @flaky
  12. def test_task_retried(self):
  13. res = retry_once.delay()
  14. assert res.get(timeout=10) == 1 # retried once
  15. @flaky
  16. def test_unicode_task(self, manager):
  17. manager.join(
  18. group(print_unicode.s() for _ in range(5))(),
  19. timeout=10, propagate=True,
  20. )