test_multi.py 16 KB


  1. from __future__ import absolute_import
  2. import errno
  3. import signal
  4. import sys
  5. from mock import Mock, patch
  6. from celery.bin.multi import (
  7. main,
  8. MultiTool,
  9. findsig,
  10. abbreviations,
  11. parse_ns_range,
  12. format_opt,
  13. quote,
  14. NamespacedOptionParser,
  15. multi_args,
  16. __doc__ as doc,
  17. )
  18. from celery.tests.utils import Case, WhateverIO
  19. class test_functions(Case):
  20. def test_findsig(self):
  21. self.assertEqual(findsig(['a', 'b', 'c', '-1']), 1)
  22. self.assertEqual(findsig(['--foo=1', '-9']), 9)
  23. self.assertEqual(findsig(['-INT']), signal.SIGINT)
  24. self.assertEqual(findsig([]), signal.SIGTERM)
  25. self.assertEqual(findsig(['-s']), signal.SIGTERM)
  26. self.assertEqual(findsig(['-log']), signal.SIGTERM)
  27. def test_abbreviations(self):
  28. expander = abbreviations({'%s': 'START',
  29. '%x': 'STOP'})
  30. self.assertEqual(expander('foo%s'), 'fooSTART')
  31. self.assertEqual(expander('foo%x'), 'fooSTOP')
  32. self.assertEqual(expander('foo%y'), 'foo%y')
  33. self.assertIsNone(expander(None))
  34. def test_parse_ns_range(self):
  35. self.assertEqual(parse_ns_range('1-3', True), ['1', '2', '3'])
  36. self.assertEqual(parse_ns_range('1-3', False), ['1-3'])
  37. self.assertEqual(parse_ns_range('1-3,10,11,20', True),
  38. ['1', '2', '3', '10', '11', '20'])
  39. def test_format_opt(self):
  40. self.assertEqual(format_opt('--foo', None), '--foo')
  41. self.assertEqual(format_opt('-c', 1), '-c 1')
  42. self.assertEqual(format_opt('--log', 'foo'), '--log=foo')
  43. def test_quote(self):
  44. self.assertEqual(quote("the 'quick"), "'the '\\''quick'")
  45. class test_NamespacedOptionParser(Case):
  46. def test_parse(self):
  47. x = NamespacedOptionParser(['-c:1,3', '4'])
  48. self.assertEqual(x.namespaces.get('1,3'), {'-c': '4'})
  49. x = NamespacedOptionParser(['-c:jerry,elaine', '5',
  50. '--loglevel:kramer=DEBUG',
  51. '--flag',
  52. '--logfile=foo', '-Q', 'bar', 'a', 'b',
  53. '--', '.disable_rate_limits=1'])
  54. self.assertEqual(x.options, {'--logfile': 'foo',
  55. '-Q': 'bar',
  56. '--flag': None})
  57. self.assertEqual(x.values, ['a', 'b'])
  58. self.assertEqual(x.namespaces.get('jerry,elaine'), {'-c': '5'})
  59. self.assertEqual(x.namespaces.get('kramer'), {'--loglevel': 'DEBUG'})
  60. self.assertEqual(x.passthrough, '-- .disable_rate_limits=1')
  61. class test_multi_args(Case):
  62. @patch('socket.gethostname')
  63. def test_parse(self, gethostname):
  64. p = NamespacedOptionParser(['-c:jerry,elaine', '5',
  65. '--loglevel:kramer=DEBUG',
  66. '--flag',
  67. '--logfile=foo', '-Q', 'bar', 'jerry',
  68. 'elaine', 'kramer',
  69. '--', '.disable_rate_limits=1'])
  70. it = multi_args(p, cmd='COMMAND', append='*AP*',
  71. prefix='*P*', suffix='*S*')
  72. names = list(it)
  73. def assert_line_in(name, args):
  74. self.assertIn(name, [tup[0] for tup in names])
  75. argv = None
  76. for item in names:
  77. if item[0] == name:
  78. argv = item[1]
  79. self.assertTrue(argv)
  80. for arg in args:
  81. self.assertIn(arg, argv)
  82. assert_line_in('*P*jerry@*S*',
  83. [
  84. 'COMMAND', '-n *P*jerry@*S*', '-Q bar',
  85. '-c 5', '--flag', '--logfile=foo',
  86. '-- .disable_rate_limits=1', '*AP*',
  87. ]
  88. )
  89. assert_line_in('*P*elaine@*S*',
  90. [
  91. 'COMMAND', '-n *P*elaine@*S*', '-Q bar',
  92. '-c 5', '--flag', '--logfile=foo',
  93. '-- .disable_rate_limits=1', '*AP*',
  94. ]
  95. )
  96. assert_line_in('*P*kramer@*S*',
  97. [
  98. 'COMMAND', '--loglevel=DEBUG', '-n *P*kramer@*S*',
  99. '-Q bar', '--flag', '--logfile=foo',
  100. '-- .disable_rate_limits=1', '*AP*',
  101. ]
  102. )
  103. expand = names[0][2]
  104. self.assertEqual(expand('%h'), '*P*jerry@*S*')
  105. self.assertEqual(expand('%n'), 'jerry')
  106. names2 = list(multi_args(p, cmd='COMMAND', append='',
  107. prefix='*P*', suffix='*S*'))
  108. self.assertEqual(names2[0][1][-1], '-- .disable_rate_limits=1')
  109. gethostname.return_value = 'example.com'
  110. p2 = NamespacedOptionParser(['10', '-c:1', '5'])
  111. names3 = list(multi_args(p2, cmd='COMMAND'))
  112. self.assertEqual(len(names3), 10)
  113. self.assertEqual(names3[0][0:2], ('celery1@example.com',
  114. ['COMMAND', '-n celery1@example.com', '-c 5', '']))
  115. for i, worker in enumerate(names3[1:]):
  116. self.assertEqual(worker[0:2], ('celery%s@example.com' % (i + 2),
  117. ['COMMAND', '-n celery%s@example.com' % (i + 2), '']))
  118. names4 = list(multi_args(p2, cmd='COMMAND', suffix='""'))
  119. self.assertEqual(len(names4), 10)
  120. self.assertEqual(names4[0][0:2], ('celery1@',
  121. ['COMMAND', '-n celery1@', '-c 5', '']))
  122. p3 = NamespacedOptionParser(['foo@', '-c:foo', '5'])
  123. names5 = list(multi_args(p3, cmd='COMMAND', suffix='""'))
  124. self.assertEqual(names5[0][0:2], ('foo@',
  125. ['COMMAND', '-n foo@', '-c 5', '']))
  126. class test_MultiTool(Case):
  127. def setUp(self):
  128. self.fh = WhateverIO()
  129. self.env = {}
  130. self.t = MultiTool(env=self.env, fh=self.fh)
  131. def test_note(self):
  132. self.t.note('hello world')
  133. self.assertEqual(self.fh.getvalue(), 'hello world\n')
  134. def test_note_quiet(self):
  135. self.t.quiet = True
  136. self.t.note('hello world')
  137. self.assertFalse(self.fh.getvalue())
  138. def test_info(self):
  139. self.t.verbose = True
  140. self.t.info('hello info')
  141. self.assertEqual(self.fh.getvalue(), 'hello info\n')
  142. def test_info_not_verbose(self):
  143. self.t.verbose = False
  144. self.t.info('hello info')
  145. self.assertFalse(self.fh.getvalue())
  146. def test_error(self):
  147. self.t.say = Mock()
  148. self.t.usage = Mock()
  149. self.assertEqual(self.t.error('foo'), 1)
  150. self.t.say.assert_called_with('foo')
  151. self.t.usage.assert_called_with()
  152. self.t.say = Mock()
  153. self.assertEqual(self.t.error(), 1)
  154. self.assertFalse(self.t.say.called)
  155. self.assertEqual(self.t.retcode, 1)
  156. @patch('celery.bin.multi.Popen')
  157. def test_waitexec(self, Popen):
  158. self.t.note = Mock()
  159. pipe = Popen.return_value = Mock()
  160. pipe.wait.return_value = -10
  161. self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 10)
  162. Popen.assert_called_with(['path', '-m', 'foo'], env=self.t.env)
  163. self.t.note.assert_called_with('* Child was terminated by signal 10')
  164. pipe.wait.return_value = 2
  165. self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 2)
  166. self.t.note.assert_called_with(
  167. '* Child terminated with errorcode 2')
  168. pipe.wait.return_value = 0
  169. self.assertFalse(self.t.waitexec(['-m', 'foo', 'path']))
  170. def test_nosplash(self):
  171. self.t.nosplash = True
  172. self.t.splash()
  173. self.assertFalse(self.fh.getvalue())
  174. def test_splash(self):
  175. self.t.nosplash = False
  176. self.t.splash()
  177. self.assertIn('celery multi', self.fh.getvalue())
  178. def test_usage(self):
  179. self.t.usage()
  180. self.assertTrue(self.fh.getvalue())
  181. def test_help(self):
  182. self.t.help([])
  183. self.assertIn(doc, self.fh.getvalue())
  184. def test_expand(self):
  185. self.t.expand(['foo%n', 'ask', 'klask', 'dask'])
  186. self.assertEqual(self.fh.getvalue(),
  187. 'fooask\nfooklask\nfoodask\n')
  188. def test_restart(self):
  189. stop = self.t._stop_nodes = Mock()
  190. self.t.restart(['jerry', 'george'], 'celery worker')
  191. waitexec = self.t.waitexec = Mock()
  192. self.assertTrue(stop.called)
  193. callback = stop.call_args[1]['callback']
  194. self.assertTrue(callback)
  195. waitexec.return_value = 0
  196. callback('jerry', ['arg'], 13)
  197. waitexec.assert_called_with(['arg'])
  198. self.assertIn('OK', self.fh.getvalue())
  199. self.fh.seek(0)
  200. self.fh.truncate()
  201. waitexec.return_value = 1
  202. callback('jerry', ['arg'], 13)
  203. self.assertIn('FAILED', self.fh.getvalue())
  204. def test_stop(self):
  205. self.t.getpids = Mock()
  206. self.t.getpids.return_value = [2, 3, 4]
  207. self.t.shutdown_nodes = Mock()
  208. self.t.stop(['a', 'b', '-INT'], 'celery worker')
  209. self.t.shutdown_nodes.assert_called_with(
  210. [2, 3, 4], sig=signal.SIGINT, retry=None, callback=None,
  211. )
  212. def test_kill(self):
  213. self.t.getpids = Mock()
  214. self.t.getpids.return_value = [
  215. ('a', None, 10),
  216. ('b', None, 11),
  217. ('c', None, 12)
  218. ]
  219. sig = self.t.signal_node = Mock()
  220. self.t.kill(['a', 'b', 'c'], 'celery worker')
  221. sigs = sig.call_args_list
  222. self.assertEqual(len(sigs), 3)
  223. self.assertEqual(sigs[0][0], ('a', 10, signal.SIGKILL))
  224. self.assertEqual(sigs[1][0], ('b', 11, signal.SIGKILL))
  225. self.assertEqual(sigs[2][0], ('c', 12, signal.SIGKILL))
  226. def prepare_pidfile_for_getpids(self, Pidfile):
  227. class pids(object):
  228. def __init__(self, path):
  229. self.path = path
  230. def read_pid(self):
  231. try:
  232. return {'foo.pid': 10,
  233. 'bar.pid': 11}[self.path]
  234. except KeyError:
  235. raise ValueError()
  236. Pidfile.side_effect = pids
  237. @patch('celery.bin.multi.Pidfile')
  238. @patch('socket.gethostname')
  239. def test_getpids(self, gethostname, Pidfile):
  240. gethostname.return_value = 'e.com'
  241. self.prepare_pidfile_for_getpids(Pidfile)
  242. callback = Mock()
  243. p = NamespacedOptionParser(['foo', 'bar', 'baz'])
  244. nodes = self.t.getpids(p, 'celery worker', callback=callback)
  245. node_0, node_1 = nodes
  246. self.assertEqual(node_0[0], 'foo@e.com')
  247. self.assertEqual(sorted(node_0[1]),
  248. sorted(('celery worker', '--pidfile=foo.pid',
  249. '-n foo@e.com', '')))
  250. self.assertEqual(node_0[2], 10)
  251. self.assertEqual(node_1[0], 'bar@e.com')
  252. self.assertEqual(sorted(node_1[1]),
  253. sorted(('celery worker', '--pidfile=bar.pid',
  254. '-n bar@e.com', '')))
  255. self.assertEqual(node_1[2], 11)
  256. self.assertTrue(callback.called)
  257. cargs, _ = callback.call_args
  258. self.assertEqual(cargs[0], 'baz@e.com')
  259. self.assertItemsEqual(cargs[1],
  260. ['celery worker', '--pidfile=baz.pid', '-n baz@e.com', ''],
  261. )
  262. self.assertIsNone(cargs[2])
  263. self.assertIn('DOWN', self.fh.getvalue())
  264. # without callback, should work
  265. nodes = self.t.getpids(p, 'celery worker', callback=None)
  266. @patch('celery.bin.multi.Pidfile')
  267. @patch('socket.gethostname')
  268. @patch('celery.bin.multi.sleep')
  269. def test_shutdown_nodes(self, slepp, gethostname, Pidfile):
  270. gethostname.return_value = 'e.com'
  271. self.prepare_pidfile_for_getpids(Pidfile)
  272. self.assertIsNone(self.t.shutdown_nodes([]))
  273. self.t.signal_node = Mock()
  274. node_alive = self.t.node_alive = Mock()
  275. self.t.node_alive.return_value = False
  276. callback = Mock()
  277. self.t.stop(['foo', 'bar', 'baz'], 'celery worker', callback=callback)
  278. sigs = sorted(self.t.signal_node.call_args_list)
  279. self.assertEqual(len(sigs), 2)
  280. self.assertIn(('foo@e.com', 10, signal.SIGTERM),
  281. [tup[0] for tup in sigs])
  282. self.assertIn(('bar@e.com', 11, signal.SIGTERM),
  283. [tup[0] for tup in sigs])
  284. self.t.signal_node.return_value = False
  285. self.assertTrue(callback.called)
  286. self.t.stop(['foo', 'bar', 'baz'], 'celery worker', callback=None)
  287. def on_node_alive(pid):
  288. if node_alive.call_count > 4:
  289. return True
  290. return False
  291. self.t.signal_node.return_value = True
  292. self.t.node_alive.side_effect = on_node_alive
  293. self.t.stop(['foo', 'bar', 'baz'], 'celery worker', retry=True)
  294. @patch('os.kill')
  295. def test_node_alive(self, kill):
  296. kill.return_value = True
  297. self.assertTrue(self.t.node_alive(13))
  298. esrch = OSError()
  299. esrch.errno = errno.ESRCH
  300. kill.side_effect = esrch
  301. self.assertFalse(self.t.node_alive(13))
  302. kill.assert_called_with(13, 0)
  303. enoent = OSError()
  304. enoent.errno = errno.ENOENT
  305. kill.side_effect = enoent
  306. with self.assertRaises(OSError):
  307. self.t.node_alive(13)
  308. @patch('os.kill')
  309. def test_signal_node(self, kill):
  310. kill.return_value = True
  311. self.assertTrue(self.t.signal_node('foo', 13, 9))
  312. esrch = OSError()
  313. esrch.errno = errno.ESRCH
  314. kill.side_effect = esrch
  315. self.assertFalse(self.t.signal_node('foo', 13, 9))
  316. kill.assert_called_with(13, 9)
  317. self.assertIn('Could not signal foo', self.fh.getvalue())
  318. enoent = OSError()
  319. enoent.errno = errno.ENOENT
  320. kill.side_effect = enoent
  321. with self.assertRaises(OSError):
  322. self.t.signal_node('foo', 13, 9)
  323. def test_start(self):
  324. self.t.waitexec = Mock()
  325. self.t.waitexec.return_value = 0
  326. self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celery worker'))
  327. self.t.waitexec.return_value = 1
  328. self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celery worker'))
  329. def test_show(self):
  330. self.t.show(['foo', 'bar', 'baz'], 'celery worker')
  331. self.assertTrue(self.fh.getvalue())
  332. @patch('socket.gethostname')
  333. def test_get(self, gethostname):
  334. gethostname.return_value = 'e.com'
  335. self.t.get(['xuzzy@e.com', 'foo', 'bar', 'baz'], 'celery worker')
  336. self.assertFalse(self.fh.getvalue())
  337. self.t.get(['foo@e.com', 'foo', 'bar', 'baz'], 'celery worker')
  338. self.assertTrue(self.fh.getvalue())
  339. @patch('socket.gethostname')
  340. def test_names(self, gethostname):
  341. gethostname.return_value = 'e.com'
  342. self.t.names(['foo', 'bar', 'baz'], 'celery worker')
  343. self.assertIn('foo@e.com\nbar@e.com\nbaz@e.com', self.fh.getvalue())
  344. def test_execute_from_commandline(self):
  345. start = self.t.commands['start'] = Mock()
  346. self.t.error = Mock()
  347. self.t.execute_from_commandline(['multi', 'start', 'foo', 'bar'])
  348. self.assertFalse(self.t.error.called)
  349. start.assert_called_with(['foo', 'bar'], 'celery worker')
  350. self.t.error = Mock()
  351. self.t.execute_from_commandline(['multi', 'frob', 'foo', 'bar'])
  352. self.t.error.assert_called_with('Invalid command: frob')
  353. self.t.error = Mock()
  354. self.t.execute_from_commandline(['multi'])
  355. self.t.error.assert_called_with()
  356. self.t.error = Mock()
  357. self.t.execute_from_commandline(['multi', '-foo'])
  358. self.t.error.assert_called_with()
  359. self.t.execute_from_commandline(['multi', 'start', 'foo',
  360. '--nosplash', '--quiet', '-q', '--verbose', '--no-color'])
  361. self.assertTrue(self.t.nosplash)
  362. self.assertTrue(self.t.quiet)
  363. self.assertTrue(self.t.verbose)
  364. self.assertTrue(self.t.no_color)
  365. def test_stopwait(self):
  366. self.t._stop_nodes = Mock()
  367. self.t.stopwait(['foo', 'bar', 'baz'], 'celery worker')
  368. self.assertEqual(self.t._stop_nodes.call_args[1]['retry'], 2)
  369. @patch('celery.bin.multi.MultiTool')
  370. def test_main(self, MultiTool):
  371. m = MultiTool.return_value = Mock()
  372. with self.assertRaises(SystemExit):
  373. main()
  374. m.execute_from_commandline.assert_called_with(sys.argv)