control.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. """The ``celery control``, ``. inspect`` and ``. status`` programs."""
  2. from __future__ import absolute_import, unicode_literals
  3. from kombu.utils.json import dumps
  4. from kombu.utils.objects import cached_property
  5. from celery.bin.base import Command
  6. from celery.five import items, string_t
  7. from celery.platforms import EX_UNAVAILABLE, EX_USAGE
  8. from celery.utils import text
  9. class _RemoteControl(Command):
  10. name = None
  11. leaf = False
  12. control_group = None
  13. def __init__(self, *args, **kwargs):
  14. self.show_body = kwargs.pop('show_body', True)
  15. self.show_reply = kwargs.pop('show_reply', True)
  16. super(_RemoteControl, self).__init__(*args, **kwargs)
  17. def add_arguments(self, parser):
  18. group = parser.add_argument_group('Remote Control Options')
  19. group.add_argument(
  20. '--timeout', '-t', type=float,
  21. help='Timeout in seconds (float) waiting for reply',
  22. )
  23. group.add_argument(
  24. '--destination', '-d',
  25. help='Comma separated list of destination node names.')
  26. group.add_argument(
  27. '--json', '-j', action='store_true', default=False,
  28. help='Use json as output format.',
  29. )
  30. @classmethod
  31. def get_command_info(cls, command,
  32. indent=0, prefix='', color=None,
  33. help=False, app=None, choices=None):
  34. if choices is None:
  35. choices = cls._choices_by_group(app)
  36. meta = choices[command]
  37. if help:
  38. help = '|' + text.indent(meta.help, indent + 4)
  39. else:
  40. help = None
  41. return text.join([
  42. '|' + text.indent('{0}{1} {2}'.format(
  43. prefix, color(command), meta.signature or ''), indent),
  44. help,
  45. ])
  46. @classmethod
  47. def list_commands(cls, indent=0, prefix='',
  48. color=None, help=False, app=None):
  49. choices = cls._choices_by_group(app)
  50. color = color if color else lambda x: x
  51. prefix = prefix + ' ' if prefix else ''
  52. return '\n'.join(
  53. cls.get_command_info(c, indent, prefix, color, help,
  54. app=app, choices=choices)
  55. for c in sorted(choices))
  56. def usage(self, command):
  57. return '%(prog)s {0} [options] {1} <command> [arg1 .. argN]'.format(
  58. command, self.args)
  59. def call(self, *args, **kwargs):
  60. raise NotImplementedError('call')
  61. def run(self, *args, **kwargs):
  62. if not args:
  63. raise self.UsageError(
  64. 'Missing {0.name} method. See --help'.format(self))
  65. return self.do_call_method(args, **kwargs)
  66. def _ensure_fanout_supported(self):
  67. with self.app.connection_for_write() as conn:
  68. if not conn.supports_exchange_type('fanout'):
  69. raise self.Error(
  70. 'Broadcast not supported by transport {0!r}'.format(
  71. conn.info()['transport']))
  72. def do_call_method(self, args,
  73. timeout=None, destination=None, json=False, **kwargs):
  74. method = args[0]
  75. if method == 'help':
  76. raise self.Error("Did you mean '{0.name} --help'?".format(self))
  77. try:
  78. meta = self.choices[method]
  79. except KeyError:
  80. raise self.UsageError(
  81. 'Unknown {0.name} method {1}'.format(self, method))
  82. self._ensure_fanout_supported()
  83. timeout = timeout or meta.default_timeout
  84. if destination and isinstance(destination, string_t):
  85. destination = [dest.strip() for dest in destination.split(',')]
  86. replies = self.call(
  87. method,
  88. arguments=self.compile_arguments(meta, method, args[1:]),
  89. timeout=timeout,
  90. destination=destination,
  91. callback=None if json else self.say_remote_command_reply,
  92. )
  93. if not replies:
  94. raise self.Error('No nodes replied within time constraint.',
  95. status=EX_UNAVAILABLE)
  96. if json:
  97. self.out(dumps(replies))
  98. return replies
  99. def compile_arguments(self, meta, method, args):
  100. args = list(args)
  101. kw = {}
  102. if meta.args:
  103. kw.update({
  104. k: v for k, v in self._consume_args(meta, method, args)
  105. })
  106. if meta.variadic:
  107. kw.update({meta.variadic: args})
  108. if not kw and args:
  109. raise self.Error(
  110. 'Command {0!r} takes no arguments.'.format(method),
  111. status=EX_USAGE)
  112. return kw or {}
  113. def _consume_args(self, meta, method, args):
  114. i = 0
  115. try:
  116. for i, arg in enumerate(args):
  117. try:
  118. name, typ = meta.args[i]
  119. except IndexError:
  120. if meta.variadic:
  121. break
  122. raise self.Error(
  123. 'Command {0!r} takes arguments: {1}'.format(
  124. method, meta.signature),
  125. status=EX_USAGE)
  126. else:
  127. yield name, typ(arg) if typ is not None else arg
  128. finally:
  129. args[:] = args[i:]
  130. @classmethod
  131. def _choices_by_group(cls, app):
  132. from celery.worker.control import Panel
  133. # need to import task modules for custom user-remote control commands.
  134. app.loader.import_default_modules()
  135. return {
  136. name: info for name, info in items(Panel.meta)
  137. if info.type == cls.control_group and info.visible
  138. }
  139. @cached_property
  140. def choices(self):
  141. return self._choices_by_group(self.app)
  142. @property
  143. def epilog(self):
  144. return '\n'.join([
  145. '[Commands]',
  146. self.list_commands(indent=4, help=True, app=self.app)
  147. ])
  148. class inspect(_RemoteControl):
  149. """Inspect the worker at runtime.
  150. Availability: RabbitMQ (AMQP) and Redis transports.
  151. Examples:
  152. .. code-block:: console
  153. $ celery inspect active --timeout=5
  154. $ celery inspect scheduled -d worker1@example.com
  155. $ celery inspect revoked -d w1@e.com,w2@e.com
  156. """
  157. name = 'inspect'
  158. control_group = 'inspect'
  159. def call(self, method, arguments, **options):
  160. return self.app.control.inspect(**options)._request(
  161. method, **arguments)
  162. class control(_RemoteControl):
  163. """Workers remote control.
  164. Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
  165. Examples:
  166. .. code-block:: console
  167. $ celery control enable_events --timeout=5
  168. $ celery control -d worker1@example.com enable_events
  169. $ celery control -d w1.e.com,w2.e.com enable_events
  170. $ celery control -d w1.e.com add_consumer queue_name
  171. $ celery control -d w1.e.com cancel_consumer queue_name
  172. $ celery control add_consumer queue exchange direct rkey
  173. """
  174. name = 'control'
  175. control_group = 'control'
  176. def call(self, method, arguments, **options):
  177. return self.app.control.broadcast(
  178. method, arguments=arguments, reply=True, **options)
  179. class status(Command):
  180. """Show list of workers that are online."""
  181. option_list = inspect.option_list
  182. def run(self, *args, **kwargs):
  183. I = inspect(
  184. app=self.app,
  185. no_color=kwargs.get('no_color', False),
  186. stdout=self.stdout, stderr=self.stderr,
  187. show_reply=False, show_body=False, quiet=True,
  188. )
  189. replies = I.run('ping', **kwargs)
  190. if not replies:
  191. raise self.Error('No nodes replied within time constraint',
  192. status=EX_UNAVAILABLE)
  193. nodecount = len(replies)
  194. if not kwargs.get('quiet', False):
  195. self.out('\n{0} {1} online.'.format(
  196. nodecount, text.pluralize(nodecount, 'node')))