123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215 |
- # -*- coding: utf-8 -*-
- """The :program:`celery` umbrella command.
- .. program:: celery
- .. _preload-options:
- Preload Options
- ---------------
- These options are supported by all commands,
- and usually parsed before command-specific arguments.
- .. cmdoption:: -A, --app
- app instance to use (e.g., ``module.attr_name``)
- .. cmdoption:: -b, --broker
- URL to broker. default is ``amqp://guest@localhost//``
- .. cmdoption:: --loader
- name of custom loader class to use.
- .. cmdoption:: --config
- Name of the configuration module
- .. cmdoption:: -C, --no-color
- Disable colors in output.
- .. cmdoption:: -q, --quiet
- Give less verbose output (behavior depends on the sub command).
- .. cmdoption:: --help
- Show help and exit.
- .. _daemon-options:
- Daemon Options
- --------------
- These options are supported by commands that can detach
- into the background (daemon). They will be present
- in any command that also has a `--detach` option.
- .. cmdoption:: -f, --logfile
- Path to log file. If no logfile is specified, `stderr` is used.
- .. cmdoption:: --pidfile
- Optional file used to store the process pid.
- The program won't start if this file already exists
- and the pid is still alive.
- .. cmdoption:: --uid
- User id, or user name of the user to run as after detaching.
- .. cmdoption:: --gid
- Group id, or group name of the main group to change to after
- detaching.
- .. cmdoption:: --umask
- Effective umask (in octal) of the process after detaching. Inherits
- the umask of the parent process by default.
- .. cmdoption:: --workdir
- Optional directory to change to after detaching.
- .. cmdoption:: --executable
- Executable to use for the detached process.
- ``celery inspect``
- ------------------
- .. program:: celery inspect
- .. cmdoption:: -t, --timeout
- Timeout in seconds (float) waiting for reply
- .. cmdoption:: -d, --destination
- Comma separated list of destination node names.
- .. cmdoption:: -j, --json
- Use json as output format.
- ``celery control``
- ------------------
- .. program:: celery control
- .. cmdoption:: -t, --timeout
- Timeout in seconds (float) waiting for reply
- .. cmdoption:: -d, --destination
- Comma separated list of destination node names.
- .. cmdoption:: -j, --json
- Use json as output format.
- ``celery migrate``
- ------------------
- .. program:: celery migrate
- .. cmdoption:: -n, --limit
- Number of tasks to consume (int).
- .. cmdoption:: -t, -timeout
- Timeout in seconds (float) waiting for tasks.
- .. cmdoption:: -a, --ack-messages
- Ack messages from source broker.
- .. cmdoption:: -T, --tasks
- List of task names to filter on.
- .. cmdoption:: -Q, --queues
- List of queues to migrate.
- .. cmdoption:: -F, --forever
- Continually migrate tasks until killed.
- ``celery upgrade``
- ------------------
- .. program:: celery upgrade
- .. cmdoption:: --django
- Upgrade a Django project.
- .. cmdoption:: --compat
- Maintain backwards compatibility.
- .. cmdoption:: --no-backup
- Don't backup original files.
- ``celery shell``
- ----------------
- .. program:: celery shell
- .. cmdoption:: -I, --ipython
- Force :pypi:`iPython` implementation.
- .. cmdoption:: -B, --bpython
- Force :pypi:`bpython` implementation.
- .. cmdoption:: -P, --python
- Force default Python shell.
- .. cmdoption:: -T, --without-tasks
- Don't add tasks to locals.
- .. cmdoption:: --eventlet
- Use :pypi:`eventlet` monkey patches.
- .. cmdoption:: --gevent
- Use :pypi:`gevent` monkey patches.
- ``celery result``
- -----------------
- .. program:: celery result
- .. cmdoption:: -t, --task
- Name of task (if custom backend).
- .. cmdoption:: --traceback
- Show traceback if any.
- ``celery purge``
- ----------------
- .. program:: celery purge
- .. cmdoption:: -f, --force
- Don't prompt for verification before deleting messages (DANGEROUS)
- ``celery call``
- ---------------
- .. program:: celery call
- .. cmdoption:: -a, --args
- Positional arguments (json format).
- .. cmdoption:: -k, --kwargs
- Keyword arguments (json format).
- .. cmdoption:: --eta
- Scheduled time in ISO-8601 format.
- .. cmdoption:: --countdown
- ETA in seconds from now (float/int).
- .. cmdoption:: --expires
- Expiry time in float/int seconds, or a ISO-8601 date.
- .. cmdoption:: --serializer
- Specify serializer to use (default is json).
- .. cmdoption:: --queue
- Destination queue.
- .. cmdoption:: --exchange
- Destination exchange (defaults to the queue exchange).
- .. cmdoption:: --routing-key
- Destination routing key (defaults to the queue routing key).
- """
- from __future__ import absolute_import, unicode_literals, print_function
- import codecs
- import numbers
- import os
- import sys
- from functools import partial
- from importlib import import_module
- from kombu.utils.json import dumps, loads
- from kombu.utils.objects import cached_property
- from celery.app import defaults
- from celery.five import items, keys, string_t, values
- from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
- from celery.utils import term
- from celery.utils import text
- from celery.utils.functional import pass1
- from celery.utils.text import str_to_list
- from celery.utils.time import maybe_iso8601
- # Cannot use relative imports here due to a Windows issue (#1111).
- from celery.bin.base import Command, Option, Extensions
- # Import commands from other modules
- from celery.bin.amqp import amqp
- from celery.bin.beat import beat
- from celery.bin.events import events
- from celery.bin.graph import graph
- from celery.bin.logtool import logtool
- from celery.bin.worker import worker
- __all__ = ['CeleryCommand', 'main']
- HELP = """
- ---- -- - - ---- Commands- -------------- --- ------------
- {commands}
- ---- -- - - --------- -- - -------------- --- ------------
- Type '{prog_name} <command> --help' for help using a specific command.
- """
- MIGRATE_PROGRESS_FMT = """\
- Migrating task {state.count}/{state.strtotal}: \
- {body[task]}[{body[id]}]\
- """
- command_classes = [
- ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
- ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
- ('Utils',
- ['purge', 'list', 'call', 'result', 'migrate', 'graph', 'upgrade'],
- None),
- ('Debugging', ['report', 'logtool'], 'red'),
- ]
- def determine_exit_status(ret):
- if isinstance(ret, numbers.Integral):
- return ret
- return EX_OK if ret else EX_FAILURE
- def main(argv=None):
- """Start celery umbrella command."""
- # Fix for setuptools generated scripts, so that it will
- # work with multiprocessing fork emulation.
- # (see multiprocessing.forking.get_preparation_data())
- try:
- if __name__ != '__main__': # pragma: no cover
- sys.modules['__main__'] = sys.modules[__name__]
- cmd = CeleryCommand()
- cmd.maybe_patch_concurrency()
- from billiard import freeze_support
- freeze_support()
- cmd.execute_from_commandline(argv)
- except KeyboardInterrupt:
- pass
- class multi(Command):
- """Start multiple worker instances."""
- respects_app_option = False
- def get_options(self):
- pass
- def run_from_argv(self, prog_name, argv, command=None):
- from celery.bin.multi import MultiTool
- multi = MultiTool(quiet=self.quiet, no_color=self.no_color)
- return multi.execute_from_commandline([command] + argv)
- class list_(Command):
- """Get info from broker.
- Note:
- For RabbitMQ the management plugin is required.
- Example:
- .. code-block:: console
- $ celery list bindings
- """
- args = '[bindings]'
- def list_bindings(self, management):
- try:
- bindings = management.get_bindings()
- except NotImplementedError:
- raise self.Error('Your transport cannot list bindings.')
- def fmt(q, e, r):
- return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
- fmt('Queue', 'Exchange', 'Routing Key')
- fmt('-' * 16, '-' * 16, '-' * 16)
- for b in bindings:
- fmt(b['destination'], b['source'], b['routing_key'])
- def run(self, what=None, *_, **kw):
- topics = {'bindings': self.list_bindings}
- available = ', '.join(topics)
- if not what:
- raise self.UsageError(
- 'You must specify one of {0}'.format(available))
- if what not in topics:
- raise self.UsageError(
- 'unknown topic {0!r} (choose one of: {1})'.format(
- what, available))
- with self.app.connection() as conn:
- self.app.amqp.TaskConsumer(conn).declare()
- topics[what](conn.manager)
- class call(Command):
- """Call a task by name.
- Examples:
- .. code-block:: console
- $ celery call tasks.add --args='[2, 2]'
- $ celery call tasks.add --args='[2, 2]' --countdown=10
- """
- args = '<task_name>'
- option_list = Command.option_list + (
- Option('--args', '-a', help='positional arguments (json).'),
- Option('--kwargs', '-k', help='keyword arguments (json).'),
- Option('--eta', help='scheduled time (ISO-8601).'),
- Option('--countdown', type='float',
- help='eta in seconds from now (float/int).'),
- Option('--expires', help='expiry time (ISO-8601/float/int).'),
- Option('--serializer', default='json', help='defaults to json.'),
- Option('--queue', help='custom queue name.'),
- Option('--exchange', help='custom exchange name.'),
- Option('--routing-key', help='custom routing key.'),
- )
- def run(self, name, *_, **kwargs):
- self._send_task(name, **kwargs)
- def _send_task(self, name, args=None, kwargs=None,
- countdown=None, serializer=None,
- queue=None, exchange=None, routing_key=None,
- eta=None, expires=None):
- # arguments
- args = loads(args) if isinstance(args, string_t) else args
- kwargs = loads(kwargs) if isinstance(kwargs, string_t) else kwargs
- # Expires can be int/float.
- try:
- expires = float(expires)
- except (TypeError, ValueError):
- # or a string describing an ISO 8601 datetime.
- try:
- expires = maybe_iso8601(expires)
- except (TypeError, ValueError):
- raise
- # send the task and print the id.
- self.out(self.app.send_task(
- name,
- args=args or (), kwargs=kwargs or {},
- countdown=countdown,
- serializer=serializer,
- queue=queue,
- exchange=exchange,
- routing_key=routing_key,
- eta=maybe_iso8601(eta),
- expires=expires,
- ).id)
- class purge(Command):
- """Erase all messages from all known task queues.
- Warning:
- There's no undo operation for this command.
- """
- warn_prelude = (
- '{warning}: This will remove all tasks from {queues}: {names}.\n'
- ' There is no undo for this operation!\n\n'
- '(to skip this prompt use the -f option)\n'
- )
- warn_prompt = 'Are you sure you want to delete all tasks'
- fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
- fmt_empty = 'No messages purged from {qnum} {queues}'
- option_list = Command.option_list + (
- Option('--force', '-f', action='store_true',
- help="Don't prompt for verification"),
- Option('--queues', '-Q', default=[],
- help='Comma separated list of queue names to purge.'),
- Option('--exclude-queues', '-X', default=[],
- help='Comma separated list of queues names not to purge.')
- )
- def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
- queues = set(str_to_list(queues or []))
- exclude = set(str_to_list(exclude_queues or []))
- names = (queues or set(keys(self.app.amqp.queues))) - exclude
- qnum = len(names)
- messages = None
- if names:
- if not force:
- self.out(self.warn_prelude.format(
- warning=self.colored.red('WARNING'),
- queues=text.pluralize(qnum, 'queue'),
- names=', '.join(sorted(names)),
- ))
- if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
- return
- with self.app.connection_for_write() as conn:
- messages = sum(self._purge(conn, queue) for queue in names)
- fmt = self.fmt_purged if messages else self.fmt_empty
- self.out(fmt.format(
- mnum=messages, qnum=qnum,
- messages=text.pluralize(messages, 'message'),
- queues=text.pluralize(qnum, 'queue')))
- def _purge(self, conn, queue):
- try:
- return conn.default_channel.queue_purge(queue) or 0
- except conn.channel_errors:
- return 0
- class result(Command):
- """Gives the return value for a given task id.
- Examples:
- .. code-block:: console
- $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
- $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
- $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
- """
- args = '<task_id>'
- option_list = Command.option_list + (
- Option('--task', '-t', help='name of task (if custom backend)'),
- Option('--traceback', action='store_true',
- help='show traceback instead'),
- )
- def run(self, task_id, *args, **kwargs):
- result_cls = self.app.AsyncResult
- task = kwargs.get('task')
- traceback = kwargs.get('traceback', False)
- if task:
- result_cls = self.app.tasks[task].AsyncResult
- result = result_cls(task_id)
- if traceback:
- value = result.traceback
- else:
- value = result.get()
- self.out(self.pretty(value)[1])
- class _RemoteControl(Command):
- name = None
- leaf = False
- control_group = None
- option_list = Command.option_list + (
- Option('--timeout', '-t', type='float',
- help='Timeout in seconds (float) waiting for reply'),
- Option('--destination', '-d',
- help='Comma separated list of destination node names.'),
- Option('--json', '-j', action='store_true',
- help='Use json as output format.'),
- )
- def __init__(self, *args, **kwargs):
- self.show_body = kwargs.pop('show_body', True)
- self.show_reply = kwargs.pop('show_reply', True)
- super(_RemoteControl, self).__init__(*args, **kwargs)
- @classmethod
- def get_command_info(self, command,
- indent=0, prefix='', color=None,
- help=False, app=None, choices=None):
- if choices is None:
- choices = self._choices_by_group(app)
- meta = choices[command]
- if help:
- help = '|' + text.indent(meta.help, indent + 4)
- else:
- help = None
- return text.join([
- '|' + text.indent('{0}{1} {2}'.format(
- prefix, color(command), meta.signature or ''), indent),
- help,
- ])
- @classmethod
- def list_commands(self, indent=0, prefix='',
- color=None, help=False, app=None):
- choices = self._choices_by_group(app)
- color = color if color else lambda x: x
- prefix = prefix + ' ' if prefix else ''
- return '\n'.join(
- self.get_command_info(c, indent, prefix, color, help,
- app=app, choices=choices)
- for c in sorted(choices))
- def usage(self, command):
- return '%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
- command, self.args)
- def call(self, *args, **kwargs):
- raise NotImplementedError('call')
- def run(self, *args, **kwargs):
- if not args:
- raise self.UsageError(
- 'Missing {0.name} method. See --help'.format(self))
- return self.do_call_method(args, **kwargs)
- def _ensure_fanout_supported(self):
- with self.app.connection_for_write() as conn:
- if not conn.supports_exchange_type('fanout'):
- raise self.Error(
- 'Broadcast not supported by transport {0!r}'.format(
- conn.info()['transport']))
- def do_call_method(self, args,
- timeout=None, destination=None, json=False, **kwargs):
- method = args[0]
- if method == 'help':
- raise self.Error("Did you mean '{0.name} --help'?".format(self))
- try:
- meta = self.choices[method]
- except KeyError:
- raise self.UsageError(
- 'Unknown {0.name} method {1}'.format(self, method))
- self._ensure_fanout_supported()
- timeout = timeout or meta.default_timeout
- if destination and isinstance(destination, string_t):
- destination = [dest.strip() for dest in destination.split(',')]
- replies = self.call(
- method,
- arguments=self.compile_arguments(meta, method, args[1:]),
- timeout=timeout,
- destination=destination,
- callback=None if json else self.say_remote_command_reply,
- )
- if not replies:
- raise self.Error('No nodes replied within time constraint.',
- status=EX_UNAVAILABLE)
- if json:
- self.out(dumps(replies))
- return replies
- def compile_arguments(self, meta, method, args):
- args = list(args)
- kw = {}
- if meta.args:
- kw.update({
- k: v for k, v in self._consume_args(meta, method, args)
- })
- if meta.variadic:
- kw.update({meta.variadic: args})
- if not kw and args:
- raise self.Error(
- 'Command {0!r} takes no arguments.'.format(method),
- status=EX_USAGE)
- return kw or {}
- def _consume_args(self, meta, method, args):
- i = 0
- try:
- for i, arg in enumerate(args):
- try:
- name, typ = meta.args[i]
- except IndexError:
- if meta.variadic:
- break
- raise self.Error(
- 'Command {0!r} takes arguments: {1}'.format(
- method, meta.signature),
- status=EX_USAGE)
- else:
- yield name, typ(arg) if typ is not None else arg
- finally:
- args[:] = args[i:]
- @classmethod
- def _choices_by_group(self, app):
- from celery.worker.control import Panel
- # need to import task modules for custom user-remote control commands.
- app.loader.import_default_modules()
- return {
- name: info for name, info in items(Panel.meta)
- if info.type == self.control_group and info.visible
- }
- @cached_property
- def choices(self):
- return self._choices_by_group(self.app)
- @property
- def epilog(self):
- return '\n'.join([
- '[Commands]',
- self.list_commands(indent=4, help=True, app=self.app)
- ])
- class inspect(_RemoteControl):
- """Inspect the worker at runtime.
- Availability: RabbitMQ (AMQP) and Redis transports.
- Examples:
- .. code-block:: console
- $ celery inspect active --timeout=5
- $ celery inspect scheduled -d worker1@example.com
- $ celery inspect revoked -d w1@e.com,w2@e.com
- """
- name = 'inspect'
- control_group = 'inspect'
- def call(self, method, arguments, **options):
- return self.app.control.inspect(**options)._request(
- method, **arguments)
- class control(_RemoteControl):
- """Workers remote control.
- Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
- Examples:
- .. code-block:: console
- $ celery control enable_events --timeout=5
- $ celery control -d worker1@example.com enable_events
- $ celery control -d w1.e.com,w2.e.com enable_events
- $ celery control -d w1.e.com add_consumer queue_name
- $ celery control -d w1.e.com cancel_consumer queue_name
- $ celery control add_consumer queue exchange direct rkey
- """
- name = 'control'
- control_group = 'control'
- def call(self, method, arguments, **options):
- return self.app.control.broadcast(
- method, arguments=arguments, reply=True, **options)
- class status(Command):
- """Show list of workers that are online."""
- option_list = inspect.option_list
- def run(self, *args, **kwargs):
- I = inspect(
- app=self.app,
- no_color=kwargs.get('no_color', False),
- stdout=self.stdout, stderr=self.stderr,
- show_reply=False, show_body=False, quiet=True,
- )
- replies = I.run('ping', **kwargs)
- if not replies:
- raise self.Error('No nodes replied within time constraint',
- status=EX_UNAVAILABLE)
- nodecount = len(replies)
- if not kwargs.get('quiet', False):
- self.out('\n{0} {1} online.'.format(
- nodecount, text.pluralize(nodecount, 'node')))
- class migrate(Command):
- """Migrate tasks from one broker to another.
- Warning:
- This command is experimental, make sure you have a backup of
- the tasks before you continue.
- Example:
- .. code-block:: console
- $ celery migrate amqp://A.example.com amqp://guest@B.example.com//
- $ celery migrate redis://localhost amqp://guest@localhost//
- """
- args = '<source_url> <dest_url>'
- option_list = Command.option_list + (
- Option('--limit', '-n', type='int',
- help='Number of tasks to consume (int)'),
- Option('--timeout', '-t', type='float', default=1.0,
- help='Timeout in seconds (float) waiting for tasks'),
- Option('--ack-messages', '-a', action='store_true',
- help='Ack messages from source broker.'),
- Option('--tasks', '-T',
- help='List of task names to filter on.'),
- Option('--queues', '-Q',
- help='List of queues to migrate.'),
- Option('--forever', '-F', action='store_true',
- help='Continually migrate tasks until killed.'),
- )
- progress_fmt = MIGRATE_PROGRESS_FMT
- def on_migrate_task(self, state, body, message):
- self.out(self.progress_fmt.format(state=state, body=body))
- def run(self, source, destination, **kwargs):
- from kombu import Connection
- from celery.contrib.migrate import migrate_tasks
- migrate_tasks(Connection(source),
- Connection(destination),
- callback=self.on_migrate_task,
- **kwargs)
- class shell(Command): # pragma: no cover
- """Start shell session with convenient access to celery symbols.
- The following symbols will be added to the main globals:
- - ``celery``: the current application.
- - ``chord``, ``group``, ``chain``, ``chunks``,
- ``xmap``, ``xstarmap`` ``subtask``, ``Task``
- - all registered tasks.
- """
- option_list = Command.option_list + (
- Option('--ipython', '-I',
- action='store_true', help='force iPython.'),
- Option('--bpython', '-B',
- action='store_true', help='force bpython.'),
- Option('--python', '-P',
- action='store_true', help='force default Python shell.'),
- Option('--without-tasks', '-T', action='store_true',
- help="don't add tasks to locals."),
- Option('--eventlet', action='store_true',
- help='use eventlet.'),
- Option('--gevent', action='store_true', help='use gevent.'),
- )
- def run(self, ipython=False, bpython=False,
- python=False, without_tasks=False, eventlet=False,
- gevent=False, **kwargs):
- sys.path.insert(0, os.getcwd())
- if eventlet:
- import_module('celery.concurrency.eventlet')
- if gevent:
- import_module('celery.concurrency.gevent')
- import celery
- import celery.task.base
- self.app.loader.import_default_modules()
- self.locals = {
- 'app': self.app,
- 'celery': self.app,
- 'Task': celery.Task,
- 'chord': celery.chord,
- 'group': celery.group,
- 'chain': celery.chain,
- 'chunks': celery.chunks,
- 'xmap': celery.xmap,
- 'xstarmap': celery.xstarmap,
- 'subtask': celery.subtask,
- 'signature': celery.signature,
- }
- if not without_tasks:
- self.locals.update({
- task.__name__: task for task in values(self.app.tasks)
- if not task.name.startswith('celery.')
- })
- if python:
- return self.invoke_fallback_shell()
- elif bpython:
- return self.invoke_bpython_shell()
- elif ipython:
- return self.invoke_ipython_shell()
- return self.invoke_default_shell()
- def invoke_default_shell(self):
- try:
- import IPython # noqa
- except ImportError:
- try:
- import bpython # noqa
- except ImportError:
- return self.invoke_fallback_shell()
- else:
- return self.invoke_bpython_shell()
- else:
- return self.invoke_ipython_shell()
- def invoke_fallback_shell(self):
- import code
- try:
- import readline
- except ImportError:
- pass
- else:
- import rlcompleter
- readline.set_completer(
- rlcompleter.Completer(self.locals).complete)
- readline.parse_and_bind('tab:complete')
- code.interact(local=self.locals)
- def invoke_ipython_shell(self):
- for ip in (self._ipython, self._ipython_pre_10,
- self._ipython_terminal, self._ipython_010,
- self._no_ipython):
- try:
- return ip()
- except ImportError:
- pass
- def _ipython(self):
- from IPython import start_ipython
- start_ipython(argv=[], user_ns=self.locals)
- def _ipython_pre_10(self): # pragma: no cover
- from IPython.frontend.terminal.ipapp import TerminalIPythonApp
- app = TerminalIPythonApp.instance()
- app.initialize(argv=[])
- app.shell.user_ns.update(self.locals)
- app.start()
- def _ipython_terminal(self): # pragma: no cover
- from IPython.terminal import embed
- embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
- def _ipython_010(self): # pragma: no cover
- from IPython.Shell import IPShell
- IPShell(argv=[], user_ns=self.locals).mainloop()
- def _no_ipython(self): # pragma: no cover
- raise ImportError('no suitable ipython found')
- def invoke_bpython_shell(self):
- import bpython
- bpython.embed(self.locals)
- class upgrade(Command):
- """Perform upgrade between versions."""
- option_list = Command.option_list + (
- Option('--django', action='store_true',
- help='Upgrade Django project'),
- Option('--compat', action='store_true',
- help='Maintain backwards compatibility'),
- Option('--no-backup', action='store_true',
- help='Dont backup original files'),
- )
- choices = {'settings'}
- def usage(self, command):
- return '%prog <command> settings [filename] [options]'
- def run(self, *args, **kwargs):
- try:
- command = args[0]
- except IndexError:
- raise self.UsageError('missing upgrade type')
- if command not in self.choices:
- raise self.UsageError('unknown upgrade type: {0}'.format(command))
- return getattr(self, command)(*args, **kwargs)
- def settings(self, command, filename,
- no_backup=False, django=False, compat=False, **kwargs):
- lines = self._slurp(filename) if no_backup else self._backup(filename)
- keyfilter = self._compat_key if django or compat else pass1
- print('processing {0}...'.format(filename), file=self.stderr)
- with codecs.open(filename, 'w', 'utf-8') as write_fh:
- for line in lines:
- write_fh.write(self._to_new_key(line, keyfilter))
- def _slurp(self, filename):
- with codecs.open(filename, 'r', 'utf-8') as read_fh:
- return [line for line in read_fh]
- def _backup(self, filename, suffix='.orig'):
- lines = []
- backup_filename = ''.join([filename, suffix])
- print('writing backup to {0}...'.format(backup_filename),
- file=self.stderr)
- with codecs.open(filename, 'r', 'utf-8') as read_fh:
- with codecs.open(backup_filename, 'w', 'utf-8') as backup_fh:
- for line in read_fh:
- backup_fh.write(line)
- lines.append(line)
- return lines
- def _to_new_key(self, line, keyfilter=pass1, source=defaults._TO_NEW_KEY):
- # sort by length to avoid, for example, broker_transport overriding
- # broker_transport_options.
- for old_key in reversed(sorted(source, key=lambda x: len(x))):
- new_line = line.replace(old_key, keyfilter(source[old_key]))
- if line != new_line:
- return new_line # only one match per line.
- return line
- def _compat_key(self, key, namespace='CELERY'):
- key = key.upper()
- if not key.startswith(namespace):
- key = '_'.join([namespace, key])
- return key
- class help(Command):
- """Show help screen and exit."""
- def usage(self, command):
- return '%prog <command> [options] {0.args}'.format(self)
- def run(self, *args, **kwargs):
- self.parser.print_help()
- self.out(HELP.format(
- prog_name=self.prog_name,
- commands=CeleryCommand.list_commands(
- colored=self.colored, app=self.app),
- ))
- return EX_USAGE
- class report(Command):
- """Shows information useful to include in bug-reports."""
- def run(self, *args, **kwargs):
- self.out(self.app.bugreport())
- return EX_OK
- class CeleryCommand(Command):
- """Base class for commands."""
- commands = {
- 'amqp': amqp,
- 'beat': beat,
- 'call': call,
- 'control': control,
- 'events': events,
- 'graph': graph,
- 'help': help,
- 'inspect': inspect,
- 'list': list_,
- 'logtool': logtool,
- 'migrate': migrate,
- 'multi': multi,
- 'purge': purge,
- 'report': report,
- 'result': result,
- 'shell': shell,
- 'status': status,
- 'upgrade': upgrade,
- 'worker': worker,
- }
- ext_fmt = '{self.namespace}.commands'
- enable_config_from_cmdline = True
- prog_name = 'celery'
- namespace = 'celery'
- @classmethod
- def register_command(cls, fun, name=None):
- cls.commands[name or fun.__name__] = fun
- return fun
- def execute(self, command, argv=None):
- try:
- cls = self.commands[command]
- except KeyError:
- cls, argv = self.commands['help'], ['help']
- cls = self.commands.get(command) or self.commands['help']
- try:
- return cls(
- app=self.app, on_error=self.on_error,
- no_color=self.no_color, quiet=self.quiet,
- on_usage_error=partial(self.on_usage_error, command=command),
- ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
- except self.UsageError as exc:
- self.on_usage_error(exc)
- return exc.status
- except self.Error as exc:
- self.on_error(exc)
- return exc.status
- def on_usage_error(self, exc, command=None):
- if command:
- helps = '{self.prog_name} {command} --help'
- else:
- helps = '{self.prog_name} --help'
- self.error(self.colored.magenta('Error: {0}'.format(exc)))
- self.error("""Please try '{0}'""".format(helps.format(
- self=self, command=command,
- )))
- def _relocate_args_from_start(self, argv, index=0):
- if argv:
- rest = []
- while index < len(argv):
- value = argv[index]
- if value.startswith('--'):
- rest.append(value)
- elif value.startswith('-'):
- # we eat the next argument even though we don't know
- # if this option takes an argument or not.
- # instead we'll assume what's the command name in the
- # return statements below.
- try:
- nxt = argv[index + 1]
- if nxt.startswith('-'):
- # is another option
- rest.append(value)
- else:
- # is (maybe) a value for this option
- rest.extend([value, nxt])
- index += 1
- except IndexError: # pragma: no cover
- rest.append(value)
- break
- else:
- break
- index += 1
- if argv[index:]: # pragma: no cover
- # if there are more arguments left then divide and swap
- # we assume the first argument in argv[i:] is the command
- # name.
- return argv[index:] + rest
- # if there are no more arguments then the last arg in rest'
- # must be the command.
- [rest.pop()] + rest
- return []
- def prepare_prog_name(self, name):
- if name == '__main__.py':
- return sys.modules['__main__'].__file__
- return name
- def handle_argv(self, prog_name, argv):
- self.prog_name = self.prepare_prog_name(prog_name)
- argv = self._relocate_args_from_start(argv)
- _, argv = self.prepare_args(None, argv)
- try:
- command = argv[0]
- except IndexError:
- command, argv = 'help', ['help']
- return self.execute(command, argv)
- def execute_from_commandline(self, argv=None):
- argv = sys.argv if argv is None else argv
- if 'multi' in argv[1:3]: # Issue 1008
- self.respects_app_option = False
- try:
- sys.exit(determine_exit_status(
- super(CeleryCommand, self).execute_from_commandline(argv)))
- except KeyboardInterrupt:
- sys.exit(EX_FAILURE)
- @classmethod
- def get_command_info(self, command, indent=0,
- color=None, colored=None, app=None):
- colored = term.colored() if colored is None else colored
- colored = colored.names[color] if color else lambda x: x
- obj = self.commands[command]
- cmd = 'celery {0}'.format(colored(command))
- if obj.leaf:
- return '|' + text.indent(cmd, indent)
- return text.join([
- ' ',
- '|' + text.indent('{0} --help'.format(cmd), indent),
- obj.list_commands(indent, 'celery {0}'.format(command), colored,
- app=app),
- ])
- @classmethod
- def list_commands(self, indent=0, colored=None, app=None):
- colored = term.colored() if colored is None else colored
- white = colored.white
- ret = []
- for cls, commands, color in command_classes:
- ret.extend([
- text.indent('+ {0}: '.format(white(cls)), indent),
- '\n'.join(
- self.get_command_info(command, indent + 4, color, colored,
- app=app)
- for command in commands),
- ''
- ])
- return '\n'.join(ret).strip()
- def with_pool_option(self, argv):
- if len(argv) > 1 and 'worker' in argv[0:3]:
- # this command supports custom pools
- # that may have to be loaded as early as possible.
- return (['-P'], ['--pool'])
- def on_concurrency_setup(self):
- self.load_extension_commands()
- def load_extension_commands(self):
- names = Extensions(self.ext_fmt.format(self=self),
- self.register_command).load()
- if names:
- command_classes.append(('Extensions', names, 'magenta'))
- def command(*args, **kwargs):
- # Deprecated: Use classmethod
- # :meth:`CeleryCommand.register_command` instead.
- _register = CeleryCommand.register_command
- return _register(args[0]) if args else _register
- if __name__ == '__main__': # pragma: no cover
- main()
|