celery.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215
  1. # -*- coding: utf-8 -*-
  2. """The :program:`celery` umbrella command.
  3. .. program:: celery
  4. .. _preload-options:
  5. Preload Options
  6. ---------------
  7. These options are supported by all commands,
  8. and usually parsed before command-specific arguments.
  9. .. cmdoption:: -A, --app
  10. app instance to use (e.g., ``module.attr_name``)
  11. .. cmdoption:: -b, --broker
  12. URL to broker. default is ``amqp://guest@localhost//``
  13. .. cmdoption:: --loader
  14. name of custom loader class to use.
  15. .. cmdoption:: --config
  16. Name of the configuration module
  17. .. cmdoption:: -C, --no-color
  18. Disable colors in output.
  19. .. cmdoption:: -q, --quiet
  20. Give less verbose output (behavior depends on the sub command).
  21. .. cmdoption:: --help
  22. Show help and exit.
  23. .. _daemon-options:
  24. Daemon Options
  25. --------------
  26. These options are supported by commands that can detach
  27. into the background (daemon). They will be present
  28. in any command that also has a `--detach` option.
  29. .. cmdoption:: -f, --logfile
  30. Path to log file. If no logfile is specified, `stderr` is used.
  31. .. cmdoption:: --pidfile
  32. Optional file used to store the process pid.
  33. The program won't start if this file already exists
  34. and the pid is still alive.
  35. .. cmdoption:: --uid
  36. User id, or user name of the user to run as after detaching.
  37. .. cmdoption:: --gid
  38. Group id, or group name of the main group to change to after
  39. detaching.
  40. .. cmdoption:: --umask
  41. Effective umask (in octal) of the process after detaching. Inherits
  42. the umask of the parent process by default.
  43. .. cmdoption:: --workdir
  44. Optional directory to change to after detaching.
  45. .. cmdoption:: --executable
  46. Executable to use for the detached process.
  47. ``celery inspect``
  48. ------------------
  49. .. program:: celery inspect
  50. .. cmdoption:: -t, --timeout
  51. Timeout in seconds (float) waiting for reply
  52. .. cmdoption:: -d, --destination
  53. Comma separated list of destination node names.
  54. .. cmdoption:: -j, --json
  55. Use json as output format.
  56. ``celery control``
  57. ------------------
  58. .. program:: celery control
  59. .. cmdoption:: -t, --timeout
  60. Timeout in seconds (float) waiting for reply
  61. .. cmdoption:: -d, --destination
  62. Comma separated list of destination node names.
  63. .. cmdoption:: -j, --json
  64. Use json as output format.
  65. ``celery migrate``
  66. ------------------
  67. .. program:: celery migrate
  68. .. cmdoption:: -n, --limit
  69. Number of tasks to consume (int).
  70. .. cmdoption:: -t, -timeout
  71. Timeout in seconds (float) waiting for tasks.
  72. .. cmdoption:: -a, --ack-messages
  73. Ack messages from source broker.
  74. .. cmdoption:: -T, --tasks
  75. List of task names to filter on.
  76. .. cmdoption:: -Q, --queues
  77. List of queues to migrate.
  78. .. cmdoption:: -F, --forever
  79. Continually migrate tasks until killed.
  80. ``celery upgrade``
  81. ------------------
  82. .. program:: celery upgrade
  83. .. cmdoption:: --django
  84. Upgrade a Django project.
  85. .. cmdoption:: --compat
  86. Maintain backwards compatibility.
  87. .. cmdoption:: --no-backup
  88. Don't backup original files.
  89. ``celery shell``
  90. ----------------
  91. .. program:: celery shell
  92. .. cmdoption:: -I, --ipython
  93. Force :pypi:`iPython` implementation.
  94. .. cmdoption:: -B, --bpython
  95. Force :pypi:`bpython` implementation.
  96. .. cmdoption:: -P, --python
  97. Force default Python shell.
  98. .. cmdoption:: -T, --without-tasks
  99. Don't add tasks to locals.
  100. .. cmdoption:: --eventlet
  101. Use :pypi:`eventlet` monkey patches.
  102. .. cmdoption:: --gevent
  103. Use :pypi:`gevent` monkey patches.
  104. ``celery result``
  105. -----------------
  106. .. program:: celery result
  107. .. cmdoption:: -t, --task
  108. Name of task (if custom backend).
  109. .. cmdoption:: --traceback
  110. Show traceback if any.
  111. ``celery purge``
  112. ----------------
  113. .. program:: celery purge
  114. .. cmdoption:: -f, --force
  115. Don't prompt for verification before deleting messages (DANGEROUS)
  116. ``celery call``
  117. ---------------
  118. .. program:: celery call
  119. .. cmdoption:: -a, --args
  120. Positional arguments (json format).
  121. .. cmdoption:: -k, --kwargs
  122. Keyword arguments (json format).
  123. .. cmdoption:: --eta
  124. Scheduled time in ISO-8601 format.
  125. .. cmdoption:: --countdown
  126. ETA in seconds from now (float/int).
  127. .. cmdoption:: --expires
  128. Expiry time in float/int seconds, or a ISO-8601 date.
  129. .. cmdoption:: --serializer
  130. Specify serializer to use (default is json).
  131. .. cmdoption:: --queue
  132. Destination queue.
  133. .. cmdoption:: --exchange
  134. Destination exchange (defaults to the queue exchange).
  135. .. cmdoption:: --routing-key
  136. Destination routing key (defaults to the queue routing key).
  137. """
  138. from __future__ import absolute_import, unicode_literals, print_function
  139. import codecs
  140. import numbers
  141. import os
  142. import sys
  143. from functools import partial
  144. from importlib import import_module
  145. from kombu.utils.json import dumps, loads
  146. from kombu.utils.objects import cached_property
  147. from celery.app import defaults
  148. from celery.five import items, keys, string_t, values
  149. from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
  150. from celery.utils import term
  151. from celery.utils import text
  152. from celery.utils.functional import pass1
  153. from celery.utils.text import str_to_list
  154. from celery.utils.time import maybe_iso8601
  155. # Cannot use relative imports here due to a Windows issue (#1111).
  156. from celery.bin.base import Command, Option, Extensions
  157. # Import commands from other modules
  158. from celery.bin.amqp import amqp
  159. from celery.bin.beat import beat
  160. from celery.bin.events import events
  161. from celery.bin.graph import graph
  162. from celery.bin.logtool import logtool
  163. from celery.bin.worker import worker
  164. __all__ = ['CeleryCommand', 'main']
  165. HELP = """
  166. ---- -- - - ---- Commands- -------------- --- ------------
  167. {commands}
  168. ---- -- - - --------- -- - -------------- --- ------------
  169. Type '{prog_name} <command> --help' for help using a specific command.
  170. """
  171. MIGRATE_PROGRESS_FMT = """\
  172. Migrating task {state.count}/{state.strtotal}: \
  173. {body[task]}[{body[id]}]\
  174. """
  175. command_classes = [
  176. ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
  177. ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
  178. ('Utils',
  179. ['purge', 'list', 'call', 'result', 'migrate', 'graph', 'upgrade'],
  180. None),
  181. ('Debugging', ['report', 'logtool'], 'red'),
  182. ]
  183. def determine_exit_status(ret):
  184. if isinstance(ret, numbers.Integral):
  185. return ret
  186. return EX_OK if ret else EX_FAILURE
  187. def main(argv=None):
  188. """Start celery umbrella command."""
  189. # Fix for setuptools generated scripts, so that it will
  190. # work with multiprocessing fork emulation.
  191. # (see multiprocessing.forking.get_preparation_data())
  192. try:
  193. if __name__ != '__main__': # pragma: no cover
  194. sys.modules['__main__'] = sys.modules[__name__]
  195. cmd = CeleryCommand()
  196. cmd.maybe_patch_concurrency()
  197. from billiard import freeze_support
  198. freeze_support()
  199. cmd.execute_from_commandline(argv)
  200. except KeyboardInterrupt:
  201. pass
  202. class multi(Command):
  203. """Start multiple worker instances."""
  204. respects_app_option = False
  205. def get_options(self):
  206. pass
  207. def run_from_argv(self, prog_name, argv, command=None):
  208. from celery.bin.multi import MultiTool
  209. multi = MultiTool(quiet=self.quiet, no_color=self.no_color)
  210. return multi.execute_from_commandline([command] + argv)
  211. class list_(Command):
  212. """Get info from broker.
  213. Note:
  214. For RabbitMQ the management plugin is required.
  215. Example:
  216. .. code-block:: console
  217. $ celery list bindings
  218. """
  219. args = '[bindings]'
  220. def list_bindings(self, management):
  221. try:
  222. bindings = management.get_bindings()
  223. except NotImplementedError:
  224. raise self.Error('Your transport cannot list bindings.')
  225. def fmt(q, e, r):
  226. return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
  227. fmt('Queue', 'Exchange', 'Routing Key')
  228. fmt('-' * 16, '-' * 16, '-' * 16)
  229. for b in bindings:
  230. fmt(b['destination'], b['source'], b['routing_key'])
  231. def run(self, what=None, *_, **kw):
  232. topics = {'bindings': self.list_bindings}
  233. available = ', '.join(topics)
  234. if not what:
  235. raise self.UsageError(
  236. 'You must specify one of {0}'.format(available))
  237. if what not in topics:
  238. raise self.UsageError(
  239. 'unknown topic {0!r} (choose one of: {1})'.format(
  240. what, available))
  241. with self.app.connection() as conn:
  242. self.app.amqp.TaskConsumer(conn).declare()
  243. topics[what](conn.manager)
  244. class call(Command):
  245. """Call a task by name.
  246. Examples:
  247. .. code-block:: console
  248. $ celery call tasks.add --args='[2, 2]'
  249. $ celery call tasks.add --args='[2, 2]' --countdown=10
  250. """
  251. args = '<task_name>'
  252. option_list = Command.option_list + (
  253. Option('--args', '-a', help='positional arguments (json).'),
  254. Option('--kwargs', '-k', help='keyword arguments (json).'),
  255. Option('--eta', help='scheduled time (ISO-8601).'),
  256. Option('--countdown', type='float',
  257. help='eta in seconds from now (float/int).'),
  258. Option('--expires', help='expiry time (ISO-8601/float/int).'),
  259. Option('--serializer', default='json', help='defaults to json.'),
  260. Option('--queue', help='custom queue name.'),
  261. Option('--exchange', help='custom exchange name.'),
  262. Option('--routing-key', help='custom routing key.'),
  263. )
  264. def run(self, name, *_, **kwargs):
  265. self._send_task(name, **kwargs)
  266. def _send_task(self, name, args=None, kwargs=None,
  267. countdown=None, serializer=None,
  268. queue=None, exchange=None, routing_key=None,
  269. eta=None, expires=None):
  270. # arguments
  271. args = loads(args) if isinstance(args, string_t) else args
  272. kwargs = loads(kwargs) if isinstance(kwargs, string_t) else kwargs
  273. # Expires can be int/float.
  274. try:
  275. expires = float(expires)
  276. except (TypeError, ValueError):
  277. # or a string describing an ISO 8601 datetime.
  278. try:
  279. expires = maybe_iso8601(expires)
  280. except (TypeError, ValueError):
  281. raise
  282. # send the task and print the id.
  283. self.out(self.app.send_task(
  284. name,
  285. args=args or (), kwargs=kwargs or {},
  286. countdown=countdown,
  287. serializer=serializer,
  288. queue=queue,
  289. exchange=exchange,
  290. routing_key=routing_key,
  291. eta=maybe_iso8601(eta),
  292. expires=expires,
  293. ).id)
  294. class purge(Command):
  295. """Erase all messages from all known task queues.
  296. Warning:
  297. There's no undo operation for this command.
  298. """
  299. warn_prelude = (
  300. '{warning}: This will remove all tasks from {queues}: {names}.\n'
  301. ' There is no undo for this operation!\n\n'
  302. '(to skip this prompt use the -f option)\n'
  303. )
  304. warn_prompt = 'Are you sure you want to delete all tasks'
  305. fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
  306. fmt_empty = 'No messages purged from {qnum} {queues}'
  307. option_list = Command.option_list + (
  308. Option('--force', '-f', action='store_true',
  309. help="Don't prompt for verification"),
  310. Option('--queues', '-Q', default=[],
  311. help='Comma separated list of queue names to purge.'),
  312. Option('--exclude-queues', '-X', default=[],
  313. help='Comma separated list of queues names not to purge.')
  314. )
  315. def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
  316. queues = set(str_to_list(queues or []))
  317. exclude = set(str_to_list(exclude_queues or []))
  318. names = (queues or set(keys(self.app.amqp.queues))) - exclude
  319. qnum = len(names)
  320. messages = None
  321. if names:
  322. if not force:
  323. self.out(self.warn_prelude.format(
  324. warning=self.colored.red('WARNING'),
  325. queues=text.pluralize(qnum, 'queue'),
  326. names=', '.join(sorted(names)),
  327. ))
  328. if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
  329. return
  330. with self.app.connection_for_write() as conn:
  331. messages = sum(self._purge(conn, queue) for queue in names)
  332. fmt = self.fmt_purged if messages else self.fmt_empty
  333. self.out(fmt.format(
  334. mnum=messages, qnum=qnum,
  335. messages=text.pluralize(messages, 'message'),
  336. queues=text.pluralize(qnum, 'queue')))
  337. def _purge(self, conn, queue):
  338. try:
  339. return conn.default_channel.queue_purge(queue) or 0
  340. except conn.channel_errors:
  341. return 0
  342. class result(Command):
  343. """Gives the return value for a given task id.
  344. Examples:
  345. .. code-block:: console
  346. $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
  347. $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
  348. $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
  349. """
  350. args = '<task_id>'
  351. option_list = Command.option_list + (
  352. Option('--task', '-t', help='name of task (if custom backend)'),
  353. Option('--traceback', action='store_true',
  354. help='show traceback instead'),
  355. )
  356. def run(self, task_id, *args, **kwargs):
  357. result_cls = self.app.AsyncResult
  358. task = kwargs.get('task')
  359. traceback = kwargs.get('traceback', False)
  360. if task:
  361. result_cls = self.app.tasks[task].AsyncResult
  362. result = result_cls(task_id)
  363. if traceback:
  364. value = result.traceback
  365. else:
  366. value = result.get()
  367. self.out(self.pretty(value)[1])
  368. class _RemoteControl(Command):
  369. name = None
  370. leaf = False
  371. control_group = None
  372. option_list = Command.option_list + (
  373. Option('--timeout', '-t', type='float',
  374. help='Timeout in seconds (float) waiting for reply'),
  375. Option('--destination', '-d',
  376. help='Comma separated list of destination node names.'),
  377. Option('--json', '-j', action='store_true',
  378. help='Use json as output format.'),
  379. )
  380. def __init__(self, *args, **kwargs):
  381. self.show_body = kwargs.pop('show_body', True)
  382. self.show_reply = kwargs.pop('show_reply', True)
  383. super(_RemoteControl, self).__init__(*args, **kwargs)
  384. @classmethod
  385. def get_command_info(self, command,
  386. indent=0, prefix='', color=None,
  387. help=False, app=None, choices=None):
  388. if choices is None:
  389. choices = self._choices_by_group(app)
  390. meta = choices[command]
  391. if help:
  392. help = '|' + text.indent(meta.help, indent + 4)
  393. else:
  394. help = None
  395. return text.join([
  396. '|' + text.indent('{0}{1} {2}'.format(
  397. prefix, color(command), meta.signature or ''), indent),
  398. help,
  399. ])
  400. @classmethod
  401. def list_commands(self, indent=0, prefix='',
  402. color=None, help=False, app=None):
  403. choices = self._choices_by_group(app)
  404. color = color if color else lambda x: x
  405. prefix = prefix + ' ' if prefix else ''
  406. return '\n'.join(
  407. self.get_command_info(c, indent, prefix, color, help,
  408. app=app, choices=choices)
  409. for c in sorted(choices))
  410. def usage(self, command):
  411. return '%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
  412. command, self.args)
  413. def call(self, *args, **kwargs):
  414. raise NotImplementedError('call')
  415. def run(self, *args, **kwargs):
  416. if not args:
  417. raise self.UsageError(
  418. 'Missing {0.name} method. See --help'.format(self))
  419. return self.do_call_method(args, **kwargs)
  420. def _ensure_fanout_supported(self):
  421. with self.app.connection_for_write() as conn:
  422. if not conn.supports_exchange_type('fanout'):
  423. raise self.Error(
  424. 'Broadcast not supported by transport {0!r}'.format(
  425. conn.info()['transport']))
  426. def do_call_method(self, args,
  427. timeout=None, destination=None, json=False, **kwargs):
  428. method = args[0]
  429. if method == 'help':
  430. raise self.Error("Did you mean '{0.name} --help'?".format(self))
  431. try:
  432. meta = self.choices[method]
  433. except KeyError:
  434. raise self.UsageError(
  435. 'Unknown {0.name} method {1}'.format(self, method))
  436. self._ensure_fanout_supported()
  437. timeout = timeout or meta.default_timeout
  438. if destination and isinstance(destination, string_t):
  439. destination = [dest.strip() for dest in destination.split(',')]
  440. replies = self.call(
  441. method,
  442. arguments=self.compile_arguments(meta, method, args[1:]),
  443. timeout=timeout,
  444. destination=destination,
  445. callback=None if json else self.say_remote_command_reply,
  446. )
  447. if not replies:
  448. raise self.Error('No nodes replied within time constraint.',
  449. status=EX_UNAVAILABLE)
  450. if json:
  451. self.out(dumps(replies))
  452. return replies
  453. def compile_arguments(self, meta, method, args):
  454. args = list(args)
  455. kw = {}
  456. if meta.args:
  457. kw.update({
  458. k: v for k, v in self._consume_args(meta, method, args)
  459. })
  460. if meta.variadic:
  461. kw.update({meta.variadic: args})
  462. if not kw and args:
  463. raise self.Error(
  464. 'Command {0!r} takes no arguments.'.format(method),
  465. status=EX_USAGE)
  466. return kw or {}
  467. def _consume_args(self, meta, method, args):
  468. i = 0
  469. try:
  470. for i, arg in enumerate(args):
  471. try:
  472. name, typ = meta.args[i]
  473. except IndexError:
  474. if meta.variadic:
  475. break
  476. raise self.Error(
  477. 'Command {0!r} takes arguments: {1}'.format(
  478. method, meta.signature),
  479. status=EX_USAGE)
  480. else:
  481. yield name, typ(arg) if typ is not None else arg
  482. finally:
  483. args[:] = args[i:]
  484. @classmethod
  485. def _choices_by_group(self, app):
  486. from celery.worker.control import Panel
  487. # need to import task modules for custom user-remote control commands.
  488. app.loader.import_default_modules()
  489. return {
  490. name: info for name, info in items(Panel.meta)
  491. if info.type == self.control_group and info.visible
  492. }
  493. @cached_property
  494. def choices(self):
  495. return self._choices_by_group(self.app)
  496. @property
  497. def epilog(self):
  498. return '\n'.join([
  499. '[Commands]',
  500. self.list_commands(indent=4, help=True, app=self.app)
  501. ])
  502. class inspect(_RemoteControl):
  503. """Inspect the worker at runtime.
  504. Availability: RabbitMQ (AMQP) and Redis transports.
  505. Examples:
  506. .. code-block:: console
  507. $ celery inspect active --timeout=5
  508. $ celery inspect scheduled -d worker1@example.com
  509. $ celery inspect revoked -d w1@e.com,w2@e.com
  510. """
  511. name = 'inspect'
  512. control_group = 'inspect'
  513. def call(self, method, arguments, **options):
  514. return self.app.control.inspect(**options)._request(
  515. method, **arguments)
  516. class control(_RemoteControl):
  517. """Workers remote control.
  518. Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
  519. Examples:
  520. .. code-block:: console
  521. $ celery control enable_events --timeout=5
  522. $ celery control -d worker1@example.com enable_events
  523. $ celery control -d w1.e.com,w2.e.com enable_events
  524. $ celery control -d w1.e.com add_consumer queue_name
  525. $ celery control -d w1.e.com cancel_consumer queue_name
  526. $ celery control add_consumer queue exchange direct rkey
  527. """
  528. name = 'control'
  529. control_group = 'control'
  530. def call(self, method, arguments, **options):
  531. return self.app.control.broadcast(
  532. method, arguments=arguments, reply=True, **options)
  533. class status(Command):
  534. """Show list of workers that are online."""
  535. option_list = inspect.option_list
  536. def run(self, *args, **kwargs):
  537. I = inspect(
  538. app=self.app,
  539. no_color=kwargs.get('no_color', False),
  540. stdout=self.stdout, stderr=self.stderr,
  541. show_reply=False, show_body=False, quiet=True,
  542. )
  543. replies = I.run('ping', **kwargs)
  544. if not replies:
  545. raise self.Error('No nodes replied within time constraint',
  546. status=EX_UNAVAILABLE)
  547. nodecount = len(replies)
  548. if not kwargs.get('quiet', False):
  549. self.out('\n{0} {1} online.'.format(
  550. nodecount, text.pluralize(nodecount, 'node')))
  551. class migrate(Command):
  552. """Migrate tasks from one broker to another.
  553. Warning:
  554. This command is experimental, make sure you have a backup of
  555. the tasks before you continue.
  556. Example:
  557. .. code-block:: console
  558. $ celery migrate amqp://A.example.com amqp://guest@B.example.com//
  559. $ celery migrate redis://localhost amqp://guest@localhost//
  560. """
  561. args = '<source_url> <dest_url>'
  562. option_list = Command.option_list + (
  563. Option('--limit', '-n', type='int',
  564. help='Number of tasks to consume (int)'),
  565. Option('--timeout', '-t', type='float', default=1.0,
  566. help='Timeout in seconds (float) waiting for tasks'),
  567. Option('--ack-messages', '-a', action='store_true',
  568. help='Ack messages from source broker.'),
  569. Option('--tasks', '-T',
  570. help='List of task names to filter on.'),
  571. Option('--queues', '-Q',
  572. help='List of queues to migrate.'),
  573. Option('--forever', '-F', action='store_true',
  574. help='Continually migrate tasks until killed.'),
  575. )
  576. progress_fmt = MIGRATE_PROGRESS_FMT
  577. def on_migrate_task(self, state, body, message):
  578. self.out(self.progress_fmt.format(state=state, body=body))
  579. def run(self, source, destination, **kwargs):
  580. from kombu import Connection
  581. from celery.contrib.migrate import migrate_tasks
  582. migrate_tasks(Connection(source),
  583. Connection(destination),
  584. callback=self.on_migrate_task,
  585. **kwargs)
  586. class shell(Command): # pragma: no cover
  587. """Start shell session with convenient access to celery symbols.
  588. The following symbols will be added to the main globals:
  589. - ``celery``: the current application.
  590. - ``chord``, ``group``, ``chain``, ``chunks``,
  591. ``xmap``, ``xstarmap`` ``subtask``, ``Task``
  592. - all registered tasks.
  593. """
  594. option_list = Command.option_list + (
  595. Option('--ipython', '-I',
  596. action='store_true', help='force iPython.'),
  597. Option('--bpython', '-B',
  598. action='store_true', help='force bpython.'),
  599. Option('--python', '-P',
  600. action='store_true', help='force default Python shell.'),
  601. Option('--without-tasks', '-T', action='store_true',
  602. help="don't add tasks to locals."),
  603. Option('--eventlet', action='store_true',
  604. help='use eventlet.'),
  605. Option('--gevent', action='store_true', help='use gevent.'),
  606. )
  607. def run(self, ipython=False, bpython=False,
  608. python=False, without_tasks=False, eventlet=False,
  609. gevent=False, **kwargs):
  610. sys.path.insert(0, os.getcwd())
  611. if eventlet:
  612. import_module('celery.concurrency.eventlet')
  613. if gevent:
  614. import_module('celery.concurrency.gevent')
  615. import celery
  616. import celery.task.base
  617. self.app.loader.import_default_modules()
  618. self.locals = {
  619. 'app': self.app,
  620. 'celery': self.app,
  621. 'Task': celery.Task,
  622. 'chord': celery.chord,
  623. 'group': celery.group,
  624. 'chain': celery.chain,
  625. 'chunks': celery.chunks,
  626. 'xmap': celery.xmap,
  627. 'xstarmap': celery.xstarmap,
  628. 'subtask': celery.subtask,
  629. 'signature': celery.signature,
  630. }
  631. if not without_tasks:
  632. self.locals.update({
  633. task.__name__: task for task in values(self.app.tasks)
  634. if not task.name.startswith('celery.')
  635. })
  636. if python:
  637. return self.invoke_fallback_shell()
  638. elif bpython:
  639. return self.invoke_bpython_shell()
  640. elif ipython:
  641. return self.invoke_ipython_shell()
  642. return self.invoke_default_shell()
  643. def invoke_default_shell(self):
  644. try:
  645. import IPython # noqa
  646. except ImportError:
  647. try:
  648. import bpython # noqa
  649. except ImportError:
  650. return self.invoke_fallback_shell()
  651. else:
  652. return self.invoke_bpython_shell()
  653. else:
  654. return self.invoke_ipython_shell()
  655. def invoke_fallback_shell(self):
  656. import code
  657. try:
  658. import readline
  659. except ImportError:
  660. pass
  661. else:
  662. import rlcompleter
  663. readline.set_completer(
  664. rlcompleter.Completer(self.locals).complete)
  665. readline.parse_and_bind('tab:complete')
  666. code.interact(local=self.locals)
  667. def invoke_ipython_shell(self):
  668. for ip in (self._ipython, self._ipython_pre_10,
  669. self._ipython_terminal, self._ipython_010,
  670. self._no_ipython):
  671. try:
  672. return ip()
  673. except ImportError:
  674. pass
  675. def _ipython(self):
  676. from IPython import start_ipython
  677. start_ipython(argv=[], user_ns=self.locals)
  678. def _ipython_pre_10(self): # pragma: no cover
  679. from IPython.frontend.terminal.ipapp import TerminalIPythonApp
  680. app = TerminalIPythonApp.instance()
  681. app.initialize(argv=[])
  682. app.shell.user_ns.update(self.locals)
  683. app.start()
  684. def _ipython_terminal(self): # pragma: no cover
  685. from IPython.terminal import embed
  686. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  687. def _ipython_010(self): # pragma: no cover
  688. from IPython.Shell import IPShell
  689. IPShell(argv=[], user_ns=self.locals).mainloop()
  690. def _no_ipython(self): # pragma: no cover
  691. raise ImportError('no suitable ipython found')
  692. def invoke_bpython_shell(self):
  693. import bpython
  694. bpython.embed(self.locals)
  695. class upgrade(Command):
  696. """Perform upgrade between versions."""
  697. option_list = Command.option_list + (
  698. Option('--django', action='store_true',
  699. help='Upgrade Django project'),
  700. Option('--compat', action='store_true',
  701. help='Maintain backwards compatibility'),
  702. Option('--no-backup', action='store_true',
  703. help='Dont backup original files'),
  704. )
  705. choices = {'settings'}
  706. def usage(self, command):
  707. return '%prog <command> settings [filename] [options]'
  708. def run(self, *args, **kwargs):
  709. try:
  710. command = args[0]
  711. except IndexError:
  712. raise self.UsageError('missing upgrade type')
  713. if command not in self.choices:
  714. raise self.UsageError('unknown upgrade type: {0}'.format(command))
  715. return getattr(self, command)(*args, **kwargs)
  716. def settings(self, command, filename,
  717. no_backup=False, django=False, compat=False, **kwargs):
  718. lines = self._slurp(filename) if no_backup else self._backup(filename)
  719. keyfilter = self._compat_key if django or compat else pass1
  720. print('processing {0}...'.format(filename), file=self.stderr)
  721. with codecs.open(filename, 'w', 'utf-8') as write_fh:
  722. for line in lines:
  723. write_fh.write(self._to_new_key(line, keyfilter))
  724. def _slurp(self, filename):
  725. with codecs.open(filename, 'r', 'utf-8') as read_fh:
  726. return [line for line in read_fh]
  727. def _backup(self, filename, suffix='.orig'):
  728. lines = []
  729. backup_filename = ''.join([filename, suffix])
  730. print('writing backup to {0}...'.format(backup_filename),
  731. file=self.stderr)
  732. with codecs.open(filename, 'r', 'utf-8') as read_fh:
  733. with codecs.open(backup_filename, 'w', 'utf-8') as backup_fh:
  734. for line in read_fh:
  735. backup_fh.write(line)
  736. lines.append(line)
  737. return lines
  738. def _to_new_key(self, line, keyfilter=pass1, source=defaults._TO_NEW_KEY):
  739. # sort by length to avoid, for example, broker_transport overriding
  740. # broker_transport_options.
  741. for old_key in reversed(sorted(source, key=lambda x: len(x))):
  742. new_line = line.replace(old_key, keyfilter(source[old_key]))
  743. if line != new_line:
  744. return new_line # only one match per line.
  745. return line
  746. def _compat_key(self, key, namespace='CELERY'):
  747. key = key.upper()
  748. if not key.startswith(namespace):
  749. key = '_'.join([namespace, key])
  750. return key
  751. class help(Command):
  752. """Show help screen and exit."""
  753. def usage(self, command):
  754. return '%prog <command> [options] {0.args}'.format(self)
  755. def run(self, *args, **kwargs):
  756. self.parser.print_help()
  757. self.out(HELP.format(
  758. prog_name=self.prog_name,
  759. commands=CeleryCommand.list_commands(
  760. colored=self.colored, app=self.app),
  761. ))
  762. return EX_USAGE
  763. class report(Command):
  764. """Shows information useful to include in bug-reports."""
  765. def run(self, *args, **kwargs):
  766. self.out(self.app.bugreport())
  767. return EX_OK
  768. class CeleryCommand(Command):
  769. """Base class for commands."""
  770. commands = {
  771. 'amqp': amqp,
  772. 'beat': beat,
  773. 'call': call,
  774. 'control': control,
  775. 'events': events,
  776. 'graph': graph,
  777. 'help': help,
  778. 'inspect': inspect,
  779. 'list': list_,
  780. 'logtool': logtool,
  781. 'migrate': migrate,
  782. 'multi': multi,
  783. 'purge': purge,
  784. 'report': report,
  785. 'result': result,
  786. 'shell': shell,
  787. 'status': status,
  788. 'upgrade': upgrade,
  789. 'worker': worker,
  790. }
  791. ext_fmt = '{self.namespace}.commands'
  792. enable_config_from_cmdline = True
  793. prog_name = 'celery'
  794. namespace = 'celery'
  795. @classmethod
  796. def register_command(cls, fun, name=None):
  797. cls.commands[name or fun.__name__] = fun
  798. return fun
  799. def execute(self, command, argv=None):
  800. try:
  801. cls = self.commands[command]
  802. except KeyError:
  803. cls, argv = self.commands['help'], ['help']
  804. cls = self.commands.get(command) or self.commands['help']
  805. try:
  806. return cls(
  807. app=self.app, on_error=self.on_error,
  808. no_color=self.no_color, quiet=self.quiet,
  809. on_usage_error=partial(self.on_usage_error, command=command),
  810. ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  811. except self.UsageError as exc:
  812. self.on_usage_error(exc)
  813. return exc.status
  814. except self.Error as exc:
  815. self.on_error(exc)
  816. return exc.status
  817. def on_usage_error(self, exc, command=None):
  818. if command:
  819. helps = '{self.prog_name} {command} --help'
  820. else:
  821. helps = '{self.prog_name} --help'
  822. self.error(self.colored.magenta('Error: {0}'.format(exc)))
  823. self.error("""Please try '{0}'""".format(helps.format(
  824. self=self, command=command,
  825. )))
  826. def _relocate_args_from_start(self, argv, index=0):
  827. if argv:
  828. rest = []
  829. while index < len(argv):
  830. value = argv[index]
  831. if value.startswith('--'):
  832. rest.append(value)
  833. elif value.startswith('-'):
  834. # we eat the next argument even though we don't know
  835. # if this option takes an argument or not.
  836. # instead we'll assume what's the command name in the
  837. # return statements below.
  838. try:
  839. nxt = argv[index + 1]
  840. if nxt.startswith('-'):
  841. # is another option
  842. rest.append(value)
  843. else:
  844. # is (maybe) a value for this option
  845. rest.extend([value, nxt])
  846. index += 1
  847. except IndexError: # pragma: no cover
  848. rest.append(value)
  849. break
  850. else:
  851. break
  852. index += 1
  853. if argv[index:]: # pragma: no cover
  854. # if there are more arguments left then divide and swap
  855. # we assume the first argument in argv[i:] is the command
  856. # name.
  857. return argv[index:] + rest
  858. # if there are no more arguments then the last arg in rest'
  859. # must be the command.
  860. [rest.pop()] + rest
  861. return []
  862. def prepare_prog_name(self, name):
  863. if name == '__main__.py':
  864. return sys.modules['__main__'].__file__
  865. return name
  866. def handle_argv(self, prog_name, argv):
  867. self.prog_name = self.prepare_prog_name(prog_name)
  868. argv = self._relocate_args_from_start(argv)
  869. _, argv = self.prepare_args(None, argv)
  870. try:
  871. command = argv[0]
  872. except IndexError:
  873. command, argv = 'help', ['help']
  874. return self.execute(command, argv)
  875. def execute_from_commandline(self, argv=None):
  876. argv = sys.argv if argv is None else argv
  877. if 'multi' in argv[1:3]: # Issue 1008
  878. self.respects_app_option = False
  879. try:
  880. sys.exit(determine_exit_status(
  881. super(CeleryCommand, self).execute_from_commandline(argv)))
  882. except KeyboardInterrupt:
  883. sys.exit(EX_FAILURE)
  884. @classmethod
  885. def get_command_info(self, command, indent=0,
  886. color=None, colored=None, app=None):
  887. colored = term.colored() if colored is None else colored
  888. colored = colored.names[color] if color else lambda x: x
  889. obj = self.commands[command]
  890. cmd = 'celery {0}'.format(colored(command))
  891. if obj.leaf:
  892. return '|' + text.indent(cmd, indent)
  893. return text.join([
  894. ' ',
  895. '|' + text.indent('{0} --help'.format(cmd), indent),
  896. obj.list_commands(indent, 'celery {0}'.format(command), colored,
  897. app=app),
  898. ])
  899. @classmethod
  900. def list_commands(self, indent=0, colored=None, app=None):
  901. colored = term.colored() if colored is None else colored
  902. white = colored.white
  903. ret = []
  904. for cls, commands, color in command_classes:
  905. ret.extend([
  906. text.indent('+ {0}: '.format(white(cls)), indent),
  907. '\n'.join(
  908. self.get_command_info(command, indent + 4, color, colored,
  909. app=app)
  910. for command in commands),
  911. ''
  912. ])
  913. return '\n'.join(ret).strip()
  914. def with_pool_option(self, argv):
  915. if len(argv) > 1 and 'worker' in argv[0:3]:
  916. # this command supports custom pools
  917. # that may have to be loaded as early as possible.
  918. return (['-P'], ['--pool'])
  919. def on_concurrency_setup(self):
  920. self.load_extension_commands()
  921. def load_extension_commands(self):
  922. names = Extensions(self.ext_fmt.format(self=self),
  923. self.register_command).load()
  924. if names:
  925. command_classes.append(('Extensions', names, 'magenta'))
  926. def command(*args, **kwargs):
  927. # Deprecated: Use classmethod
  928. # :meth:`CeleryCommand.register_command` instead.
  929. _register = CeleryCommand.register_command
  930. return _register(args[0]) if args else _register
  931. if __name__ == '__main__': # pragma: no cover
  932. main()