1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180 |
- # -*- coding: utf-8 -*-
- """
- The :program:`celery` umbrella command.
- .. program:: celery
- .. _preload-options:
- Preload Options
- ---------------
- These options are supported by all commands,
- and usually parsed before command-specific arguments.
- .. cmdoption:: -A, --app
- app instance to use (e.g. ``module.attr_name``)
- .. cmdoption:: -b, --broker
- URL to broker. default is ``amqp://guest@localhost//``
- .. cmdoption:: --loader
- name of custom loader class to use.
- .. cmdoption:: --config
- Name of the configuration module
- .. cmdoption:: -C, --no-color
- Disable colors in output.
- .. cmdoption:: -q, --quiet
- Give less verbose output (behavior depends on the sub command).
- .. cmdoption:: --help
- Show help and exit.
- .. _daemon-options:
- Daemon Options
- --------------
- These options are supported by commands that can detach
- into the background (daemon). They will be present
- in any command that also has a `--detach` option.
- .. cmdoption:: -f, --logfile
- Path to log file. If no logfile is specified, `stderr` is used.
- .. cmdoption:: --pidfile
- Optional file used to store the process pid.
- The program will not start if this file already exists
- and the pid is still alive.
- .. cmdoption:: --uid
- User id, or user name of the user to run as after detaching.
- .. cmdoption:: --gid
- Group id, or group name of the main group to change to after
- detaching.
- .. cmdoption:: --umask
- Effective umask (in octal) of the process after detaching. Inherits
- the umask of the parent process by default.
- .. cmdoption:: --workdir
- Optional directory to change to after detaching.
- .. cmdoption:: --executable
- Executable to use for the detached process.
- ``celery inspect``
- ------------------
- .. program:: celery inspect
- .. cmdoption:: -t, --timeout
- Timeout in seconds (float) waiting for reply
- .. cmdoption:: -d, --destination
- Comma separated list of destination node names.
- .. cmdoption:: -j, --json
- Use json as output format.
- ``celery control``
- ------------------
- .. program:: celery control
- .. cmdoption:: -t, --timeout
- Timeout in seconds (float) waiting for reply
- .. cmdoption:: -d, --destination
- Comma separated list of destination node names.
- .. cmdoption:: -j, --json
- Use json as output format.
- ``celery migrate``
- ------------------
- .. program:: celery migrate
- .. cmdoption:: -n, --limit
- Number of tasks to consume (int).
- .. cmdoption:: -t, -timeout
- Timeout in seconds (float) waiting for tasks.
- .. cmdoption:: -a, --ack-messages
- Ack messages from source broker.
- .. cmdoption:: -T, --tasks
- List of task names to filter on.
- .. cmdoption:: -Q, --queues
- List of queues to migrate.
- .. cmdoption:: -F, --forever
- Continually migrate tasks until killed.
- ``celery upgrade``
- ------------------
- .. program:: celery upgrade
- .. cmdoption:: --django
- Upgrade a Django project.
- .. cmdoption:: --compat
- Maintain backwards compatibility.
- .. cmdoption:: --no-backup
- Don't backup original files.
- ``celery shell``
- ----------------
- .. program:: celery shell
- .. cmdoption:: -I, --ipython
- Force :pypi:`iPython` implementation.
- .. cmdoption:: -B, --bpython
- Force :pypi:`bpython` implementation.
- .. cmdoption:: -P, --python
- Force default Python shell.
- .. cmdoption:: -T, --without-tasks
- Don't add tasks to locals.
- .. cmdoption:: --eventlet
- Use :pypi:`eventlet` monkey patches.
- .. cmdoption:: --gevent
- Use :pypi:`gevent` monkey patches.
- ``celery result``
- -----------------
- .. program:: celery result
- .. cmdoption:: -t, --task
- Name of task (if custom backend).
- .. cmdoption:: --traceback
- Show traceback if any.
- ``celery purge``
- ----------------
- .. program:: celery purge
- .. cmdoption:: -f, --force
- Don't prompt for verification before deleting messages (DANGEROUS)
- ``celery call``
- ---------------
- .. program:: celery call
- .. cmdoption:: -a, --args
- Positional arguments (json format).
- .. cmdoption:: -k, --kwargs
- Keyword arguments (json format).
- .. cmdoption:: --eta
- Scheduled time in ISO-8601 format.
- .. cmdoption:: --countdown
- ETA in seconds from now (float/int).
- .. cmdoption:: --expires
- Expiry time in float/int seconds, or a ISO-8601 date.
- .. cmdoption:: --serializer
- Specify serializer to use (default is json).
- .. cmdoption:: --queue
- Destination queue.
- .. cmdoption:: --exchange
- Destination exchange (defaults to the queue exchange).
- .. cmdoption:: --routing-key
- Destination routing key (defaults to the queue routing key).
- """
- from __future__ import absolute_import, unicode_literals, print_function
- import codecs
- import numbers
- import os
- import sys
- from functools import partial
- from importlib import import_module
- from kombu.utils import json
- from celery.app import defaults
- from celery.five import string_t, values
- from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
- from celery.utils import term
- from celery.utils import text
- from celery.utils.functional import pass1
- from celery.utils.timeutils import maybe_iso8601
- # Cannot use relative imports here due to a Windows issue (#1111).
- from celery.bin.base import Command, Option, Extensions
- # Import commands from other modules
- from celery.bin.amqp import amqp
- from celery.bin.beat import beat
- from celery.bin.events import events
- from celery.bin.graph import graph
- from celery.bin.logtool import logtool
- from celery.bin.worker import worker
- __all__ = ['CeleryCommand', 'main']
- HELP = """
- ---- -- - - ---- Commands- -------------- --- ------------
- {commands}
- ---- -- - - --------- -- - -------------- --- ------------
- Type '{prog_name} <command> --help' for help using a specific command.
- """
- MIGRATE_PROGRESS_FMT = """\
- Migrating task {state.count}/{state.strtotal}: \
- {body[task]}[{body[id]}]\
- """
- command_classes = [
- ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
- ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
- ('Utils',
- ['purge', 'list', 'call', 'result', 'migrate', 'graph', 'upgrade'],
- None),
- ('Debugging', ['report', 'logtool'], 'red'),
- ]
- def determine_exit_status(ret):
- if isinstance(ret, numbers.Integral):
- return ret
- return EX_OK if ret else EX_FAILURE
- def main(argv=None):
- # Fix for setuptools generated scripts, so that it will
- # work with multiprocessing fork emulation.
- # (see multiprocessing.forking.get_preparation_data())
- try:
- if __name__ != '__main__': # pragma: no cover
- sys.modules['__main__'] = sys.modules[__name__]
- cmd = CeleryCommand()
- cmd.maybe_patch_concurrency()
- from billiard import freeze_support
- freeze_support()
- cmd.execute_from_commandline(argv)
- except KeyboardInterrupt:
- pass
- class multi(Command):
- """Start multiple worker instances."""
- respects_app_option = False
- def get_options(self):
- pass
- def run_from_argv(self, prog_name, argv, command=None):
- from celery.bin.multi import MultiTool
- multi = MultiTool(quiet=self.quiet, no_color=self.no_color)
- return multi.execute_from_commandline(
- [command] + argv, prog_name,
- )
- class list_(Command):
- """Get info from broker.
- Examples::
- celery list bindings
- NOTE: For RabbitMQ the management plugin is required.
- """
- args = '[bindings]'
- def list_bindings(self, management):
- try:
- bindings = management.get_bindings()
- except NotImplementedError:
- raise self.Error('Your transport cannot list bindings.')
- def fmt(q, e, r):
- return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
- fmt('Queue', 'Exchange', 'Routing Key')
- fmt('-' * 16, '-' * 16, '-' * 16)
- for b in bindings:
- fmt(b['destination'], b['source'], b['routing_key'])
- def run(self, what=None, *_, **kw):
- topics = {'bindings': self.list_bindings}
- available = ', '.join(topics)
- if not what:
- raise self.UsageError(
- 'You must specify one of {0}'.format(available))
- if what not in topics:
- raise self.UsageError(
- 'unknown topic {0!r} (choose one of: {1})'.format(
- what, available))
- with self.app.connection() as conn:
- self.app.amqp.TaskConsumer(conn).declare()
- topics[what](conn.manager)
- class call(Command):
- """Call a task by name.
- Examples::
- celery call tasks.add --args='[2, 2]'
- celery call tasks.add --args='[2, 2]' --countdown=10
- """
- args = '<task_name>'
- option_list = Command.option_list + (
- Option('--args', '-a', help='positional arguments (json).'),
- Option('--kwargs', '-k', help='keyword arguments (json).'),
- Option('--eta', help='scheduled time (ISO-8601).'),
- Option('--countdown', type='float',
- help='eta in seconds from now (float/int).'),
- Option('--expires', help='expiry time (ISO-8601/float/int).'),
- Option('--serializer', default='json', help='defaults to json.'),
- Option('--queue', help='custom queue name.'),
- Option('--exchange', help='custom exchange name.'),
- Option('--routing-key', help='custom routing key.'),
- )
- def run(self, name, *_, **kw):
- # Positional args.
- args = kw.get('args') or ()
- if isinstance(args, string_t):
- args = json.loads(args)
- # Keyword args.
- kwargs = kw.get('kwargs') or {}
- if isinstance(kwargs, string_t):
- kwargs = json.loads(kwargs)
- # Expires can be int/float.
- expires = kw.get('expires') or None
- try:
- expires = float(expires)
- except (TypeError, ValueError):
- # or a string describing an ISO 8601 datetime.
- try:
- expires = maybe_iso8601(expires)
- except (TypeError, ValueError):
- raise
- res = self.app.send_task(name, args=args, kwargs=kwargs,
- countdown=kw.get('countdown'),
- serializer=kw.get('serializer'),
- queue=kw.get('queue'),
- exchange=kw.get('exchange'),
- routing_key=kw.get('routing_key'),
- eta=maybe_iso8601(kw.get('eta')),
- expires=expires)
- self.out(res.id)
- class purge(Command):
- """Erase all messages from all known task queues.
- WARNING: There is no undo operation for this command.
- """
- warn_prelude = (
- '{warning}: This will remove all tasks from {queues}: {names}.\n'
- ' There is no undo for this operation!\n\n'
- '(to skip this prompt use the -f option)\n'
- )
- warn_prompt = 'Are you sure you want to delete all tasks'
- fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
- fmt_empty = 'No messages purged from {qnum} {queues}'
- option_list = Command.option_list + (
- Option('--force', '-f', action='store_true',
- help='Do not prompt for verification'),
- )
- def run(self, force=False, **kwargs):
- names = list(sorted(self.app.amqp.queues.keys()))
- qnum = len(names)
- if not force:
- self.out(self.warn_prelude.format(
- warning=self.colored.red('WARNING'),
- queues=text.pluralize(qnum, 'queue'), names=', '.join(names),
- ))
- if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
- return
- messages = self.app.control.purge()
- fmt = self.fmt_purged if messages else self.fmt_empty
- self.out(fmt.format(
- mnum=messages, qnum=qnum,
- messages=text.pluralize(messages, 'message'),
- queues=text.pluralize(qnum, 'queue')))
- class result(Command):
- """Gives the return value for a given task id.
- Examples::
- celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
- celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
- celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
- """
- args = '<task_id>'
- option_list = Command.option_list + (
- Option('--task', '-t', help='name of task (if custom backend)'),
- Option('--traceback', action='store_true',
- help='show traceback instead'),
- )
- def run(self, task_id, *args, **kwargs):
- result_cls = self.app.AsyncResult
- task = kwargs.get('task')
- traceback = kwargs.get('traceback', False)
- if task:
- result_cls = self.app.tasks[task].AsyncResult
- result = result_cls(task_id)
- if traceback:
- value = result.traceback
- else:
- value = result.get()
- self.out(self.pretty(value)[1])
- class _RemoteControl(Command):
- name = None
- choices = None
- leaf = False
- option_list = Command.option_list + (
- Option('--timeout', '-t', type='float',
- help='Timeout in seconds (float) waiting for reply'),
- Option('--destination', '-d',
- help='Comma separated list of destination node names.'),
- Option('--json', '-j', action='store_true',
- help='Use json as output format.'),
- )
- def __init__(self, *args, **kwargs):
- self.show_body = kwargs.pop('show_body', True)
- self.show_reply = kwargs.pop('show_reply', True)
- super(_RemoteControl, self).__init__(*args, **kwargs)
- @classmethod
- def get_command_info(self, command,
- indent=0, prefix='', color=None, help=False):
- if help:
- help = '|' + text.indent(self.choices[command][1], indent + 4)
- else:
- help = None
- try:
- # see if it uses args.
- meth = getattr(self, command)
- return text.join([
- '|' + text.indent('{0}{1} {2}'.format(
- prefix, color(command), meth.__doc__), indent),
- help,
- ])
- except AttributeError:
- return text.join([
- '|' + text.indent(prefix + str(color(command)), indent), help,
- ])
- @classmethod
- def list_commands(self, indent=0, prefix='', color=None, help=False):
- color = color if color else lambda x: x
- prefix = prefix + ' ' if prefix else ''
- return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
- for c in sorted(self.choices))
- @property
- def epilog(self):
- return '\n'.join([
- '[Commands]',
- self.list_commands(indent=4, help=True)
- ])
- def usage(self, command):
- return '%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
- command, self.args)
- def call(self, *args, **kwargs):
- raise NotImplementedError('call')
- def run(self, *args, **kwargs):
- if not args:
- raise self.UsageError(
- 'Missing {0.name} method. See --help'.format(self))
- return self.do_call_method(args, **kwargs)
- def do_call_method(self, args, **kwargs):
- method = args[0]
- if method == 'help':
- raise self.Error("Did you mean '{0.name} --help'?".format(self))
- if method not in self.choices:
- raise self.UsageError(
- 'Unknown {0.name} method {1}'.format(self, method))
- if self.app.connection_for_write().transport.driver_type == 'sql':
- raise self.Error('Broadcast not supported by SQL broker transport')
- output_json = kwargs.get('json')
- destination = kwargs.get('destination')
- timeout = kwargs.get('timeout') or self.choices[method][0]
- if destination and isinstance(destination, string_t):
- destination = [dest.strip() for dest in destination.split(',')]
- handler = getattr(self, method, self.call)
- callback = None if output_json else self.say_remote_command_reply
- replies = handler(method, *args[1:], timeout=timeout,
- destination=destination,
- callback=callback)
- if not replies:
- raise self.Error('No nodes replied within time constraint.',
- status=EX_UNAVAILABLE)
- if output_json:
- self.out(json.dumps(replies))
- return replies
- class inspect(_RemoteControl):
- """Inspect the worker at runtime.
- Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
- Examples::
- celery inspect active --timeout=5
- celery inspect scheduled -d worker1@example.com
- celery inspect revoked -d w1@e.com,w2@e.com
- """
- name = 'inspect'
- choices = {
- 'active': (1.0, 'dump active tasks (being processed)'),
- 'active_queues': (1.0, 'dump queues being consumed from'),
- 'scheduled': (1.0, 'dump scheduled tasks (eta/countdown/retry)'),
- 'reserved': (1.0, 'dump reserved tasks (waiting to be processed)'),
- 'stats': (1.0, 'dump worker statistics'),
- 'revoked': (1.0, 'dump of revoked task ids'),
- 'registered': (1.0, 'dump of registered tasks'),
- 'ping': (0.2, 'ping worker(s)'),
- 'clock': (1.0, 'get value of logical clock'),
- 'conf': (1.0, 'dump worker configuration'),
- 'report': (1.0, 'get bugreport info'),
- 'memsample': (1.0, 'sample memory (requires psutil)'),
- 'memdump': (1.0, 'dump memory samples (requires psutil)'),
- 'objgraph': (60.0, 'create object graph (requires objgraph)'),
- }
- def call(self, method, *args, **options):
- i = self.app.control.inspect(**options)
- return getattr(i, method)(*args)
- def objgraph(self, type_='Request', *args, **kwargs):
- return self.call('objgraph', type_, **kwargs)
- def conf(self, with_defaults=False, *args, **kwargs):
- return self.call('conf', with_defaults, **kwargs)
- class control(_RemoteControl):
- """Workers remote control.
- Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
- Examples::
- celery control enable_events --timeout=5
- celery control -d worker1@example.com enable_events
- celery control -d w1.e.com,w2.e.com enable_events
- celery control -d w1.e.com add_consumer queue_name
- celery control -d w1.e.com cancel_consumer queue_name
- celery control -d w1.e.com add_consumer queue exchange direct rkey
- """
- name = 'control'
- choices = {
- 'enable_events': (1.0, 'tell worker(s) to enable events'),
- 'disable_events': (1.0, 'tell worker(s) to disable events'),
- 'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
- 'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
- 'rate_limit': (
- 1.0, 'tell worker(s) to modify the rate limit for a task type'),
- 'time_limit': (
- 1.0, 'tell worker(s) to modify the time limit for a task type.'),
- 'autoscale': (1.0, 'change autoscale settings'),
- 'pool_grow': (1.0, 'start more pool processes'),
- 'pool_shrink': (1.0, 'use less pool processes'),
- }
- def call(self, method, *args, **options):
- return getattr(self.app.control, method)(*args, reply=True, **options)
- def pool_grow(self, method, n=1, **kwargs):
- """[N=1]"""
- return self.call(method, int(n), **kwargs)
- def pool_shrink(self, method, n=1, **kwargs):
- """[N=1]"""
- return self.call(method, int(n), **kwargs)
- def autoscale(self, method, max=None, min=None, **kwargs):
- """[max] [min]"""
- return self.call(method, int(max), int(min), **kwargs)
- def rate_limit(self, method, task_name, rate_limit, **kwargs):
- """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
- return self.call(method, task_name, rate_limit, **kwargs)
- def time_limit(self, method, task_name, soft, hard=None, **kwargs):
- """<task_name> <soft_secs> [hard_secs]"""
- return self.call(method, task_name,
- float(soft), float(hard), **kwargs)
- def add_consumer(self, method, queue, exchange=None,
- exchange_type='direct', routing_key=None, **kwargs):
- """<queue> [exchange [type [routing_key]]]"""
- return self.call(method, queue, exchange,
- exchange_type, routing_key, **kwargs)
- def cancel_consumer(self, method, queue, **kwargs):
- """<queue>"""
- return self.call(method, queue, **kwargs)
- class status(Command):
- """Show list of workers that are online."""
- option_list = inspect.option_list
- def run(self, *args, **kwargs):
- I = inspect(
- app=self.app,
- no_color=kwargs.get('no_color', False),
- stdout=self.stdout, stderr=self.stderr,
- show_reply=False, show_body=False, quiet=True,
- )
- replies = I.run('ping', **kwargs)
- if not replies:
- raise self.Error('No nodes replied within time constraint',
- status=EX_UNAVAILABLE)
- nodecount = len(replies)
- if not kwargs.get('quiet', False):
- self.out('\n{0} {1} online.'.format(
- nodecount, text.pluralize(nodecount, 'node')))
- class migrate(Command):
- """Migrate tasks from one broker to another.
- Examples:
- .. code-block:: console
- $ celery migrate redis://localhost amqp://guest@localhost//
- $ celery migrate django:// redis://localhost
- NOTE: This command is experimental, make sure you have
- a backup of the tasks before you continue.
- """
- args = '<source_url> <dest_url>'
- option_list = Command.option_list + (
- Option('--limit', '-n', type='int',
- help='Number of tasks to consume (int)'),
- Option('--timeout', '-t', type='float', default=1.0,
- help='Timeout in seconds (float) waiting for tasks'),
- Option('--ack-messages', '-a', action='store_true',
- help='Ack messages from source broker.'),
- Option('--tasks', '-T',
- help='List of task names to filter on.'),
- Option('--queues', '-Q',
- help='List of queues to migrate.'),
- Option('--forever', '-F', action='store_true',
- help='Continually migrate tasks until killed.'),
- )
- progress_fmt = MIGRATE_PROGRESS_FMT
- def on_migrate_task(self, state, body, message):
- self.out(self.progress_fmt.format(state=state, body=body))
- def run(self, source, destination, **kwargs):
- from kombu import Connection
- from celery.contrib.migrate import migrate_tasks
- migrate_tasks(Connection(source),
- Connection(destination),
- callback=self.on_migrate_task,
- **kwargs)
- class shell(Command): # pragma: no cover
- """Start shell session with convenient access to celery symbols.
- The following symbols will be added to the main globals:
- - celery: the current application.
- - chord, group, chain, chunks,
- xmap, xstarmap subtask, Task
- - all registered tasks.
- """
- option_list = Command.option_list + (
- Option('--ipython', '-I',
- action='store_true', dest='force_ipython',
- help='force iPython.'),
- Option('--bpython', '-B',
- action='store_true', dest='force_bpython',
- help='force bpython.'),
- Option('--python', '-P',
- action='store_true', dest='force_python',
- help='force default Python shell.'),
- Option('--without-tasks', '-T', action='store_true',
- help="don't add tasks to locals."),
- Option('--eventlet', action='store_true',
- help='use eventlet.'),
- Option('--gevent', action='store_true', help='use gevent.'),
- )
- def run(self, force_ipython=False, force_bpython=False,
- force_python=False, without_tasks=False, eventlet=False,
- gevent=False, **kwargs):
- sys.path.insert(0, os.getcwd())
- if eventlet:
- import_module('celery.concurrency.eventlet')
- if gevent:
- import_module('celery.concurrency.gevent')
- import celery
- import celery.task.base
- self.app.loader.import_default_modules()
- self.locals = {'app': self.app,
- 'celery': self.app,
- 'Task': celery.Task,
- 'chord': celery.chord,
- 'group': celery.group,
- 'chain': celery.chain,
- 'chunks': celery.chunks,
- 'xmap': celery.xmap,
- 'xstarmap': celery.xstarmap,
- 'subtask': celery.subtask,
- 'signature': celery.signature}
- if not without_tasks:
- self.locals.update({
- task.__name__: task for task in values(self.app.tasks)
- if not task.name.startswith('celery.')
- })
- if force_python:
- return self.invoke_fallback_shell()
- elif force_bpython:
- return self.invoke_bpython_shell()
- elif force_ipython:
- return self.invoke_ipython_shell()
- return self.invoke_default_shell()
- def invoke_default_shell(self):
- try:
- import IPython # noqa
- except ImportError:
- try:
- import bpython # noqa
- except ImportError:
- return self.invoke_fallback_shell()
- else:
- return self.invoke_bpython_shell()
- else:
- return self.invoke_ipython_shell()
- def invoke_fallback_shell(self):
- import code
- try:
- import readline
- except ImportError:
- pass
- else:
- import rlcompleter
- readline.set_completer(
- rlcompleter.Completer(self.locals).complete)
- readline.parse_and_bind('tab:complete')
- code.interact(local=self.locals)
- def invoke_ipython_shell(self):
- for ip in (self._ipython, self._ipython_pre_10,
- self._ipython_terminal, self._ipython_010,
- self._no_ipython):
- try:
- return ip()
- except ImportError:
- pass
- def _ipython(self):
- from IPython import start_ipython
- start_ipython(argv=[], user_ns=self.locals)
- def _ipython_pre_10(self): # pragma: no cover
- from IPython.frontend.terminal.ipapp import TerminalIPythonApp
- app = TerminalIPythonApp.instance()
- app.initialize(argv=[])
- app.shell.user_ns.update(self.locals)
- app.start()
- def _ipython_terminal(self): # pragma: no cover
- from IPython.terminal import embed
- embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
- def _ipython_010(self): # pragma: no cover
- from IPython.Shell import IPShell
- IPShell(argv=[], user_ns=self.locals).mainloop()
- def _no_ipython(self): # pragma: no cover
- raise ImportError('no suitable ipython found')
- def invoke_bpython_shell(self):
- import bpython
- bpython.embed(self.locals)
- class upgrade(Command):
- """Perform upgrade between versions."""
- option_list = Command.option_list + (
- Option('--django', action='store_true',
- help='Upgrade Django project'),
- Option('--compat', action='store_true',
- help='Maintain backwards compatibility'),
- Option('--no-backup', action='store_true',
- help='Dont backup original files'),
- )
- choices = {'settings'}
- def usage(self, command):
- return '%prog <command> settings [filename] [options]'
- def run(self, *args, **kwargs):
- try:
- command = args[0]
- except IndexError:
- raise self.UsageError('missing upgrade type')
- if command not in self.choices:
- raise self.UsageError('unknown upgrade type: {0}'.format(command))
- return getattr(self, command)(*args, **kwargs)
- def settings(self, command, filename,
- no_backup=False, django=False, compat=False, **kwargs):
- lines = self._slurp(filename) if no_backup else self._backup(filename)
- keyfilter = self._compat_key if django or compat else pass1
- print('processing {0}...'.format(filename), file=self.stderr)
- with codecs.open(filename, 'w', 'utf-8') as write_fh:
- for line in lines:
- write_fh.write(self._to_new_key(line, keyfilter))
- def _slurp(self, filename):
- with codecs.open(filename, 'r', 'utf-8') as read_fh:
- return [line for line in read_fh]
- def _backup(self, filename, suffix='.orig'):
- lines = []
- backup_filename = ''.join([filename, suffix])
- print('writing backup to {0}...'.format(backup_filename),
- file=self.stderr)
- with codecs.open(filename, 'r', 'utf-8') as read_fh:
- with codecs.open(backup_filename, 'w', 'utf-8') as backup_fh:
- for line in read_fh:
- backup_fh.write(line)
- lines.append(line)
- return lines
- def _to_new_key(self, line, keyfilter=pass1, source=defaults._TO_NEW_KEY):
- # sort by length to avoid e.g. broker_transport overriding
- # broker_transport_options.
- for old_key in reversed(sorted(source, key=lambda x: len(x))):
- new_line = line.replace(old_key, keyfilter(source[old_key]))
- if line != new_line:
- return new_line # only one match per line.
- return line
- def _compat_key(self, key, namespace='CELERY'):
- key = key.upper()
- if not key.startswith(namespace):
- key = '_'.join([namespace, key])
- return key
- class help(Command):
- """Show help screen and exit."""
- def usage(self, command):
- return '%prog <command> [options] {0.args}'.format(self)
- def run(self, *args, **kwargs):
- self.parser.print_help()
- self.out(HELP.format(
- prog_name=self.prog_name,
- commands=CeleryCommand.list_commands(colored=self.colored),
- ))
- return EX_USAGE
- class report(Command):
- """Shows information useful to include in bug-reports."""
- def run(self, *args, **kwargs):
- self.out(self.app.bugreport())
- return EX_OK
- class CeleryCommand(Command):
- ext_fmt = '{self.namespace}.commands'
- commands = {
- 'amqp': amqp,
- 'beat': beat,
- 'call': call,
- 'control': control,
- 'events': events,
- 'graph': graph,
- 'help': help,
- 'inspect': inspect,
- 'list': list_,
- 'logtool': logtool,
- 'migrate': migrate,
- 'multi': multi,
- 'purge': purge,
- 'report': report,
- 'result': result,
- 'shell': shell,
- 'status': status,
- 'upgrade': upgrade,
- 'worker': worker,
- }
- enable_config_from_cmdline = True
- prog_name = 'celery'
- namespace = 'celery'
- @classmethod
- def register_command(cls, fun, name=None):
- cls.commands[name or fun.__name__] = fun
- return fun
- def execute(self, command, argv=None):
- try:
- cls = self.commands[command]
- except KeyError:
- cls, argv = self.commands['help'], ['help']
- cls = self.commands.get(command) or self.commands['help']
- try:
- return cls(
- app=self.app, on_error=self.on_error,
- no_color=self.no_color, quiet=self.quiet,
- on_usage_error=partial(self.on_usage_error, command=command),
- ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
- except self.UsageError as exc:
- self.on_usage_error(exc)
- return exc.status
- except self.Error as exc:
- self.on_error(exc)
- return exc.status
- def on_usage_error(self, exc, command=None):
- if command:
- helps = '{self.prog_name} {command} --help'
- else:
- helps = '{self.prog_name} --help'
- self.error(self.colored.magenta('Error: {0}'.format(exc)))
- self.error("""Please try '{0}'""".format(helps.format(
- self=self, command=command,
- )))
- def _relocate_args_from_start(self, argv, index=0):
- if argv:
- rest = []
- while index < len(argv):
- value = argv[index]
- if value.startswith('--'):
- rest.append(value)
- elif value.startswith('-'):
- # we eat the next argument even though we don't know
- # if this option takes an argument or not.
- # instead we will assume what is the command name in the
- # return statements below.
- try:
- nxt = argv[index + 1]
- if nxt.startswith('-'):
- # is another option
- rest.append(value)
- else:
- # is (maybe) a value for this option
- rest.extend([value, nxt])
- index += 1
- except IndexError: # pragma: no cover
- rest.append(value)
- break
- else:
- break
- index += 1
- if argv[index:]: # pragma: no cover
- # if there are more arguments left then divide and swap
- # we assume the first argument in argv[i:] is the command
- # name.
- return argv[index:] + rest
- # if there are no more arguments then the last arg in rest'
- # must be the command.
- [rest.pop()] + rest
- return []
- def prepare_prog_name(self, name):
- if name == '__main__.py':
- return sys.modules['__main__'].__file__
- return name
- def handle_argv(self, prog_name, argv):
- self.prog_name = self.prepare_prog_name(prog_name)
- argv = self._relocate_args_from_start(argv)
- _, argv = self.prepare_args(None, argv)
- try:
- command = argv[0]
- except IndexError:
- command, argv = 'help', ['help']
- return self.execute(command, argv)
- def execute_from_commandline(self, argv=None):
- argv = sys.argv if argv is None else argv
- if 'multi' in argv[1:3]: # Issue 1008
- self.respects_app_option = False
- try:
- sys.exit(determine_exit_status(
- super(CeleryCommand, self).execute_from_commandline(argv)))
- except KeyboardInterrupt:
- sys.exit(EX_FAILURE)
- @classmethod
- def get_command_info(self, command, indent=0, color=None, colored=None):
- colored = term.colored() if colored is None else colored
- colored = colored.names[color] if color else lambda x: x
- obj = self.commands[command]
- cmd = 'celery {0}'.format(colored(command))
- if obj.leaf:
- return '|' + text.indent(cmd, indent)
- return text.join([
- ' ',
- '|' + text.indent('{0} --help'.format(cmd), indent),
- obj.list_commands(indent, 'celery {0}'.format(command), colored),
- ])
- @classmethod
- def list_commands(self, indent=0, colored=None):
- colored = term.colored() if colored is None else colored
- white = colored.white
- ret = []
- for cls, commands, color in command_classes:
- ret.extend([
- text.indent('+ {0}: '.format(white(cls)), indent),
- '\n'.join(
- self.get_command_info(command, indent + 4, color, colored)
- for command in commands),
- ''
- ])
- return '\n'.join(ret).strip()
- def with_pool_option(self, argv):
- if len(argv) > 1 and 'worker' in argv[0:3]:
- # this command supports custom pools
- # that may have to be loaded as early as possible.
- return (['-P'], ['--pool'])
- def on_concurrency_setup(self):
- self.load_extension_commands()
- def load_extension_commands(self):
- names = Extensions(self.ext_fmt.format(self=self),
- self.register_command).load()
- if names:
- command_classes.append(('Extensions', names, 'magenta'))
- def command(*args, **kwargs):
- """Deprecated: Use classmethod :meth:`CeleryCommand.register_command`
- instead."""
- _register = CeleryCommand.register_command
- return _register(args[0]) if args else _register
- if __name__ == '__main__': # pragma: no cover
- main()
|