test_multi.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. from __future__ import absolute_import
  2. import errno
  3. import signal
  4. import sys
  5. from mock import Mock, patch
  6. from celery.bin.multi import (
  7. main,
  8. MultiTool,
  9. findsig,
  10. abbreviations,
  11. parse_ns_range,
  12. format_opt,
  13. quote,
  14. NamespacedOptionParser,
  15. multi_args,
  16. __doc__ as doc,
  17. )
  18. from celery.tests.case import AppCase, WhateverIO
  19. class test_functions(AppCase):
  20. def test_findsig(self):
  21. self.assertEqual(findsig(['a', 'b', 'c', '-1']), 1)
  22. self.assertEqual(findsig(['--foo=1', '-9']), 9)
  23. self.assertEqual(findsig(['-INT']), signal.SIGINT)
  24. self.assertEqual(findsig([]), signal.SIGTERM)
  25. self.assertEqual(findsig(['-s']), signal.SIGTERM)
  26. self.assertEqual(findsig(['-log']), signal.SIGTERM)
  27. def test_abbreviations(self):
  28. expander = abbreviations({'%s': 'START',
  29. '%x': 'STOP'})
  30. self.assertEqual(expander('foo%s'), 'fooSTART')
  31. self.assertEqual(expander('foo%x'), 'fooSTOP')
  32. self.assertEqual(expander('foo%y'), 'foo%y')
  33. self.assertIsNone(expander(None))
  34. def test_parse_ns_range(self):
  35. self.assertEqual(parse_ns_range('1-3', True), ['1', '2', '3'])
  36. self.assertEqual(parse_ns_range('1-3', False), ['1-3'])
  37. self.assertEqual(parse_ns_range(
  38. '1-3,10,11,20', True),
  39. ['1', '2', '3', '10', '11', '20'],
  40. )
  41. def test_format_opt(self):
  42. self.assertEqual(format_opt('--foo', None), '--foo')
  43. self.assertEqual(format_opt('-c', 1), '-c 1')
  44. self.assertEqual(format_opt('--log', 'foo'), '--log=foo')
  45. def test_quote(self):
  46. self.assertEqual(quote("the 'quick"), "'the '\\''quick'")
  47. class test_NamespacedOptionParser(AppCase):
  48. def test_parse(self):
  49. x = NamespacedOptionParser(['-c:1,3', '4'])
  50. self.assertEqual(x.namespaces.get('1,3'), {'-c': '4'})
  51. x = NamespacedOptionParser(['-c:jerry,elaine', '5',
  52. '--loglevel:kramer=DEBUG',
  53. '--flag',
  54. '--logfile=foo', '-Q', 'bar', 'a', 'b',
  55. '--', '.disable_rate_limits=1'])
  56. self.assertEqual(x.options, {'--logfile': 'foo',
  57. '-Q': 'bar',
  58. '--flag': None})
  59. self.assertEqual(x.values, ['a', 'b'])
  60. self.assertEqual(x.namespaces.get('jerry,elaine'), {'-c': '5'})
  61. self.assertEqual(x.namespaces.get('kramer'), {'--loglevel': 'DEBUG'})
  62. self.assertEqual(x.passthrough, '-- .disable_rate_limits=1')
  63. class test_multi_args(AppCase):
  64. @patch('socket.gethostname')
  65. def test_parse(self, gethostname):
  66. p = NamespacedOptionParser([
  67. '-c:jerry,elaine', '5',
  68. '--loglevel:kramer=DEBUG',
  69. '--flag',
  70. '--logfile=foo', '-Q', 'bar', 'jerry',
  71. 'elaine', 'kramer',
  72. '--', '.disable_rate_limits=1',
  73. ])
  74. it = multi_args(p, cmd='COMMAND', append='*AP*',
  75. prefix='*P*', suffix='*S*')
  76. names = list(it)
  77. def assert_line_in(name, args):
  78. self.assertIn(name, [tup[0] for tup in names])
  79. argv = None
  80. for item in names:
  81. if item[0] == name:
  82. argv = item[1]
  83. self.assertTrue(argv)
  84. for arg in args:
  85. self.assertIn(arg, argv)
  86. assert_line_in(
  87. '*P*jerry@*S*',
  88. ['COMMAND', '-n *P*jerry@*S*', '-Q bar',
  89. '-c 5', '--flag', '--logfile=foo',
  90. '-- .disable_rate_limits=1', '*AP*'],
  91. )
  92. assert_line_in(
  93. '*P*elaine@*S*',
  94. ['COMMAND', '-n *P*elaine@*S*', '-Q bar',
  95. '-c 5', '--flag', '--logfile=foo',
  96. '-- .disable_rate_limits=1', '*AP*'],
  97. )
  98. assert_line_in(
  99. '*P*kramer@*S*',
  100. ['COMMAND', '--loglevel=DEBUG', '-n *P*kramer@*S*',
  101. '-Q bar', '--flag', '--logfile=foo',
  102. '-- .disable_rate_limits=1', '*AP*'],
  103. )
  104. expand = names[0][2]
  105. self.assertEqual(expand('%h'), '*P*jerry@*S*')
  106. self.assertEqual(expand('%n'), 'jerry')
  107. names2 = list(multi_args(p, cmd='COMMAND', append='',
  108. prefix='*P*', suffix='*S*'))
  109. self.assertEqual(names2[0][1][-1], '-- .disable_rate_limits=1')
  110. gethostname.return_value = 'example.com'
  111. p2 = NamespacedOptionParser(['10', '-c:1', '5'])
  112. names3 = list(multi_args(p2, cmd='COMMAND'))
  113. self.assertEqual(len(names3), 10)
  114. self.assertEqual(
  115. names3[0][0:2],
  116. ('celery1@example.com',
  117. ['COMMAND', '-n celery1@example.com', '-c 5', '']),
  118. )
  119. for i, worker in enumerate(names3[1:]):
  120. self.assertEqual(
  121. worker[0:2],
  122. ('celery%s@example.com' % (i + 2),
  123. ['COMMAND', '-n celery%s@example.com' % (i + 2), '']),
  124. )
  125. names4 = list(multi_args(p2, cmd='COMMAND', suffix='""'))
  126. self.assertEqual(len(names4), 10)
  127. self.assertEqual(
  128. names4[0][0:2],
  129. ('celery1@',
  130. ['COMMAND', '-n celery1@', '-c 5', '']),
  131. )
  132. p3 = NamespacedOptionParser(['foo@', '-c:foo', '5'])
  133. names5 = list(multi_args(p3, cmd='COMMAND', suffix='""'))
  134. self.assertEqual(
  135. names5[0][0:2],
  136. ('foo@',
  137. ['COMMAND', '-n foo@', '-c 5', '']),
  138. )
  139. class test_MultiTool(AppCase):
  140. def setup(self):
  141. self.fh = WhateverIO()
  142. self.env = {}
  143. self.t = MultiTool(env=self.env, fh=self.fh)
  144. def test_note(self):
  145. self.t.note('hello world')
  146. self.assertEqual(self.fh.getvalue(), 'hello world\n')
  147. def test_note_quiet(self):
  148. self.t.quiet = True
  149. self.t.note('hello world')
  150. self.assertFalse(self.fh.getvalue())
  151. def test_info(self):
  152. self.t.verbose = True
  153. self.t.info('hello info')
  154. self.assertEqual(self.fh.getvalue(), 'hello info\n')
  155. def test_info_not_verbose(self):
  156. self.t.verbose = False
  157. self.t.info('hello info')
  158. self.assertFalse(self.fh.getvalue())
  159. def test_error(self):
  160. self.t.say = Mock()
  161. self.t.usage = Mock()
  162. self.assertEqual(self.t.error('foo'), 1)
  163. self.t.say.assert_called_with('foo')
  164. self.t.usage.assert_called_with()
  165. self.t.say = Mock()
  166. self.assertEqual(self.t.error(), 1)
  167. self.assertFalse(self.t.say.called)
  168. self.assertEqual(self.t.retcode, 1)
  169. @patch('celery.bin.multi.Popen')
  170. def test_waitexec(self, Popen):
  171. self.t.note = Mock()
  172. pipe = Popen.return_value = Mock()
  173. pipe.wait.return_value = -10
  174. self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 10)
  175. Popen.assert_called_with(['path', '-m', 'foo'], env=self.t.env)
  176. self.t.note.assert_called_with('* Child was terminated by signal 10')
  177. pipe.wait.return_value = 2
  178. self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 2)
  179. self.t.note.assert_called_with(
  180. '* Child terminated with errorcode 2',
  181. )
  182. pipe.wait.return_value = 0
  183. self.assertFalse(self.t.waitexec(['-m', 'foo', 'path']))
  184. def test_nosplash(self):
  185. self.t.nosplash = True
  186. self.t.splash()
  187. self.assertFalse(self.fh.getvalue())
  188. def test_splash(self):
  189. self.t.nosplash = False
  190. self.t.splash()
  191. self.assertIn('celery multi', self.fh.getvalue())
  192. def test_usage(self):
  193. self.t.usage()
  194. self.assertTrue(self.fh.getvalue())
  195. def test_help(self):
  196. self.t.help([])
  197. self.assertIn(doc, self.fh.getvalue())
  198. def test_expand(self):
  199. self.t.expand(['foo%n', 'ask', 'klask', 'dask'])
  200. self.assertEqual(
  201. self.fh.getvalue(), 'fooask\nfooklask\nfoodask\n',
  202. )
  203. def test_restart(self):
  204. stop = self.t._stop_nodes = Mock()
  205. self.t.restart(['jerry', 'george'], 'celery worker')
  206. waitexec = self.t.waitexec = Mock()
  207. self.assertTrue(stop.called)
  208. callback = stop.call_args[1]['callback']
  209. self.assertTrue(callback)
  210. waitexec.return_value = 0
  211. callback('jerry', ['arg'], 13)
  212. waitexec.assert_called_with(['arg'])
  213. self.assertIn('OK', self.fh.getvalue())
  214. self.fh.seek(0)
  215. self.fh.truncate()
  216. waitexec.return_value = 1
  217. callback('jerry', ['arg'], 13)
  218. self.assertIn('FAILED', self.fh.getvalue())
  219. def test_stop(self):
  220. self.t.getpids = Mock()
  221. self.t.getpids.return_value = [2, 3, 4]
  222. self.t.shutdown_nodes = Mock()
  223. self.t.stop(['a', 'b', '-INT'], 'celery worker')
  224. self.t.shutdown_nodes.assert_called_with(
  225. [2, 3, 4], sig=signal.SIGINT, retry=None, callback=None,
  226. )
  227. def test_kill(self):
  228. self.t.getpids = Mock()
  229. self.t.getpids.return_value = [
  230. ('a', None, 10),
  231. ('b', None, 11),
  232. ('c', None, 12)
  233. ]
  234. sig = self.t.signal_node = Mock()
  235. self.t.kill(['a', 'b', 'c'], 'celery worker')
  236. sigs = sig.call_args_list
  237. self.assertEqual(len(sigs), 3)
  238. self.assertEqual(sigs[0][0], ('a', 10, signal.SIGKILL))
  239. self.assertEqual(sigs[1][0], ('b', 11, signal.SIGKILL))
  240. self.assertEqual(sigs[2][0], ('c', 12, signal.SIGKILL))
  241. def prepare_pidfile_for_getpids(self, Pidfile):
  242. class pids(object):
  243. def __init__(self, path):
  244. self.path = path
  245. def read_pid(self):
  246. try:
  247. return {'foo.pid': 10,
  248. 'bar.pid': 11}[self.path]
  249. except KeyError:
  250. raise ValueError()
  251. Pidfile.side_effect = pids
  252. @patch('celery.bin.multi.Pidfile')
  253. @patch('socket.gethostname')
  254. def test_getpids(self, gethostname, Pidfile):
  255. gethostname.return_value = 'e.com'
  256. self.prepare_pidfile_for_getpids(Pidfile)
  257. callback = Mock()
  258. p = NamespacedOptionParser(['foo', 'bar', 'baz'])
  259. nodes = self.t.getpids(p, 'celery worker', callback=callback)
  260. node_0, node_1 = nodes
  261. self.assertEqual(node_0[0], 'foo@e.com')
  262. self.assertEqual(
  263. sorted(node_0[1]),
  264. sorted(('celery worker', '--pidfile=foo.pid',
  265. '-n foo@e.com', '')),
  266. )
  267. self.assertEqual(node_0[2], 10)
  268. self.assertEqual(node_1[0], 'bar@e.com')
  269. self.assertEqual(
  270. sorted(node_1[1]),
  271. sorted(('celery worker', '--pidfile=bar.pid',
  272. '-n bar@e.com', '')),
  273. )
  274. self.assertEqual(node_1[2], 11)
  275. self.assertTrue(callback.called)
  276. cargs, _ = callback.call_args
  277. self.assertEqual(cargs[0], 'baz@e.com')
  278. self.assertItemsEqual(
  279. cargs[1],
  280. ['celery worker', '--pidfile=baz.pid', '-n baz@e.com', ''],
  281. )
  282. self.assertIsNone(cargs[2])
  283. self.assertIn('DOWN', self.fh.getvalue())
  284. # without callback, should work
  285. nodes = self.t.getpids(p, 'celery worker', callback=None)
  286. @patch('celery.bin.multi.Pidfile')
  287. @patch('socket.gethostname')
  288. @patch('celery.bin.multi.sleep')
  289. def test_shutdown_nodes(self, slepp, gethostname, Pidfile):
  290. gethostname.return_value = 'e.com'
  291. self.prepare_pidfile_for_getpids(Pidfile)
  292. self.assertIsNone(self.t.shutdown_nodes([]))
  293. self.t.signal_node = Mock()
  294. node_alive = self.t.node_alive = Mock()
  295. self.t.node_alive.return_value = False
  296. callback = Mock()
  297. self.t.stop(['foo', 'bar', 'baz'], 'celery worker', callback=callback)
  298. sigs = sorted(self.t.signal_node.call_args_list)
  299. self.assertEqual(len(sigs), 2)
  300. self.assertIn(
  301. ('foo@e.com', 10, signal.SIGTERM),
  302. [tup[0] for tup in sigs],
  303. )
  304. self.assertIn(
  305. ('bar@e.com', 11, signal.SIGTERM),
  306. [tup[0] for tup in sigs],
  307. )
  308. self.t.signal_node.return_value = False
  309. self.assertTrue(callback.called)
  310. self.t.stop(['foo', 'bar', 'baz'], 'celery worker', callback=None)
  311. def on_node_alive(pid):
  312. if node_alive.call_count > 4:
  313. return True
  314. return False
  315. self.t.signal_node.return_value = True
  316. self.t.node_alive.side_effect = on_node_alive
  317. self.t.stop(['foo', 'bar', 'baz'], 'celery worker', retry=True)
  318. @patch('os.kill')
  319. def test_node_alive(self, kill):
  320. kill.return_value = True
  321. self.assertTrue(self.t.node_alive(13))
  322. esrch = OSError()
  323. esrch.errno = errno.ESRCH
  324. kill.side_effect = esrch
  325. self.assertFalse(self.t.node_alive(13))
  326. kill.assert_called_with(13, 0)
  327. enoent = OSError()
  328. enoent.errno = errno.ENOENT
  329. kill.side_effect = enoent
  330. with self.assertRaises(OSError):
  331. self.t.node_alive(13)
  332. @patch('os.kill')
  333. def test_signal_node(self, kill):
  334. kill.return_value = True
  335. self.assertTrue(self.t.signal_node('foo', 13, 9))
  336. esrch = OSError()
  337. esrch.errno = errno.ESRCH
  338. kill.side_effect = esrch
  339. self.assertFalse(self.t.signal_node('foo', 13, 9))
  340. kill.assert_called_with(13, 9)
  341. self.assertIn('Could not signal foo', self.fh.getvalue())
  342. enoent = OSError()
  343. enoent.errno = errno.ENOENT
  344. kill.side_effect = enoent
  345. with self.assertRaises(OSError):
  346. self.t.signal_node('foo', 13, 9)
  347. def test_start(self):
  348. self.t.waitexec = Mock()
  349. self.t.waitexec.return_value = 0
  350. self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celery worker'))
  351. self.t.waitexec.return_value = 1
  352. self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celery worker'))
  353. def test_show(self):
  354. self.t.show(['foo', 'bar', 'baz'], 'celery worker')
  355. self.assertTrue(self.fh.getvalue())
  356. @patch('socket.gethostname')
  357. def test_get(self, gethostname):
  358. gethostname.return_value = 'e.com'
  359. self.t.get(['xuzzy@e.com', 'foo', 'bar', 'baz'], 'celery worker')
  360. self.assertFalse(self.fh.getvalue())
  361. self.t.get(['foo@e.com', 'foo', 'bar', 'baz'], 'celery worker')
  362. self.assertTrue(self.fh.getvalue())
  363. @patch('socket.gethostname')
  364. def test_names(self, gethostname):
  365. gethostname.return_value = 'e.com'
  366. self.t.names(['foo', 'bar', 'baz'], 'celery worker')
  367. self.assertIn('foo@e.com\nbar@e.com\nbaz@e.com', self.fh.getvalue())
  368. def test_execute_from_commandline(self):
  369. start = self.t.commands['start'] = Mock()
  370. self.t.error = Mock()
  371. self.t.execute_from_commandline(['multi', 'start', 'foo', 'bar'])
  372. self.assertFalse(self.t.error.called)
  373. start.assert_called_with(['foo', 'bar'], 'celery worker')
  374. self.t.error = Mock()
  375. self.t.execute_from_commandline(['multi', 'frob', 'foo', 'bar'])
  376. self.t.error.assert_called_with('Invalid command: frob')
  377. self.t.error = Mock()
  378. self.t.execute_from_commandline(['multi'])
  379. self.t.error.assert_called_with()
  380. self.t.error = Mock()
  381. self.t.execute_from_commandline(['multi', '-foo'])
  382. self.t.error.assert_called_with()
  383. self.t.execute_from_commandline(
  384. ['multi', 'start', 'foo',
  385. '--nosplash', '--quiet', '-q', '--verbose', '--no-color'],
  386. )
  387. self.assertTrue(self.t.nosplash)
  388. self.assertTrue(self.t.quiet)
  389. self.assertTrue(self.t.verbose)
  390. self.assertTrue(self.t.no_color)
  391. def test_stopwait(self):
  392. self.t._stop_nodes = Mock()
  393. self.t.stopwait(['foo', 'bar', 'baz'], 'celery worker')
  394. self.assertEqual(self.t._stop_nodes.call_args[1]['retry'], 2)
  395. @patch('celery.bin.multi.MultiTool')
  396. def test_main(self, MultiTool):
  397. m = MultiTool.return_value = Mock()
  398. with self.assertRaises(SystemExit):
  399. main()
  400. m.execute_from_commandline.assert_called_with(sys.argv)