celery.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery` umbrella command.
  4. .. program:: celery
  5. """
  6. from __future__ import absolute_import, unicode_literals
  7. import anyjson
  8. import os
  9. import sys
  10. from importlib import import_module
  11. from celery.five import string_t, values
  12. from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
  13. from celery.utils import term
  14. from celery.utils import text
  15. from celery.utils.timeutils import maybe_iso8601
  16. # Cannot use relative imports here due to a Windows issue (#1111).
  17. from celery.bin.base import Command, Error, Option, Extensions
  18. # Import commands from other modules
  19. from celery.bin.amqp import amqp
  20. from celery.bin.beat import beat
  21. from celery.bin.events import events
  22. from celery.bin.graph import graph
  23. from celery.bin.worker import worker
  24. HELP = """
  25. ---- -- - - ---- Commands- -------------- --- ------------
  26. {commands}
  27. ---- -- - - --------- -- - -------------- --- ------------
  28. Type '{prog_name} <command> --help' for help using a specific command.
  29. """
  30. MIGRATE_PROGRESS_FMT = """\
  31. Migrating task {state.count}/{state.strtotal}: \
  32. {body[task]}[{body[id]}]\
  33. """
  34. DEBUG = os.environ.get('C_DEBUG', False)
  35. command_classes = [
  36. ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
  37. ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
  38. ('Utils', ['purge', 'list', 'migrate', 'call', 'result', 'report'], None),
  39. ]
  40. if DEBUG:
  41. command_classes.append(
  42. ('Debug', ['graph'], 'red'),
  43. )
  44. def determine_exit_status(ret):
  45. if isinstance(ret, int):
  46. return ret
  47. return EX_OK if ret else EX_FAILURE
  48. def main(argv=None):
  49. # Fix for setuptools generated scripts, so that it will
  50. # work with multiprocessing fork emulation.
  51. # (see multiprocessing.forking.get_preparation_data())
  52. try:
  53. if __name__ != '__main__': # pragma: no cover
  54. sys.modules['__main__'] = sys.modules[__name__]
  55. cmd = CeleryCommand()
  56. cmd.maybe_patch_concurrency()
  57. from billiard import freeze_support
  58. freeze_support()
  59. cmd.execute_from_commandline(argv)
  60. except KeyboardInterrupt:
  61. pass
  62. class multi(Command):
  63. """Start multiple worker instances."""
  64. respects_app_option = False
  65. def get_options(self):
  66. return ()
  67. def run_from_argv(self, prog_name, argv):
  68. from celery.bin.multi import MultiTool
  69. return MultiTool().execute_from_commandline(argv, prog_name)
  70. class list_(Command):
  71. """Get info from broker.
  72. Examples::
  73. celery list bindings
  74. NOTE: For RabbitMQ the management plugin is required.
  75. """
  76. args = '[bindings]'
  77. def list_bindings(self, management):
  78. try:
  79. bindings = management.get_bindings()
  80. except NotImplementedError:
  81. raise Error('Your transport cannot list bindings.')
  82. fmt = lambda q, e, r: self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
  83. fmt('Queue', 'Exchange', 'Routing Key')
  84. fmt('-' * 16, '-' * 16, '-' * 16)
  85. for b in bindings:
  86. fmt(b['destination'], b['source'], b['routing_key'])
  87. def run(self, what=None, *_, **kw):
  88. topics = {'bindings': self.list_bindings}
  89. available = ', '.join(topics)
  90. if not what:
  91. raise Error('You must specify one of {0}'.format(available))
  92. if what not in topics:
  93. raise Error('unknown topic {0!r} (choose one of: {1})'.format(
  94. what, available))
  95. with self.app.connection() as conn:
  96. self.app.amqp.TaskConsumer(conn).declare()
  97. topics[what](conn.manager)
  98. class call(Command):
  99. """Call a task by name.
  100. Examples::
  101. celery call tasks.add --args='[2, 2]'
  102. celery call tasks.add --args='[2, 2]' --countdown=10
  103. """
  104. args = '<task_name>'
  105. option_list = Command.option_list + (
  106. Option('--args', '-a', help='positional arguments (json).'),
  107. Option('--kwargs', '-k', help='keyword arguments (json).'),
  108. Option('--eta', help='scheduled time (ISO-8601).'),
  109. Option('--countdown', type='float',
  110. help='eta in seconds from now (float/int).'),
  111. Option('--expires', help='expiry time (ISO-8601/float/int).'),
  112. Option('--serializer', default='json', help='defaults to json.'),
  113. Option('--queue', help='custom queue name.'),
  114. Option('--exchange', help='custom exchange name.'),
  115. Option('--routing-key', help='custom routing key.'),
  116. )
  117. def run(self, name, *_, **kw):
  118. # Positional args.
  119. args = kw.get('args') or ()
  120. if isinstance(args, string_t):
  121. args = anyjson.loads(args)
  122. # Keyword args.
  123. kwargs = kw.get('kwargs') or {}
  124. if isinstance(kwargs, string_t):
  125. kwargs = anyjson.loads(kwargs)
  126. # Expires can be int/float.
  127. expires = kw.get('expires') or None
  128. try:
  129. expires = float(expires)
  130. except (TypeError, ValueError):
  131. # or a string describing an ISO 8601 datetime.
  132. try:
  133. expires = maybe_iso8601(expires)
  134. except (TypeError, ValueError):
  135. raise
  136. res = self.app.send_task(name, args=args, kwargs=kwargs,
  137. countdown=kw.get('countdown'),
  138. serializer=kw.get('serializer'),
  139. queue=kw.get('queue'),
  140. exchange=kw.get('exchange'),
  141. routing_key=kw.get('routing_key'),
  142. eta=maybe_iso8601(kw.get('eta')),
  143. expires=expires)
  144. self.out(res.id)
  145. class purge(Command):
  146. """Erase all messages from all known task queues.
  147. WARNING: There is no undo operation for this command.
  148. """
  149. fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
  150. fmt_empty = 'No messages purged from {qnum} {queues}'
  151. def run(self, *args, **kwargs):
  152. queues = len(self.app.amqp.queues)
  153. messages = self.app.control.purge()
  154. fmt = self.fmt_purged if messages else self.fmt_empty
  155. self.out(fmt.format(
  156. mnum=messages, qnum=queues,
  157. messages=text.pluralize(messages, 'message'),
  158. queues=text.pluralize(queues, 'queue')))
  159. class result(Command):
  160. """Gives the return value for a given task id.
  161. Examples::
  162. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
  163. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
  164. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
  165. """
  166. args = '<task_id>'
  167. option_list = Command.option_list + (
  168. Option('--task', '-t', help='name of task (if custom backend)'),
  169. Option('--traceback', action='store_true',
  170. help='show traceback instead'),
  171. )
  172. def run(self, task_id, *args, **kwargs):
  173. result_cls = self.app.AsyncResult
  174. task = kwargs.get('task')
  175. traceback = kwargs.get('traceback', False)
  176. if task:
  177. result_cls = self.app.tasks[task].AsyncResult
  178. result = result_cls(task_id)
  179. if traceback:
  180. value = result.traceback
  181. else:
  182. value = result.get()
  183. self.out(self.pretty(value)[1])
  184. class _RemoteControl(Command):
  185. name = None
  186. choices = None
  187. leaf = False
  188. option_list = Command.option_list + (
  189. Option('--timeout', '-t', type='float',
  190. help='Timeout in seconds (float) waiting for reply'),
  191. Option('--destination', '-d',
  192. help='Comma separated list of destination node names.'))
  193. def __init__(self, *args, **kwargs):
  194. self.show_body = kwargs.pop('show_body', True)
  195. self.show_reply = kwargs.pop('show_reply', True)
  196. super(_RemoteControl, self).__init__(*args, **kwargs)
  197. @classmethod
  198. def get_command_info(self, command, indent=0, prefix='', color=None,
  199. help=False):
  200. if help:
  201. help = '|' + text.indent(self.choices[command][1], indent + 4)
  202. else:
  203. help = None
  204. try:
  205. # see if it uses args.
  206. meth = getattr(self, command)
  207. return text.join([
  208. '|' + text.indent('{0}{1} {2}'.format(prefix, color(command),
  209. meth.__doc__), indent), help,
  210. ])
  211. except AttributeError:
  212. return text.join([
  213. '|' + text.indent(prefix + str(color(command)), indent), help,
  214. ])
  215. @classmethod
  216. def list_commands(self, indent=0, prefix='', color=None, help=False):
  217. color = color if color else lambda x: x
  218. prefix = prefix + ' ' if prefix else ''
  219. return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
  220. for c in sorted(self.choices))
  221. @property
  222. def epilog(self):
  223. return '\n'.join([
  224. '[Commands]',
  225. self.list_commands(indent=4, help=True)
  226. ])
  227. def usage(self, command):
  228. return '%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
  229. command, self.args)
  230. def call(self, *args, **kwargs):
  231. raise NotImplementedError('get_obj')
  232. def run(self, *args, **kwargs):
  233. if not args:
  234. raise Error('Missing {0.name} method. See --help'.format(self))
  235. return self.do_call_method(args, **kwargs)
  236. def do_call_method(self, args, **kwargs):
  237. method = args[0]
  238. if method == 'help':
  239. raise Error("Did you mean '{0.name} --help'?".format(self))
  240. if method not in self.choices:
  241. raise Error('Unknown {0.name} method {1}'.format(self, method))
  242. destination = kwargs.get('destination')
  243. timeout = kwargs.get('timeout') or self.choices[method][0]
  244. if destination and isinstance(destination, string_t):
  245. destination = [dest.strip() for dest in destination.split(',')]
  246. try:
  247. handler = getattr(self, method)
  248. except AttributeError:
  249. handler = self.call
  250. replies = handler(method, *args[1:], timeout=timeout,
  251. destination=destination,
  252. callback=self.say_remote_command_reply)
  253. if not replies:
  254. raise Error('No nodes replied within time constraint.',
  255. status=EX_UNAVAILABLE)
  256. return replies
  257. def say(self, direction, title, body=''):
  258. c = self.colored
  259. if direction == '<-' and self.quiet:
  260. return
  261. dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
  262. self.out(c.reset(dirstr, title))
  263. if body and self.show_body:
  264. self.out(body)
  265. class inspect(_RemoteControl):
  266. """Inspect the worker at runtime.
  267. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  268. Examples::
  269. celery inspect active --timeout=5
  270. celery inspect scheduled -d worker1.example.com
  271. celery inspect revoked -d w1.e.com,w2.e.com
  272. """
  273. name = 'inspect'
  274. choices = {
  275. 'active': (1.0, 'dump active tasks (being processed)'),
  276. 'active_queues': (1.0, 'dump queues being consumed from'),
  277. 'scheduled': (1.0, 'dump scheduled tasks (eta/countdown/retry)'),
  278. 'reserved': (1.0, 'dump reserved tasks (waiting to be processed)'),
  279. 'stats': (1.0, 'dump worker statistics'),
  280. 'revoked': (1.0, 'dump of revoked task ids'),
  281. 'registered': (1.0, 'dump of registered tasks'),
  282. 'ping': (0.2, 'ping worker(s)'),
  283. 'clock': (1.0, 'get value of logical clock'),
  284. 'conf': (1.0, 'dump worker configuration'),
  285. 'report': (1.0, 'get bugreport info')
  286. }
  287. def call(self, method, *args, **options):
  288. i = self.app.control.inspect(**options)
  289. return getattr(i, method)(*args)
  290. class control(_RemoteControl):
  291. """Workers remote control.
  292. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  293. Examples::
  294. celery control enable_events --timeout=5
  295. celery control -d worker1.example.com enable_events
  296. celery control -d w1.e.com,w2.e.com enable_events
  297. celery control -d w1.e.com add_consumer queue_name
  298. celery control -d w1.e.com cancel_consumer queue_name
  299. celery control -d w1.e.com add_consumer queue exchange direct rkey
  300. """
  301. name = 'control'
  302. choices = {
  303. 'enable_events': (1.0, 'tell worker(s) to enable events'),
  304. 'disable_events': (1.0, 'tell worker(s) to disable events'),
  305. 'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
  306. 'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
  307. 'rate_limit': (1.0,
  308. 'tell worker(s) to modify the rate limit for a task type'),
  309. 'time_limit': (1.0,
  310. 'tell worker(s) to modify the time limit for a task type.'),
  311. 'autoscale': (1.0, 'change autoscale settings'),
  312. 'pool_grow': (1.0, 'start more pool processes'),
  313. 'pool_shrink': (1.0, 'use less pool processes'),
  314. }
  315. def call(self, method, *args, **options):
  316. return getattr(self.app.control, method)(*args, retry=True, **options)
  317. def pool_grow(self, method, n=1, **kwargs):
  318. """[N=1]"""
  319. return self.call(method, n, **kwargs)
  320. def pool_shrink(self, method, n=1, **kwargs):
  321. """[N=1]"""
  322. return self.call(method, n, **kwargs)
  323. def autoscale(self, method, max=None, min=None, **kwargs):
  324. """[max] [min]"""
  325. return self.call(method, max, min, **kwargs)
  326. def rate_limit(self, method, task_name, rate_limit, **kwargs):
  327. """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
  328. return self.call(method, task_name, rate_limit, reply=True, **kwargs)
  329. def time_limit(self, method, task_name, soft, hard=None, **kwargs):
  330. """<task_name> <soft_secs> [hard_secs]"""
  331. return self.call(method, task_name, soft, hard, reply=True, **kwargs)
  332. def add_consumer(self, method, queue, exchange=None,
  333. exchange_type='direct', routing_key=None, **kwargs):
  334. """<queue> [exchange [type [routing_key]]]"""
  335. return self.call(method, queue, exchange,
  336. exchange_type, routing_key, reply=True, **kwargs)
  337. def cancel_consumer(self, method, queue, **kwargs):
  338. """<queue>"""
  339. return self.call(method, queue, reply=True, **kwargs)
  340. class status(Command):
  341. """Show list of workers that are online."""
  342. option_list = inspect.option_list
  343. def run(self, *args, **kwargs):
  344. I = inspect(app=self.app,
  345. no_color=kwargs.get('no_color', False),
  346. stdout=self.stdout, stderr=self.stderr,
  347. show_reply=False, show_body=False, quiet=True)
  348. replies = I.run('ping', **kwargs)
  349. if not replies:
  350. raise Error('No nodes replied within time constraint',
  351. status=EX_UNAVAILABLE)
  352. nodecount = len(replies)
  353. if not kwargs.get('quiet', False):
  354. self.out('\n{0} {1} online.'.format(
  355. nodecount, text.pluralize(nodecount, 'node')))
  356. class migrate(Command):
  357. """Migrate tasks from one broker to another.
  358. Examples::
  359. celery migrate redis://localhost amqp://guest@localhost//
  360. celery migrate django:// redis://localhost
  361. NOTE: This command is experimental, make sure you have
  362. a backup of the tasks before you continue.
  363. """
  364. args = '<source_url> <dest_url>'
  365. option_list = Command.option_list + (
  366. Option('--limit', '-n', type='int',
  367. help='Number of tasks to consume (int)'),
  368. Option('--timeout', '-t', type='float', default=1.0,
  369. help='Timeout in seconds (float) waiting for tasks'),
  370. Option('--ack-messages', '-a', action='store_true',
  371. help='Ack messages from source broker.'),
  372. Option('--tasks', '-T',
  373. help='List of task names to filter on.'),
  374. Option('--queues', '-Q',
  375. help='List of queues to migrate.'),
  376. Option('--forever', '-F', action='store_true',
  377. help='Continually migrate tasks until killed.'),
  378. )
  379. progress_fmt = MIGRATE_PROGRESS_FMT
  380. def on_migrate_task(self, state, body, message):
  381. self.out(self.progress_fmt.format(state=state, body=body))
  382. def run(self, *args, **kwargs):
  383. if len(args) != 2:
  384. # this never exits due to OptionParser.parse_options
  385. self.run_from_argv(self.prog_name, ['migrate', '--help'])
  386. raise SystemExit()
  387. from kombu import Connection
  388. from celery.contrib.migrate import migrate_tasks
  389. migrate_tasks(Connection(args[0]),
  390. Connection(args[1]),
  391. callback=self.on_migrate_task,
  392. **kwargs)
  393. class shell(Command): # pragma: no cover
  394. """Start shell session with convenient access to celery symbols.
  395. The following symbols will be added to the main globals:
  396. - celery: the current application.
  397. - chord, group, chain, chunks,
  398. xmap, xstarmap subtask, Task
  399. - all registered tasks.
  400. Example Session:
  401. .. code-block:: bash
  402. $ celery shell
  403. >>> celery
  404. <Celery default:0x1012d9fd0>
  405. >>> add
  406. <@task: tasks.add>
  407. >>> add.delay(2, 2)
  408. <AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
  409. """
  410. option_list = Command.option_list + (
  411. Option('--ipython', '-I',
  412. action='store_true', dest='force_ipython',
  413. help='force iPython.'),
  414. Option('--bpython', '-B',
  415. action='store_true', dest='force_bpython',
  416. help='force bpython.'),
  417. Option('--python', '-P',
  418. action='store_true', dest='force_python',
  419. help='force default Python shell.'),
  420. Option('--without-tasks', '-T', action='store_true',
  421. help="don't add tasks to locals."),
  422. Option('--eventlet', action='store_true',
  423. help='use eventlet.'),
  424. Option('--gevent', action='store_true', help='use gevent.'),
  425. )
  426. def run(self, force_ipython=False, force_bpython=False,
  427. force_python=False, without_tasks=False, eventlet=False,
  428. gevent=False, **kwargs):
  429. sys.path.insert(0, os.getcwd())
  430. if eventlet:
  431. import_module('celery.concurrency.eventlet')
  432. if gevent:
  433. import_module('celery.concurrency.gevent')
  434. import celery
  435. import celery.task.base
  436. self.app.loader.import_default_modules()
  437. self.locals = {'celery': self.app,
  438. 'Task': celery.Task,
  439. 'chord': celery.chord,
  440. 'group': celery.group,
  441. 'chain': celery.chain,
  442. 'chunks': celery.chunks,
  443. 'xmap': celery.xmap,
  444. 'xstarmap': celery.xstarmap,
  445. 'subtask': celery.subtask}
  446. if not without_tasks:
  447. self.locals.update(dict((task.__name__, task)
  448. for task in values(self.app.tasks)
  449. if not task.name.startswith('celery.')))
  450. if force_python:
  451. return self.invoke_fallback_shell()
  452. elif force_bpython:
  453. return self.invoke_bpython_shell()
  454. elif force_ipython:
  455. return self.invoke_ipython_shell()
  456. return self.invoke_default_shell()
  457. def invoke_default_shell(self):
  458. try:
  459. import IPython # noqa
  460. except ImportError:
  461. try:
  462. import bpython # noqa
  463. except ImportError:
  464. return self.invoke_fallback_shell()
  465. else:
  466. return self.invoke_bpython_shell()
  467. else:
  468. return self.invoke_ipython_shell()
  469. def invoke_fallback_shell(self):
  470. import code
  471. try:
  472. import readline
  473. except ImportError:
  474. pass
  475. else:
  476. import rlcompleter
  477. readline.set_completer(
  478. rlcompleter.Completer(self.locals).complete)
  479. readline.parse_and_bind('tab:complete')
  480. code.interact(local=self.locals)
  481. def invoke_ipython_shell(self):
  482. try:
  483. from IPython.frontend.terminal import embed
  484. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  485. except ImportError: # ipython < 0.11
  486. from IPython.Shell import IPShell
  487. IPShell(argv=[], user_ns=self.locals).mainloop()
  488. def invoke_bpython_shell(self):
  489. import bpython
  490. bpython.embed(self.locals)
  491. class help(Command):
  492. """Show help screen and exit."""
  493. def usage(self, command):
  494. return '%prog <command> [options] {0.args}'.format(self)
  495. def run(self, *args, **kwargs):
  496. self.parser.print_help()
  497. self.out(HELP.format(prog_name=self.prog_name,
  498. commands=CeleryCommand.list_commands()))
  499. return EX_USAGE
  500. class report(Command):
  501. """Shows information useful to include in bugreports."""
  502. def run(self, *args, **kwargs):
  503. self.out(self.app.bugreport())
  504. return EX_OK
  505. class CeleryCommand(Command):
  506. namespace = 'celery'
  507. ext_fmt = '{self.namespace}.commands'
  508. commands = {
  509. 'amqp': amqp,
  510. 'beat': beat,
  511. 'call': call,
  512. 'control': control,
  513. 'events': events,
  514. 'graph': graph,
  515. 'help': help,
  516. 'inspect': inspect,
  517. 'list': list_,
  518. 'migrate': migrate,
  519. 'multi': multi,
  520. 'purge': purge,
  521. 'report': report,
  522. 'result': result,
  523. 'shell': shell,
  524. 'status': status,
  525. 'worker': worker,
  526. }
  527. enable_config_from_cmdline = True
  528. prog_name = 'celery'
  529. @classmethod
  530. def register_command(cls, fun, name=None):
  531. cls.commands[name or fun.__name__] = fun
  532. return fun
  533. def execute(self, command, argv=None):
  534. try:
  535. cls = self.commands[command]
  536. except KeyError:
  537. cls, argv = self.commands['help'], ['help']
  538. cls = self.commands.get(command) or self.commands['help']
  539. try:
  540. return cls(app=self.app).run_from_argv(self.prog_name,
  541. argv[1:], command=argv[0])
  542. except Error:
  543. return self.execute('help', argv)
  544. def remove_options_at_beginning(self, argv, index=0):
  545. if argv:
  546. while index < len(argv):
  547. value = argv[index]
  548. if value.startswith('--'):
  549. pass
  550. elif value.startswith('-'):
  551. index += 1
  552. else:
  553. return argv[index:]
  554. index += 1
  555. return []
  556. def handle_argv(self, prog_name, argv):
  557. self.prog_name = prog_name
  558. argv = self.remove_options_at_beginning(argv)
  559. _, argv = self.prepare_args(None, argv)
  560. try:
  561. command = argv[0]
  562. except IndexError:
  563. command, argv = 'help', ['help']
  564. return self.execute(command, argv)
  565. def execute_from_commandline(self, argv=None):
  566. argv = sys.argv if argv is None else argv
  567. if 'multi' in argv[1:3]: # Issue 1008
  568. self.respects_app_option = False
  569. try:
  570. sys.exit(determine_exit_status(
  571. super(CeleryCommand, self).execute_from_commandline(argv)))
  572. except KeyboardInterrupt:
  573. sys.exit(EX_FAILURE)
  574. @classmethod
  575. def get_command_info(self, command, indent=0, color=None):
  576. colored = term.colored().names[color] if color else lambda x: x
  577. obj = self.commands[command]
  578. cmd = 'celery {0}'.format(colored(command))
  579. if obj.leaf:
  580. return '|' + text.indent(cmd, indent)
  581. return text.join([
  582. ' ',
  583. '|' + text.indent('{0} --help'.format(cmd), indent),
  584. obj.list_commands(indent, 'celery {0}'.format(command), colored),
  585. ])
  586. @classmethod
  587. def list_commands(self, indent=0):
  588. white = term.colored().white
  589. ret = []
  590. for cls, commands, color in command_classes:
  591. ret.extend([
  592. text.indent('+ {0}: '.format(white(cls)), indent),
  593. '\n'.join(self.get_command_info(command, indent + 4, color)
  594. for command in commands),
  595. ''
  596. ])
  597. return '\n'.join(ret).strip()
  598. def with_pool_option(self, argv):
  599. if len(argv) > 1 and argv[1] == 'worker':
  600. # this command supports custom pools
  601. # that may have to be loaded as early as possible.
  602. return (['-P'], ['--pool'])
  603. def on_concurrency_setup(self):
  604. self.load_extension_commands()
  605. def load_extension_commands(self):
  606. names = Extensions(self.ext_fmt.format(self=self),
  607. self.register_command).load()
  608. if names:
  609. command_classes.append(('Extensions', names, 'magenta'))
  610. def command(*args, **kwargs):
  611. """Deprecated: Use classmethod :meth:`CeleryCommand.register_command`
  612. instead."""
  613. _register = CeleryCommand.register_command
  614. return _register(args[0]) if args else _register
  615. if __name__ == '__main__': # pragma: no cover
  616. main()