test_tasks.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from celery import group
  4. from .conftest import flaky, get_active_redis_channels
  5. from .tasks import add, add_ignore_result, print_unicode, retry_once, sleeping
  6. class test_tasks:
  7. @flaky
  8. def test_task_accepted(self, manager, sleep=1):
  9. r1 = sleeping.delay(sleep)
  10. sleeping.delay(sleep)
  11. manager.assert_accepted([r1.id])
  12. @flaky
  13. def test_task_retried(self):
  14. res = retry_once.delay()
  15. assert res.get(timeout=10) == 1 # retried once
  16. @flaky
  17. def test_unicode_task(self, manager):
  18. manager.join(
  19. group(print_unicode.s() for _ in range(5))(),
  20. timeout=10, propagate=True,
  21. )
  22. class tests_task_redis_result_backend:
  23. def setup(self, manager):
  24. if not manager.app.conf.result_backend.startswith('redis'):
  25. raise pytest.skip('Requires redis result backend.')
  26. def test_ignoring_result_no_subscriptions(self):
  27. assert get_active_redis_channels() == []
  28. result = add_ignore_result.delay(1, 2)
  29. assert result.ignored is True
  30. assert get_active_redis_channels() == []
  31. def test_asyncresult_forget_cancels_subscription(self):
  32. result = add.delay(1, 2)
  33. assert get_active_redis_channels() == [
  34. "celery-task-meta-{}".format(result.id)
  35. ]
  36. result.forget()
  37. assert get_active_redis_channels() == []
  38. def test_asyncresult_get_cancels_subscription(self):
  39. result = add.delay(1, 2)
  40. assert get_active_redis_channels() == [
  41. "celery-task-meta-{}".format(result.id)
  42. ]
  43. assert result.get(timeout=3) == 3
  44. assert get_active_redis_channels() == []