test_amqp.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, patch
  4. from celery.bin.amqp import AMQPAdmin, AMQShell, amqp, dump_message, main
  5. from celery.five import WhateverIO
  6. class test_AMQShell:
  7. def setup(self):
  8. self.fh = WhateverIO()
  9. self.adm = self.create_adm()
  10. self.shell = AMQShell(connect=self.adm.connect, out=self.fh)
  11. def create_adm(self, *args, **kwargs):
  12. return AMQPAdmin(app=self.app, out=self.fh, *args, **kwargs)
  13. def test_queue_declare(self):
  14. self.shell.onecmd('queue.declare foo')
  15. assert 'ok' in self.fh.getvalue()
  16. def test_missing_command(self):
  17. self.shell.onecmd('foo foo')
  18. assert 'unknown syntax' in self.fh.getvalue()
  19. def RV(self):
  20. raise Exception(self.fh.getvalue())
  21. def test_spec_format_response(self):
  22. spec = self.shell.amqp['exchange.declare']
  23. assert spec.format_response(None) == 'ok.'
  24. assert spec.format_response('NO') == 'NO'
  25. def test_missing_namespace(self):
  26. self.shell.onecmd('ns.cmd arg')
  27. assert 'unknown syntax' in self.fh.getvalue()
  28. def test_help(self):
  29. self.shell.onecmd('help')
  30. assert 'Example:' in self.fh.getvalue()
  31. def test_help_command(self):
  32. self.shell.onecmd('help queue.declare')
  33. assert 'passive:no' in self.fh.getvalue()
  34. def test_help_unknown_command(self):
  35. self.shell.onecmd('help foo.baz')
  36. assert 'unknown syntax' in self.fh.getvalue()
  37. def test_onecmd_error(self):
  38. self.shell.dispatch = Mock()
  39. self.shell.dispatch.side_effect = MemoryError()
  40. self.shell.say = Mock()
  41. assert not self.shell.needs_reconnect
  42. self.shell.onecmd('hello')
  43. self.shell.say.assert_called()
  44. assert self.shell.needs_reconnect
  45. def test_exit(self):
  46. with pytest.raises(SystemExit):
  47. self.shell.onecmd('exit')
  48. assert "don't leave!" in self.fh.getvalue()
  49. def test_note_silent(self):
  50. self.shell.silent = True
  51. self.shell.note('foo bar')
  52. assert 'foo bar' not in self.fh.getvalue()
  53. def test_reconnect(self):
  54. self.shell.onecmd('queue.declare foo')
  55. self.shell.needs_reconnect = True
  56. self.shell.onecmd('queue.delete foo')
  57. def test_completenames(self):
  58. assert self.shell.completenames('queue.dec') == ['queue.declare']
  59. assert (sorted(self.shell.completenames('declare')) ==
  60. sorted(['queue.declare', 'exchange.declare']))
  61. def test_empty_line(self):
  62. self.shell.emptyline = Mock()
  63. self.shell.default = Mock()
  64. self.shell.onecmd('')
  65. self.shell.emptyline.assert_called_with()
  66. self.shell.onecmd('foo')
  67. self.shell.default.assert_called_with('foo')
  68. def test_respond(self):
  69. self.shell.respond({'foo': 'bar'})
  70. assert 'foo' in self.fh.getvalue()
  71. def test_prompt(self):
  72. assert self.shell.prompt
  73. def test_no_returns(self):
  74. self.shell.onecmd('queue.declare foo')
  75. self.shell.onecmd('exchange.declare bar direct yes')
  76. self.shell.onecmd('queue.bind foo bar baz')
  77. self.shell.onecmd('basic.ack 1')
  78. def test_dump_message(self):
  79. m = Mock()
  80. m.body = 'the quick brown fox'
  81. m.properties = {'a': 1}
  82. m.delivery_info = {'exchange': 'bar'}
  83. assert dump_message(m)
  84. def test_dump_message_no_message(self):
  85. assert 'No messages in queue' in dump_message(None)
  86. def test_note(self):
  87. self.adm.silent = True
  88. self.adm.note('FOO')
  89. assert 'FOO' not in self.fh.getvalue()
  90. def test_run(self):
  91. a = self.create_adm('queue.declare', 'foo')
  92. a.run()
  93. assert 'ok' in self.fh.getvalue()
  94. def test_run_loop(self):
  95. a = self.create_adm()
  96. a.Shell = Mock()
  97. shell = a.Shell.return_value = Mock()
  98. shell.cmdloop = Mock()
  99. a.run()
  100. shell.cmdloop.assert_called_with()
  101. shell.cmdloop.side_effect = KeyboardInterrupt()
  102. a.run()
  103. assert 'bibi' in self.fh.getvalue()
  104. @patch('celery.bin.amqp.amqp')
  105. def test_main(self, Command):
  106. c = Command.return_value = Mock()
  107. main()
  108. c.execute_from_commandline.assert_called_with()
  109. @patch('celery.bin.amqp.AMQPAdmin')
  110. def test_command(self, cls):
  111. x = amqp(app=self.app)
  112. x.run()
  113. assert cls.call_args[1]['app'] is self.app