test_amqp.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import io
  2. import pytest
  3. from case import Mock, patch
  4. from celery.bin.amqp import (
  5. AMQPAdmin,
  6. AMQShell,
  7. dump_message,
  8. amqp,
  9. main,
  10. )
  11. class test_AMQShell:
  12. def setup(self):
  13. self.fh = io.StringIO()
  14. self.adm = self.create_adm()
  15. self.shell = AMQShell(connect=self.adm.connect, out=self.fh)
  16. def create_adm(self, *args, **kwargs):
  17. return AMQPAdmin(app=self.app, out=self.fh, *args, **kwargs)
  18. def test_queue_declare(self):
  19. self.shell.onecmd('queue.declare foo')
  20. assert 'ok' in self.fh.getvalue()
  21. def test_missing_command(self):
  22. self.shell.onecmd('foo foo')
  23. assert 'unknown syntax' in self.fh.getvalue()
  24. def RV(self):
  25. raise Exception(self.fh.getvalue())
  26. def test_spec_format_response(self):
  27. spec = self.shell.amqp['exchange.declare']
  28. assert spec.format_response(None) == 'ok.'
  29. assert spec.format_response('NO') == 'NO'
  30. def test_missing_namespace(self):
  31. self.shell.onecmd('ns.cmd arg')
  32. assert 'unknown syntax' in self.fh.getvalue()
  33. def test_help(self):
  34. self.shell.onecmd('help')
  35. assert 'Example:' in self.fh.getvalue()
  36. def test_help_command(self):
  37. self.shell.onecmd('help queue.declare')
  38. assert 'passive:no' in self.fh.getvalue()
  39. def test_help_unknown_command(self):
  40. self.shell.onecmd('help foo.baz')
  41. assert 'unknown syntax' in self.fh.getvalue()
  42. def test_onecmd_error(self):
  43. self.shell.dispatch = Mock()
  44. self.shell.dispatch.side_effect = MemoryError()
  45. self.shell.say = Mock()
  46. assert not self.shell.needs_reconnect
  47. self.shell.onecmd('hello')
  48. self.shell.say.assert_called()
  49. assert self.shell.needs_reconnect
  50. def test_exit(self):
  51. with pytest.raises(SystemExit):
  52. self.shell.onecmd('exit')
  53. assert "don't leave!" in self.fh.getvalue()
  54. def test_note_silent(self):
  55. self.shell.silent = True
  56. self.shell.note('foo bar')
  57. assert 'foo bar' not in self.fh.getvalue()
  58. def test_reconnect(self):
  59. self.shell.onecmd('queue.declare foo')
  60. self.shell.needs_reconnect = True
  61. self.shell.onecmd('queue.delete foo')
  62. def test_completenames(self):
  63. assert self.shell.completenames('queue.dec') == ['queue.declare']
  64. assert (sorted(self.shell.completenames('declare')) ==
  65. sorted(['queue.declare', 'exchange.declare']))
  66. def test_empty_line(self):
  67. self.shell.emptyline = Mock()
  68. self.shell.default = Mock()
  69. self.shell.onecmd('')
  70. self.shell.emptyline.assert_called_with()
  71. self.shell.onecmd('foo')
  72. self.shell.default.assert_called_with('foo')
  73. def test_respond(self):
  74. self.shell.respond({'foo': 'bar'})
  75. assert 'foo' in self.fh.getvalue()
  76. def test_prompt(self):
  77. assert self.shell.prompt
  78. def test_no_returns(self):
  79. self.shell.onecmd('queue.declare foo')
  80. self.shell.onecmd('exchange.declare bar direct yes')
  81. self.shell.onecmd('queue.bind foo bar baz')
  82. self.shell.onecmd('basic.ack 1')
  83. def test_dump_message(self):
  84. m = Mock()
  85. m.body = 'the quick brown fox'
  86. m.properties = {'a': 1}
  87. m.delivery_info = {'exchange': 'bar'}
  88. assert dump_message(m)
  89. def test_dump_message_no_message(self):
  90. assert 'No messages in queue' in dump_message(None)
  91. def test_note(self):
  92. self.adm.silent = True
  93. self.adm.note('FOO')
  94. assert 'FOO' not in self.fh.getvalue()
  95. def test_run(self):
  96. a = self.create_adm('queue.declare', 'foo')
  97. a.run()
  98. assert 'ok' in self.fh.getvalue()
  99. def test_run_loop(self):
  100. a = self.create_adm()
  101. a.Shell = Mock()
  102. shell = a.Shell.return_value = Mock()
  103. shell.cmdloop = Mock()
  104. a.run()
  105. shell.cmdloop.assert_called_with()
  106. shell.cmdloop.side_effect = KeyboardInterrupt()
  107. a.run()
  108. assert 'bibi' in self.fh.getvalue()
  109. @patch('celery.bin.amqp.amqp')
  110. def test_main(self, Command):
  111. c = Command.return_value = Mock()
  112. main()
  113. c.execute_from_commandline.assert_called_with()
  114. @patch('celery.bin.amqp.AMQPAdmin')
  115. def test_command(self, cls):
  116. x = amqp(app=self.app)
  117. x.run()
  118. assert cls.call_args[1]['app'] is self.app