celery.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery` umbrella command.
  4. .. program:: celery
  5. """
  6. from __future__ import absolute_import
  7. from __future__ import with_statement
  8. import anyjson
  9. import sys
  10. import warnings
  11. from billiard import freeze_support
  12. from importlib import import_module
  13. from pprint import pformat
  14. from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
  15. from celery.utils import term
  16. from celery.utils import text
  17. from celery.utils.functional import memoize
  18. from celery.utils.imports import symbol_by_name
  19. from celery.utils.timeutils import maybe_iso8601
  20. from celery.bin.base import Command as BaseCommand, Option
  21. HELP = """
  22. ---- -- - - ---- Commands- -------------- --- ------------
  23. %(commands)s
  24. ---- -- - - --------- -- - -------------- --- ------------
  25. Type '%(prog_name)s <command> --help' for help using a specific command.
  26. """
  27. commands = {}
  28. command_classes = [
  29. ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
  30. ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
  31. ('Utils', ['purge', 'list', 'migrate', 'call', 'result', 'report'], None),
  32. ]
  33. @memoize()
  34. def _get_extension_classes():
  35. extensions = []
  36. command_classes.append(('Extensions', extensions, 'magenta'))
  37. return extensions
  38. class Error(Exception):
  39. def __init__(self, reason, status=EX_FAILURE):
  40. self.reason = reason
  41. self.status = status
  42. super(Error, self).__init__(reason, status)
  43. def __str__(self):
  44. return self.reason
  45. def command(fun, name=None, sortpri=0):
  46. commands[name or fun.__name__] = fun
  47. fun.sortpri = sortpri
  48. return fun
  49. def load_extension_commands(namespace='celery.commands'):
  50. try:
  51. from pkg_resources import iter_entry_points
  52. except ImportError:
  53. return
  54. for ep in iter_entry_points(namespace):
  55. _get_extension_classes().append(ep.name)
  56. sym = ':'.join([ep.module_name, ep.attrs[0]])
  57. try:
  58. cls = symbol_by_name(sym)
  59. except (ImportError, SyntaxError), exc:
  60. warnings.warn('Cannot load extension %r: %r' % (sym, exc))
  61. else:
  62. command(cls, name=ep.name)
  63. class Command(BaseCommand):
  64. help = ''
  65. args = ''
  66. prog_name = 'celery'
  67. show_body = True
  68. show_reply = True
  69. option_list = (
  70. Option('--quiet', '-q', action='store_true'),
  71. Option('--no-color', '-C', action='store_true'),
  72. )
  73. def __init__(self, app=None, no_color=False, stdout=sys.stdout,
  74. stderr=sys.stderr, show_reply=True):
  75. super(Command, self).__init__(app=app)
  76. self.colored = term.colored(enabled=not no_color)
  77. self.stdout = stdout
  78. self.stderr = stderr
  79. self.quiet = False
  80. if show_reply is not None:
  81. self.show_reply = show_reply
  82. def __call__(self, *args, **kwargs):
  83. try:
  84. ret = self.run(*args, **kwargs)
  85. except Error, exc:
  86. self.error(self.colored.red('Error: %s' % exc))
  87. return exc.status
  88. return ret if ret is not None else EX_OK
  89. def show_help(self, command):
  90. self.run_from_argv(self.prog_name, [command, '--help'])
  91. return EX_USAGE
  92. def error(self, s):
  93. self.out(s, fh=self.stderr)
  94. def out(self, s, fh=None):
  95. s = str(s)
  96. if not s.endswith('\n'):
  97. s += '\n'
  98. (fh or self.stdout).write(s)
  99. def run_from_argv(self, prog_name, argv):
  100. self.prog_name = prog_name
  101. self.command = argv[0]
  102. self.arglist = argv[1:]
  103. self.parser = self.create_parser(self.prog_name, self.command)
  104. options, args = self.prepare_args(
  105. *self.parser.parse_args(self.arglist))
  106. self.colored = term.colored(enabled=not options['no_color'])
  107. self.quiet = options.get('quiet', False)
  108. self.show_body = options.get('show_body', True)
  109. return self(*args, **options)
  110. def usage(self, command):
  111. return '%%prog %s [options] %s' % (command, self.args)
  112. def prettify_list(self, n):
  113. c = self.colored
  114. if not n:
  115. return '- empty -'
  116. return '\n'.join(str(c.reset(c.white('*'), ' %s' % (item, )))
  117. for item in n)
  118. def prettify_dict_ok_error(self, n):
  119. c = self.colored
  120. try:
  121. return (c.green('OK'),
  122. text.indent(self.prettify(n['ok'])[1], 4))
  123. except KeyError:
  124. pass
  125. return (c.red('ERROR'),
  126. text.indent(self.prettify(n['error'])[1], 4))
  127. def say_remote_command_reply(self, replies):
  128. c = self.colored
  129. node = replies.keys()[0]
  130. reply = replies[node]
  131. status, preply = self.prettify(reply)
  132. self.say_chat('->', c.cyan(node, ': ') + status,
  133. text.indent(preply, 4) if self.show_reply else '')
  134. def prettify(self, n):
  135. OK = str(self.colored.green('OK'))
  136. if isinstance(n, list):
  137. return OK, self.prettify_list(n)
  138. if isinstance(n, dict):
  139. if 'ok' in n or 'error' in n:
  140. return self.prettify_dict_ok_error(n)
  141. if isinstance(n, basestring):
  142. return OK, unicode(n)
  143. return OK, pformat(n)
  144. def say_chat(self, direction, title, body=''):
  145. c = self.colored
  146. if direction == '<-' and self.quiet:
  147. return
  148. dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
  149. self.out(c.reset(dirstr, title))
  150. if body and self.show_body:
  151. self.out(body)
  152. @property
  153. def description(self):
  154. return self.__doc__
  155. class Delegate(Command):
  156. def __init__(self, *args, **kwargs):
  157. super(Delegate, self).__init__(*args, **kwargs)
  158. self.target = symbol_by_name(self.Command)(app=self.app)
  159. self.args = self.target.args
  160. def get_options(self):
  161. return self.option_list + self.target.get_options()
  162. def create_parser(self, prog_name, command):
  163. parser = super(Delegate, self).create_parser(prog_name, command)
  164. return self.target.prepare_parser(parser)
  165. def run(self, *args, **kwargs):
  166. self.target.check_args(args)
  167. return self.target.run(*args, **kwargs)
  168. class multi(Command):
  169. """Start multiple worker instances."""
  170. def get_options(self):
  171. return ()
  172. def run_from_argv(self, prog_name, argv):
  173. from celery.bin.celeryd_multi import MultiTool
  174. return MultiTool().execute_from_commandline(argv, prog_name)
  175. multi = command(multi)
  176. class worker(Delegate):
  177. """Start worker instance.
  178. Examples::
  179. celery worker --app=proj -l info
  180. celery worker -A proj -l info -Q hipri,lopri
  181. celery worker -A proj --concurrency=4
  182. celery worker -A proj --concurrency=1000 -P eventlet
  183. celery worker --autoscale=10,0
  184. """
  185. Command = 'celery.bin.celeryd:WorkerCommand'
  186. worker = command(worker, sortpri=01)
  187. class events(Delegate):
  188. """Event-stream utilities.
  189. Commands::
  190. celery events --app=proj
  191. start graphical monitor (requires curses)
  192. celery events -d --app=proj
  193. dump events to screen.
  194. celery events -b amqp://
  195. celery events -C <camera> [options]
  196. run snapshot camera.
  197. Examples::
  198. celery events
  199. celery events -d
  200. celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
  201. """
  202. Command = 'celery.bin.celeryev:EvCommand'
  203. events = command(events, sortpri=10)
  204. class beat(Delegate):
  205. """Start the celerybeat periodic task scheduler.
  206. Examples::
  207. celery beat -l info
  208. celery beat -s /var/run/celerybeat/schedule --detach
  209. celery beat -S djcelery.schedulers.DatabaseScheduler
  210. """
  211. Command = 'celery.bin.celerybeat:BeatCommand'
  212. beat = command(beat, sortpri=20)
  213. class amqp(Delegate):
  214. """AMQP Administration Shell.
  215. Also works for non-amqp transports.
  216. Examples::
  217. celery amqp
  218. start shell mode
  219. celery amqp help
  220. show list of commands
  221. celery amqp exchange.delete name
  222. celery amqp queue.delete queue
  223. celery amqp queue.delete queue yes yes
  224. """
  225. Command = 'celery.bin.camqadm:AMQPAdminCommand'
  226. amqp = command(amqp, sortpri=30)
  227. class list_(Command):
  228. """Get info from broker.
  229. Examples::
  230. celery list bindings
  231. NOTE: For RabbitMQ the management plugin is required.
  232. """
  233. args = '[bindings]'
  234. def list_bindings(self, management):
  235. try:
  236. bindings = management.get_bindings()
  237. except NotImplementedError:
  238. raise Error('Your transport cannot list bindings.')
  239. fmt = lambda q, e, r: self.out('%s %s %s' % (q.ljust(28),
  240. e.ljust(28), r))
  241. fmt('Queue', 'Exchange', 'Routing Key')
  242. fmt('-' * 16, '-' * 16, '-' * 16)
  243. for b in bindings:
  244. fmt(b['destination'], b['source'], b['routing_key'])
  245. def run(self, what=None, *_, **kw):
  246. topics = {'bindings': self.list_bindings}
  247. available = ', '.join(topics.keys())
  248. if not what:
  249. raise Error('You must specify what to list (%s)' % available)
  250. if what not in topics:
  251. raise Error('unknown topic %r (choose one of: %s)' % (
  252. what, available))
  253. with self.app.connection() as conn:
  254. self.app.amqp.TaskConsumer(conn).declare()
  255. topics[what](conn.manager)
  256. list_ = command(list_, 'list')
  257. class call(Command):
  258. """Call a task by name.
  259. Examples::
  260. celery call tasks.add --args='[2, 2]'
  261. celery call tasks.add --args='[2, 2]' --countdown=10
  262. """
  263. args = '<task_name>'
  264. option_list = Command.option_list + (
  265. Option('--args', '-a', help='positional arguments (json).'),
  266. Option('--kwargs', '-k', help='keyword arguments (json).'),
  267. Option('--eta', help='scheduled time (ISO-8601).'),
  268. Option('--countdown', type='float',
  269. help='eta in seconds from now (float/int).'),
  270. Option('--expires', help='expiry time (ISO-8601/float/int).'),
  271. Option('--serializer', default='json', help='defaults to json.'),
  272. Option('--queue', help='custom queue name.'),
  273. Option('--exchange', help='custom exchange name.'),
  274. Option('--routing-key', help='custom routing key.'),
  275. )
  276. def run(self, name, *_, **kw):
  277. # Positional args.
  278. args = kw.get('args') or ()
  279. if isinstance(args, basestring):
  280. args = anyjson.loads(args)
  281. # Keyword args.
  282. kwargs = kw.get('kwargs') or {}
  283. if isinstance(kwargs, basestring):
  284. kwargs = anyjson.loads(kwargs)
  285. # Expires can be int/float.
  286. expires = kw.get('expires') or None
  287. try:
  288. expires = float(expires)
  289. except (TypeError, ValueError):
  290. # or a string describing an ISO 8601 datetime.
  291. try:
  292. expires = maybe_iso8601(expires)
  293. except (TypeError, ValueError):
  294. raise
  295. res = self.app.send_task(name, args=args, kwargs=kwargs,
  296. countdown=kw.get('countdown'),
  297. serializer=kw.get('serializer'),
  298. queue=kw.get('queue'),
  299. exchange=kw.get('exchange'),
  300. routing_key=kw.get('routing_key'),
  301. eta=maybe_iso8601(kw.get('eta')),
  302. expires=expires)
  303. self.out(res.id)
  304. call = command(call)
  305. class purge(Command):
  306. """Erase all messages from all known task queues.
  307. WARNING: There is no undo operation for this command.
  308. """
  309. def run(self, *args, **kwargs):
  310. queues = len(self.app.amqp.queues.keys())
  311. messages_removed = self.app.control.purge()
  312. if messages_removed:
  313. self.out('Purged %s %s from %s known task %s.' % (
  314. messages_removed, text.pluralize(messages_removed, 'message'),
  315. queues, text.pluralize(queues, 'queue')))
  316. else:
  317. self.out('No messages purged from %s known %s' % (
  318. queues, text.pluralize(queues, 'queue')))
  319. purge = command(purge)
  320. class result(Command):
  321. """Gives the return value for a given task id.
  322. Examples::
  323. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
  324. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
  325. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
  326. """
  327. args = '<task_id>'
  328. option_list = Command.option_list + (
  329. Option('--task', '-t', help='name of task (if custom backend)'),
  330. Option('--traceback', action='store_true',
  331. help='show traceback instead'),
  332. )
  333. def run(self, task_id, *args, **kwargs):
  334. result_cls = self.app.AsyncResult
  335. task = kwargs.get('task')
  336. traceback = kwargs.get('traceback', False)
  337. if task:
  338. result_cls = self.app.tasks[task].AsyncResult
  339. result = result_cls(task_id)
  340. if traceback:
  341. value = result.traceback
  342. else:
  343. value = result.get()
  344. self.out(self.prettify(value)[1])
  345. result = command(result)
  346. class _RemoteControl(Command):
  347. name = None
  348. choices = None
  349. leaf = False
  350. option_list = Command.option_list + (
  351. Option('--timeout', '-t', type='float',
  352. help='Timeout in seconds (float) waiting for reply'),
  353. Option('--destination', '-d',
  354. help='Comma separated list of destination node names.'))
  355. @classmethod
  356. def get_command_info(self, command, indent=0, prefix='', color=None,
  357. help=False):
  358. if help:
  359. help = '|' + text.indent(self.choices[command][1], indent + 4)
  360. else:
  361. help = None
  362. try:
  363. # see if it uses args.
  364. meth = getattr(self, command)
  365. return text.join([
  366. '|' + text.indent('%s%s %s' % (prefix, color(command),
  367. meth.__doc__), indent), help,
  368. ])
  369. except AttributeError:
  370. return text.join([
  371. '|' + text.indent(prefix + str(color(command)), indent), help,
  372. ])
  373. @classmethod
  374. def list_commands(self, indent=0, prefix='', color=None, help=False):
  375. color = color if color else lambda x: x
  376. prefix = prefix + ' ' if prefix else ''
  377. return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
  378. for c in sorted(self.choices))
  379. @property
  380. def epilog(self):
  381. return '\n'.join([
  382. '[Commands]',
  383. self.list_commands(indent=4, help=True)
  384. ])
  385. def usage(self, command):
  386. return '%%prog %s [options] %s <command> [arg1 .. argN]' % (
  387. command, self.args)
  388. def call(self, *args, **kwargs):
  389. raise NotImplementedError('get_obj')
  390. def run(self, *args, **kwargs):
  391. if not args:
  392. raise Error('Missing %s method. See --help' % self.name)
  393. return self.do_call_method(args, **kwargs)
  394. def do_call_method(self, args, **kwargs):
  395. method = args[0]
  396. if method == 'help':
  397. raise Error("Did you mean '%s --help'?" % self.name)
  398. if method not in self.choices:
  399. raise Error('Unknown %s method %s' % (self.name, method))
  400. destination = kwargs.get('destination')
  401. timeout = kwargs.get('timeout') or self.choices[method][0]
  402. if destination and isinstance(destination, basestring):
  403. destination = map(str.strip, destination.split(','))
  404. try:
  405. handler = getattr(self, method)
  406. except AttributeError:
  407. handler = self.call
  408. # XXX Python 2.5 does not support X(*args, foo=1)
  409. kwargs = {"timeout": timeout, "destination": destination,
  410. "callback": self.say_remote_command_reply}
  411. replies = handler(method, *args[1:], **kwargs)
  412. if not replies:
  413. raise Error('No nodes replied within time constraint.',
  414. status=EX_UNAVAILABLE)
  415. return replies
  416. def say(self, direction, title, body=''):
  417. c = self.colored
  418. if direction == '<-' and self.quiet:
  419. return
  420. dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
  421. self.out(c.reset(dirstr, title))
  422. if body and self.show_body:
  423. self.out(body)
  424. class inspect(_RemoteControl):
  425. """Inspect the worker at runtime.
  426. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  427. Examples::
  428. celery inspect active --timeout=5
  429. celery inspect scheduled -d worker1.example.com
  430. celery inspect revoked -d w1.e.com,w2.e.com
  431. """
  432. name = 'inspect'
  433. choices = {
  434. 'active': (1.0, 'dump active tasks (being processed)'),
  435. 'active_queues': (1.0, 'dump queues being consumed from'),
  436. 'scheduled': (1.0, 'dump scheduled tasks (eta/countdown/retry)'),
  437. 'reserved': (1.0, 'dump reserved tasks (waiting to be processed)'),
  438. 'stats': (1.0, 'dump worker statistics'),
  439. 'revoked': (1.0, 'dump of revoked task ids'),
  440. 'registered': (1.0, 'dump of registered tasks'),
  441. 'ping': (0.2, 'ping worker(s)'),
  442. 'report': (1.0, 'get bugreport info')
  443. }
  444. def call(self, method, *args, **options):
  445. i = self.app.control.inspect(**options)
  446. return getattr(i, method)(*args)
  447. inspect = command(inspect)
  448. class control(_RemoteControl):
  449. """Workers remote control.
  450. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  451. Examples::
  452. celery control enable_events --timeout=5
  453. celery control -d worker1.example.com enable_events
  454. celery control -d w1.e.com,w2.e.com enable_events
  455. celery control -d w1.e.com add_consumer queue_name
  456. celery control -d w1.e.com cancel_consumer queue_name
  457. celery control -d w1.e.com add_consumer queue exchange direct rkey
  458. """
  459. name = 'control'
  460. choices = {
  461. 'enable_events': (1.0, 'tell worker(s) to enable events'),
  462. 'disable_events': (1.0, 'tell worker(s) to disable events'),
  463. 'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
  464. 'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
  465. 'rate_limit': (1.0,
  466. 'tell worker(s) to modify the rate limit for a task type'),
  467. 'time_limit': (1.0,
  468. 'tell worker(s) to modify the time limit for a task type.'),
  469. 'autoscale': (1.0, 'change autoscale settings'),
  470. 'pool_grow': (1.0, 'start more pool processes'),
  471. 'pool_shrink': (1.0, 'use less pool processes'),
  472. }
  473. def call(self, method, *args, **options):
  474. # XXX Python 2.5 doesn't support X(*args, reply=True, **kwargs)
  475. return getattr(self.app.control, method)(
  476. *args, **dict(options, retry=True))
  477. def pool_grow(self, method, n=1, **kwargs):
  478. """[N=1]"""
  479. return self.call(method, n, **kwargs)
  480. def pool_shrink(self, method, n=1, **kwargs):
  481. """[N=1]"""
  482. return self.call(method, n, **kwargs)
  483. def autoscale(self, method, max=None, min=None, **kwargs):
  484. """[max] [min]"""
  485. return self.call(method, max, min, **kwargs)
  486. def rate_limit(self, method, task_name, rate_limit, **kwargs):
  487. """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
  488. return self.call(method, task_name, rate_limit, reply=True, **kwargs)
  489. def time_limit(self, method, task_name, soft, hard=None, **kwargs):
  490. """<task_name> <soft_secs> [hard_secs]"""
  491. return self.call(method, task_name, soft, hard, reply=True, **kwargs)
  492. def add_consumer(self, method, queue, exchange=None,
  493. exchange_type='direct', routing_key=None, **kwargs):
  494. """<queue> [exchange [type [routing_key]]]"""
  495. return self.call(method, queue, exchange,
  496. exchange_type, routing_key, reply=True, **kwargs)
  497. def cancel_consumer(self, method, queue, **kwargs):
  498. """<queue>"""
  499. return self.call(method, queue, reply=True, **kwargs)
  500. control = command(control)
  501. class status(Command):
  502. """Show list of workers that are online."""
  503. option_list = inspect.option_list
  504. def run(self, *args, **kwargs):
  505. replies = inspect(app=self.app,
  506. no_color=kwargs.get('no_color', False),
  507. stdout=self.stdout, stderr=self.stderr,
  508. show_reply=False) \
  509. .run('ping', **dict(kwargs, quiet=True, show_body=False))
  510. if not replies:
  511. raise Error('No nodes replied within time constraint',
  512. status=EX_UNAVAILABLE)
  513. nodecount = len(replies)
  514. if not kwargs.get('quiet', False):
  515. self.out('\n%s %s online.' % (nodecount,
  516. text.pluralize(nodecount, 'node')))
  517. status = command(status)
  518. class migrate(Command):
  519. """Migrate tasks from one broker to another.
  520. Examples::
  521. celery migrate redis://localhost amqp://guest@localhost//
  522. celery migrate django:// redis://localhost
  523. NOTE: This command is experimental, make sure you have
  524. a backup of the tasks before you continue.
  525. """
  526. args = '<source_url> <dest_url>'
  527. option_list = Command.option_list + (
  528. Option('--limit', '-n', type='int',
  529. help='Number of tasks to consume (int)'),
  530. Option('--timeout', '-t', type='float', default=1.0,
  531. help='Timeout in seconds (float) waiting for tasks'),
  532. Option('--ack-messages', '-a', action='store_true',
  533. help='Ack messages from source broker.'),
  534. Option('--tasks', '-T',
  535. help='List of task names to filter on.'),
  536. Option('--queues', '-Q',
  537. help='List of queues to migrate.'),
  538. Option('--forever', '-F', action='store_true',
  539. help='Continually migrate tasks until killed.'),
  540. )
  541. def on_migrate_task(self, state, body, message):
  542. self.out('Migrating task %s/%s: %s[%s]' % (
  543. state.count, state.strtotal, body['task'], body['id']))
  544. def run(self, *args, **kwargs):
  545. if len(args) != 2:
  546. return self.show_help('migrate')
  547. from kombu import Connection
  548. from celery.contrib.migrate import migrate_tasks
  549. migrate_tasks(Connection(args[0]),
  550. Connection(args[1]),
  551. callback=self.on_migrate_task,
  552. **kwargs)
  553. migrate = command(migrate)
  554. class shell(Command): # pragma: no cover
  555. """Start shell session with convenient access to celery symbols.
  556. The following symbols will be added to the main globals:
  557. - celery: the current application.
  558. - chord, group, chain, chunks,
  559. xmap, xstarmap subtask, Task
  560. - all registered tasks.
  561. Example Session::
  562. $ celery shell
  563. >>> celery
  564. <Celery default:0x1012d9fd0>
  565. >>> add
  566. <@task: tasks.add>
  567. >>> add.delay(2, 2)
  568. <AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
  569. """
  570. option_list = Command.option_list + (
  571. Option('--ipython', '-I',
  572. action='store_true', dest='force_ipython',
  573. help='force iPython.'),
  574. Option('--bpython', '-B',
  575. action='store_true', dest='force_bpython',
  576. help='force bpython.'),
  577. Option('--python', '-P',
  578. action='store_true', dest='force_python',
  579. help='force default Python shell.'),
  580. Option('--without-tasks', '-T', action='store_true',
  581. help="don't add tasks to locals."),
  582. Option('--eventlet', action='store_true',
  583. help='use eventlet.'),
  584. Option('--gevent', action='store_true', help='use gevent.'),
  585. )
  586. def run(self, force_ipython=False, force_bpython=False,
  587. force_python=False, without_tasks=False, eventlet=False,
  588. gevent=False, **kwargs):
  589. if eventlet:
  590. import_module('celery.concurrency.eventlet')
  591. if gevent:
  592. import_module('celery.concurrency.gevent')
  593. import celery
  594. import celery.task.base
  595. self.app.loader.import_default_modules()
  596. self.locals = {'celery': self.app,
  597. 'Task': celery.Task,
  598. 'chord': celery.chord,
  599. 'group': celery.group,
  600. 'chain': celery.chain,
  601. 'chunks': celery.chunks,
  602. 'xmap': celery.xmap,
  603. 'xstarmap': celery.xstarmap,
  604. 'subtask': celery.subtask}
  605. if not without_tasks:
  606. self.locals.update(dict((task.__name__, task)
  607. for task in self.app.tasks.itervalues()
  608. if not task.name.startswith('celery.')))
  609. if force_python:
  610. return self.invoke_fallback_shell()
  611. elif force_bpython:
  612. return self.invoke_bpython_shell()
  613. elif force_ipython:
  614. return self.invoke_ipython_shell()
  615. return self.invoke_default_shell()
  616. def invoke_default_shell(self):
  617. try:
  618. import IPython # noqa
  619. except ImportError:
  620. try:
  621. import bpython # noqa
  622. except ImportError:
  623. return self.invoke_fallback_shell()
  624. else:
  625. return self.invoke_bpython_shell()
  626. else:
  627. return self.invoke_ipython_shell()
  628. def invoke_fallback_shell(self):
  629. import code
  630. try:
  631. import readline
  632. except ImportError:
  633. pass
  634. else:
  635. import rlcompleter
  636. readline.set_completer(
  637. rlcompleter.Completer(self.locals).complete)
  638. readline.parse_and_bind('tab:complete')
  639. code.interact(local=self.locals)
  640. def invoke_ipython_shell(self):
  641. try:
  642. from IPython.frontend.terminal import embed
  643. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  644. except ImportError: # ipython < 0.11
  645. from IPython.Shell import IPShell
  646. IPShell(argv=[], user_ns=self.locals).mainloop()
  647. def invoke_bpython_shell(self):
  648. import bpython
  649. bpython.embed(self.locals)
  650. shell = command(shell)
  651. class help(Command):
  652. """Show help screen and exit."""
  653. def usage(self, command):
  654. return '%%prog <command> [options] %s' % (self.args, )
  655. def run(self, *args, **kwargs):
  656. self.parser.print_help()
  657. self.out(HELP % {'prog_name': self.prog_name,
  658. 'commands': CeleryCommand.list_commands()})
  659. return EX_USAGE
  660. help = command(help)
  661. class report(Command):
  662. """Shows information useful to include in bugreports."""
  663. def run(self, *args, **kwargs):
  664. self.out(self.app.bugreport())
  665. return EX_OK
  666. report = command(report)
  667. class CeleryCommand(BaseCommand):
  668. commands = commands
  669. enable_config_from_cmdline = True
  670. prog_name = 'celery'
  671. def execute(self, command, argv=None):
  672. try:
  673. cls = self.commands[command]
  674. except KeyError:
  675. cls, argv = self.commands['help'], ['help']
  676. cls = self.commands.get(command) or self.commands['help']
  677. try:
  678. return cls(app=self.app).run_from_argv(self.prog_name, argv)
  679. except Error:
  680. return self.execute('help', argv)
  681. def remove_options_at_beginning(self, argv, index=0):
  682. if argv:
  683. while index < len(argv):
  684. value = argv[index]
  685. if value.startswith('--'):
  686. pass
  687. elif value.startswith('-'):
  688. index += 1
  689. else:
  690. return argv[index:]
  691. index += 1
  692. return []
  693. def handle_argv(self, prog_name, argv):
  694. self.prog_name = prog_name
  695. argv = self.remove_options_at_beginning(argv)
  696. _, argv = self.prepare_args(None, argv)
  697. try:
  698. command = argv[0]
  699. except IndexError:
  700. command, argv = 'help', ['help']
  701. return self.execute(command, argv)
  702. def execute_from_commandline(self, argv=None):
  703. try:
  704. sys.exit(determine_exit_status(
  705. super(CeleryCommand, self).execute_from_commandline(argv)))
  706. except KeyboardInterrupt:
  707. sys.exit(EX_FAILURE)
  708. @classmethod
  709. def get_command_info(self, command, indent=0, color=None):
  710. colored = term.colored().names[color] if color else lambda x: x
  711. obj = self.commands[command]
  712. if obj.leaf:
  713. return '|' + text.indent('celery %s' % colored(command), indent)
  714. return text.join([
  715. ' ',
  716. '|' + text.indent('celery %s --help' % colored(command), indent),
  717. obj.list_commands(indent, 'celery %s' % command, colored),
  718. ])
  719. @classmethod
  720. def list_commands(self, indent=0):
  721. white = term.colored().white
  722. ret = []
  723. for cls, commands, color in command_classes:
  724. ret.extend([
  725. text.indent('+ %s: ' % white(cls), indent),
  726. '\n'.join(self.get_command_info(command, indent + 4, color)
  727. for command in commands),
  728. ''
  729. ])
  730. return '\n'.join(ret).strip()
  731. def with_pool_option(self, argv):
  732. if len(argv) > 1 and argv[1] == 'worker':
  733. # this command supports custom pools
  734. # that may have to be loaded as early as possible.
  735. return (['-P'], ['--pool'])
  736. def on_concurrency_setup(self):
  737. load_extension_commands()
  738. def determine_exit_status(ret):
  739. if isinstance(ret, int):
  740. return ret
  741. return EX_OK if ret else EX_FAILURE
  742. def main():
  743. # Fix for setuptools generated scripts, so that it will
  744. # work with multiprocessing fork emulation.
  745. # (see multiprocessing.forking.get_preparation_data())
  746. try:
  747. if __name__ != '__main__': # pragma: no cover
  748. sys.modules['__main__'] = sys.modules[__name__]
  749. freeze_support()
  750. CeleryCommand().execute_from_commandline()
  751. except KeyboardInterrupt:
  752. pass
  753. if __name__ == '__main__': # pragma: no cover
  754. main()