12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202 |
- """
- The :program:`celery` umbrella command.
- .. program:: celery
- .. _preload-options:
- Preload Options
- ---------------
- These options are supported by all commands,
- and usually parsed before command-specific arguments.
- .. cmdoption:: -A, --app
- app instance to use (e.g. ``module.attr_name``)
- .. cmdoption:: -b, --broker
- URL to broker. default is ``amqp://guest@localhost//``
- .. cmdoption:: --loader
- name of custom loader class to use.
- .. cmdoption:: --config
- Name of the configuration module
- .. cmdoption:: -C, --no-color
- Disable colors in output.
- .. cmdoption:: -q, --quiet
- Give less verbose output (behavior depends on the sub command).
- .. cmdoption:: --help
- Show help and exit.
- .. _daemon-options:
- Daemon Options
- --------------
- These options are supported by commands that can detach
- into the background (daemon). They will be present
- in any command that also has a `--detach` option.
- .. cmdoption:: -f, --logfile
- Path to log file. If no logfile is specified, `stderr` is used.
- .. cmdoption:: --pidfile
- Optional file used to store the process pid.
- The program will not start if this file already exists
- and the pid is still alive.
- .. cmdoption:: --uid
- User id, or user name of the user to run as after detaching.
- .. cmdoption:: --gid
- Group id, or group name of the main group to change to after
- detaching.
- .. cmdoption:: --umask
- Effective umask (in octal) of the process after detaching. Inherits
- the umask of the parent process by default.
- .. cmdoption:: --workdir
- Optional directory to change to after detaching.
- .. cmdoption:: --executable
- Executable to use for the detached process.
- ``celery inspect``
- ------------------
- .. program:: celery inspect
- .. cmdoption:: -t, --timeout
- Timeout in seconds (float) waiting for reply
- .. cmdoption:: -d, --destination
- Comma separated list of destination node names.
- .. cmdoption:: -j, --json
- Use json as output format.
- ``celery control``
- ------------------
- .. program:: celery control
- .. cmdoption:: -t, --timeout
- Timeout in seconds (float) waiting for reply
- .. cmdoption:: -d, --destination
- Comma separated list of destination node names.
- .. cmdoption:: -j, --json
- Use json as output format.
- ``celery migrate``
- ------------------
- .. program:: celery migrate
- .. cmdoption:: -n, --limit
- Number of tasks to consume (int).
- .. cmdoption:: -t, -timeout
- Timeout in seconds (float) waiting for tasks.
- .. cmdoption:: -a, --ack-messages
- Ack messages from source broker.
- .. cmdoption:: -T, --tasks
- List of task names to filter on.
- .. cmdoption:: -Q, --queues
- List of queues to migrate.
- .. cmdoption:: -F, --forever
- Continually migrate tasks until killed.
- ``celery upgrade``
- ------------------
- .. program:: celery upgrade
- .. cmdoption:: --django
- Upgrade a Django project.
- .. cmdoption:: --compat
- Maintain backwards compatibility.
- .. cmdoption:: --no-backup
- Don't backup original files.
- ``celery shell``
- ----------------
- .. program:: celery shell
- .. cmdoption:: -I, --ipython
- Force :pypi:`iPython` implementation.
- .. cmdoption:: -B, --bpython
- Force :pypi:`bpython` implementation.
- .. cmdoption:: -P, --python
- Force default Python shell.
- .. cmdoption:: -T, --without-tasks
- Don't add tasks to locals.
- .. cmdoption:: --eventlet
- Use :pypi:`eventlet` monkey patches.
- .. cmdoption:: --gevent
- Use :pypi:`gevent` monkey patches.
- ``celery result``
- -----------------
- .. program:: celery result
- .. cmdoption:: -t, --task
- Name of task (if custom backend).
- .. cmdoption:: --traceback
- Show traceback if any.
- ``celery purge``
- ----------------
- .. program:: celery purge
- .. cmdoption:: -f, --force
- Don't prompt for verification before deleting messages (DANGEROUS)
- ``celery call``
- ---------------
- .. program:: celery call
- .. cmdoption:: -a, --args
- Positional arguments (json format).
- .. cmdoption:: -k, --kwargs
- Keyword arguments (json format).
- .. cmdoption:: --eta
- Scheduled time in ISO-8601 format.
- .. cmdoption:: --countdown
- ETA in seconds from now (float/int).
- .. cmdoption:: --expires
- Expiry time in float/int seconds, or a ISO-8601 date.
- .. cmdoption:: --serializer
- Specify serializer to use (default is json).
- .. cmdoption:: --queue
- Destination queue.
- .. cmdoption:: --exchange
- Destination exchange (defaults to the queue exchange).
- .. cmdoption:: --routing-key
- Destination routing key (defaults to the queue routing key).
- """
- from __future__ import absolute_import, unicode_literals, print_function
- import codecs
- import numbers
- import os
- import sys
- from functools import partial
- from importlib import import_module
- from kombu.utils import json
- from celery.app import defaults
- from celery.five import keys, string_t, values
- from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
- from celery.utils import term
- from celery.utils import text
- from celery.utils.functional import pass1
- from celery.utils.text import str_to_list
- from celery.utils.timeutils import maybe_iso8601
- from celery.bin.base import Command, Option, Extensions
- from celery.bin.amqp import amqp
- from celery.bin.beat import beat
- from celery.bin.events import events
- from celery.bin.graph import graph
- from celery.bin.logtool import logtool
- from celery.bin.worker import worker
- __all__ = ['CeleryCommand', 'main']
- HELP = """
- ---- -- - - ---- Commands- -------------- --- ------------
- {commands}
- ---- -- - - --------- -- - -------------- --- ------------
- Type '{prog_name} <command> --help' for help using a specific command.
- """
- MIGRATE_PROGRESS_FMT = """\
- Migrating task {state.count}/{state.strtotal}: \
- {body[task]}[{body[id]}]\
- """
- command_classes = [
- ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
- ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
- ('Utils',
- ['purge', 'list', 'call', 'result', 'migrate', 'graph', 'upgrade'],
- None),
- ('Debugging', ['report', 'logtool'], 'red'),
- ]
- def determine_exit_status(ret):
- if isinstance(ret, numbers.Integral):
- return ret
- return EX_OK if ret else EX_FAILURE
- def main(argv=None):
-
-
-
- try:
- if __name__ != '__main__':
- sys.modules['__main__'] = sys.modules[__name__]
- cmd = CeleryCommand()
- cmd.maybe_patch_concurrency()
- from billiard import freeze_support
- freeze_support()
- cmd.execute_from_commandline(argv)
- except KeyboardInterrupt:
- pass
- class multi(Command):
- """Start multiple worker instances."""
- respects_app_option = False
- def get_options(self):
- pass
- def run_from_argv(self, prog_name, argv, command=None):
- from celery.bin.multi import MultiTool
- multi = MultiTool(quiet=self.quiet, no_color=self.no_color)
- return multi.execute_from_commandline(
- [command] + argv, prog_name,
- )
- class list_(Command):
- """Get info from broker.
- Examples::
- celery list bindings
- NOTE: For RabbitMQ the management plugin is required.
- """
- args = '[bindings]'
- def list_bindings(self, management):
- try:
- bindings = management.get_bindings()
- except NotImplementedError:
- raise self.Error('Your transport cannot list bindings.')
- def fmt(q, e, r):
- return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
- fmt('Queue', 'Exchange', 'Routing Key')
- fmt('-' * 16, '-' * 16, '-' * 16)
- for b in bindings:
- fmt(b['destination'], b['source'], b['routing_key'])
- def run(self, what=None, *_, **kw):
- topics = {'bindings': self.list_bindings}
- available = ', '.join(topics)
- if not what:
- raise self.UsageError(
- 'You must specify one of {0}'.format(available))
- if what not in topics:
- raise self.UsageError(
- 'unknown topic {0!r} (choose one of: {1})'.format(
- what, available))
- with self.app.connection() as conn:
- self.app.amqp.TaskConsumer(conn).declare()
- topics[what](conn.manager)
- class call(Command):
- """Call a task by name.
- Examples::
- celery call tasks.add --args='[2, 2]'
- celery call tasks.add --args='[2, 2]' --countdown=10
- """
- args = '<task_name>'
- option_list = Command.option_list + (
- Option('--args', '-a', help='positional arguments (json).'),
- Option('--kwargs', '-k', help='keyword arguments (json).'),
- Option('--eta', help='scheduled time (ISO-8601).'),
- Option('--countdown', type='float',
- help='eta in seconds from now (float/int).'),
- Option('--expires', help='expiry time (ISO-8601/float/int).'),
- Option('--serializer', default='json', help='defaults to json.'),
- Option('--queue', help='custom queue name.'),
- Option('--exchange', help='custom exchange name.'),
- Option('--routing-key', help='custom routing key.'),
- )
- def run(self, name, *_, **kw):
-
- args = kw.get('args') or ()
- if isinstance(args, string_t):
- args = json.loads(args)
-
- kwargs = kw.get('kwargs') or {}
- if isinstance(kwargs, string_t):
- kwargs = json.loads(kwargs)
-
- expires = kw.get('expires') or None
- try:
- expires = float(expires)
- except (TypeError, ValueError):
-
- try:
- expires = maybe_iso8601(expires)
- except (TypeError, ValueError):
- raise
- res = self.app.send_task(name, args=args, kwargs=kwargs,
- countdown=kw.get('countdown'),
- serializer=kw.get('serializer'),
- queue=kw.get('queue'),
- exchange=kw.get('exchange'),
- routing_key=kw.get('routing_key'),
- eta=maybe_iso8601(kw.get('eta')),
- expires=expires)
- self.out(res.id)
- class purge(Command):
- """Erase all messages from all known task queues.
- WARNING: There is no undo operation for this command.
- """
- warn_prelude = (
- '{warning}: This will remove all tasks from {queues}: {names}.\n'
- ' There is no undo for this operation!\n\n'
- '(to skip this prompt use the -f option)\n'
- )
- warn_prompt = 'Are you sure you want to delete all tasks'
- fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
- fmt_empty = 'No messages purged from {qnum} {queues}'
- option_list = Command.option_list + (
- Option('--force', '-f', action='store_true',
- help='Do not prompt for verification'),
- Option('--queues', '-Q', default=[],
- help='Comma separated list of queue names to purge.'),
- Option('--exclude-queues', '-X', default=[],
- help='Comma separated list of queues names not to purge.')
- )
- def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
- queues = set(str_to_list(queues or []))
- exclude = set(str_to_list(exclude_queues or []))
- names = (queues or set(keys(self.app.amqp.queues))) - exclude
- qnum = len(names)
- messages = None
- if names:
- if not force:
- self.out(self.warn_prelude.format(
- warning=self.colored.red('WARNING'),
- queues=text.pluralize(qnum, 'queue'),
- names=', '.join(sorted(names)),
- ))
- if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
- return
- with self.app.connection_for_write() as conn:
- messages = sum(self._purge(conn, queue) for queue in names)
- fmt = self.fmt_purged if messages else self.fmt_empty
- self.out(fmt.format(
- mnum=messages, qnum=qnum,
- messages=text.pluralize(messages, 'message'),
- queues=text.pluralize(qnum, 'queue')))
- def _purge(self, conn, queue):
- try:
- return conn.default_channel.queue_purge(queue) or 0
- except conn.channel_errors:
- return 0
- class result(Command):
- """Gives the return value for a given task id.
- Examples::
- celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
- celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
- celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
- """
- args = '<task_id>'
- option_list = Command.option_list + (
- Option('--task', '-t', help='name of task (if custom backend)'),
- Option('--traceback', action='store_true',
- help='show traceback instead'),
- )
- def run(self, task_id, *args, **kwargs):
- result_cls = self.app.AsyncResult
- task = kwargs.get('task')
- traceback = kwargs.get('traceback', False)
- if task:
- result_cls = self.app.tasks[task].AsyncResult
- result = result_cls(task_id)
- if traceback:
- value = result.traceback
- else:
- value = result.get()
- self.out(self.pretty(value)[1])
- class _RemoteControl(Command):
- name = None
- choices = None
- leaf = False
- option_list = Command.option_list + (
- Option('--timeout', '-t', type='float',
- help='Timeout in seconds (float) waiting for reply'),
- Option('--destination', '-d',
- help='Comma separated list of destination node names.'),
- Option('--json', '-j', action='store_true',
- help='Use json as output format.'),
- )
- def __init__(self, *args, **kwargs):
- self.show_body = kwargs.pop('show_body', True)
- self.show_reply = kwargs.pop('show_reply', True)
- super(_RemoteControl, self).__init__(*args, **kwargs)
- @classmethod
- def get_command_info(self, command,
- indent=0, prefix='', color=None, help=False):
- if help:
- help = '|' + text.indent(self.choices[command][1], indent + 4)
- else:
- help = None
- try:
-
- meth = getattr(self, command)
- return text.join([
- '|' + text.indent('{0}{1} {2}'.format(
- prefix, color(command), meth.__doc__), indent),
- help,
- ])
- except AttributeError:
- return text.join([
- '|' + text.indent(prefix + str(color(command)), indent), help,
- ])
- @classmethod
- def list_commands(self, indent=0, prefix='', color=None, help=False):
- color = color if color else lambda x: x
- prefix = prefix + ' ' if prefix else ''
- return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
- for c in sorted(self.choices))
- @property
- def epilog(self):
- return '\n'.join([
- '[Commands]',
- self.list_commands(indent=4, help=True)
- ])
- def usage(self, command):
- return '%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
- command, self.args)
- def call(self, *args, **kwargs):
- raise NotImplementedError('call')
- def run(self, *args, **kwargs):
- if not args:
- raise self.UsageError(
- 'Missing {0.name} method. See --help'.format(self))
- return self.do_call_method(args, **kwargs)
- def do_call_method(self, args, **kwargs):
- method = args[0]
- if method == 'help':
- raise self.Error("Did you mean '{0.name} --help'?".format(self))
- if method not in self.choices:
- raise self.UsageError(
- 'Unknown {0.name} method {1}'.format(self, method))
- if self.app.connection_for_write().transport.driver_type == 'sql':
- raise self.Error('Broadcast not supported by SQL broker transport')
- output_json = kwargs.get('json')
- destination = kwargs.get('destination')
- timeout = kwargs.get('timeout') or self.choices[method][0]
- if destination and isinstance(destination, string_t):
- destination = [dest.strip() for dest in destination.split(',')]
- handler = getattr(self, method, self.call)
- callback = None if output_json else self.say_remote_command_reply
- replies = handler(method, *args[1:], timeout=timeout,
- destination=destination,
- callback=callback)
- if not replies:
- raise self.Error('No nodes replied within time constraint.',
- status=EX_UNAVAILABLE)
- if output_json:
- self.out(json.dumps(replies))
- return replies
- class inspect(_RemoteControl):
- """Inspect the worker at runtime.
- Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
- Examples::
- celery inspect active --timeout=5
- celery inspect scheduled -d worker1@example.com
- celery inspect revoked -d w1@e.com,w2@e.com
- """
- name = 'inspect'
- choices = {
- 'active': (1.0, 'dump active tasks (being processed)'),
- 'active_queues': (1.0, 'dump queues being consumed from'),
- 'clock': (1.0, 'get value of logical clock'),
- 'conf': (1.0, 'dump worker configuration'),
- 'memdump': (1.0, 'dump memory samples (requires psutil)'),
- 'memsample': (1.0, 'sample memory (requires psutil)'),
- 'objgraph': (60.0, 'create object graph (requires objgraph)'),
- 'ping': (0.2, 'ping worker(s)'),
- 'query_task': (1.0, 'query for task information by id'),
- 'reserved': (1.0, 'dump reserved tasks (waiting to be processed)'),
- 'scheduled': (1.0, 'dump scheduled tasks (eta/countdown/retry)'),
- 'stats': (1.0, 'dump worker statistics'),
- 'registered': (1.0, 'dump of registered tasks'),
- 'report': (1.0, 'get bugreport info'),
- 'revoked': (1.0, 'dump of revoked task ids'),
- }
- def call(self, method, *args, **options):
- i = self.app.control.inspect(**options)
- return getattr(i, method)(*args)
- def objgraph(self, type_='Request', *args, **kwargs):
- return self.call('objgraph', type_, **kwargs)
- def conf(self, with_defaults=False, *args, **kwargs):
- return self.call('conf', with_defaults, **kwargs)
- def query_task(self, *ids, **options):
- return self.call('query_task', ids, **options)
- class control(_RemoteControl):
- """Workers remote control.
- Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
- Examples::
- celery control enable_events --timeout=5
- celery control -d worker1@example.com enable_events
- celery control -d w1.e.com,w2.e.com enable_events
- celery control -d w1.e.com add_consumer queue_name
- celery control -d w1.e.com cancel_consumer queue_name
- celery control -d w1.e.com add_consumer queue exchange direct rkey
- """
- name = 'control'
- choices = {
- 'enable_events': (1.0, 'tell worker(s) to enable events'),
- 'disable_events': (1.0, 'tell worker(s) to disable events'),
- 'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
- 'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
- 'rate_limit': (
- 1.0, 'tell worker(s) to modify the rate limit for a task type'),
- 'time_limit': (
- 1.0, 'tell worker(s) to modify the time limit for a task type.'),
- 'autoscale': (1.0, 'change autoscale settings'),
- 'pool_grow': (1.0, 'start more pool processes'),
- 'pool_shrink': (1.0, 'use less pool processes'),
- }
- def call(self, method, *args, **options):
- return getattr(self.app.control, method)(*args, reply=True, **options)
- def pool_grow(self, method, n=1, **kwargs):
- """[N=1]"""
- return self.call(method, int(n), **kwargs)
- def pool_shrink(self, method, n=1, **kwargs):
- """[N=1]"""
- return self.call(method, int(n), **kwargs)
- def autoscale(self, method, max=None, min=None, **kwargs):
- """[max] [min]"""
- return self.call(method, int(max), int(min), **kwargs)
- def rate_limit(self, method, task_name, rate_limit, **kwargs):
- """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
- return self.call(method, task_name, rate_limit, **kwargs)
- def time_limit(self, method, task_name, soft, hard=None, **kwargs):
- """<task_name> <soft_secs> [hard_secs]"""
- return self.call(method, task_name,
- float(soft), float(hard), **kwargs)
- def add_consumer(self, method, queue, exchange=None,
- exchange_type='direct', routing_key=None, **kwargs):
- """<queue> [exchange [type [routing_key]]]"""
- return self.call(method, queue, exchange,
- exchange_type, routing_key, **kwargs)
- def cancel_consumer(self, method, queue, **kwargs):
- """<queue>"""
- return self.call(method, queue, **kwargs)
- class status(Command):
- """Show list of workers that are online."""
- option_list = inspect.option_list
- def run(self, *args, **kwargs):
- I = inspect(
- app=self.app,
- no_color=kwargs.get('no_color', False),
- stdout=self.stdout, stderr=self.stderr,
- show_reply=False, show_body=False, quiet=True,
- )
- replies = I.run('ping', **kwargs)
- if not replies:
- raise self.Error('No nodes replied within time constraint',
- status=EX_UNAVAILABLE)
- nodecount = len(replies)
- if not kwargs.get('quiet', False):
- self.out('\n{0} {1} online.'.format(
- nodecount, text.pluralize(nodecount, 'node')))
- class migrate(Command):
- """Migrate tasks from one broker to another.
- Examples:
- .. code-block:: console
- $ celery migrate redis://localhost amqp://guest@localhost//
- $ celery migrate django:// redis://localhost
- NOTE: This command is experimental, make sure you have
- a backup of the tasks before you continue.
- """
- args = '<source_url> <dest_url>'
- option_list = Command.option_list + (
- Option('--limit', '-n', type='int',
- help='Number of tasks to consume (int)'),
- Option('--timeout', '-t', type='float', default=1.0,
- help='Timeout in seconds (float) waiting for tasks'),
- Option('--ack-messages', '-a', action='store_true',
- help='Ack messages from source broker.'),
- Option('--tasks', '-T',
- help='List of task names to filter on.'),
- Option('--queues', '-Q',
- help='List of queues to migrate.'),
- Option('--forever', '-F', action='store_true',
- help='Continually migrate tasks until killed.'),
- )
- progress_fmt = MIGRATE_PROGRESS_FMT
- def on_migrate_task(self, state, body, message):
- self.out(self.progress_fmt.format(state=state, body=body))
- def run(self, source, destination, **kwargs):
- from kombu import Connection
- from celery.contrib.migrate import migrate_tasks
- migrate_tasks(Connection(source),
- Connection(destination),
- callback=self.on_migrate_task,
- **kwargs)
- class shell(Command):
- """Start shell session with convenient access to celery symbols.
- The following symbols will be added to the main globals:
- - celery: the current application.
- - chord, group, chain, chunks,
- xmap, xstarmap subtask, Task
- - all registered tasks.
- """
- option_list = Command.option_list + (
- Option('--ipython', '-I',
- action='store_true', dest='force_ipython',
- help='force iPython.'),
- Option('--bpython', '-B',
- action='store_true', dest='force_bpython',
- help='force bpython.'),
- Option('--python', '-P',
- action='store_true', dest='force_python',
- help='force default Python shell.'),
- Option('--without-tasks', '-T', action='store_true',
- help="don't add tasks to locals."),
- Option('--eventlet', action='store_true',
- help='use eventlet.'),
- Option('--gevent', action='store_true', help='use gevent.'),
- )
- def run(self, force_ipython=False, force_bpython=False,
- force_python=False, without_tasks=False, eventlet=False,
- gevent=False, **kwargs):
- sys.path.insert(0, os.getcwd())
- if eventlet:
- import_module('celery.concurrency.eventlet')
- if gevent:
- import_module('celery.concurrency.gevent')
- import celery
- import celery.task.base
- self.app.loader.import_default_modules()
- self.locals = {'app': self.app,
- 'celery': self.app,
- 'Task': celery.Task,
- 'chord': celery.chord,
- 'group': celery.group,
- 'chain': celery.chain,
- 'chunks': celery.chunks,
- 'xmap': celery.xmap,
- 'xstarmap': celery.xstarmap,
- 'subtask': celery.subtask,
- 'signature': celery.signature}
- if not without_tasks:
- self.locals.update({
- task.__name__: task for task in values(self.app.tasks)
- if not task.name.startswith('celery.')
- })
- if force_python:
- return self.invoke_fallback_shell()
- elif force_bpython:
- return self.invoke_bpython_shell()
- elif force_ipython:
- return self.invoke_ipython_shell()
- return self.invoke_default_shell()
- def invoke_default_shell(self):
- try:
- import IPython
- except ImportError:
- try:
- import bpython
- except ImportError:
- return self.invoke_fallback_shell()
- else:
- return self.invoke_bpython_shell()
- else:
- return self.invoke_ipython_shell()
- def invoke_fallback_shell(self):
- import code
- try:
- import readline
- except ImportError:
- pass
- else:
- import rlcompleter
- readline.set_completer(
- rlcompleter.Completer(self.locals).complete)
- readline.parse_and_bind('tab:complete')
- code.interact(local=self.locals)
- def invoke_ipython_shell(self):
- for ip in (self._ipython, self._ipython_pre_10,
- self._ipython_terminal, self._ipython_010,
- self._no_ipython):
- try:
- return ip()
- except ImportError:
- pass
- def _ipython(self):
- from IPython import start_ipython
- start_ipython(argv=[], user_ns=self.locals)
- def _ipython_pre_10(self):
- from IPython.frontend.terminal.ipapp import TerminalIPythonApp
- app = TerminalIPythonApp.instance()
- app.initialize(argv=[])
- app.shell.user_ns.update(self.locals)
- app.start()
- def _ipython_terminal(self):
- from IPython.terminal import embed
- embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
- def _ipython_010(self):
- from IPython.Shell import IPShell
- IPShell(argv=[], user_ns=self.locals).mainloop()
- def _no_ipython(self):
- raise ImportError('no suitable ipython found')
- def invoke_bpython_shell(self):
- import bpython
- bpython.embed(self.locals)
- class upgrade(Command):
- """Perform upgrade between versions."""
- option_list = Command.option_list + (
- Option('--django', action='store_true',
- help='Upgrade Django project'),
- Option('--compat', action='store_true',
- help='Maintain backwards compatibility'),
- Option('--no-backup', action='store_true',
- help='Dont backup original files'),
- )
- choices = {'settings'}
- def usage(self, command):
- return '%prog <command> settings [filename] [options]'
- def run(self, *args, **kwargs):
- try:
- command = args[0]
- except IndexError:
- raise self.UsageError('missing upgrade type')
- if command not in self.choices:
- raise self.UsageError('unknown upgrade type: {0}'.format(command))
- return getattr(self, command)(*args, **kwargs)
- def settings(self, command, filename,
- no_backup=False, django=False, compat=False, **kwargs):
- lines = self._slurp(filename) if no_backup else self._backup(filename)
- keyfilter = self._compat_key if django or compat else pass1
- print('processing {0}...'.format(filename), file=self.stderr)
- with codecs.open(filename, 'w', 'utf-8') as write_fh:
- for line in lines:
- write_fh.write(self._to_new_key(line, keyfilter))
- def _slurp(self, filename):
- with codecs.open(filename, 'r', 'utf-8') as read_fh:
- return [line for line in read_fh]
- def _backup(self, filename, suffix='.orig'):
- lines = []
- backup_filename = ''.join([filename, suffix])
- print('writing backup to {0}...'.format(backup_filename),
- file=self.stderr)
- with codecs.open(filename, 'r', 'utf-8') as read_fh:
- with codecs.open(backup_filename, 'w', 'utf-8') as backup_fh:
- for line in read_fh:
- backup_fh.write(line)
- lines.append(line)
- return lines
- def _to_new_key(self, line, keyfilter=pass1, source=defaults._TO_NEW_KEY):
-
-
- for old_key in reversed(sorted(source, key=lambda x: len(x))):
- new_line = line.replace(old_key, keyfilter(source[old_key]))
- if line != new_line:
- return new_line
- return line
- def _compat_key(self, key, namespace='CELERY'):
- key = key.upper()
- if not key.startswith(namespace):
- key = '_'.join([namespace, key])
- return key
- class help(Command):
- """Show help screen and exit."""
- def usage(self, command):
- return '%prog <command> [options] {0.args}'.format(self)
- def run(self, *args, **kwargs):
- self.parser.print_help()
- self.out(HELP.format(
- prog_name=self.prog_name,
- commands=CeleryCommand.list_commands(colored=self.colored),
- ))
- return EX_USAGE
- class report(Command):
- """Shows information useful to include in bug-reports."""
- def run(self, *args, **kwargs):
- self.out(self.app.bugreport())
- return EX_OK
- class CeleryCommand(Command):
- ext_fmt = '{self.namespace}.commands'
- commands = {
- 'amqp': amqp,
- 'beat': beat,
- 'call': call,
- 'control': control,
- 'events': events,
- 'graph': graph,
- 'help': help,
- 'inspect': inspect,
- 'list': list_,
- 'logtool': logtool,
- 'migrate': migrate,
- 'multi': multi,
- 'purge': purge,
- 'report': report,
- 'result': result,
- 'shell': shell,
- 'status': status,
- 'upgrade': upgrade,
- 'worker': worker,
- }
- enable_config_from_cmdline = True
- prog_name = 'celery'
- namespace = 'celery'
- @classmethod
- def register_command(cls, fun, name=None):
- cls.commands[name or fun.__name__] = fun
- return fun
- def execute(self, command, argv=None):
- try:
- cls = self.commands[command]
- except KeyError:
- cls, argv = self.commands['help'], ['help']
- cls = self.commands.get(command) or self.commands['help']
- try:
- return cls(
- app=self.app, on_error=self.on_error,
- no_color=self.no_color, quiet=self.quiet,
- on_usage_error=partial(self.on_usage_error, command=command),
- ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
- except self.UsageError as exc:
- self.on_usage_error(exc)
- return exc.status
- except self.Error as exc:
- self.on_error(exc)
- return exc.status
- def on_usage_error(self, exc, command=None):
- if command:
- helps = '{self.prog_name} {command} --help'
- else:
- helps = '{self.prog_name} --help'
- self.error(self.colored.magenta('Error: {0}'.format(exc)))
- self.error("""Please try '{0}'""".format(helps.format(
- self=self, command=command,
- )))
- def _relocate_args_from_start(self, argv, index=0):
- if argv:
- rest = []
- while index < len(argv):
- value = argv[index]
- if value.startswith('--'):
- rest.append(value)
- elif value.startswith('-'):
-
-
-
-
- try:
- nxt = argv[index + 1]
- if nxt.startswith('-'):
-
- rest.append(value)
- else:
-
- rest.extend([value, nxt])
- index += 1
- except IndexError:
- rest.append(value)
- break
- else:
- break
- index += 1
- if argv[index:]:
-
-
-
- return argv[index:] + rest
-
-
- [rest.pop()] + rest
- return []
- def prepare_prog_name(self, name):
- if name == '__main__.py':
- return sys.modules['__main__'].__file__
- return name
- def handle_argv(self, prog_name, argv):
- self.prog_name = self.prepare_prog_name(prog_name)
- argv = self._relocate_args_from_start(argv)
- _, argv = self.prepare_args(None, argv)
- try:
- command = argv[0]
- except IndexError:
- command, argv = 'help', ['help']
- return self.execute(command, argv)
- def execute_from_commandline(self, argv=None):
- argv = sys.argv if argv is None else argv
- if 'multi' in argv[1:3]:
- self.respects_app_option = False
- try:
- sys.exit(determine_exit_status(
- super(CeleryCommand, self).execute_from_commandline(argv)))
- except KeyboardInterrupt:
- sys.exit(EX_FAILURE)
- @classmethod
- def get_command_info(self, command, indent=0, color=None, colored=None):
- colored = term.colored() if colored is None else colored
- colored = colored.names[color] if color else lambda x: x
- obj = self.commands[command]
- cmd = 'celery {0}'.format(colored(command))
- if obj.leaf:
- return '|' + text.indent(cmd, indent)
- return text.join([
- ' ',
- '|' + text.indent('{0} --help'.format(cmd), indent),
- obj.list_commands(indent, 'celery {0}'.format(command), colored),
- ])
- @classmethod
- def list_commands(self, indent=0, colored=None):
- colored = term.colored() if colored is None else colored
- white = colored.white
- ret = []
- for cls, commands, color in command_classes:
- ret.extend([
- text.indent('+ {0}: '.format(white(cls)), indent),
- '\n'.join(
- self.get_command_info(command, indent + 4, color, colored)
- for command in commands),
- ''
- ])
- return '\n'.join(ret).strip()
- def with_pool_option(self, argv):
- if len(argv) > 1 and 'worker' in argv[0:3]:
-
-
- return (['-P'], ['--pool'])
- def on_concurrency_setup(self):
- self.load_extension_commands()
- def load_extension_commands(self):
- names = Extensions(self.ext_fmt.format(self=self),
- self.register_command).load()
- if names:
- command_classes.append(('Extensions', names, 'magenta'))
- def command(*args, **kwargs):
- """Deprecated: Use classmethod :meth:`CeleryCommand.register_command`
- instead."""
- _register = CeleryCommand.register_command
- return _register(args[0]) if args else _register
- if __name__ == '__main__':
- main()
|