test_tasks.py 420 B

12345678910111213141516
  1. from celery import group
  2. from .tasks import print_unicode, sleeping
  3. class test_tasks:
  4. def test_task_accepted(self, manager, sleep=1):
  5. r1 = sleeping.delay(sleep)
  6. sleeping.delay(sleep)
  7. manager.assert_accepted([r1.id])
  8. def test_unicode_task(self, manager):
  9. manager.join(
  10. group(print_unicode.s() for _ in range(5))(),
  11. timeout=10, propagate=True,
  12. )