test_tasks.py 527 B

1234567891011121314151617181920
  1. from __future__ import absolute_import, unicode_literals
  2. from celery import group
  3. from .conftest import flaky
  4. from .tasks import print_unicode, sleeping
  5. class test_tasks:
  6. @flaky
  7. def test_task_accepted(self, manager, sleep=1):
  8. r1 = sleeping.delay(sleep)
  9. sleeping.delay(sleep)
  10. manager.assert_accepted([r1.id])
  11. @flaky
  12. def test_unicode_task(self, manager):
  13. manager.join(
  14. group(print_unicode.s() for _ in range(5))(),
  15. timeout=10, propagate=True,
  16. )