test_celeryd_multi.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. from __future__ import absolute_import
  2. import errno
  3. import signal
  4. import sys
  5. from mock import Mock, patch
  6. from celery.bin.celeryd_multi import (
  7. main,
  8. MultiTool,
  9. findsig,
  10. abbreviations,
  11. parse_ns_range,
  12. format_opt,
  13. quote,
  14. NamespacedOptionParser,
  15. multi_args,
  16. __doc__ as doc,
  17. )
  18. from celery.tests.utils import Case, WhateverIO
  19. class test_functions(Case):
  20. def test_findsig(self):
  21. self.assertEqual(findsig(['a', 'b', 'c', '-1']), 1)
  22. self.assertEqual(findsig(['--foo=1', '-9']), 9)
  23. self.assertEqual(findsig(['-INT']), signal.SIGINT)
  24. self.assertEqual(findsig([]), signal.SIGTERM)
  25. self.assertEqual(findsig(['-s']), signal.SIGTERM)
  26. self.assertEqual(findsig(['-log']), signal.SIGTERM)
  27. def test_abbreviations(self):
  28. expander = abbreviations({'%s': 'START',
  29. '%x': 'STOP'})
  30. self.assertEqual(expander('foo%s'), 'fooSTART')
  31. self.assertEqual(expander('foo%x'), 'fooSTOP')
  32. self.assertEqual(expander('foo%y'), 'foo%y')
  33. self.assertIsNone(expander(None))
  34. def test_parse_ns_range(self):
  35. self.assertEqual(parse_ns_range('1-3', True), ['1', '2', '3'])
  36. self.assertEqual(parse_ns_range('1-3', False), ['1-3'])
  37. self.assertEqual(parse_ns_range('1-3,10,11,20', True),
  38. ['1', '2', '3', '10', '11', '20'])
  39. def test_format_opt(self):
  40. self.assertEqual(format_opt('--foo', None), '--foo')
  41. self.assertEqual(format_opt('-c', 1), '-c 1')
  42. self.assertEqual(format_opt('--log', 'foo'), '--log=foo')
  43. def test_quote(self):
  44. self.assertEqual(quote("the 'quick"), "'the '\\''quick'")
  45. class test_NamespacedOptionParser(Case):
  46. def test_parse(self):
  47. x = NamespacedOptionParser(['-c:1,3', '4'])
  48. self.assertEqual(x.namespaces.get('1,3'), {'-c': '4'})
  49. x = NamespacedOptionParser(['-c:jerry,elaine', '5',
  50. '--loglevel:kramer=DEBUG',
  51. '--flag',
  52. '--logfile=foo', '-Q', 'bar', 'a', 'b',
  53. '--', '.disable_rate_limits=1'])
  54. self.assertEqual(x.options, {'--logfile': 'foo',
  55. '-Q': 'bar',
  56. '--flag': None})
  57. self.assertEqual(x.values, ['a', 'b'])
  58. self.assertEqual(x.namespaces.get('jerry,elaine'), {'-c': '5'})
  59. self.assertEqual(x.namespaces.get('kramer'), {'--loglevel': 'DEBUG'})
  60. self.assertEqual(x.passthrough, '-- .disable_rate_limits=1')
  61. class test_multi_args(Case):
  62. @patch('socket.gethostname')
  63. def test_parse(self, gethostname):
  64. p = NamespacedOptionParser(['-c:jerry,elaine', '5',
  65. '--loglevel:kramer=DEBUG',
  66. '--flag',
  67. '--logfile=foo', '-Q', 'bar', 'jerry',
  68. 'elaine', 'kramer',
  69. '--', '.disable_rate_limits=1'])
  70. it = multi_args(p, cmd='COMMAND', append='*AP*',
  71. prefix='*P*', suffix='*S*')
  72. names = list(it)
  73. self.assertEqual(names[0][0:2], ('*P*jerry*S*',
  74. [
  75. 'COMMAND', '-n *P*jerry*S*', '-Q bar',
  76. '-c 5', '--flag', '--logfile=foo',
  77. '-- .disable_rate_limits=1', '*AP*',
  78. ]
  79. ))
  80. self.assertEqual(names[1][0:2], ('*P*elaine*S*',
  81. [
  82. 'COMMAND', '-n *P*elaine*S*', '-Q bar',
  83. '-c 5', '--flag', '--logfile=foo',
  84. '-- .disable_rate_limits=1', '*AP*',
  85. ]
  86. ))
  87. self.assertEqual(names[2][0:2], ('*P*kramer*S*',
  88. [
  89. 'COMMAND', '--loglevel=DEBUG', '-n *P*kramer*S*',
  90. '-Q bar', '--flag', '--logfile=foo',
  91. '-- .disable_rate_limits=1', '*AP*',
  92. ]
  93. ))
  94. expand = names[0][2]
  95. self.assertEqual(expand('%h'), '*P*jerry*S*')
  96. self.assertEqual(expand('%n'), 'jerry')
  97. names2 = list(multi_args(p, cmd='COMMAND', append='',
  98. prefix='*P*', suffix='*S*'))
  99. self.assertEqual(names2[0][1][-1], '-- .disable_rate_limits=1')
  100. gethostname.return_value = 'example.com'
  101. p2 = NamespacedOptionParser(['10', '-c:1', '5'])
  102. names3 = list(multi_args(p2, cmd='COMMAND'))
  103. self.assertEqual(len(names3), 10)
  104. self.assertEqual(names3[0][0:2], ('celery1.example.com',
  105. ['COMMAND', '-n celery1.example.com', '-c 5', '']))
  106. for i, worker in enumerate(names3[1:]):
  107. self.assertEqual(worker[0:2], ('celery%s.example.com' % (i + 2),
  108. ['COMMAND', '-n celery%s.example.com' % (i + 2), '']))
  109. names4 = list(multi_args(p2, cmd='COMMAND', suffix='""'))
  110. self.assertEqual(len(names4), 10)
  111. self.assertEqual(names4[0][0:2], ('celery1',
  112. ['COMMAND', '-n celery1', '-c 5', '']))
  113. p3 = NamespacedOptionParser(['foo', '-c:foo', '5'])
  114. names5 = list(multi_args(p3, cmd='COMMAND', suffix='""'))
  115. self.assertEqual(names5[0][0:2], ('foo',
  116. ['COMMAND', '-n foo', '-c 5', '']))
  117. class test_MultiTool(Case):
  118. def setUp(self):
  119. self.fh = WhateverIO()
  120. self.env = {}
  121. self.t = MultiTool(env=self.env, fh=self.fh)
  122. def test_note(self):
  123. self.t.note('hello world')
  124. self.assertEqual(self.fh.getvalue(), 'hello world\n')
  125. def test_note_quiet(self):
  126. self.t.quiet = True
  127. self.t.note('hello world')
  128. self.assertFalse(self.fh.getvalue())
  129. def test_info(self):
  130. self.t.verbose = True
  131. self.t.info('hello info')
  132. self.assertEqual(self.fh.getvalue(), 'hello info\n')
  133. def test_info_not_verbose(self):
  134. self.t.verbose = False
  135. self.t.info('hello info')
  136. self.assertFalse(self.fh.getvalue())
  137. def test_error(self):
  138. self.t.say = Mock()
  139. self.t.usage = Mock()
  140. self.assertEqual(self.t.error('foo'), 1)
  141. self.t.say.assert_called_with('foo')
  142. self.t.usage.assert_called_with()
  143. self.t.say = Mock()
  144. self.assertEqual(self.t.error(), 1)
  145. self.assertFalse(self.t.say.called)
  146. self.assertEqual(self.t.retcode, 1)
  147. @patch('celery.bin.celeryd_multi.Popen')
  148. def test_waitexec(self, Popen):
  149. self.t.note = Mock()
  150. pipe = Popen.return_value = Mock()
  151. pipe.wait.return_value = -10
  152. self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 10)
  153. Popen.assert_called_with(['path', '-m', 'foo'], env=self.t.env)
  154. self.t.note.assert_called_with('* Child was terminated by signal 10')
  155. pipe.wait.return_value = 2
  156. self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 2)
  157. self.t.note.assert_called_with(
  158. '* Child terminated with errorcode 2')
  159. pipe.wait.return_value = 0
  160. self.assertFalse(self.t.waitexec(['-m', 'foo', 'path']))
  161. def test_nosplash(self):
  162. self.t.nosplash = True
  163. self.t.splash()
  164. self.assertFalse(self.fh.getvalue())
  165. def test_splash(self):
  166. self.t.nosplash = False
  167. self.t.splash()
  168. self.assertIn('celeryd-multi', self.fh.getvalue())
  169. def test_usage(self):
  170. self.t.usage()
  171. self.assertTrue(self.fh.getvalue())
  172. def test_help(self):
  173. self.t.help([])
  174. self.assertIn(doc, self.fh.getvalue())
  175. def test_expand(self):
  176. self.t.expand(['foo%n', 'ask', 'klask', 'dask'])
  177. self.assertEqual(self.fh.getvalue(),
  178. 'fooask\nfooklask\nfoodask\n')
  179. def test_restart(self):
  180. stop = self.t._stop_nodes = Mock()
  181. self.t.restart(['jerry', 'george'], 'celeryd')
  182. waitexec = self.t.waitexec = Mock()
  183. self.assertTrue(stop.called)
  184. callback = stop.call_args[1]['callback']
  185. self.assertTrue(callback)
  186. waitexec.return_value = 0
  187. callback('jerry', ['arg'], 13)
  188. waitexec.assert_called_with(['arg'])
  189. self.assertIn('OK', self.fh.getvalue())
  190. self.fh.seek(0)
  191. self.fh.truncate()
  192. waitexec.return_value = 1
  193. callback('jerry', ['arg'], 13)
  194. self.assertIn('FAILED', self.fh.getvalue())
  195. def test_stop(self):
  196. self.t.getpids = Mock()
  197. self.t.getpids.return_value = [2, 3, 4]
  198. self.t.shutdown_nodes = Mock()
  199. self.t.stop(['a', 'b', '-INT'], 'celeryd')
  200. self.t.shutdown_nodes.assert_called_with(
  201. [2, 3, 4], sig=signal.SIGINT, retry=None, callback=None,
  202. )
  203. def test_kill(self):
  204. self.t.getpids = Mock()
  205. self.t.getpids.return_value = [
  206. ('a', None, 10),
  207. ('b', None, 11),
  208. ('c', None, 12)
  209. ]
  210. sig = self.t.signal_node = Mock()
  211. self.t.kill(['a', 'b', 'c'], 'celeryd')
  212. sigs = sig.call_args_list
  213. self.assertEqual(len(sigs), 3)
  214. self.assertEqual(sigs[0][0], ('a', 10, signal.SIGKILL))
  215. self.assertEqual(sigs[1][0], ('b', 11, signal.SIGKILL))
  216. self.assertEqual(sigs[2][0], ('c', 12, signal.SIGKILL))
  217. def prepare_pidfile_for_getpids(self, Pidfile):
  218. class pids(object):
  219. def __init__(self, path):
  220. self.path = path
  221. def read_pid(self):
  222. try:
  223. return {'celeryd@foo.pid': 10,
  224. 'celeryd@bar.pid': 11}[self.path]
  225. except KeyError:
  226. raise ValueError()
  227. Pidfile.side_effect = pids
  228. @patch('celery.bin.celeryd_multi.Pidfile')
  229. @patch('socket.gethostname')
  230. def test_getpids(self, gethostname, Pidfile):
  231. gethostname.return_value = 'e.com'
  232. self.prepare_pidfile_for_getpids(Pidfile)
  233. callback = Mock()
  234. p = NamespacedOptionParser(['foo', 'bar', 'baz'])
  235. nodes = self.t.getpids(p, 'celeryd', callback=callback)
  236. self.assertEqual(nodes, [
  237. ('foo.e.com',
  238. ('celeryd', '--pidfile=celeryd@foo.pid', '-n foo.e.com', ''),
  239. 10),
  240. ('bar.e.com',
  241. ('celeryd', '--pidfile=celeryd@bar.pid', '-n bar.e.com', ''),
  242. 11),
  243. ])
  244. self.assertTrue(callback.called)
  245. callback.assert_called_with(
  246. 'baz.e.com',
  247. ['celeryd', '--pidfile=celeryd@baz.pid', '-n baz.e.com', ''],
  248. None,
  249. )
  250. self.assertIn('DOWN', self.fh.getvalue())
  251. # without callback, should work
  252. nodes = self.t.getpids(p, 'celeryd', callback=None)
  253. @patch('celery.bin.celeryd_multi.Pidfile')
  254. @patch('socket.gethostname')
  255. @patch('celery.bin.celeryd_multi.sleep')
  256. def test_shutdown_nodes(self, slepp, gethostname, Pidfile):
  257. gethostname.return_value = 'e.com'
  258. self.prepare_pidfile_for_getpids(Pidfile)
  259. self.assertIsNone(self.t.shutdown_nodes([]))
  260. self.t.signal_node = Mock()
  261. node_alive = self.t.node_alive = Mock()
  262. self.t.node_alive.return_value = False
  263. callback = Mock()
  264. self.t.stop(['foo', 'bar', 'baz'], 'celeryd', callback=callback)
  265. sigs = self.t.signal_node.call_args_list
  266. self.assertEqual(len(sigs), 2)
  267. self.assertEqual(sigs[0][0], ('foo.e.com', 10, signal.SIGTERM))
  268. self.assertEqual(sigs[1][0], ('bar.e.com', 11, signal.SIGTERM))
  269. self.t.signal_node.return_value = False
  270. self.assertTrue(callback.called)
  271. self.t.stop(['foo', 'bar', 'baz'], 'celeryd', callback=None)
  272. def on_node_alive(pid):
  273. if node_alive.call_count > 4:
  274. return True
  275. return False
  276. self.t.signal_node.return_value = True
  277. self.t.node_alive.side_effect = on_node_alive
  278. self.t.stop(['foo', 'bar', 'baz'], 'celeryd', retry=True)
  279. @patch('os.kill')
  280. def test_node_alive(self, kill):
  281. kill.return_value = True
  282. self.assertTrue(self.t.node_alive(13))
  283. esrch = OSError()
  284. esrch.errno = errno.ESRCH
  285. kill.side_effect = esrch
  286. self.assertFalse(self.t.node_alive(13))
  287. kill.assert_called_with(13, 0)
  288. enoent = OSError()
  289. enoent.errno = errno.ENOENT
  290. kill.side_effect = enoent
  291. with self.assertRaises(OSError):
  292. self.t.node_alive(13)
  293. @patch('os.kill')
  294. def test_signal_node(self, kill):
  295. kill.return_value = True
  296. self.assertTrue(self.t.signal_node('foo', 13, 9))
  297. esrch = OSError()
  298. esrch.errno = errno.ESRCH
  299. kill.side_effect = esrch
  300. self.assertFalse(self.t.signal_node('foo', 13, 9))
  301. kill.assert_called_with(13, 9)
  302. self.assertIn('Could not signal foo', self.fh.getvalue())
  303. enoent = OSError()
  304. enoent.errno = errno.ENOENT
  305. kill.side_effect = enoent
  306. with self.assertRaises(OSError):
  307. self.t.signal_node('foo', 13, 9)
  308. def test_start(self):
  309. self.t.waitexec = Mock()
  310. self.t.waitexec.return_value = 0
  311. self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celeryd'))
  312. self.t.waitexec.return_value = 1
  313. self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celeryd'))
  314. def test_show(self):
  315. self.t.show(['foo', 'bar', 'baz'], 'celeryd')
  316. self.assertTrue(self.fh.getvalue())
  317. @patch('socket.gethostname')
  318. def test_get(self, gethostname):
  319. gethostname.return_value = 'e.com'
  320. self.t.get(['xuzzy.e.com', 'foo', 'bar', 'baz'], 'celeryd')
  321. self.assertFalse(self.fh.getvalue())
  322. self.t.get(['foo.e.com', 'foo', 'bar', 'baz'], 'celeryd')
  323. self.assertTrue(self.fh.getvalue())
  324. @patch('socket.gethostname')
  325. def test_names(self, gethostname):
  326. gethostname.return_value = 'e.com'
  327. self.t.names(['foo', 'bar', 'baz'], 'celeryd')
  328. self.assertIn('foo.e.com\nbar.e.com\nbaz.e.com', self.fh.getvalue())
  329. def test_execute_from_commandline(self):
  330. start = self.t.commands['start'] = Mock()
  331. self.t.error = Mock()
  332. self.t.execute_from_commandline(['multi', 'start', 'foo', 'bar'])
  333. self.assertFalse(self.t.error.called)
  334. start.assert_called_with(['foo', 'bar'], 'celeryd')
  335. self.t.error = Mock()
  336. self.t.execute_from_commandline(['multi', 'frob', 'foo', 'bar'])
  337. self.t.error.assert_called_with('Invalid command: frob')
  338. self.t.error = Mock()
  339. self.t.execute_from_commandline(['multi'])
  340. self.t.error.assert_called_with()
  341. self.t.error = Mock()
  342. self.t.execute_from_commandline(['multi', '-foo'])
  343. self.t.error.assert_called_with()
  344. self.t.execute_from_commandline(['multi', 'start', 'foo',
  345. '--nosplash', '--quiet', '-q', '--verbose', '--no-color'])
  346. self.assertTrue(self.t.nosplash)
  347. self.assertTrue(self.t.quiet)
  348. self.assertTrue(self.t.verbose)
  349. self.assertTrue(self.t.no_color)
  350. def test_stopwait(self):
  351. self.t._stop_nodes = Mock()
  352. self.t.stopwait(['foo', 'bar', 'baz'], 'celeryd')
  353. self.assertEqual(self.t._stop_nodes.call_args[1]['retry'], 2)
  354. @patch('celery.bin.celeryd_multi.MultiTool')
  355. def test_main(self, MultiTool):
  356. m = MultiTool.return_value = Mock()
  357. with self.assertRaises(SystemExit):
  358. main()
  359. m.execute_from_commandline.assert_called_with(sys.argv)