test_tasks.py 508 B

12345678910111213141516171819
  1. from __future__ import absolute_import, unicode_literals
  2. from flaky import flaky
  3. from celery import group
  4. from .tasks import print_unicode, sleeping
  5. @flaky
  6. class test_tasks:
  7. def test_task_accepted(self, manager, sleep=1):
  8. r1 = sleeping.delay(sleep)
  9. sleeping.delay(sleep)
  10. manager.assert_accepted([r1.id])
  11. def test_unicode_task(self, manager):
  12. manager.join(
  13. group(print_unicode.s() for _ in range(5))(),
  14. timeout=10, propagate=True,
  15. )