celery.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery` umbrella command.
  4. .. program:: celery
  5. """
  6. from __future__ import absolute_import
  7. from __future__ import with_statement
  8. import anyjson
  9. import heapq
  10. import os
  11. import sys
  12. import warnings
  13. from importlib import import_module
  14. from pprint import pformat
  15. from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
  16. from celery.utils import term
  17. from celery.utils import text
  18. from celery.utils.functional import memoize
  19. from celery.utils.imports import symbol_by_name
  20. from celery.utils.timeutils import maybe_iso8601
  21. from celery.bin.base import Command as BaseCommand, Option
  22. HELP = """
  23. ---- -- - - ---- Commands- -------------- --- ------------
  24. %(commands)s
  25. ---- -- - - --------- -- - -------------- --- ------------
  26. Type '%(prog_name)s <command> --help' for help using a specific command.
  27. """
  28. commands = {}
  29. command_classes = [
  30. ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
  31. ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
  32. ('Utils', ['purge', 'list', 'migrate', 'call', 'result', 'report'], None),
  33. ]
  34. @memoize()
  35. def _get_extension_classes():
  36. extensions = []
  37. command_classes.append(('Extensions', extensions, 'magenta'))
  38. return extensions
  39. class Error(Exception):
  40. def __init__(self, reason, status=EX_FAILURE):
  41. self.reason = reason
  42. self.status = status
  43. super(Error, self).__init__(reason, status)
  44. def __str__(self):
  45. return self.reason
  46. def command(fun, name=None, sortpri=0):
  47. commands[name or fun.__name__] = fun
  48. fun.sortpri = sortpri
  49. return fun
  50. def load_extension_commands(namespace='celery.commands'):
  51. try:
  52. from pkg_resources import iter_entry_points
  53. except ImportError:
  54. return
  55. for ep in iter_entry_points(namespace):
  56. sym = ':'.join([ep.module_name, ep.attrs[0]])
  57. try:
  58. cls = symbol_by_name(sym)
  59. except (ImportError, SyntaxError), exc:
  60. warnings.warn('Cannot load extension %r: %r' % (sym, exc))
  61. else:
  62. heapq.heappush(_get_extension_classes(), ep.name)
  63. command(cls, name=ep.name)
  64. class Command(BaseCommand):
  65. help = ''
  66. args = ''
  67. prog_name = 'celery'
  68. show_body = True
  69. show_reply = True
  70. option_list = (
  71. Option('--quiet', '-q', action='store_true'),
  72. Option('--no-color', '-C', action='store_true', default=None),
  73. )
  74. def __init__(self, app=None, no_color=False, stdout=sys.stdout,
  75. stderr=sys.stderr, show_reply=True):
  76. super(Command, self).__init__(app=app)
  77. self.colored = term.colored(enabled=not no_color)
  78. self.stdout = stdout
  79. self.stderr = stderr
  80. self.quiet = False
  81. if show_reply is not None:
  82. self.show_reply = show_reply
  83. def __call__(self, *args, **kwargs):
  84. try:
  85. ret = self.run(*args, **kwargs)
  86. except Error, exc:
  87. self.error(self.colored.red('Error: %s' % exc))
  88. return exc.status
  89. return ret if ret is not None else EX_OK
  90. def show_help(self, command):
  91. self.run_from_argv(self.prog_name, [command, '--help'])
  92. return EX_USAGE
  93. def error(self, s):
  94. self.out(s, fh=self.stderr)
  95. def out(self, s, fh=None):
  96. s = str(s)
  97. if not s.endswith('\n'):
  98. s += '\n'
  99. (fh or self.stdout).write(s)
  100. def run_from_argv(self, prog_name, argv):
  101. self.prog_name = prog_name
  102. self.command = argv[0]
  103. self.arglist = argv[1:]
  104. self.parser = self.create_parser(self.prog_name, self.command)
  105. options, args = self.prepare_args(
  106. *self.parser.parse_args(self.arglist))
  107. self.colored = term.colored(enabled=not options['no_color'])
  108. self.quiet = options.get('quiet', False)
  109. self.show_body = options.get('show_body', True)
  110. return self(*args, **options)
  111. def usage(self, command):
  112. return '%%prog %s [options] %s' % (command, self.args)
  113. def prettify_list(self, n):
  114. c = self.colored
  115. if not n:
  116. return '- empty -'
  117. return '\n'.join(str(c.reset(c.white('*'), ' %s' % (item, )))
  118. for item in n)
  119. def prettify_dict_ok_error(self, n):
  120. c = self.colored
  121. try:
  122. return (c.green('OK'),
  123. text.indent(self.prettify(n['ok'])[1], 4))
  124. except KeyError:
  125. pass
  126. return (c.red('ERROR'),
  127. text.indent(self.prettify(n['error'])[1], 4))
  128. def say_remote_command_reply(self, replies):
  129. c = self.colored
  130. node = iter(replies).next() # <-- take first.
  131. reply = replies[node]
  132. status, preply = self.prettify(reply)
  133. self.say_chat('->', c.cyan(node, ': ') + status,
  134. text.indent(preply, 4) if self.show_reply else '')
  135. def prettify(self, n):
  136. OK = str(self.colored.green('OK'))
  137. if isinstance(n, list):
  138. return OK, self.prettify_list(n)
  139. if isinstance(n, dict):
  140. if 'ok' in n or 'error' in n:
  141. return self.prettify_dict_ok_error(n)
  142. if isinstance(n, basestring):
  143. return OK, unicode(n)
  144. return OK, pformat(n)
  145. def say_chat(self, direction, title, body=''):
  146. c = self.colored
  147. if direction == '<-' and self.quiet:
  148. return
  149. dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
  150. self.out(c.reset(dirstr, title))
  151. if body and self.show_body:
  152. self.out(body)
  153. @property
  154. def description(self):
  155. return self.__doc__
  156. class Delegate(Command):
  157. def __init__(self, *args, **kwargs):
  158. super(Delegate, self).__init__(*args, **kwargs)
  159. self.target = symbol_by_name(self.Command)(app=self.app)
  160. self.args = self.target.args
  161. def get_options(self):
  162. return self.option_list + self.target.get_options()
  163. def create_parser(self, prog_name, command):
  164. parser = super(Delegate, self).create_parser(prog_name, command)
  165. return self.target.prepare_parser(parser)
  166. def run(self, *args, **kwargs):
  167. self.target.check_args(args)
  168. return self.target.run(*args, **kwargs)
  169. class multi(Command):
  170. """Start multiple worker instances."""
  171. respects_app_option = False
  172. def get_options(self):
  173. return ()
  174. def run_from_argv(self, prog_name, argv):
  175. from celery.bin.celeryd_multi import MultiTool
  176. return MultiTool().execute_from_commandline(argv, prog_name)
  177. multi = command(multi)
  178. class worker(Delegate):
  179. """Start worker instance.
  180. Examples::
  181. celery worker --app=proj -l info
  182. celery worker -A proj -l info -Q hipri,lopri
  183. celery worker -A proj --concurrency=4
  184. celery worker -A proj --concurrency=1000 -P eventlet
  185. celery worker --autoscale=10,0
  186. """
  187. Command = 'celery.bin.celeryd:WorkerCommand'
  188. worker = command(worker, sortpri=01)
  189. class events(Delegate):
  190. """Event-stream utilities.
  191. Commands::
  192. celery events --app=proj
  193. start graphical monitor (requires curses)
  194. celery events -d --app=proj
  195. dump events to screen.
  196. celery events -b amqp://
  197. celery events -C <camera> [options]
  198. run snapshot camera.
  199. Examples::
  200. celery events
  201. celery events -d
  202. celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
  203. """
  204. Command = 'celery.bin.celeryev:EvCommand'
  205. events = command(events, sortpri=10)
  206. class beat(Delegate):
  207. """Start the celerybeat periodic task scheduler.
  208. Examples::
  209. celery beat -l info
  210. celery beat -s /var/run/celerybeat/schedule --detach
  211. celery beat -S djcelery.schedulers.DatabaseScheduler
  212. """
  213. Command = 'celery.bin.celerybeat:BeatCommand'
  214. beat = command(beat, sortpri=20)
  215. class amqp(Delegate):
  216. """AMQP Administration Shell.
  217. Also works for non-amqp transports.
  218. Examples::
  219. celery amqp
  220. start shell mode
  221. celery amqp help
  222. show list of commands
  223. celery amqp exchange.delete name
  224. celery amqp queue.delete queue
  225. celery amqp queue.delete queue yes yes
  226. """
  227. Command = 'celery.bin.camqadm:AMQPAdminCommand'
  228. amqp = command(amqp, sortpri=30)
  229. class list_(Command):
  230. """Get info from broker.
  231. Examples::
  232. celery list bindings
  233. NOTE: For RabbitMQ the management plugin is required.
  234. """
  235. args = '[bindings]'
  236. def list_bindings(self, management):
  237. try:
  238. bindings = management.get_bindings()
  239. except NotImplementedError:
  240. raise Error('Your transport cannot list bindings.')
  241. fmt = lambda q, e, r: self.out('%s %s %s' % (q.ljust(28),
  242. e.ljust(28), r))
  243. fmt('Queue', 'Exchange', 'Routing Key')
  244. fmt('-' * 16, '-' * 16, '-' * 16)
  245. for b in bindings:
  246. fmt(b['destination'], b['source'], b['routing_key'])
  247. def run(self, what=None, *_, **kw):
  248. topics = {'bindings': self.list_bindings}
  249. available = ', '.join(topics)
  250. if not what:
  251. raise Error('You must specify what to list (%s)' % available)
  252. if what not in topics:
  253. raise Error('unknown topic %r (choose one of: %s)' % (
  254. what, available))
  255. with self.app.connection() as conn:
  256. self.app.amqp.TaskConsumer(conn).declare()
  257. topics[what](conn.manager)
  258. list_ = command(list_, 'list')
  259. class call(Command):
  260. """Call a task by name.
  261. Examples::
  262. celery call tasks.add --args='[2, 2]'
  263. celery call tasks.add --args='[2, 2]' --countdown=10
  264. """
  265. args = '<task_name>'
  266. option_list = Command.option_list + (
  267. Option('--args', '-a', help='positional arguments (json).'),
  268. Option('--kwargs', '-k', help='keyword arguments (json).'),
  269. Option('--eta', help='scheduled time (ISO-8601).'),
  270. Option('--countdown', type='float',
  271. help='eta in seconds from now (float/int).'),
  272. Option('--expires', help='expiry time (ISO-8601/float/int).'),
  273. Option('--serializer', default='json', help='defaults to json.'),
  274. Option('--queue', help='custom queue name.'),
  275. Option('--exchange', help='custom exchange name.'),
  276. Option('--routing-key', help='custom routing key.'),
  277. )
  278. def run(self, name, *_, **kw):
  279. # Positional args.
  280. args = kw.get('args') or ()
  281. if isinstance(args, basestring):
  282. args = anyjson.loads(args)
  283. # Keyword args.
  284. kwargs = kw.get('kwargs') or {}
  285. if isinstance(kwargs, basestring):
  286. kwargs = anyjson.loads(kwargs)
  287. # Expires can be int/float.
  288. expires = kw.get('expires') or None
  289. try:
  290. expires = float(expires)
  291. except (TypeError, ValueError):
  292. # or a string describing an ISO 8601 datetime.
  293. try:
  294. expires = maybe_iso8601(expires)
  295. except (TypeError, ValueError):
  296. raise
  297. res = self.app.send_task(name, args=args, kwargs=kwargs,
  298. countdown=kw.get('countdown'),
  299. serializer=kw.get('serializer'),
  300. queue=kw.get('queue'),
  301. exchange=kw.get('exchange'),
  302. routing_key=kw.get('routing_key'),
  303. eta=maybe_iso8601(kw.get('eta')),
  304. expires=expires)
  305. self.out(res.id)
  306. call = command(call)
  307. class purge(Command):
  308. """Erase all messages from all known task queues.
  309. WARNING: There is no undo operation for this command.
  310. """
  311. def run(self, *args, **kwargs):
  312. queues = len(self.app.amqp.queues)
  313. messages_removed = self.app.control.purge()
  314. if messages_removed:
  315. self.out('Purged %s %s from %s known task %s.' % (
  316. messages_removed, text.pluralize(messages_removed, 'message'),
  317. queues, text.pluralize(queues, 'queue')))
  318. else:
  319. self.out('No messages purged from %s known %s' % (
  320. queues, text.pluralize(queues, 'queue')))
  321. purge = command(purge)
  322. class result(Command):
  323. """Gives the return value for a given task id.
  324. Examples::
  325. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
  326. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
  327. celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
  328. """
  329. args = '<task_id>'
  330. option_list = Command.option_list + (
  331. Option('--task', '-t', help='name of task (if custom backend)'),
  332. Option('--traceback', action='store_true',
  333. help='show traceback instead'),
  334. )
  335. def run(self, task_id, *args, **kwargs):
  336. result_cls = self.app.AsyncResult
  337. task = kwargs.get('task')
  338. traceback = kwargs.get('traceback', False)
  339. if task:
  340. result_cls = self.app.tasks[task].AsyncResult
  341. result = result_cls(task_id)
  342. if traceback:
  343. value = result.traceback
  344. else:
  345. value = result.get()
  346. self.out(self.prettify(value)[1])
  347. result = command(result)
  348. class _RemoteControl(Command):
  349. name = None
  350. choices = None
  351. leaf = False
  352. option_list = Command.option_list + (
  353. Option('--timeout', '-t', type='float',
  354. help='Timeout in seconds (float) waiting for reply'),
  355. Option('--destination', '-d',
  356. help='Comma separated list of destination node names.'))
  357. @classmethod
  358. def get_command_info(self, command,
  359. indent=0, prefix='', color=None, help=False):
  360. if help:
  361. help = '|' + text.indent(self.choices[command][1], indent + 4)
  362. else:
  363. help = None
  364. try:
  365. # see if it uses args.
  366. meth = getattr(self, command)
  367. return text.join([
  368. '|' + text.indent('%s%s %s' % (prefix, color(command),
  369. meth.__doc__), indent), help,
  370. ])
  371. except AttributeError:
  372. return text.join([
  373. '|' + text.indent(prefix + str(color(command)), indent), help,
  374. ])
  375. @classmethod
  376. def list_commands(self, indent=0, prefix='', color=None, help=False):
  377. color = color if color else lambda x: x
  378. prefix = prefix + ' ' if prefix else ''
  379. return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
  380. for c in sorted(self.choices))
  381. @property
  382. def epilog(self):
  383. return '\n'.join([
  384. '[Commands]',
  385. self.list_commands(indent=4, help=True)
  386. ])
  387. def usage(self, command):
  388. return '%%prog %s [options] %s <command> [arg1 .. argN]' % (
  389. command, self.args)
  390. def call(self, *args, **kwargs):
  391. raise NotImplementedError('get_obj')
  392. def run(self, *args, **kwargs):
  393. if not args:
  394. raise Error('Missing %s method. See --help' % self.name)
  395. return self.do_call_method(args, **kwargs)
  396. def do_call_method(self, args, **kwargs):
  397. method = args[0]
  398. if method == 'help':
  399. raise Error("Did you mean '%s --help'?" % self.name)
  400. if method not in self.choices:
  401. raise Error('Unknown %s method %s' % (self.name, method))
  402. destination = kwargs.get('destination')
  403. timeout = kwargs.get('timeout') or self.choices[method][0]
  404. if destination and isinstance(destination, basestring):
  405. destination = [v.strip() for v in destination.split(',')]
  406. try:
  407. handler = getattr(self, method)
  408. except AttributeError:
  409. handler = self.call
  410. # XXX Python 2.5 does not support X(*args, foo=1)
  411. kwargs = {"timeout": timeout, "destination": destination,
  412. "callback": self.say_remote_command_reply}
  413. replies = handler(method, *args[1:], **kwargs)
  414. if not replies:
  415. raise Error('No nodes replied within time constraint.',
  416. status=EX_UNAVAILABLE)
  417. return replies
  418. def say(self, direction, title, body=''):
  419. c = self.colored
  420. if direction == '<-' and self.quiet:
  421. return
  422. dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
  423. self.out(c.reset(dirstr, title))
  424. if body and self.show_body:
  425. self.out(body)
  426. class inspect(_RemoteControl):
  427. """Inspect the worker at runtime.
  428. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  429. Examples::
  430. celery inspect active --timeout=5
  431. celery inspect scheduled -d worker1.example.com
  432. celery inspect revoked -d w1.e.com,w2.e.com
  433. """
  434. name = 'inspect'
  435. choices = {
  436. 'active': (1.0, 'dump active tasks (being processed)'),
  437. 'active_queues': (1.0, 'dump queues being consumed from'),
  438. 'scheduled': (1.0, 'dump scheduled tasks (eta/countdown/retry)'),
  439. 'reserved': (1.0, 'dump reserved tasks (waiting to be processed)'),
  440. 'stats': (1.0, 'dump worker statistics'),
  441. 'revoked': (1.0, 'dump of revoked task ids'),
  442. 'registered': (1.0, 'dump of registered tasks'),
  443. 'ping': (0.2, 'ping worker(s)'),
  444. 'report': (1.0, 'get bugreport info')
  445. }
  446. def call(self, method, *args, **options):
  447. i = self.app.control.inspect(**options)
  448. return getattr(i, method)(*args)
  449. inspect = command(inspect)
  450. class control(_RemoteControl):
  451. """Workers remote control.
  452. Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
  453. Examples::
  454. celery control enable_events --timeout=5
  455. celery control -d worker1.example.com enable_events
  456. celery control -d w1.e.com,w2.e.com enable_events
  457. celery control -d w1.e.com add_consumer queue_name
  458. celery control -d w1.e.com cancel_consumer queue_name
  459. celery control -d w1.e.com add_consumer queue exchange direct rkey
  460. """
  461. name = 'control'
  462. choices = {
  463. 'enable_events': (1.0, 'tell worker(s) to enable events'),
  464. 'disable_events': (1.0, 'tell worker(s) to disable events'),
  465. 'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
  466. 'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
  467. 'rate_limit': (
  468. 1.0, 'tell worker(s) to modify the rate limit for a task type'),
  469. 'time_limit': (
  470. 1.0, 'tell worker(s) to modify the time limit for a task type.'),
  471. 'autoscale': (1.0, 'change autoscale settings'),
  472. 'pool_grow': (1.0, 'start more pool processes'),
  473. 'pool_shrink': (1.0, 'use less pool processes'),
  474. }
  475. def call(self, method, *args, **options):
  476. # XXX Python 2.5 doesn't support X(*args, reply=True, **kwargs)
  477. return getattr(self.app.control, method)(
  478. *args, **dict(options, retry=True))
  479. def pool_grow(self, method, n=1, **kwargs):
  480. """[N=1]"""
  481. return self.call(method, n, **kwargs)
  482. def pool_shrink(self, method, n=1, **kwargs):
  483. """[N=1]"""
  484. return self.call(method, n, **kwargs)
  485. def autoscale(self, method, max=None, min=None, **kwargs):
  486. """[max] [min]"""
  487. return self.call(method, max, min, **kwargs)
  488. def rate_limit(self, method, task_name, rate_limit, **kwargs):
  489. """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
  490. return self.call(method, task_name, rate_limit, reply=True, **kwargs)
  491. def time_limit(self, method, task_name, soft, hard=None, **kwargs):
  492. """<task_name> <soft_secs> [hard_secs]"""
  493. return self.call(method, task_name, soft, hard, reply=True, **kwargs)
  494. def add_consumer(self, method, queue, exchange=None,
  495. exchange_type='direct', routing_key=None, **kwargs):
  496. """<queue> [exchange [type [routing_key]]]"""
  497. return self.call(method, queue, exchange,
  498. exchange_type, routing_key, reply=True, **kwargs)
  499. def cancel_consumer(self, method, queue, **kwargs):
  500. """<queue>"""
  501. return self.call(method, queue, reply=True, **kwargs)
  502. control = command(control)
  503. class status(Command):
  504. """Show list of workers that are online."""
  505. option_list = inspect.option_list
  506. def run(self, *args, **kwargs):
  507. replies = inspect(
  508. app=self.app,
  509. no_color=kwargs.get('no_color', False),
  510. stdout=self.stdout, stderr=self.stderr,
  511. show_reply=False).run(
  512. 'ping', **dict(kwargs, quiet=True, show_body=False))
  513. if not replies:
  514. raise Error('No nodes replied within time constraint',
  515. status=EX_UNAVAILABLE)
  516. nodecount = len(replies)
  517. if not kwargs.get('quiet', False):
  518. self.out('\n%s %s online.' % (nodecount,
  519. text.pluralize(nodecount, 'node')))
  520. status = command(status)
  521. class migrate(Command):
  522. """Migrate tasks from one broker to another.
  523. Examples::
  524. celery migrate redis://localhost amqp://guest@localhost//
  525. celery migrate django:// redis://localhost
  526. NOTE: This command is experimental, make sure you have
  527. a backup of the tasks before you continue.
  528. """
  529. args = '<source_url> <dest_url>'
  530. option_list = Command.option_list + (
  531. Option('--limit', '-n', type='int',
  532. help='Number of tasks to consume (int)'),
  533. Option('--timeout', '-t', type='float', default=1.0,
  534. help='Timeout in seconds (float) waiting for tasks'),
  535. Option('--ack-messages', '-a', action='store_true',
  536. help='Ack messages from source broker.'),
  537. Option('--tasks', '-T',
  538. help='List of task names to filter on.'),
  539. Option('--queues', '-Q',
  540. help='List of queues to migrate.'),
  541. Option('--forever', '-F', action='store_true',
  542. help='Continually migrate tasks until killed.'),
  543. )
  544. def on_migrate_task(self, state, body, message):
  545. self.out('Migrating task %s/%s: %s[%s]' % (
  546. state.count, state.strtotal, body['task'], body['id']))
  547. def run(self, *args, **kwargs):
  548. if len(args) != 2:
  549. return self.show_help('migrate')
  550. from kombu import Connection
  551. from celery.contrib.migrate import migrate_tasks
  552. migrate_tasks(Connection(args[0]),
  553. Connection(args[1]),
  554. callback=self.on_migrate_task,
  555. **kwargs)
  556. migrate = command(migrate)
  557. class shell(Command): # pragma: no cover
  558. """Start shell session with convenient access to celery symbols.
  559. The following symbols will be added to the main globals:
  560. - celery: the current application.
  561. - chord, group, chain, chunks,
  562. xmap, xstarmap subtask, Task
  563. - all registered tasks.
  564. Example Session:
  565. .. code-block:: bash
  566. $ celery shell
  567. >>> celery
  568. <Celery default:0x1012d9fd0>
  569. >>> add
  570. <@task: tasks.add>
  571. >>> add.delay(2, 2)
  572. <AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
  573. """
  574. option_list = Command.option_list + (
  575. Option('--ipython', '-I',
  576. action='store_true', dest='force_ipython',
  577. help='force iPython.'),
  578. Option('--bpython', '-B',
  579. action='store_true', dest='force_bpython',
  580. help='force bpython.'),
  581. Option('--python', '-P',
  582. action='store_true', dest='force_python',
  583. help='force default Python shell.'),
  584. Option('--without-tasks', '-T', action='store_true',
  585. help="don't add tasks to locals."),
  586. Option('--eventlet', action='store_true',
  587. help='use eventlet.'),
  588. Option('--gevent', action='store_true', help='use gevent.'),
  589. )
  590. def run(self, force_ipython=False, force_bpython=False,
  591. force_python=False, without_tasks=False, eventlet=False,
  592. gevent=False, **kwargs):
  593. sys.path.insert(0, os.getcwd())
  594. if eventlet:
  595. import_module('celery.concurrency.eventlet')
  596. if gevent:
  597. import_module('celery.concurrency.gevent')
  598. import celery
  599. import celery.task.base
  600. self.app.loader.import_default_modules()
  601. self.locals = {'celery': self.app,
  602. 'Task': celery.Task,
  603. 'chord': celery.chord,
  604. 'group': celery.group,
  605. 'chain': celery.chain,
  606. 'chunks': celery.chunks,
  607. 'xmap': celery.xmap,
  608. 'xstarmap': celery.xstarmap,
  609. 'subtask': celery.subtask}
  610. if not without_tasks:
  611. self.locals.update(dict(
  612. (task.__name__, task) for task in self.app.tasks.itervalues()
  613. if not task.name.startswith('celery.')),
  614. )
  615. if force_python:
  616. return self.invoke_fallback_shell()
  617. elif force_bpython:
  618. return self.invoke_bpython_shell()
  619. elif force_ipython:
  620. return self.invoke_ipython_shell()
  621. return self.invoke_default_shell()
  622. def invoke_default_shell(self):
  623. try:
  624. import IPython # noqa
  625. except ImportError:
  626. try:
  627. import bpython # noqa
  628. except ImportError:
  629. return self.invoke_fallback_shell()
  630. else:
  631. return self.invoke_bpython_shell()
  632. else:
  633. return self.invoke_ipython_shell()
  634. def invoke_fallback_shell(self):
  635. import code
  636. try:
  637. import readline
  638. except ImportError:
  639. pass
  640. else:
  641. import rlcompleter
  642. readline.set_completer(
  643. rlcompleter.Completer(self.locals).complete)
  644. readline.parse_and_bind('tab:complete')
  645. code.interact(local=self.locals)
  646. def invoke_ipython_shell(self):
  647. try:
  648. from IPython.frontend.terminal import embed
  649. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  650. except ImportError: # ipython < 0.11
  651. from IPython.Shell import IPShell
  652. IPShell(argv=[], user_ns=self.locals).mainloop()
  653. def invoke_bpython_shell(self):
  654. import bpython
  655. bpython.embed(self.locals)
  656. shell = command(shell)
  657. class help(Command):
  658. """Show help screen and exit."""
  659. def usage(self, command):
  660. return '%%prog <command> [options] %s' % (self.args, )
  661. def run(self, *args, **kwargs):
  662. self.parser.print_help()
  663. self.out(HELP % {'prog_name': self.prog_name,
  664. 'commands': CeleryCommand.list_commands()})
  665. return EX_USAGE
  666. help = command(help)
  667. class report(Command):
  668. """Shows information useful to include in bugreports."""
  669. def run(self, *args, **kwargs):
  670. self.out(self.app.bugreport())
  671. return EX_OK
  672. report = command(report)
  673. class CeleryCommand(BaseCommand):
  674. commands = commands
  675. enable_config_from_cmdline = True
  676. prog_name = 'celery'
  677. def execute(self, command, argv=None):
  678. try:
  679. cls = self.commands[command]
  680. except KeyError:
  681. cls, argv = self.commands['help'], ['help']
  682. cls = self.commands.get(command) or self.commands['help']
  683. try:
  684. return cls(app=self.app).run_from_argv(self.prog_name, argv)
  685. except Error:
  686. return self.execute('help', argv)
  687. def remove_options_at_beginning(self, argv, index=0):
  688. if argv:
  689. while index < len(argv):
  690. value = argv[index]
  691. if value.startswith('--'):
  692. pass
  693. elif value.startswith('-'):
  694. index += 1
  695. else:
  696. return argv[index:]
  697. index += 1
  698. return []
  699. def handle_argv(self, prog_name, argv):
  700. self.prog_name = prog_name
  701. argv = self.remove_options_at_beginning(argv)
  702. _, argv = self.prepare_args(None, argv)
  703. try:
  704. command = argv[0]
  705. except IndexError:
  706. command, argv = 'help', ['help']
  707. return self.execute(command, argv)
  708. def execute_from_commandline(self, argv=None):
  709. argv = sys.argv if argv is None else argv
  710. if 'multi' in argv[1:3]: # Issue 1008
  711. self.respects_app_option = False
  712. try:
  713. sys.exit(determine_exit_status(
  714. super(CeleryCommand, self).execute_from_commandline(argv)))
  715. except KeyboardInterrupt:
  716. sys.exit(EX_FAILURE)
  717. @classmethod
  718. def get_command_info(self, command, indent=0, color=None):
  719. colored = term.colored().names[color] if color else lambda x: x
  720. obj = self.commands[command]
  721. if obj.leaf:
  722. return '|' + text.indent('celery %s' % colored(command), indent)
  723. return text.join([
  724. ' ',
  725. '|' + text.indent('celery %s --help' % colored(command), indent),
  726. obj.list_commands(indent, 'celery %s' % command, colored),
  727. ])
  728. @classmethod
  729. def list_commands(self, indent=0):
  730. white = term.colored().white
  731. ret = []
  732. for cls, commands, color in command_classes:
  733. ret.extend([
  734. text.indent('+ %s: ' % white(cls), indent),
  735. '\n'.join(self.get_command_info(command, indent + 4, color)
  736. for command in commands),
  737. ''
  738. ])
  739. return '\n'.join(ret).strip()
  740. def with_pool_option(self, argv):
  741. if len(argv) > 1 and argv[1] == 'worker':
  742. # this command supports custom pools
  743. # that may have to be loaded as early as possible.
  744. return (['-P'], ['--pool'])
  745. def on_concurrency_setup(self):
  746. load_extension_commands()
  747. def determine_exit_status(ret):
  748. if isinstance(ret, int):
  749. return ret
  750. return EX_OK if ret else EX_FAILURE
  751. def main(argv=None):
  752. # Fix for setuptools generated scripts, so that it will
  753. # work with multiprocessing fork emulation.
  754. # (see multiprocessing.forking.get_preparation_data())
  755. try:
  756. if __name__ != '__main__': # pragma: no cover
  757. sys.modules['__main__'] = sys.modules[__name__]
  758. cmd = CeleryCommand()
  759. cmd.maybe_patch_concurrency()
  760. from billiard import freeze_support
  761. freeze_support()
  762. cmd.execute_from_commandline(argv)
  763. except KeyboardInterrupt:
  764. pass
  765. if __name__ == '__main__': # pragma: no cover
  766. main()