test_utils_info.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. from celery.tests.utils import unittest
  2. from celery import Celery
  3. from celery.utils import textindent
  4. RANDTEXT = """\
  5. The quick brown
  6. fox jumps
  7. over the
  8. lazy dog\
  9. """
  10. RANDTEXT_RES = """\
  11. The quick brown
  12. fox jumps
  13. over the
  14. lazy dog\
  15. """
  16. QUEUES = {"queue1": {
  17. "exchange": "exchange1",
  18. "exchange_type": "type1",
  19. "binding_key": "bind1"},
  20. "queue2": {
  21. "exchange": "exchange2",
  22. "exchange_type": "type2",
  23. "binding_key": "bind2"}}
  24. QUEUE_FORMAT1 = """. queue1: exchange:exchange1 (type1) binding:bind1"""
  25. QUEUE_FORMAT2 = """. queue2: exchange:exchange2 (type2) binding:bind2"""
  26. class TestInfo(unittest.TestCase):
  27. def test_textindent(self):
  28. self.assertEqual(textindent(RANDTEXT, 4), RANDTEXT_RES)
  29. def test_format_queues(self):
  30. celery = Celery(set_as_current=False)
  31. celery.amqp.queues = celery.amqp.Queues(QUEUES)
  32. self.assertEqual(sorted(celery.amqp.queues.format().split("\n")),
  33. sorted([QUEUE_FORMAT1, QUEUE_FORMAT2]))