celery.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery` umbrella command.
  4. .. program:: celery
  5. """
  6. from __future__ import absolute_import, print_function
  7. import anyjson
  8. import warnings
  9. import sys
  10. from future_builtins import map
  11. from importlib import import_module
  12. from pprint import pformat
  13. from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
  14. from celery.utils import term
  15. from celery.utils import text
  16. from celery.utils.functional import memoize
  17. from celery.utils.imports import symbol_by_name
  18. from celery.utils.timeutils import maybe_iso8601
  19. from celery.bin.base import Command as BaseCommand, Option
  20. HELP = """
  21. ---- -- - - ---- Commands- -------------- --- ------------
  22. {commands}
  23. ---- -- - - --------- -- - -------------- --- ------------
  24. Type '{prog_name} <command> --help' for help using a specific command.
  25. """
  26. MIGRATE_PROGRESS_FMT = """\
  27. Migrating task {state.count}/{state.strtotal}: \
  28. {body[task]}[{body[id]}]\
  29. """
  30. commands = {}
  31. command_classes = [
  32. ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
  33. ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
  34. ('Utils', ['purge', 'list', 'migrate', 'call', 'result', 'report'], None),
  35. ]
  36. @memoize()
  37. def _get_extension_classes():
  38. extensions = []
  39. command_classes.append(('Extensions', extensions, 'magenta'))
  40. return extensions
  41. class Error(Exception):
  42. def __init__(self, reason, status=EX_FAILURE):
  43. self.reason = reason
  44. self.status = status
  45. super(Error, self).__init__(reason, status)
  46. def __str__(self):
  47. return self.reason
  48. def command(*args, **kwargs):
  49. def _register(fun):
  50. commands[kwargs.get('name') or fun.__name__] = fun
  51. return fun
  52. return _register(args[0]) if args else _register
  53. def load_extension_commands(namespace='celery.commands'):
  54. try:
  55. from pkg_resources import iter_entry_points
  56. except ImportError:
  57. return
  58. for ep in iter_entry_points(namespace):
  59. sym = ':'.join([ep.module_name, ep.attrs[0]])
  60. try:
  61. cls = symbol_by_name(sym)
  62. except (ImportError, SyntaxError) as exc:
  63. warnings.warn(
  64. 'Cannot load extension {0!r}: {1!r}'.format(sym, exc))
  65. else:
  66. _get_extension_classes().append(ep.name)
  67. command(cls, name=ep.name)
  68. class Command(BaseCommand):
  69. help = ''
  70. args = ''
  71. prog_name = 'celery'
  72. show_body = True
  73. show_reply = True
  74. option_list = (
  75. Option('--quiet', '-q', action='store_true'),
  76. Option('--no-color', '-C', action='store_true'),
  77. )
  78. def __init__(self, app=None, no_color=False, stdout=sys.stdout,
  79. stderr=sys.stderr, show_reply=True):
  80. super(Command, self).__init__(app=app)
  81. self.colored = term.colored(enabled=not no_color)
  82. self.stdout = stdout
  83. self.stderr = stderr
  84. self.quiet = False
  85. if show_reply is not None:
  86. self.show_reply = show_reply
  87. def __call__(self, *args, **kwargs):
  88. try:
  89. ret = self.run(*args, **kwargs)
  90. except Error as exc:
  91. self.error(self.colored.red('Error: {0!r}'.format(exc)))
  92. return exc.status
  93. return ret if ret is not None else EX_OK
  94. def show_help(self, command):
  95. self.run_from_argv(self.prog_name, [command, '--help'])
  96. return EX_USAGE
  97. def error(self, s):
  98. self.out(s, fh=self.stderr)
  99. def out(self, s, fh=None):
  100. print(s, file=fh or self.stdout)
  101. def run_from_argv(self, prog_name, argv):
  102. self.prog_name = prog_name
  103. self.command = argv[0]
  104. self.arglist = argv[1:]
  105. self.parser = self.create_parser(self.prog_name, self.command)
  106. options, args = self.prepare_args(
  107. *self.parser.parse_args(self.arglist))
  108. self.colored = term.colored(enabled=not options['no_color'])
  109. self.quiet = options.get('quiet', False)
  110. self.show_body = options.get('show_body', True)
  111. return self(*args, **options)
  112. def usage(self, command):
  113. return '%%prog {0} [options] {self.args}'.format(command, self=self)
  114. def prettify_list(self, n):
  115. c = self.colored
  116. if not n:
  117. return '- empty -'
  118. return '\n'.join(str(c.reset(c.white('*'), ' {0}'.format(item)))
  119. for item in n)
  120. def prettify_dict_ok_error(self, n):
  121. c = self.colored
  122. try:
  123. return (c.green('OK'),
  124. text.indent(self.prettify(n['ok'])[1], 4))
  125. except KeyError:
  126. pass
  127. return (c.red('ERROR'),
  128. text.indent(self.prettify(n['error'])[1], 4))
  129. def say_remote_command_reply(self, replies):
  130. c = self.colored
  131. node = iter(replies).next() # <-- take first.
  132. reply = replies[node]
  133. status, preply = self.prettify(reply)
  134. self.say_chat('->', c.cyan(node, ': ') + status,
  135. text.indent(preply, 4) if self.show_reply else '')
  136. def prettify(self, n):
  137. OK = str(self.colored.green('OK'))
  138. if isinstance(n, list):
  139. return OK, self.prettify_list(n)
  140. if isinstance(n, dict):
  141. if 'ok' in n or 'error' in n:
  142. return self.prettify_dict_ok_error(n)
  143. if isinstance(n, basestring):
  144. return OK, unicode(n)
  145. return OK, pformat(n)
  146. def say_chat(self, direction, title, body=''):
  147. c = self.colored
  148. if direction == '<-' and self.quiet:
  149. return
  150. dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
  151. self.out(c.reset(dirstr, title))
  152. if body and self.show_body:
  153. self.out(body)
  154. @property
  155. def description(self):
  156. return self.__doc__
  157. class Delegate(Command):
  158. def __init__(self, *args, **kwargs):
  159. super(Delegate, self).__init__(*args, **kwargs)
  160. self.target = symbol_by_name(self.Command)(app=self.app)
  161. self.args = self.target.args
  162. def get_options(self):
  163. return self.option_list + self.target.get_options()
  164. def create_parser(self, prog_name, command):
  165. parser = super(Delegate, self).create_parser(prog_name, command)
  166. return self.target.prepare_parser(parser)
  167. def run(self, *args, **kwargs):
  168. self.target.check_args(args)
  169. return self.target.run(*args, **kwargs)
  170. @command
  171. class multi(Command):
  172. """Start multiple worker instances."""
  173. def get_options(self):
  174. return ()
  175. def run_from_argv(self, prog_name, argv):
  176. from celery.bin.celeryd_multi import MultiTool
  177. return MultiTool().execute_from_commandline(argv, prog_name)
  178. @command
  179. class worker(Delegate):
  180. """Start worker instance.
  181. Examples::
  182. celery worker --app=proj -l info
  183. celery worker -A proj -l info -Q hipri,lopri
  184. celery worker -A proj --concurrency=4
  185. celery worker -A proj --concurrency=1000 -P eventlet
  186. celery worker --autoscale=10,0
  187. """
  188. Command = 'celery.bin.celeryd:WorkerCommand'
  189. @command
  190. class events(Delegate):
  191. """Event-stream utilities.
  192. Commands::
  193. celery events --app=proj
  194. start graphical monitor (requires curses)
  195. celery events -d --app=proj
  196. dump events to screen.
  197. celery events -b amqp://
  198. celery events -C <camera> [options]
  199. run snapshot camera.
  200. Examples::
  201. celery events
  202. celery events -d
  203. celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
  204. """
  205. Command = 'celery.bin.celeryev:EvCommand'
  206. @command
  207. class beat(Delegate):
  208. """Start the celerybeat periodic task scheduler.
  209. Examples::
  210. celery beat -l info
  211. celery beat -s /var/run/celerybeat/schedule --detach
  212. celery beat -S djcelery.schedulers.DatabaseScheduler
  213. """
  214. Command = 'celery.bin.celerybeat:BeatCommand'
  215. @command
  216. class amqp(Delegate):
  217. """AMQP Administration Shell.
  218. Also works for non-amqp transports.
  219. Examples::
  220. celery amqp
  221. start shell mode
  222. celery amqp help
  223. show list of commands
  224. celery amqp exchange.delete name
  225. celery amqp queue.delete queue
  226. celery amqp queue.delete queue yes yes
  227. """
  228. Command = 'celery.bin.camqadm:AMQPAdminCommand'
  229. @command(name='list')
  230. class list_(Command):
  231. """Get info from broker.
  232. Examples::
  233. celery list bindings
  234. NOTE: For RabbitMQ the management plugin is required.
  235. """
  236. args = '[bindings]'
  237. def list_bindings(self, management):
  238. try:
  239. bindings = management.get_bindings()
  240. except NotImplementedError:
  241. raise Error('Your transport cannot list bindings.')
  242. fmt = lambda q, e, r: self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
  243. fmt('Queue', 'Exchange', 'Routing Key')
  244. fmt('-' * 16, '-' * 16, '-' * 16)
  245. for b in bindings:
  246. fmt(b['destination'], b['source'], b['routing_key'])
  247. def run(self, what=None, *_, **kw):
  248. topics = {'bindings': self.list_bindings}
  249. available = ', '.join(topics)
  250. if not what:
  251. raise Error('You must specify one of {0}'.format(available))
  252. if what not in topics:
  253. raise Error('unknown topic {0!r} (choose one of: {1})'.format(
  254. what, available))
  255. with self.app.connection() as conn:
  256. self.app.amqp.TaskConsumer(conn).declare()
  257. topics[what](conn.manager)
  258. @command
  259. class call(Command):
  260. """Call a task by name.
  261. Examples::
  262. celery call tasks.add --args='[2, 2]'
  263. celery call tasks.add --args='[2, 2]' --countdown=10
  264. """
  265. args = '<task_name>'
  266. option_list = Command.option_list + (
  267. Option('--args', '-a', help='positional arguments (json).'),
  268. Option('--kwargs', '-k', help='keyword arguments (json).'),
  269. Option('--eta', help='scheduled time (ISO-8601).'),
  270. Option('--countdown', type='float',
  271. help='eta in seconds from now (float/int).'),
  272. Option('--expires', help='expiry time (ISO-8601/float/int).'),
  273. Option('--serializer', default='json', help='defaults to json.'),
  274. Option('--queue', help='custom queue name.'),
  275. Option('--exchange', help='custom exchange name.'),
  276. Option('--routing-key', help='custom routing key.'),
  277. )
  278. def run(self, name, *_, **kw):
  279. # Positional args.
  280. args = kw.get('args') or ()
  281. if isinstance(args, basestring):
  282. args = anyjson.loads(args)
  283. # Keyword args.
  284. kwargs = kw.get('kwargs') or {}
  285. if isinstance(kwargs, basestring):
  286. kwargs = anyjson.loads(kwargs)
  287. # Expires can be int/float.
  288. expires = kw.get('expires') or None
  289. try:
  290. expires = float(expires)
  291. except (TypeError, ValueError):
  292. # or a string describing an ISO 8601 datetime.
  293. try:
  294. expires = maybe_iso8601(expires)
  295. except (TypeError, ValueError):
  296. raise
  297. res = self.app.send_task(name, args=args, kwargs=kwargs,
  298. countdown=kw.get('countdown'),
  299. serializer=kw.get('serializer'),
  300. queue=kw.get('queue'),
  301. exchange=kw.get('exchange'),
  302. routing_key=kw.get('routing_key'),
  303. eta=maybe_iso8601(kw.get('eta')),
  304. expires=expires)
  305. self.out(res.id)
  306. @command
  307. class purge(Command):
  308. """Erase all messages from all known task queues.
  309. WARNING: There is no undo operation for this command.
  310. """
  311. fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
  312. fmt_empty = 'No messages purged from {qnum} {queues}'
  313. def run(self, *args, **kwargs):
  314. queues = len(self.app.amqp.queues)
  315. messages = self.app.control.purge()
  316. fmt = self.fmt_purged if messages else self.fmt_empty
  317. self.out(fmt.format(
  318. mnum=messages, qnum=queues,
  319. messages=text.pluralize(messages, 'message'),
  320. queues=text.pluralize(queues, 'queue')))
  321. @command
  322. class result(Command):
  323. """Gives the return value for a given task id.
  324. Examples::
  325. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
  326. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
  327. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
  328. """
  329. args = '<task_id>'
  330. option_list = Command.option_list + (
  331. Option('--task', '-t', help='name of task (if custom backend)'),
  332. Option('--traceback', action='store_true',
  333. help='show traceback instead'),
  334. )
  335. def run(self, task_id, *args, **kwargs):
  336. result_cls = self.app.AsyncResult
  337. task = kwargs.get('task')
  338. traceback = kwargs.get('traceback', False)
  339. if task:
  340. result_cls = self.app.tasks[task].AsyncResult
  341. result = result_cls(task_id)
  342. if traceback:
  343. value = result.traceback
  344. else:
  345. value = result.get()
  346. self.out(self.prettify(value)[1])
  347. class _RemoteControl(Command):
  348. name = None
  349. choices = None
  350. leaf = False
  351. option_list = Command.option_list + (
  352. Option('--timeout', '-t', type='float',
  353. help='Timeout in seconds (float) waiting for reply'),
  354. Option('--destination', '-d',
  355. help='Comma separated list of destination node names.'))
  356. @classmethod
  357. def get_command_info(self, command, indent=0, prefix='', color=None,
  358. help=False):
  359. if help:
  360. help = '|' + text.indent(self.choices[command][1], indent + 4)
  361. else:
  362. help = None
  363. try:
  364. # see if it uses args.
  365. meth = getattr(self, command)
  366. return text.join([
  367. '|' + text.indent('{0}{1} {2}'.format(prefix, color(command),
  368. meth.__doc__), indent), help,
  369. ])
  370. except AttributeError:
  371. return text.join([
  372. '|' + text.indent(prefix + str(color(command)), indent), help,
  373. ])
  374. @classmethod
  375. def list_commands(self, indent=0, prefix='', color=None, help=False):
  376. color = color if color else lambda x: x
  377. prefix = prefix + ' ' if prefix else ''
  378. return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
  379. for c in sorted(self.choices))
  380. @property
  381. def epilog(self):
  382. return '\n'.join([
  383. '[Commands]',
  384. self.list_commands(indent=4, help=True)
  385. ])
  386. def usage(self, command):
  387. return '%%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
  388. command, self.args)
  389. def call(self, *args, **kwargs):
  390. raise NotImplementedError('get_obj')
  391. def run(self, *args, **kwargs):
  392. if not args:
  393. raise Error('Missing {0.name} method. See --help'.format(self))
  394. return self.do_call_method(args, **kwargs)
  395. def do_call_method(self, args, **kwargs):
  396. method = args[0]
  397. if method == 'help':
  398. raise Error("Did you mean '{0.name} --help'?".format(self))
  399. if method not in self.choices:
  400. raise Error('Unknown {0.name} method {1}'.format(self, method))
  401. destination = kwargs.get('destination')
  402. timeout = kwargs.get('timeout') or self.choices[method][0]
  403. if destination and isinstance(destination, basestring):
  404. destination = list(map(str.strip, destination.split(',')))
  405. try:
  406. handler = getattr(self, method)
  407. except AttributeError:
  408. handler = self.call
  409. replies = handler(method, *args[1:], timeout=timeout,
  410. destination=destination,
  411. callback=self.say_remote_command_reply)
  412. if not replies:
  413. raise Error('No nodes replied within time constraint.',
  414. status=EX_UNAVAILABLE)
  415. return replies
  416. def say(self, direction, title, body=''):
  417. c = self.colored
  418. if direction == '<-' and self.quiet:
  419. return
  420. dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
  421. self.out(c.reset(dirstr, title))
  422. if body and self.show_body:
  423. self.out(body)
  424. @command
  425. class inspect(_RemoteControl):
  426. """Inspect the worker at runtime.
  427. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  428. Examples::
  429. celery inspect active --timeout=5
  430. celery inspect scheduled -d worker1.example.com
  431. celery inspect revoked -d w1.e.com,w2.e.com
  432. """
  433. name = 'inspect'
  434. choices = {
  435. 'active': (1.0, 'dump active tasks (being processed)'),
  436. 'active_queues': (1.0, 'dump queues being consumed from'),
  437. 'scheduled': (1.0, 'dump scheduled tasks (eta/countdown/retry)'),
  438. 'reserved': (1.0, 'dump reserved tasks (waiting to be processed)'),
  439. 'stats': (1.0, 'dump worker statistics'),
  440. 'revoked': (1.0, 'dump of revoked task ids'),
  441. 'registered': (1.0, 'dump of registered tasks'),
  442. 'ping': (0.2, 'ping worker(s)'),
  443. 'report': (1.0, 'get bugreport info')
  444. }
  445. def call(self, method, *args, **options):
  446. i = self.app.control.inspect(**options)
  447. return getattr(i, method)(*args)
  448. @command
  449. class control(_RemoteControl):
  450. """Workers remote control.
  451. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  452. Examples::
  453. celery control enable_events --timeout=5
  454. celery control -d worker1.example.com enable_events
  455. celery control -d w1.e.com,w2.e.com enable_events
  456. celery control -d w1.e.com add_consumer queue_name
  457. celery control -d w1.e.com cancel_consumer queue_name
  458. celery control -d w1.e.com add_consumer queue exchange direct rkey
  459. """
  460. name = 'control'
  461. choices = {
  462. 'enable_events': (1.0, 'tell worker(s) to enable events'),
  463. 'disable_events': (1.0, 'tell worker(s) to disable events'),
  464. 'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
  465. 'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
  466. 'rate_limit': (1.0,
  467. 'tell worker(s) to modify the rate limit for a task type'),
  468. 'time_limit': (1.0,
  469. 'tell worker(s) to modify the time limit for a task type.'),
  470. 'autoscale': (1.0, 'change autoscale settings'),
  471. 'pool_grow': (1.0, 'start more pool processes'),
  472. 'pool_shrink': (1.0, 'use less pool processes'),
  473. }
  474. def call(self, method, *args, **options):
  475. return getattr(self.app.control, method)(*args, retry=True, **options)
  476. def pool_grow(self, method, n=1, **kwargs):
  477. """[N=1]"""
  478. return self.call(method, n, **kwargs)
  479. def pool_shrink(self, method, n=1, **kwargs):
  480. """[N=1]"""
  481. return self.call(method, n, **kwargs)
  482. def autoscale(self, method, max=None, min=None, **kwargs):
  483. """[max] [min]"""
  484. return self.call(method, max, min, **kwargs)
  485. def rate_limit(self, method, task_name, rate_limit, **kwargs):
  486. """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
  487. return self.call(method, task_name, rate_limit, reply=True, **kwargs)
  488. def time_limit(self, method, task_name, soft, hard=None, **kwargs):
  489. """<task_name> <soft_secs> [hard_secs]"""
  490. return self.call(method, task_name, soft, hard, reply=True, **kwargs)
  491. def add_consumer(self, method, queue, exchange=None,
  492. exchange_type='direct', routing_key=None, **kwargs):
  493. """<queue> [exchange [type [routing_key]]]"""
  494. return self.call(method, queue, exchange,
  495. exchange_type, routing_key, reply=True, **kwargs)
  496. def cancel_consumer(self, method, queue, **kwargs):
  497. """<queue>"""
  498. return self.call(method, queue, reply=True, **kwargs)
  499. @command
  500. class status(Command):
  501. """Show list of workers that are online."""
  502. option_list = inspect.option_list
  503. def run(self, *args, **kwargs):
  504. replies = inspect(app=self.app,
  505. no_color=kwargs.get('no_color', False),
  506. stdout=self.stdout, stderr=self.stderr,
  507. show_reply=False) \
  508. .run('ping', **dict(kwargs, quiet=True, show_body=False))
  509. if not replies:
  510. raise Error('No nodes replied within time constraint',
  511. status=EX_UNAVAILABLE)
  512. nodecount = len(replies)
  513. if not kwargs.get('quiet', False):
  514. self.out('\n{0} {1} online.'.format(
  515. nodecount, text.pluralize(nodecount, 'node')))
  516. @command
  517. class migrate(Command):
  518. """Migrate tasks from one broker to another.
  519. Examples::
  520. celery migrate redis://localhost amqp://guest@localhost//
  521. celery migrate django:// redis://localhost
  522. NOTE: This command is experimental, make sure you have
  523. a backup of the tasks before you continue.
  524. """
  525. args = '<source_url> <dest_url>'
  526. option_list = Command.option_list + (
  527. Option('--limit', '-n', type='int',
  528. help='Number of tasks to consume (int)'),
  529. Option('--timeout', '-t', type='float', default=1.0,
  530. help='Timeout in seconds (float) waiting for tasks'),
  531. Option('--ack-messages', '-a', action='store_true',
  532. help='Ack messages from source broker.'),
  533. Option('--tasks', '-T',
  534. help='List of task names to filter on.'),
  535. Option('--queues', '-Q',
  536. help='List of queues to migrate.'),
  537. Option('--forever', '-F', action='store_true',
  538. help='Continually migrate tasks until killed.'),
  539. )
  540. progress_fmt = MIGRATE_PROGRESS_FMT
  541. def on_migrate_task(self, state, body, message):
  542. self.out(self.progress_fmt.format(state=state, body=body))
  543. def run(self, *args, **kwargs):
  544. if len(args) != 2:
  545. return self.show_help('migrate')
  546. from kombu import Connection
  547. from celery.contrib.migrate import migrate_tasks
  548. migrate_tasks(Connection(args[0]),
  549. Connection(args[1]),
  550. callback=self.on_migrate_task,
  551. **kwargs)
  552. @command
  553. class shell(Command): # pragma: no cover
  554. """Start shell session with convenient access to celery symbols.
  555. The following symbols will be added to the main globals:
  556. - celery: the current application.
  557. - chord, group, chain, chunks,
  558. xmap, xstarmap subtask, Task
  559. - all registered tasks.
  560. Example Session:
  561. .. code-block:: bash
  562. $ celery shell
  563. >>> celery
  564. <Celery default:0x1012d9fd0>
  565. >>> add
  566. <@task: tasks.add>
  567. >>> add.delay(2, 2)
  568. <AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
  569. """
  570. option_list = Command.option_list + (
  571. Option('--ipython', '-I',
  572. action='store_true', dest='force_ipython',
  573. help='force iPython.'),
  574. Option('--bpython', '-B',
  575. action='store_true', dest='force_bpython',
  576. help='force bpython.'),
  577. Option('--python', '-P',
  578. action='store_true', dest='force_python',
  579. help='force default Python shell.'),
  580. Option('--without-tasks', '-T', action='store_true',
  581. help="don't add tasks to locals."),
  582. Option('--eventlet', action='store_true',
  583. help='use eventlet.'),
  584. Option('--gevent', action='store_true', help='use gevent.'),
  585. )
  586. def run(self, force_ipython=False, force_bpython=False,
  587. force_python=False, without_tasks=False, eventlet=False,
  588. gevent=False, **kwargs):
  589. if eventlet:
  590. import_module('celery.concurrency.eventlet')
  591. if gevent:
  592. import_module('celery.concurrency.gevent')
  593. import celery
  594. import celery.task.base
  595. self.app.loader.import_default_modules()
  596. self.locals = {'celery': self.app,
  597. 'Task': celery.Task,
  598. 'chord': celery.chord,
  599. 'group': celery.group,
  600. 'chain': celery.chain,
  601. 'chunks': celery.chunks,
  602. 'xmap': celery.xmap,
  603. 'xstarmap': celery.xstarmap,
  604. 'subtask': celery.subtask}
  605. if not without_tasks:
  606. self.locals.update(dict((task.__name__, task)
  607. for task in self.app.tasks.itervalues()
  608. if not task.name.startswith('celery.')))
  609. if force_python:
  610. return self.invoke_fallback_shell()
  611. elif force_bpython:
  612. return self.invoke_bpython_shell()
  613. elif force_ipython:
  614. return self.invoke_ipython_shell()
  615. return self.invoke_default_shell()
  616. def invoke_default_shell(self):
  617. try:
  618. import IPython # noqa
  619. except ImportError:
  620. try:
  621. import bpython # noqa
  622. except ImportError:
  623. return self.invoke_fallback_shell()
  624. else:
  625. return self.invoke_bpython_shell()
  626. else:
  627. return self.invoke_ipython_shell()
  628. def invoke_fallback_shell(self):
  629. import code
  630. try:
  631. import readline
  632. except ImportError:
  633. pass
  634. else:
  635. import rlcompleter
  636. readline.set_completer(
  637. rlcompleter.Completer(self.locals).complete)
  638. readline.parse_and_bind('tab:complete')
  639. code.interact(local=self.locals)
  640. def invoke_ipython_shell(self):
  641. try:
  642. from IPython.frontend.terminal import embed
  643. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  644. except ImportError: # ipython < 0.11
  645. from IPython.Shell import IPShell
  646. IPShell(argv=[], user_ns=self.locals).mainloop()
  647. def invoke_bpython_shell(self):
  648. import bpython
  649. bpython.embed(self.locals)
  650. @command
  651. class help(Command):
  652. """Show help screen and exit."""
  653. def usage(self, command):
  654. return '%%prog <command> [options] {0.args}'.format(self)
  655. def run(self, *args, **kwargs):
  656. self.parser.print_help()
  657. self.out(HELP.format(prog_name=self.prog_name,
  658. commands=CeleryCommand.list_commands()))
  659. return EX_USAGE
  660. @command
  661. class report(Command):
  662. """Shows information useful to include in bugreports."""
  663. def run(self, *args, **kwargs):
  664. self.out(self.app.bugreport())
  665. return EX_OK
  666. class CeleryCommand(BaseCommand):
  667. commands = commands
  668. enable_config_from_cmdline = True
  669. prog_name = 'celery'
  670. def execute(self, command, argv=None):
  671. try:
  672. cls = self.commands[command]
  673. except KeyError:
  674. cls, argv = self.commands['help'], ['help']
  675. cls = self.commands.get(command) or self.commands['help']
  676. try:
  677. return cls(app=self.app).run_from_argv(self.prog_name, argv)
  678. except Error:
  679. return self.execute('help', argv)
  680. def remove_options_at_beginning(self, argv, index=0):
  681. if argv:
  682. while index < len(argv):
  683. value = argv[index]
  684. if value.startswith('--'):
  685. pass
  686. elif value.startswith('-'):
  687. index += 1
  688. else:
  689. return argv[index:]
  690. index += 1
  691. return []
  692. def handle_argv(self, prog_name, argv):
  693. self.prog_name = prog_name
  694. argv = self.remove_options_at_beginning(argv)
  695. _, argv = self.prepare_args(None, argv)
  696. try:
  697. command = argv[0]
  698. except IndexError:
  699. command, argv = 'help', ['help']
  700. return self.execute(command, argv)
  701. def execute_from_commandline(self, argv=None):
  702. try:
  703. sys.exit(determine_exit_status(
  704. super(CeleryCommand, self).execute_from_commandline(argv)))
  705. except KeyboardInterrupt:
  706. sys.exit(EX_FAILURE)
  707. @classmethod
  708. def get_command_info(self, command, indent=0, color=None):
  709. colored = term.colored().names[color] if color else lambda x: x
  710. obj = self.commands[command]
  711. cmd = 'celery {0}'.format(colored(command))
  712. if obj.leaf:
  713. return '|' + text.indent(cmd, indent)
  714. return text.join([
  715. ' ',
  716. '|' + text.indent('{0} --help'.format(cmd), indent),
  717. obj.list_commands(indent, 'celery {0}'.format(command), colored),
  718. ])
  719. @classmethod
  720. def list_commands(self, indent=0):
  721. white = term.colored().white
  722. ret = []
  723. for cls, commands, color in command_classes:
  724. ret.extend([
  725. text.indent('+ {0}: '.format(white(cls)), indent),
  726. '\n'.join(self.get_command_info(command, indent + 4, color)
  727. for command in commands),
  728. ''
  729. ])
  730. return '\n'.join(ret).strip()
  731. def with_pool_option(self, argv):
  732. if len(argv) > 1 and argv[1] == 'worker':
  733. # this command supports custom pools
  734. # that may have to be loaded as early as possible.
  735. return (['-P'], ['--pool'])
  736. def on_concurrency_setup(self):
  737. load_extension_commands()
  738. def determine_exit_status(ret):
  739. if isinstance(ret, int):
  740. return ret
  741. return EX_OK if ret else EX_FAILURE
  742. def main(argv=None):
  743. # Fix for setuptools generated scripts, so that it will
  744. # work with multiprocessing fork emulation.
  745. # (see multiprocessing.forking.get_preparation_data())
  746. try:
  747. if __name__ != '__main__': # pragma: no cover
  748. sys.modules['__main__'] = sys.modules[__name__]
  749. cmd = CeleryCommand()
  750. cmd.maybe_patch_concurrency()
  751. from billiard import freeze_support
  752. freeze_support()
  753. cmd.execute_from_commandline(argv)
  754. except KeyboardInterrupt:
  755. pass
  756. if __name__ == '__main__': # pragma: no cover
  757. main()