celery.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. # -*- coding: utf-8 -*-
  2. """The :program:`celery` umbrella command.
  3. .. program:: celery
  4. .. _preload-options:
  5. Preload Options
  6. ---------------
  7. These options are supported by all commands,
  8. and usually parsed before command-specific arguments.
  9. .. cmdoption:: -A, --app
  10. app instance to use (e.g., ``module.attr_name``)
  11. .. cmdoption:: -b, --broker
  12. URL to broker. default is ``amqp://guest@localhost//``
  13. .. cmdoption:: --loader
  14. name of custom loader class to use.
  15. .. cmdoption:: --config
  16. Name of the configuration module
  17. .. cmdoption:: -C, --no-color
  18. Disable colors in output.
  19. .. cmdoption:: -q, --quiet
  20. Give less verbose output (behavior depends on the sub command).
  21. .. cmdoption:: --help
  22. Show help and exit.
  23. .. _daemon-options:
  24. Daemon Options
  25. --------------
  26. These options are supported by commands that can detach
  27. into the background (daemon). They will be present
  28. in any command that also has a `--detach` option.
  29. .. cmdoption:: -f, --logfile
  30. Path to log file. If no logfile is specified, `stderr` is used.
  31. .. cmdoption:: --pidfile
  32. Optional file used to store the process pid.
  33. The program won't start if this file already exists
  34. and the pid is still alive.
  35. .. cmdoption:: --uid
  36. User id, or user name of the user to run as after detaching.
  37. .. cmdoption:: --gid
  38. Group id, or group name of the main group to change to after
  39. detaching.
  40. .. cmdoption:: --umask
  41. Effective umask (in octal) of the process after detaching. Inherits
  42. the umask of the parent process by default.
  43. .. cmdoption:: --workdir
  44. Optional directory to change to after detaching.
  45. .. cmdoption:: --executable
  46. Executable to use for the detached process.
  47. ``celery inspect``
  48. ------------------
  49. .. program:: celery inspect
  50. .. cmdoption:: -t, --timeout
  51. Timeout in seconds (float) waiting for reply
  52. .. cmdoption:: -d, --destination
  53. Comma separated list of destination node names.
  54. .. cmdoption:: -j, --json
  55. Use json as output format.
  56. ``celery control``
  57. ------------------
  58. .. program:: celery control
  59. .. cmdoption:: -t, --timeout
  60. Timeout in seconds (float) waiting for reply
  61. .. cmdoption:: -d, --destination
  62. Comma separated list of destination node names.
  63. .. cmdoption:: -j, --json
  64. Use json as output format.
  65. ``celery migrate``
  66. ------------------
  67. .. program:: celery migrate
  68. .. cmdoption:: -n, --limit
  69. Number of tasks to consume (int).
  70. .. cmdoption:: -t, -timeout
  71. Timeout in seconds (float) waiting for tasks.
  72. .. cmdoption:: -a, --ack-messages
  73. Ack messages from source broker.
  74. .. cmdoption:: -T, --tasks
  75. List of task names to filter on.
  76. .. cmdoption:: -Q, --queues
  77. List of queues to migrate.
  78. .. cmdoption:: -F, --forever
  79. Continually migrate tasks until killed.
  80. ``celery upgrade``
  81. ------------------
  82. .. program:: celery upgrade
  83. .. cmdoption:: --django
  84. Upgrade a Django project.
  85. .. cmdoption:: --compat
  86. Maintain backwards compatibility.
  87. .. cmdoption:: --no-backup
  88. Don't backup original files.
  89. ``celery shell``
  90. ----------------
  91. .. program:: celery shell
  92. .. cmdoption:: -I, --ipython
  93. Force :pypi:`iPython` implementation.
  94. .. cmdoption:: -B, --bpython
  95. Force :pypi:`bpython` implementation.
  96. .. cmdoption:: -P, --python
  97. Force default Python shell.
  98. .. cmdoption:: -T, --without-tasks
  99. Don't add tasks to locals.
  100. .. cmdoption:: --eventlet
  101. Use :pypi:`eventlet` monkey patches.
  102. .. cmdoption:: --gevent
  103. Use :pypi:`gevent` monkey patches.
  104. ``celery result``
  105. -----------------
  106. .. program:: celery result
  107. .. cmdoption:: -t, --task
  108. Name of task (if custom backend).
  109. .. cmdoption:: --traceback
  110. Show traceback if any.
  111. ``celery purge``
  112. ----------------
  113. .. program:: celery purge
  114. .. cmdoption:: -f, --force
  115. Don't prompt for verification before deleting messages (DANGEROUS)
  116. ``celery call``
  117. ---------------
  118. .. program:: celery call
  119. .. cmdoption:: -a, --args
  120. Positional arguments (json format).
  121. .. cmdoption:: -k, --kwargs
  122. Keyword arguments (json format).
  123. .. cmdoption:: --eta
  124. Scheduled time in ISO-8601 format.
  125. .. cmdoption:: --countdown
  126. ETA in seconds from now (float/int).
  127. .. cmdoption:: --expires
  128. Expiry time in float/int seconds, or a ISO-8601 date.
  129. .. cmdoption:: --serializer
  130. Specify serializer to use (default is json).
  131. .. cmdoption:: --queue
  132. Destination queue.
  133. .. cmdoption:: --exchange
  134. Destination exchange (defaults to the queue exchange).
  135. .. cmdoption:: --routing-key
  136. Destination routing key (defaults to the queue routing key).
  137. """
  138. from __future__ import absolute_import, print_function, unicode_literals
  139. import numbers
  140. import sys
  141. from functools import partial
  142. # Import commands from other modules
  143. from celery.bin.amqp import amqp
  144. # Cannot use relative imports here due to a Windows issue (#1111).
  145. from celery.bin.base import Command, Extensions
  146. from celery.bin.beat import beat
  147. from celery.bin.call import call
  148. from celery.bin.control import _RemoteControl # noqa
  149. from celery.bin.control import control, inspect, status
  150. from celery.bin.events import events
  151. from celery.bin.graph import graph
  152. from celery.bin.list import list_
  153. from celery.bin.logtool import logtool
  154. from celery.bin.migrate import migrate
  155. from celery.bin.purge import purge
  156. from celery.bin.result import result
  157. from celery.bin.shell import shell
  158. from celery.bin.upgrade import upgrade
  159. from celery.bin.worker import worker
  160. from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE
  161. from celery.utils import term, text
  162. __all__ = ('CeleryCommand', 'main')
  163. HELP = """
  164. ---- -- - - ---- Commands- -------------- --- ------------
  165. {commands}
  166. ---- -- - - --------- -- - -------------- --- ------------
  167. Type '{prog_name} <command> --help' for help using a specific command.
  168. """
  169. command_classes = [
  170. ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
  171. ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
  172. ('Utils',
  173. ['purge', 'list', 'call', 'result', 'migrate', 'graph', 'upgrade'],
  174. None),
  175. ('Debugging', ['report', 'logtool'], 'red'),
  176. ]
  177. def determine_exit_status(ret):
  178. if isinstance(ret, numbers.Integral):
  179. return ret
  180. return EX_OK if ret else EX_FAILURE
  181. def main(argv=None):
  182. """Start celery umbrella command."""
  183. # Fix for setuptools generated scripts, so that it will
  184. # work with multiprocessing fork emulation.
  185. # (see multiprocessing.forking.get_preparation_data())
  186. try:
  187. if __name__ != '__main__': # pragma: no cover
  188. sys.modules['__main__'] = sys.modules[__name__]
  189. cmd = CeleryCommand()
  190. cmd.maybe_patch_concurrency()
  191. from billiard import freeze_support
  192. freeze_support()
  193. cmd.execute_from_commandline(argv)
  194. except KeyboardInterrupt:
  195. pass
  196. class multi(Command):
  197. """Start multiple worker instances."""
  198. respects_app_option = False
  199. def run_from_argv(self, prog_name, argv, command=None):
  200. from celery.bin.multi import MultiTool
  201. cmd = MultiTool(quiet=self.quiet, no_color=self.no_color)
  202. return cmd.execute_from_commandline([command] + argv)
  203. class help(Command):
  204. """Show help screen and exit."""
  205. def usage(self, command):
  206. return '%(prog)s <command> [options] {0.args}'.format(self)
  207. def run(self, *args, **kwargs):
  208. self.parser.print_help()
  209. self.out(HELP.format(
  210. prog_name=self.prog_name,
  211. commands=CeleryCommand.list_commands(
  212. colored=self.colored, app=self.app),
  213. ))
  214. return EX_USAGE
  215. class report(Command):
  216. """Shows information useful to include in bug-reports."""
  217. def __init__(self, *args, **kwargs):
  218. """Custom initialization for report command.
  219. We need this custom initialization to make sure that
  220. everything is loaded when running a report.
  221. There has been some issues when printing Django's
  222. settings because Django is not properly setup when
  223. running the report.
  224. """
  225. super(report, self).__init__(*args, **kwargs)
  226. self.app.loader.import_default_modules()
  227. def run(self, *args, **kwargs):
  228. self.out(self.app.bugreport())
  229. return EX_OK
  230. class CeleryCommand(Command):
  231. """Base class for commands."""
  232. commands = {
  233. 'amqp': amqp,
  234. 'beat': beat,
  235. 'call': call,
  236. 'control': control,
  237. 'events': events,
  238. 'graph': graph,
  239. 'help': help,
  240. 'inspect': inspect,
  241. 'list': list_,
  242. 'logtool': logtool,
  243. 'migrate': migrate,
  244. 'multi': multi,
  245. 'purge': purge,
  246. 'report': report,
  247. 'result': result,
  248. 'shell': shell,
  249. 'status': status,
  250. 'upgrade': upgrade,
  251. 'worker': worker,
  252. }
  253. ext_fmt = '{self.namespace}.commands'
  254. enable_config_from_cmdline = True
  255. prog_name = 'celery'
  256. namespace = 'celery'
  257. @classmethod
  258. def register_command(cls, fun, name=None):
  259. cls.commands[name or fun.__name__] = fun
  260. return fun
  261. def execute(self, command, argv=None):
  262. try:
  263. cls = self.commands[command]
  264. except KeyError:
  265. cls, argv = self.commands['help'], ['help']
  266. cls = self.commands.get(command) or self.commands['help']
  267. try:
  268. return cls(
  269. app=self.app, on_error=self.on_error,
  270. no_color=self.no_color, quiet=self.quiet,
  271. on_usage_error=partial(self.on_usage_error, command=command),
  272. ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  273. except self.UsageError as exc:
  274. self.on_usage_error(exc)
  275. return exc.status
  276. except self.Error as exc:
  277. self.on_error(exc)
  278. return exc.status
  279. def on_usage_error(self, exc, command=None):
  280. if command:
  281. helps = '{self.prog_name} {command} --help'
  282. else:
  283. helps = '{self.prog_name} --help'
  284. self.error(self.colored.magenta('Error: {0}'.format(exc)))
  285. self.error("""Please try '{0}'""".format(helps.format(
  286. self=self, command=command,
  287. )))
  288. def _relocate_args_from_start(self, argv, index=0):
  289. if argv:
  290. rest = []
  291. while index < len(argv):
  292. value = argv[index]
  293. if value.startswith('--'):
  294. rest.append(value)
  295. elif value.startswith('-'):
  296. # we eat the next argument even though we don't know
  297. # if this option takes an argument or not.
  298. # instead we'll assume what's the command name in the
  299. # return statements below.
  300. try:
  301. nxt = argv[index + 1]
  302. if nxt.startswith('-'):
  303. # is another option
  304. rest.append(value)
  305. else:
  306. # is (maybe) a value for this option
  307. rest.extend([value, nxt])
  308. index += 1
  309. except IndexError: # pragma: no cover
  310. rest.append(value)
  311. break
  312. else:
  313. break
  314. index += 1
  315. if argv[index:]: # pragma: no cover
  316. # if there are more arguments left then divide and swap
  317. # we assume the first argument in argv[i:] is the command
  318. # name.
  319. return argv[index:] + rest
  320. # if there are no more arguments then the last arg in rest'
  321. # must be the command.
  322. [rest.pop()] + rest
  323. return []
  324. def prepare_prog_name(self, name):
  325. if name == '__main__.py':
  326. return sys.modules['__main__'].__file__
  327. return name
  328. def handle_argv(self, prog_name, argv, **kwargs):
  329. self.prog_name = self.prepare_prog_name(prog_name)
  330. argv = self._relocate_args_from_start(argv)
  331. _, argv = self.prepare_args(None, argv)
  332. try:
  333. command = argv[0]
  334. except IndexError:
  335. command, argv = 'help', ['help']
  336. return self.execute(command, argv)
  337. def execute_from_commandline(self, argv=None):
  338. argv = sys.argv if argv is None else argv
  339. if 'multi' in argv[1:3]: # Issue 1008
  340. self.respects_app_option = False
  341. try:
  342. sys.exit(determine_exit_status(
  343. super(CeleryCommand, self).execute_from_commandline(argv)))
  344. except KeyboardInterrupt:
  345. sys.exit(EX_FAILURE)
  346. @classmethod
  347. def get_command_info(cls, command, indent=0,
  348. color=None, colored=None, app=None):
  349. colored = term.colored() if colored is None else colored
  350. colored = colored.names[color] if color else lambda x: x
  351. obj = cls.commands[command]
  352. cmd = 'celery {0}'.format(colored(command))
  353. if obj.leaf:
  354. return '|' + text.indent(cmd, indent)
  355. return text.join([
  356. ' ',
  357. '|' + text.indent('{0} --help'.format(cmd), indent),
  358. obj.list_commands(indent, 'celery {0}'.format(command), colored,
  359. app=app),
  360. ])
  361. @classmethod
  362. def list_commands(cls, indent=0, colored=None, app=None):
  363. colored = term.colored() if colored is None else colored
  364. white = colored.white
  365. ret = []
  366. for command_cls, commands, color in command_classes:
  367. ret.extend([
  368. text.indent('+ {0}: '.format(white(command_cls)), indent),
  369. '\n'.join(
  370. cls.get_command_info(
  371. command, indent + 4, color, colored, app=app)
  372. for command in commands),
  373. ''
  374. ])
  375. return '\n'.join(ret).strip()
  376. def with_pool_option(self, argv):
  377. if len(argv) > 1 and 'worker' in argv[0:3]:
  378. # this command supports custom pools
  379. # that may have to be loaded as early as possible.
  380. return (['-P'], ['--pool'])
  381. def on_concurrency_setup(self):
  382. self.load_extension_commands()
  383. def load_extension_commands(self):
  384. names = Extensions(self.ext_fmt.format(self=self),
  385. self.register_command).load()
  386. if names:
  387. command_classes.append(('Extensions', names, 'magenta'))
  388. if __name__ == '__main__': # pragma: no cover
  389. main()