celery.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery` umbrella command.
  4. .. program:: celery
  5. """
  6. from __future__ import absolute_import
  7. from __future__ import with_statement
  8. import anyjson
  9. import sys
  10. from billiard import freeze_support
  11. from importlib import import_module
  12. from pprint import pformat
  13. from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
  14. from celery.utils import term
  15. from celery.utils import text
  16. from celery.utils.imports import symbol_by_name
  17. from celery.utils.timeutils import maybe_iso8601
  18. from celery.bin.base import Command as BaseCommand, Option
  19. HELP = """
  20. ---- -- - - ---- Commands- -------------- --- ------------
  21. %(commands)s
  22. ---- -- - - --------- -- - -------------- --- ------------
  23. Type '%(prog_name)s <command> --help' for help using a specific command.
  24. """
  25. commands = {}
  26. command_classes = (
  27. ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
  28. ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
  29. ('Utils', ['purge', 'list', 'migrate', 'call', 'result', 'report'], None),
  30. )
  31. class Error(Exception):
  32. def __init__(self, reason, status=EX_FAILURE):
  33. self.reason = reason
  34. self.status = status
  35. super(Error, self).__init__(reason, status)
  36. def __str__(self):
  37. return self.reason
  38. def command(fun, name=None, sortpri=0):
  39. commands[name or fun.__name__] = fun
  40. fun.sortpri = sortpri
  41. return fun
  42. class Command(BaseCommand):
  43. help = ''
  44. args = ''
  45. prog_name = 'celery'
  46. show_body = True
  47. leaf = True
  48. show_reply = True
  49. option_list = (
  50. Option('--quiet', '-q', action='store_true'),
  51. Option('--no-color', '-C', action='store_true'),
  52. )
  53. def __init__(self, app=None, no_color=False, stdout=sys.stdout,
  54. stderr=sys.stderr, show_reply=True):
  55. super(Command, self).__init__(app=app)
  56. self.colored = term.colored(enabled=not no_color)
  57. self.stdout = stdout
  58. self.stderr = stderr
  59. self.quiet = False
  60. if show_reply is not None:
  61. self.show_reply = show_reply
  62. def __call__(self, *args, **kwargs):
  63. try:
  64. ret = self.run(*args, **kwargs)
  65. except Error, exc:
  66. self.error(self.colored.red('Error: %s' % exc))
  67. return exc.status
  68. return ret if ret is not None else EX_OK
  69. def show_help(self, command):
  70. self.run_from_argv(self.prog_name, [command, '--help'])
  71. return EX_USAGE
  72. def error(self, s):
  73. self.out(s, fh=self.stderr)
  74. def out(self, s, fh=None):
  75. s = str(s)
  76. if not s.endswith('\n'):
  77. s += '\n'
  78. (fh or self.stdout).write(s)
  79. def run_from_argv(self, prog_name, argv):
  80. self.prog_name = prog_name
  81. self.command = argv[0]
  82. self.arglist = argv[1:]
  83. self.parser = self.create_parser(self.prog_name, self.command)
  84. options, args = self.prepare_args(
  85. *self.parser.parse_args(self.arglist))
  86. self.colored = term.colored(enabled=not options['no_color'])
  87. self.quiet = options.get('quiet', False)
  88. self.show_body = options.get('show_body', True)
  89. return self(*args, **options)
  90. def usage(self, command):
  91. return '%%prog %s [options] %s' % (command, self.args)
  92. def prettify_list(self, n):
  93. c = self.colored
  94. if not n:
  95. return '- empty -'
  96. return '\n'.join(str(c.reset(c.white('*'), ' %s' % (item, )))
  97. for item in n)
  98. def prettify_dict_ok_error(self, n):
  99. c = self.colored
  100. try:
  101. return (c.green('OK'),
  102. text.indent(self.prettify(n['ok'])[1], 4))
  103. except KeyError:
  104. pass
  105. return (c.red('ERROR'),
  106. text.indent(self.prettify(n['error'])[1], 4))
  107. def say_remote_command_reply(self, replies):
  108. c = self.colored
  109. node = replies.keys()[0]
  110. reply = replies[node]
  111. status, preply = self.prettify(reply)
  112. self.say_chat('->', c.cyan(node, ': ') + status,
  113. text.indent(preply, 4) if self.show_reply else '')
  114. def prettify(self, n):
  115. OK = str(self.colored.green('OK'))
  116. if isinstance(n, list):
  117. return OK, self.prettify_list(n)
  118. if isinstance(n, dict):
  119. if 'ok' in n or 'error' in n:
  120. return self.prettify_dict_ok_error(n)
  121. if isinstance(n, basestring):
  122. return OK, unicode(n)
  123. return OK, pformat(n)
  124. def say_chat(self, direction, title, body=''):
  125. c = self.colored
  126. if direction == '<-' and self.quiet:
  127. return
  128. dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
  129. self.out(c.reset(dirstr, title))
  130. if body and self.show_body:
  131. self.out(body)
  132. @property
  133. def description(self):
  134. return self.__doc__
  135. class Delegate(Command):
  136. def __init__(self, *args, **kwargs):
  137. super(Delegate, self).__init__(*args, **kwargs)
  138. self.target = symbol_by_name(self.Command)(app=self.app)
  139. self.args = self.target.args
  140. def get_options(self):
  141. return self.option_list + self.target.get_options()
  142. def create_parser(self, prog_name, command):
  143. parser = super(Delegate, self).create_parser(prog_name, command)
  144. return self.target.prepare_parser(parser)
  145. def run(self, *args, **kwargs):
  146. self.target.check_args(args)
  147. return self.target.run(*args, **kwargs)
  148. class multi(Command):
  149. """Start multiple worker instances."""
  150. def get_options(self):
  151. return ()
  152. def run_from_argv(self, prog_name, argv):
  153. from celery.bin.celeryd_multi import MultiTool
  154. return MultiTool().execute_from_commandline(argv, prog_name)
  155. multi = command(multi)
  156. class worker(Delegate):
  157. """Start worker instance.
  158. Examples::
  159. celery worker --app=proj -l info
  160. celery worker -A proj -l info -Q hipri,lopri
  161. celery worker -A proj --concurrency=4
  162. celery worker -A proj --concurrency=1000 -P eventlet
  163. celery worker --autoscale=10,0
  164. """
  165. Command = 'celery.bin.celeryd:WorkerCommand'
  166. worker = command(worker, sortpri=01)
  167. class events(Delegate):
  168. """Event-stream utilities.
  169. Commands::
  170. celery events --app=proj
  171. start graphical monitor (requires curses)
  172. celery events -d --app=proj
  173. dump events to screen.
  174. celery events -b amqp://
  175. celery events -C <camera> [options]
  176. run snapshot camera.
  177. Examples::
  178. celery events
  179. celery events -d
  180. celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
  181. """
  182. Command = 'celery.bin.celeryev:EvCommand'
  183. events = command(events, sortpri=10)
  184. class beat(Delegate):
  185. """Start the celerybeat periodic task scheduler.
  186. Examples::
  187. celery beat -l info
  188. celery beat -s /var/run/celerybeat/schedule --detach
  189. celery beat -S djcelery.schedulers.DatabaseScheduler
  190. """
  191. Command = 'celery.bin.celerybeat:BeatCommand'
  192. beat = command(beat, sortpri=20)
  193. class amqp(Delegate):
  194. """AMQP Administration Shell.
  195. Also works for non-amqp transports.
  196. Examples::
  197. celery amqp
  198. start shell mode
  199. celery amqp help
  200. show list of commands
  201. celery amqp exchange.delete name
  202. celery amqp queue.delete queue
  203. celery amqp queue.delete queue yes yes
  204. """
  205. Command = 'celery.bin.camqadm:AMQPAdminCommand'
  206. amqp = command(amqp, sortpri=30)
  207. class list_(Command):
  208. """Get info from broker.
  209. Examples::
  210. celery list bindings
  211. NOTE: For RabbitMQ the management plugin is required.
  212. """
  213. args = '[bindings]'
  214. def list_bindings(self, management):
  215. try:
  216. bindings = management.get_bindings()
  217. except NotImplementedError:
  218. raise Error('Your transport cannot list bindings.')
  219. fmt = lambda q, e, r: self.out('%s %s %s' % (q.ljust(28),
  220. e.ljust(28), r))
  221. fmt('Queue', 'Exchange', 'Routing Key')
  222. fmt('-' * 16, '-' * 16, '-' * 16)
  223. for b in bindings:
  224. fmt(b['destination'], b['source'], b['routing_key'])
  225. def run(self, what=None, *_, **kw):
  226. topics = {'bindings': self.list_bindings}
  227. available = ', '.join(topics.keys())
  228. if not what:
  229. raise Error('You must specify what to list (%s)' % available)
  230. if what not in topics:
  231. raise Error('unknown topic %r (choose one of: %s)' % (
  232. what, available))
  233. with self.app.connection() as conn:
  234. self.app.amqp.TaskConsumer(conn).declare()
  235. topics[what](conn.manager)
  236. list_ = command(list_, 'list')
  237. class call(Command):
  238. """Call a task by name.
  239. Examples::
  240. celery call tasks.add --args='[2, 2]'
  241. celery call tasks.add --args='[2, 2]' --countdown=10
  242. """
  243. args = '<task_name>'
  244. option_list = Command.option_list + (
  245. Option('--args', '-a', help='positional arguments (json).'),
  246. Option('--kwargs', '-k', help='keyword arguments (json).'),
  247. Option('--eta', help='scheduled time (ISO-8601).'),
  248. Option('--countdown', type='float',
  249. help='eta in seconds from now (float/int).'),
  250. Option('--expires', help='expiry time (ISO-8601/float/int).'),
  251. Option('--serializer', default='json', help='defaults to json.'),
  252. Option('--queue', help='custom queue name.'),
  253. Option('--exchange', help='custom exchange name.'),
  254. Option('--routing-key', help='custom routing key.'),
  255. )
  256. def run(self, name, *_, **kw):
  257. # Positional args.
  258. args = kw.get('args') or ()
  259. if isinstance(args, basestring):
  260. args = anyjson.loads(args)
  261. # Keyword args.
  262. kwargs = kw.get('kwargs') or {}
  263. if isinstance(kwargs, basestring):
  264. kwargs = anyjson.loads(kwargs)
  265. # Expires can be int/float.
  266. expires = kw.get('expires') or None
  267. try:
  268. expires = float(expires)
  269. except (TypeError, ValueError):
  270. # or a string describing an ISO 8601 datetime.
  271. try:
  272. expires = maybe_iso8601(expires)
  273. except (TypeError, ValueError):
  274. raise
  275. res = self.app.send_task(name, args=args, kwargs=kwargs,
  276. countdown=kw.get('countdown'),
  277. serializer=kw.get('serializer'),
  278. queue=kw.get('queue'),
  279. exchange=kw.get('exchange'),
  280. routing_key=kw.get('routing_key'),
  281. eta=maybe_iso8601(kw.get('eta')),
  282. expires=expires)
  283. self.out(res.id)
  284. call = command(call)
  285. class purge(Command):
  286. """Erase all messages from all known task queues.
  287. WARNING: There is no undo operation for this command.
  288. """
  289. def run(self, *args, **kwargs):
  290. queues = len(self.app.amqp.queues.keys())
  291. messages_removed = self.app.control.purge()
  292. if messages_removed:
  293. self.out('Purged %s %s from %s known task %s.' % (
  294. messages_removed, text.pluralize(messages_removed, 'message'),
  295. queues, text.pluralize(queues, 'queue')))
  296. else:
  297. self.out('No messages purged from %s known %s' % (
  298. queues, text.pluralize(queues, 'queue')))
  299. purge = command(purge)
  300. class result(Command):
  301. """Gives the return value for a given task id.
  302. Examples::
  303. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
  304. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
  305. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
  306. """
  307. args = '<task_id>'
  308. option_list = Command.option_list + (
  309. Option('--task', '-t', help='name of task (if custom backend)'),
  310. Option('--traceback', action='store_true',
  311. help='show traceback instead'),
  312. )
  313. def run(self, task_id, *args, **kwargs):
  314. result_cls = self.app.AsyncResult
  315. task = kwargs.get('task')
  316. traceback = kwargs.get('traceback', False)
  317. if task:
  318. result_cls = self.app.tasks[task].AsyncResult
  319. result = result_cls(task_id)
  320. if traceback:
  321. value = result.traceback
  322. else:
  323. value = result.get()
  324. self.out(self.prettify(value)[1])
  325. result = command(result)
  326. class _RemoteControl(Command):
  327. name = None
  328. choices = None
  329. leaf = False
  330. option_list = Command.option_list + (
  331. Option('--timeout', '-t', type='float',
  332. help='Timeout in seconds (float) waiting for reply'),
  333. Option('--destination', '-d',
  334. help='Comma separated list of destination node names.'))
  335. @classmethod
  336. def get_command_info(self, command, indent=0, prefix='', color=None,
  337. help=False):
  338. if help:
  339. help = '|' + text.indent(self.choices[command][1], indent + 4)
  340. else:
  341. help = None
  342. try:
  343. # see if it uses args.
  344. meth = getattr(self, command)
  345. return text.join([
  346. '|' + text.indent('%s%s %s' % (prefix, color(command),
  347. meth.__doc__), indent), help,
  348. ])
  349. except AttributeError:
  350. return text.join([
  351. '|' + text.indent(prefix + str(color(command)), indent), help,
  352. ])
  353. @classmethod
  354. def list_commands(self, indent=0, prefix='', color=None, help=False):
  355. color = color if color else lambda x: x
  356. prefix = prefix + ' ' if prefix else ''
  357. return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
  358. for c in sorted(self.choices))
  359. @property
  360. def epilog(self):
  361. return '\n'.join([
  362. '[Commands]',
  363. self.list_commands(indent=4, help=True)
  364. ])
  365. def usage(self, command):
  366. return '%%prog %s [options] %s <command> [arg1 .. argN]' % (
  367. command, self.args)
  368. def call(self, *args, **kwargs):
  369. raise NotImplementedError('get_obj')
  370. def run(self, *args, **kwargs):
  371. if not args:
  372. raise Error('Missing %s method. See --help' % self.name)
  373. return self.do_call_method(args, **kwargs)
  374. def do_call_method(self, args, **kwargs):
  375. method = args[0]
  376. if method == 'help':
  377. raise Error("Did you mean '%s --help'?" % self.name)
  378. if method not in self.choices:
  379. raise Error('Unknown %s method %s' % (self.name, method))
  380. destination = kwargs.get('destination')
  381. timeout = kwargs.get('timeout') or self.choices[method][0]
  382. if destination and isinstance(destination, basestring):
  383. destination = map(str.strip, destination.split(','))
  384. try:
  385. handler = getattr(self, method)
  386. except AttributeError:
  387. handler = self.call
  388. # XXX Python 2.5 does not support X(*args, foo=1)
  389. kwargs = {"timeout": timeout, "destination": destination,
  390. "callback": self.say_remote_command_reply}
  391. replies = handler(method, *args[1:], **kwargs)
  392. if not replies:
  393. raise Error('No nodes replied within time constraint.',
  394. status=EX_UNAVAILABLE)
  395. return replies
  396. def say(self, direction, title, body=''):
  397. c = self.colored
  398. if direction == '<-' and self.quiet:
  399. return
  400. dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
  401. self.out(c.reset(dirstr, title))
  402. if body and self.show_body:
  403. self.out(body)
  404. class inspect(_RemoteControl):
  405. """Inspect the worker at runtime.
  406. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  407. Examples::
  408. celery inspect active --timeout=5
  409. celery inspect scheduled -d worker1.example.com
  410. celery inspect revoked -d w1.e.com,w2.e.com
  411. """
  412. name = 'inspect'
  413. choices = {
  414. 'active': (1.0, 'dump active tasks (being processed)'),
  415. 'active_queues': (1.0, 'dump queues being consumed from'),
  416. 'scheduled': (1.0, 'dump scheduled tasks (eta/countdown/retry)'),
  417. 'reserved': (1.0, 'dump reserved tasks (waiting to be processed)'),
  418. 'stats': (1.0, 'dump worker statistics'),
  419. 'revoked': (1.0, 'dump of revoked task ids'),
  420. 'registered': (1.0, 'dump of registered tasks'),
  421. 'ping': (0.2, 'ping worker(s)'),
  422. 'report': (1.0, 'get bugreport info')
  423. }
  424. def call(self, method, *args, **options):
  425. i = self.app.control.inspect(**options)
  426. return getattr(i, method)(*args)
  427. inspect = command(inspect)
  428. class control(_RemoteControl):
  429. """Workers remote control.
  430. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  431. Examples::
  432. celery control enable_events --timeout=5
  433. celery control -d worker1.example.com enable_events
  434. celery control -d w1.e.com,w2.e.com enable_events
  435. celery control -d w1.e.com add_consumer queue_name
  436. celery control -d w1.e.com cancel_consumer queue_name
  437. celery control -d w1.e.com add_consumer queue exchange direct rkey
  438. """
  439. name = 'control'
  440. choices = {
  441. 'enable_events': (1.0, 'tell worker(s) to enable events'),
  442. 'disable_events': (1.0, 'tell worker(s) to disable events'),
  443. 'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
  444. 'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
  445. 'rate_limit': (1.0,
  446. 'tell worker(s) to modify the rate limit for a task type'),
  447. 'time_limit': (1.0,
  448. 'tell worker(s) to modify the time limit for a task type.'),
  449. 'autoscale': (1.0, 'change autoscale settings'),
  450. 'pool_grow': (1.0, 'start more pool processes'),
  451. 'pool_shrink': (1.0, 'use less pool processes'),
  452. }
  453. def call(self, method, *args, **options):
  454. # XXX Python 2.5 doesn't support X(*args, reply=True, **kwargs)
  455. return getattr(self.app.control, method)(
  456. *args, **dict(options, retry=True))
  457. def pool_grow(self, method, n=1, **kwargs):
  458. """[N=1]"""
  459. return self.call(method, n, **kwargs)
  460. def pool_shrink(self, method, n=1, **kwargs):
  461. """[N=1]"""
  462. return self.call(method, n, **kwargs)
  463. def autoscale(self, method, max=None, min=None, **kwargs):
  464. """[max] [min]"""
  465. return self.call(method, max, min, **kwargs)
  466. def rate_limit(self, method, task_name, rate_limit, **kwargs):
  467. """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
  468. return self.call(method, task_name, rate_limit, reply=True, **kwargs)
  469. def time_limit(self, method, task_name, soft, hard=None, **kwargs):
  470. """<task_name> <soft_secs> [hard_secs]"""
  471. return self.call(method, task_name, soft, hard, reply=True, **kwargs)
  472. def add_consumer(self, method, queue, exchange=None,
  473. exchange_type='direct', routing_key=None, **kwargs):
  474. """<queue> [exchange [type [routing_key]]]"""
  475. return self.call(method, queue, exchange,
  476. exchange_type, routing_key, reply=True, **kwargs)
  477. def cancel_consumer(self, method, queue, **kwargs):
  478. """<queue>"""
  479. return self.call(method, queue, reply=True, **kwargs)
  480. control = command(control)
  481. class status(Command):
  482. """Show list of workers that are online."""
  483. option_list = inspect.option_list
  484. def run(self, *args, **kwargs):
  485. replies = inspect(app=self.app,
  486. no_color=kwargs.get('no_color', False),
  487. stdout=self.stdout, stderr=self.stderr,
  488. show_reply=False) \
  489. .run('ping', **dict(kwargs, quiet=True, show_body=False))
  490. if not replies:
  491. raise Error('No nodes replied within time constraint',
  492. status=EX_UNAVAILABLE)
  493. nodecount = len(replies)
  494. if not kwargs.get('quiet', False):
  495. self.out('\n%s %s online.' % (nodecount,
  496. text.pluralize(nodecount, 'node')))
  497. status = command(status)
  498. class migrate(Command):
  499. """Migrate tasks from one broker to another.
  500. Examples::
  501. celery migrate redis://localhost amqp://guest@localhost//
  502. celery migrate django:// redis://localhost
  503. NOTE: This command is experimental, make sure you have
  504. a backup of the tasks before you continue.
  505. """
  506. args = '<source_url> <dest_url>'
  507. option_list = Command.option_list + (
  508. Option('--limit', '-n', type='int',
  509. help='Number of tasks to consume (int)'),
  510. Option('--timeout', '-t', type='float', default=1.0,
  511. help='Timeout in seconds (float) waiting for tasks'),
  512. Option('--ack-messages', '-a', action='store_true',
  513. help='Ack messages from source broker.'),
  514. Option('--tasks', '-T',
  515. help='List of task names to filter on.'),
  516. Option('--queues', '-Q',
  517. help='List of queues to migrate.'),
  518. Option('--forever', '-F', action='store_true',
  519. help='Continually migrate tasks until killed.'),
  520. )
  521. def on_migrate_task(self, state, body, message):
  522. self.out('Migrating task %s/%s: %s[%s]' % (
  523. state.count, state.strtotal, body['task'], body['id']))
  524. def run(self, *args, **kwargs):
  525. if len(args) != 2:
  526. return self.show_help('migrate')
  527. from kombu import Connection
  528. from celery.contrib.migrate import migrate_tasks
  529. migrate_tasks(Connection(args[0]),
  530. Connection(args[1]),
  531. callback=self.on_migrate_task,
  532. **kwargs)
  533. migrate = command(migrate)
  534. class shell(Command): # pragma: no cover
  535. """Start shell session with convenient access to celery symbols.
  536. The following symbols will be added to the main globals:
  537. - celery: the current application.
  538. - chord, ichord, group, igroup, chain,
  539. chunks, xmap, xstarmap subtask, Task
  540. - all registered tasks.
  541. Example Session::
  542. $ celery shell
  543. >>> celery
  544. <Celery default:0x1012d9fd0>
  545. >>> add
  546. <@task: tasks.add>
  547. >>> add.delay(2, 2)
  548. <AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
  549. """
  550. option_list = Command.option_list + (
  551. Option('--ipython', '-I',
  552. action='store_true', dest='force_ipython',
  553. help='force iPython.'),
  554. Option('--bpython', '-B',
  555. action='store_true', dest='force_bpython',
  556. help='force bpython.'),
  557. Option('--python', '-P',
  558. action='store_true', dest='force_python',
  559. help='force default Python shell.'),
  560. Option('--without-tasks', '-T', action='store_true',
  561. help="don't add tasks to locals."),
  562. Option('--eventlet', action='store_true',
  563. help='use eventlet.'),
  564. Option('--gevent', action='store_true', help='use gevent.'),
  565. )
  566. def run(self, force_ipython=False, force_bpython=False,
  567. force_python=False, without_tasks=False, eventlet=False,
  568. gevent=False, **kwargs):
  569. if eventlet:
  570. import_module('celery.concurrency.eventlet')
  571. if gevent:
  572. import_module('celery.concurrency.gevent')
  573. import celery
  574. import celery.task.base
  575. self.app.loader.import_default_modules()
  576. self.locals = {'celery': self.app,
  577. 'Task': celery.Task,
  578. 'chord': celery.chord,
  579. 'ichord': celery.ichord,
  580. 'group': celery.group,
  581. 'igroup': celery.igroup,
  582. 'chain': celery.chain,
  583. 'chunks': celery.chunks,
  584. 'xmap': celery.xmap,
  585. 'xstarmap': celery.xstarmap,
  586. 'subtask': celery.subtask}
  587. if not without_tasks:
  588. self.locals.update(dict((task.__name__, task)
  589. for task in self.app.tasks.itervalues()
  590. if not task.name.startswith('celery.')))
  591. if force_python:
  592. return self.invoke_fallback_shell()
  593. elif force_bpython:
  594. return self.invoke_bpython_shell()
  595. elif force_ipython:
  596. return self.invoke_ipython_shell()
  597. return self.invoke_default_shell()
  598. def invoke_default_shell(self):
  599. try:
  600. import IPython # noqa
  601. except ImportError:
  602. try:
  603. import bpython # noqa
  604. except ImportError:
  605. return self.invoke_fallback_shell()
  606. else:
  607. return self.invoke_bpython_shell()
  608. else:
  609. return self.invoke_ipython_shell()
  610. def invoke_fallback_shell(self):
  611. import code
  612. try:
  613. import readline
  614. except ImportError:
  615. pass
  616. else:
  617. import rlcompleter
  618. readline.set_completer(
  619. rlcompleter.Completer(self.locals).complete)
  620. readline.parse_and_bind('tab:complete')
  621. code.interact(local=self.locals)
  622. def invoke_ipython_shell(self):
  623. try:
  624. from IPython.frontend.terminal import embed
  625. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  626. except ImportError: # ipython < 0.11
  627. from IPython.Shell import IPShell
  628. IPShell(argv=[], user_ns=self.locals).mainloop()
  629. def invoke_bpython_shell(self):
  630. import bpython
  631. bpython.embed(self.locals)
  632. shell = command(shell)
  633. class help(Command):
  634. """Show help screen and exit."""
  635. def usage(self, command):
  636. return '%%prog <command> [options] %s' % (self.args, )
  637. def run(self, *args, **kwargs):
  638. self.parser.print_help()
  639. self.out(HELP % {'prog_name': self.prog_name,
  640. 'commands': CeleryCommand.list_commands()})
  641. return EX_USAGE
  642. help = command(help)
  643. class report(Command):
  644. """Shows information useful to include in bugreports."""
  645. def run(self, *args, **kwargs):
  646. self.out(self.app.bugreport())
  647. return EX_OK
  648. report = command(report)
  649. class CeleryCommand(BaseCommand):
  650. commands = commands
  651. enable_config_from_cmdline = True
  652. prog_name = 'celery'
  653. def execute(self, command, argv=None):
  654. try:
  655. cls = self.commands[command]
  656. except KeyError:
  657. cls, argv = self.commands['help'], ['help']
  658. cls = self.commands.get(command) or self.commands['help']
  659. try:
  660. return cls(app=self.app).run_from_argv(self.prog_name, argv)
  661. except Error:
  662. return self.execute('help', argv)
  663. def remove_options_at_beginning(self, argv, index=0):
  664. if argv:
  665. while index < len(argv):
  666. value = argv[index]
  667. if value.startswith('--'):
  668. pass
  669. elif value.startswith('-'):
  670. index += 1
  671. else:
  672. return argv[index:]
  673. index += 1
  674. return []
  675. def handle_argv(self, prog_name, argv):
  676. self.prog_name = prog_name
  677. argv = self.remove_options_at_beginning(argv)
  678. _, argv = self.prepare_args(None, argv)
  679. try:
  680. command = argv[0]
  681. except IndexError:
  682. command, argv = 'help', ['help']
  683. return self.execute(command, argv)
  684. def execute_from_commandline(self, argv=None):
  685. try:
  686. sys.exit(determine_exit_status(
  687. super(CeleryCommand, self).execute_from_commandline(argv)))
  688. except KeyboardInterrupt:
  689. sys.exit(EX_FAILURE)
  690. @classmethod
  691. def get_command_info(self, command, indent=0, color=None):
  692. colored = term.colored().names[color] if color else lambda x: x
  693. obj = self.commands[command]
  694. if obj.leaf:
  695. return '|' + text.indent('celery %s' % colored(command), indent)
  696. return text.join([
  697. ' ',
  698. '|' + text.indent('celery %s --help' % colored(command), indent),
  699. obj.list_commands(indent, 'celery %s' % command, colored),
  700. ])
  701. @classmethod
  702. def list_commands(self, indent=0):
  703. white = term.colored().white
  704. ret = []
  705. for cls, commands, color in command_classes:
  706. ret.extend([
  707. text.indent('+ %s: ' % white(cls), indent),
  708. '\n'.join(self.get_command_info(command, indent + 4, color)
  709. for command in commands),
  710. ''
  711. ])
  712. return '\n'.join(ret).strip()
  713. def determine_exit_status(ret):
  714. if isinstance(ret, int):
  715. return ret
  716. return EX_OK if ret else EX_FAILURE
  717. def main():
  718. # Fix for setuptools generated scripts, so that it will
  719. # work with multiprocessing fork emulation.
  720. # (see multiprocessing.forking.get_preparation_data())
  721. if __name__ != '__main__': # pragma: no cover
  722. sys.modules['__main__'] = sys.modules[__name__]
  723. freeze_support()
  724. CeleryCommand().execute_from_commandline()
  725. if __name__ == '__main__': # pragma: no cover
  726. main()