test_amqp.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, patch
  4. from celery.bin.amqp import (
  5. AMQPAdmin,
  6. AMQShell,
  7. dump_message,
  8. amqp,
  9. main,
  10. )
  11. from celery.five import WhateverIO
  12. class test_AMQShell:
  13. def setup(self):
  14. self.fh = WhateverIO()
  15. self.adm = self.create_adm()
  16. self.shell = AMQShell(connect=self.adm.connect, out=self.fh)
  17. def create_adm(self, *args, **kwargs):
  18. return AMQPAdmin(app=self.app, out=self.fh, *args, **kwargs)
  19. def test_queue_declare(self):
  20. self.shell.onecmd('queue.declare foo')
  21. assert 'ok' in self.fh.getvalue()
  22. def test_missing_command(self):
  23. self.shell.onecmd('foo foo')
  24. assert 'unknown syntax' in self.fh.getvalue()
  25. def RV(self):
  26. raise Exception(self.fh.getvalue())
  27. def test_spec_format_response(self):
  28. spec = self.shell.amqp['exchange.declare']
  29. assert spec.format_response(None) == 'ok.'
  30. assert spec.format_response('NO') == 'NO'
  31. def test_missing_namespace(self):
  32. self.shell.onecmd('ns.cmd arg')
  33. assert 'unknown syntax' in self.fh.getvalue()
  34. def test_help(self):
  35. self.shell.onecmd('help')
  36. assert 'Example:' in self.fh.getvalue()
  37. def test_help_command(self):
  38. self.shell.onecmd('help queue.declare')
  39. assert 'passive:no' in self.fh.getvalue()
  40. def test_help_unknown_command(self):
  41. self.shell.onecmd('help foo.baz')
  42. assert 'unknown syntax' in self.fh.getvalue()
  43. def test_onecmd_error(self):
  44. self.shell.dispatch = Mock()
  45. self.shell.dispatch.side_effect = MemoryError()
  46. self.shell.say = Mock()
  47. assert not self.shell.needs_reconnect
  48. self.shell.onecmd('hello')
  49. self.shell.say.assert_called()
  50. assert self.shell.needs_reconnect
  51. def test_exit(self):
  52. with pytest.raises(SystemExit):
  53. self.shell.onecmd('exit')
  54. assert "don't leave!" in self.fh.getvalue()
  55. def test_note_silent(self):
  56. self.shell.silent = True
  57. self.shell.note('foo bar')
  58. assert 'foo bar' not in self.fh.getvalue()
  59. def test_reconnect(self):
  60. self.shell.onecmd('queue.declare foo')
  61. self.shell.needs_reconnect = True
  62. self.shell.onecmd('queue.delete foo')
  63. def test_completenames(self):
  64. assert self.shell.completenames('queue.dec') == ['queue.declare']
  65. assert (sorted(self.shell.completenames('declare')) ==
  66. sorted(['queue.declare', 'exchange.declare']))
  67. def test_empty_line(self):
  68. self.shell.emptyline = Mock()
  69. self.shell.default = Mock()
  70. self.shell.onecmd('')
  71. self.shell.emptyline.assert_called_with()
  72. self.shell.onecmd('foo')
  73. self.shell.default.assert_called_with('foo')
  74. def test_respond(self):
  75. self.shell.respond({'foo': 'bar'})
  76. assert 'foo' in self.fh.getvalue()
  77. def test_prompt(self):
  78. assert self.shell.prompt
  79. def test_no_returns(self):
  80. self.shell.onecmd('queue.declare foo')
  81. self.shell.onecmd('exchange.declare bar direct yes')
  82. self.shell.onecmd('queue.bind foo bar baz')
  83. self.shell.onecmd('basic.ack 1')
  84. def test_dump_message(self):
  85. m = Mock()
  86. m.body = 'the quick brown fox'
  87. m.properties = {'a': 1}
  88. m.delivery_info = {'exchange': 'bar'}
  89. assert dump_message(m)
  90. def test_dump_message_no_message(self):
  91. assert 'No messages in queue' in dump_message(None)
  92. def test_note(self):
  93. self.adm.silent = True
  94. self.adm.note('FOO')
  95. assert 'FOO' not in self.fh.getvalue()
  96. def test_run(self):
  97. a = self.create_adm('queue.declare', 'foo')
  98. a.run()
  99. assert 'ok' in self.fh.getvalue()
  100. def test_run_loop(self):
  101. a = self.create_adm()
  102. a.Shell = Mock()
  103. shell = a.Shell.return_value = Mock()
  104. shell.cmdloop = Mock()
  105. a.run()
  106. shell.cmdloop.assert_called_with()
  107. shell.cmdloop.side_effect = KeyboardInterrupt()
  108. a.run()
  109. assert 'bibi' in self.fh.getvalue()
  110. @patch('celery.bin.amqp.amqp')
  111. def test_main(self, Command):
  112. c = Command.return_value = Mock()
  113. main()
  114. c.execute_from_commandline.assert_called_with()
  115. @patch('celery.bin.amqp.AMQPAdmin')
  116. def test_command(self, cls):
  117. x = amqp(app=self.app)
  118. x.run()
  119. assert cls.call_args[1]['app'] is self.app