|
@@ -12,6 +12,7 @@ import anyjson
|
|
|
import os
|
|
|
import sys
|
|
|
|
|
|
+from functools import partial
|
|
|
from importlib import import_module
|
|
|
|
|
|
from celery.five import string_t, values
|
|
@@ -21,7 +22,7 @@ from celery.utils import text
|
|
|
from celery.utils.timeutils import maybe_iso8601
|
|
|
|
|
|
# Cannot use relative imports here due to a Windows issue (#1111).
|
|
|
-from celery.bin.base import Command, Error, Option, Extensions
|
|
|
+from celery.bin.base import Command, Option, Extensions
|
|
|
|
|
|
# Import commands from other modules
|
|
|
from celery.bin.amqp import amqp
|
|
@@ -63,11 +64,6 @@ def determine_exit_status(ret):
|
|
|
return EX_OK if ret else EX_FAILURE
|
|
|
|
|
|
|
|
|
-def ensure_broadcast_supported(app):
|
|
|
- if app.connection().transport.driver_type == 'sql':
|
|
|
- raise Error('SQL broker transports does not support broadcast')
|
|
|
-
|
|
|
-
|
|
|
def main(argv=None):
|
|
|
# Fix for setuptools generated scripts, so that it will
|
|
|
# work with multiprocessing fork emulation.
|
|
@@ -114,7 +110,7 @@ class list_(Command):
|
|
|
try:
|
|
|
bindings = management.get_bindings()
|
|
|
except NotImplementedError:
|
|
|
- raise Error('Your transport cannot list bindings.')
|
|
|
+ raise self.Error('Your transport cannot list bindings.')
|
|
|
|
|
|
fmt = lambda q, e, r: self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
|
|
|
fmt('Queue', 'Exchange', 'Routing Key')
|
|
@@ -126,10 +122,12 @@ class list_(Command):
|
|
|
topics = {'bindings': self.list_bindings}
|
|
|
available = ', '.join(topics)
|
|
|
if not what:
|
|
|
- raise Error('You must specify one of {0}'.format(available))
|
|
|
+ raise self.UsageError(
|
|
|
+ 'You must specify one of {0}'.format(available))
|
|
|
if what not in topics:
|
|
|
- raise Error('unknown topic {0!r} (choose one of: {1})'.format(
|
|
|
- what, available))
|
|
|
+ raise self.UsageError(
|
|
|
+ 'unknown topic {0!r} (choose one of: {1})'.format(
|
|
|
+ what, available))
|
|
|
with self.app.connection() as conn:
|
|
|
self.app.amqp.TaskConsumer(conn).declare()
|
|
|
topics[what](conn.manager)
|
|
@@ -300,19 +298,20 @@ class _RemoteControl(Command):
|
|
|
|
|
|
def run(self, *args, **kwargs):
|
|
|
if not args:
|
|
|
- raise Error('Missing {0.name} method. See --help'.format(self))
|
|
|
+ raise self.UsageError(
|
|
|
+ 'Missing {0.name} method. See --help'.format(self))
|
|
|
return self.do_call_method(args, **kwargs)
|
|
|
|
|
|
def do_call_method(self, args, **kwargs):
|
|
|
method = args[0]
|
|
|
if method == 'help':
|
|
|
- raise Error("Did you mean '{0.name} --help'?".format(self))
|
|
|
+ raise self.Error("Did you mean '{0.name} --help'?".format(self))
|
|
|
if method not in self.choices:
|
|
|
- raise Error('Unknown {0.name} method {1}'.format(self, method))
|
|
|
-
|
|
|
- ensure_broadcast_supported(self.app)
|
|
|
+ raise self.UsageError(
|
|
|
+ 'Unknown {0.name} method {1}'.format(self, method))
|
|
|
|
|
|
- ensure_broadcast_supported(self.app)
|
|
|
+ if self.app.connection().transport.driver_type == 'sql':
|
|
|
+ raise self.Error('Broadcast not supported by SQL broker transport')
|
|
|
|
|
|
destination = kwargs.get('destination')
|
|
|
timeout = kwargs.get('timeout') or self.choices[method][0]
|
|
@@ -328,8 +327,8 @@ class _RemoteControl(Command):
|
|
|
destination=destination,
|
|
|
callback=self.say_remote_command_reply)
|
|
|
if not replies:
|
|
|
- raise Error('No nodes replied within time constraint.',
|
|
|
- status=EX_UNAVAILABLE)
|
|
|
+ raise self.Error('No nodes replied within time constraint.',
|
|
|
+ status=EX_UNAVAILABLE)
|
|
|
return replies
|
|
|
|
|
|
def say(self, direction, title, body=''):
|
|
@@ -453,8 +452,8 @@ class status(Command):
|
|
|
)
|
|
|
replies = I.run('ping', **kwargs)
|
|
|
if not replies:
|
|
|
- raise Error('No nodes replied within time constraint',
|
|
|
- status=EX_UNAVAILABLE)
|
|
|
+ raise self.Error('No nodes replied within time constraint',
|
|
|
+ status=EX_UNAVAILABLE)
|
|
|
nodecount = len(replies)
|
|
|
if not kwargs.get('quiet', False):
|
|
|
self.out('\n{0} {1} online.'.format(
|
|
@@ -492,11 +491,7 @@ class migrate(Command):
|
|
|
def on_migrate_task(self, state, body, message):
|
|
|
self.out(self.progress_fmt.format(state=state, body=body))
|
|
|
|
|
|
- def run(self, *args, **kwargs):
|
|
|
- if len(args) != 2:
|
|
|
- # this never exits due to OptionParser.parse_options
|
|
|
- self.run_from_argv(self.prog_name, ['migrate', '--help'])
|
|
|
- raise SystemExit()
|
|
|
+ def run(self, source, destination, **kwargs):
|
|
|
from kombu import Connection
|
|
|
from celery.contrib.migrate import migrate_tasks
|
|
|
|
|
@@ -680,12 +675,26 @@ class CeleryCommand(Command):
|
|
|
cls, argv = self.commands['help'], ['help']
|
|
|
cls = self.commands.get(command) or self.commands['help']
|
|
|
try:
|
|
|
- return cls(app=self.app).run_from_argv(
|
|
|
- self.prog_name, argv[1:], command=argv[0],
|
|
|
- )
|
|
|
- except (TypeError, Error), exc:
|
|
|
- raise
|
|
|
- return self.execute('help', argv)
|
|
|
+ return cls(
|
|
|
+ app=self.app, on_error=self.on_error,
|
|
|
+ on_usage_error=partial(self.on_usage_error, command=command),
|
|
|
+ ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
|
|
|
+ except self.UsageError as exc:
|
|
|
+ self.on_usage_error(exc)
|
|
|
+ return exc.status
|
|
|
+ except self.Error as exc:
|
|
|
+ self.on_error(exc)
|
|
|
+ return exc.status
|
|
|
+
|
|
|
+ def on_usage_error(self, exc, command=None):
|
|
|
+ if command:
|
|
|
+ helps = '{self.prog_name} {command} --help'
|
|
|
+ else:
|
|
|
+ helps = '{self.prog_name} --help'
|
|
|
+ self.error(self.colored.magenta("Error: {0}".format(exc)))
|
|
|
+ self.error("""Please try '{0}'""".format(helps.format(
|
|
|
+ self=self, command=command,
|
|
|
+ )))
|
|
|
|
|
|
def remove_options_at_beginning(self, argv, index=0):
|
|
|
if argv:
|