test_tasks.py 483 B

1234567891011121314151617
  1. from __future__ import absolute_import, unicode_literals
  2. from celery import group
  3. from cyanide.tasks import print_unicode, sleeping
  4. class test_tasks:
  5. def test_task_accepted(self, manager, sleep=1):
  6. r1 = sleeping.delay(sleep)
  7. sleeping.delay(sleep)
  8. manager.assert_accepted([r1.id])
  9. def test_unicode_task(self, manager):
  10. manager.join(
  11. group(print_unicode.s() for _ in range(5))(),
  12. timeout=1, propagate=True,
  13. )