123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534 |
- # -*- coding: utf-8 -*-
- """Worker remote control command implementations."""
- from __future__ import absolute_import, unicode_literals
- import io
- import tempfile
- from collections import namedtuple
- from billiard.common import TERM_SIGNAME
- from kombu.utils.encoding import safe_repr
- from celery.exceptions import WorkerShutdown
- from celery.five import UserDict, items, string_t, text_t
- from celery.platforms import signals as _signals
- from celery.utils import timeutils
- from celery.utils.functional import maybe_list
- from celery.utils.log import get_logger
- from celery.utils.serialization import jsonify, strtobool
- from . import state as worker_state
- from .request import Request
- __all__ = ['Panel']
- DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
- logger = get_logger(__name__)
- controller_info_t = namedtuple('controller_info_t', [
- 'alias', 'type', 'visible', 'default_timeout',
- 'help', 'signature', 'args', 'variadic',
- ])
- def ok(value):
- return {'ok': value}
- def nok(value):
- return {'error': value}
- class Panel(UserDict):
- """Global registry of remote control commands."""
- data = dict() # global dict.
- meta = dict()
- by_alias = dict()
- @classmethod
- def register(cls, *args, **kwargs):
- if args:
- return cls._register(**kwargs)(*args)
- return cls._register(**kwargs)
- @classmethod
- def _register(cls, name=None, alias=None, type='control',
- visible=True, default_timeout=1.0, help=None,
- signature=None, args=None, variadic=None):
- def _inner(fun):
- control_name = name or fun.__name__
- _help = help or (fun.__doc__ or '').strip().split('\n')[0]
- cls.data[control_name] = fun
- cls.meta[control_name] = controller_info_t(
- alias, type, visible, default_timeout,
- _help, signature, args, variadic)
- if alias:
- cls.data[alias] = fun
- return fun
- return _inner
- def control_command(**kwargs):
- return Panel.register(type='control', **kwargs)
- def inspect_command(**kwargs):
- return Panel.register(type='inspect', **kwargs)
- # -- App
- @inspect_command()
- def report(state):
- """Information about Celery installation for bug reports."""
- return ok(state.app.bugreport())
- @inspect_command(
- alias='dump_conf', # XXX < backwards compatible
- signature='[include_defaults=False]',
- args=[('with_defaults', strtobool)],
- )
- def conf(state, with_defaults=False, **kwargs):
- """List configuration."""
- return jsonify(state.app.conf.table(with_defaults=with_defaults),
- keyfilter=_wanted_config_key,
- unknown_type_filter=safe_repr)
- def _wanted_config_key(key):
- return isinstance(key, string_t) and not key.startswith('__')
- # -- Task
- @inspect_command(
- variadic='ids',
- signature='[id1 [id2 [... [idN]]]]',
- )
- def query_task(state, ids, **kwargs):
- """Query for task information by id."""
- return {
- req.id: (_state_of_task(req), req.info())
- for req in _find_requests_by_id(maybe_list(ids))
- }
- def _find_requests_by_id(ids,
- get_request=worker_state.requests.__getitem__):
- for task_id in ids:
- try:
- yield get_request(task_id)
- except KeyError:
- pass
- def _state_of_task(request,
- is_active=worker_state.active_requests.__contains__,
- is_reserved=worker_state.reserved_requests.__contains__):
- if is_active(request):
- return 'active'
- elif is_reserved(request):
- return 'reserved'
- return 'ready'
- @control_command(
- variadic='task_id',
- signature='[id1 [id2 [... [idN]]]]',
- )
- def revoke(state, task_id, terminate=False, signal=None, **kwargs):
- """Revoke task by task id (or list of ids).
- Keyword Arguments:
- terminate (bool): Also terminate the process if the task is active.
- signal (str): Name of signal to use for terminate (e.g., ``KILL``).
- """
- # supports list argument since 3.1
- task_ids, task_id = set(maybe_list(task_id) or []), None
- size = len(task_ids)
- terminated = set()
- worker_state.revoked.update(task_ids)
- if terminate:
- signum = _signals.signum(signal or TERM_SIGNAME)
- for request in _find_requests_by_id(task_ids):
- if request.id not in terminated:
- terminated.add(request.id)
- logger.info('Terminating %s (%s)', request.id, signum)
- request.terminate(state.consumer.pool, signal=signum)
- if len(terminated) >= size:
- break
- if not terminated:
- return ok('terminate: tasks unknown')
- return ok('terminate: {0}'.format(', '.join(terminated)))
- idstr = ', '.join(task_ids)
- logger.info('Tasks flagged as revoked: %s', idstr)
- return ok('tasks {0} flagged as revoked'.format(idstr))
- @control_command(
- variadic='task_id',
- args=[('signal', text_t)],
- signature='<signal> [id1 [id2 [... [idN]]]]'
- )
- def terminate(state, signal, task_id, **kwargs):
- """Terminate task by task id (or list of ids)."""
- return revoke(state, task_id, terminate=True, signal=signal)
- @control_command(
- args=[('task_name', text_t), ('rate_limit', text_t)],
- signature='<task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>',
- )
- def rate_limit(state, task_name, rate_limit, **kwargs):
- """Tell worker(s) to modify the rate limit for a task by type.
- See Also:
- :attr:`celery.task.base.Task.rate_limit`.
- Arguments:
- task_name (str): Type of task to set rate limit for.
- rate_limit (int, str): New rate limit.
- """
- try:
- timeutils.rate(rate_limit)
- except ValueError as exc:
- return nok('Invalid rate limit string: {0!r}'.format(exc))
- try:
- state.app.tasks[task_name].rate_limit = rate_limit
- except KeyError:
- logger.error('Rate limit attempt for unknown task %s',
- task_name, exc_info=True)
- return nok('unknown task')
- state.consumer.reset_rate_limits()
- if not rate_limit:
- logger.info('Rate limits disabled for tasks of type %s', task_name)
- return ok('rate limit disabled successfully')
- logger.info('New rate limit for tasks of type %s: %s.',
- task_name, rate_limit)
- return ok('new rate limit set successfully')
- @control_command(
- args=[('task_name', text_t), ('soft', float), ('hard', float)],
- signature='<task_name> <soft_secs> [hard_secs]',
- )
- def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
- """Tell worker(s) to modify the time limit for task by type.
- Arguments:
- task_name (str): Name of task to change.
- hard (float): Hard time limit.
- soft (float): Soft time limit.
- """
- try:
- task = state.app.tasks[task_name]
- except KeyError:
- logger.error('Change time limit attempt for unknown task %s',
- task_name, exc_info=True)
- return nok('unknown task')
- task.soft_time_limit = soft
- task.time_limit = hard
- logger.info('New time limits for tasks of type %s: soft=%s hard=%s',
- task_name, soft, hard)
- return ok('time limits set successfully')
- # -- Events
- @inspect_command()
- def clock(state, **kwargs):
- """Get current logical clock value."""
- return {'clock': state.app.clock.value}
- @control_command()
- def election(state, id, topic, action=None, **kwargs):
- """Hold election.
- Arguments:
- id (str): Unique election id.
- topic (str): Election topic.
- action (str): Action to take for elected actor.
- """
- if state.consumer.gossip:
- state.consumer.gossip.election(id, topic, action)
- @control_command()
- def enable_events(state):
- """Tell worker(s) to send task-related events."""
- dispatcher = state.consumer.event_dispatcher
- if dispatcher.groups and 'task' not in dispatcher.groups:
- dispatcher.groups.add('task')
- logger.info('Events of group {task} enabled by remote.')
- return ok('task events enabled')
- return ok('task events already enabled')
- @control_command()
- def disable_events(state):
- """Tell worker(s) to stop sending task-related events."""
- dispatcher = state.consumer.event_dispatcher
- if 'task' in dispatcher.groups:
- dispatcher.groups.discard('task')
- logger.info('Events of group {task} disabled by remote.')
- return ok('task events disabled')
- return ok('task events already disabled')
- @control_command()
- def heartbeat(state):
- """Tell worker(s) to send event heartbeat immediately."""
- logger.debug('Heartbeat requested by remote.')
- dispatcher = state.consumer.event_dispatcher
- dispatcher.send('worker-heartbeat', freq=5, **worker_state.SOFTWARE_INFO)
- # -- Worker
- @inspect_command(visible=False)
- def hello(state, from_node, revoked=None, **kwargs):
- """Request mingle sync-data."""
- if from_node != state.hostname:
- logger.info('sync with %s', from_node)
- if revoked:
- worker_state.revoked.update(revoked)
- return {
- 'revoked': worker_state.revoked._data,
- 'clock': state.app.clock.forward(),
- }
- @inspect_command(default_timeout=0.2)
- def ping(state, **kwargs):
- """Ping worker(s)."""
- return ok('pong')
- @inspect_command()
- def stats(state, **kwargs):
- """Request worker statistics/information."""
- return state.consumer.controller.stats()
- @inspect_command(alias='dump_schedule')
- def scheduled(state, **kwargs):
- """List of currently scheduled ETA/countdown tasks."""
- return list(_iter_schedule_requests(state.consumer.timer))
- def _iter_schedule_requests(timer, Request=Request):
- for waiting in timer.schedule.queue:
- try:
- arg0 = waiting.entry.args[0]
- except (IndexError, TypeError):
- continue
- else:
- if isinstance(arg0, Request):
- yield {
- 'eta': arg0.eta.isoformat() if arg0.eta else None,
- 'priority': waiting.priority,
- 'request': arg0.info(),
- }
- @inspect_command(alias='dump_reserved')
- def reserved(state, **kwargs):
- """List of currently reserved tasks, not including scheduled/active."""
- reserved_tasks = (
- state.tset(worker_state.reserved_requests) -
- state.tset(worker_state.active_requests)
- )
- if not reserved_tasks:
- return []
- return [request.info() for request in reserved_tasks]
- @inspect_command(alias='dump_active')
- def active(state, **kwargs):
- """List of tasks currently being executed."""
- return [request.info()
- for request in state.tset(worker_state.active_requests)]
- @inspect_command(alias='dump_revoked')
- def revoked(state, **kwargs):
- """List of revoked task-ids."""
- return list(worker_state.revoked)
- @inspect_command(
- alias='dump_tasks',
- variadic='taskinfoitems',
- signature='[attr1 [attr2 [... [attrN]]]]',
- )
- def registered(state, taskinfoitems=None, builtins=False, **kwargs):
- """List of registered tasks.
- Arguments:
- taskinfoitems (Sequence[str]): List of task attributes to include.
- Defaults to ``exchange,routing_key,rate_limit``.
- builtins (bool): Also include built-in tasks.
- """
- reg = state.app.tasks
- taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
- tasks = reg if builtins else (
- task for task in reg if not task.startswith('celery.'))
- def _extract_info(task):
- fields = {
- field: str(getattr(task, field, None)) for field in taskinfoitems
- if getattr(task, field, None) is not None
- }
- if fields:
- info = ['='.join(f) for f in items(fields)]
- return '{0} [{1}]'.format(task.name, ' '.join(info))
- return task.name
- return [_extract_info(reg[task]) for task in sorted(tasks)]
- # -- Debugging
- @inspect_command(
- default_timeout=60.0,
- args=[('type', text_t), ('num', int), ('max_depth', int)],
- signature='[object_type=Request] [num=200 [max_depth=10]]',
- )
- def objgraph(state, num=200, max_depth=10, type='Request'): # pragma: no cover
- """Create graph of uncollected objects (memory-leak debugging).
- Arguments:
- num (int): Max number of objects to graph.
- max_depth (int): Traverse at most n levels deep.
- type (str): Name of object to graph. Default is ``"Request"``.
- """
- try:
- import objgraph
- except ImportError:
- raise ImportError('Requires the objgraph library')
- logger.info('Dumping graph for type %r', type)
- with tempfile.NamedTemporaryFile(prefix='cobjg',
- suffix='.png', delete=False) as fh:
- objects = objgraph.by_type(type)[:num]
- objgraph.show_backrefs(
- objects,
- max_depth=max_depth, highlight=lambda v: v in objects,
- filename=fh.name,
- )
- return {'filename': fh.name}
- @inspect_command()
- def memsample(state, **kwargs):
- """Sample current RSS memory usage."""
- from celery.utils.debug import sample_mem
- return sample_mem()
- @inspect_command(
- args=[('samples', int)],
- signature='[n_samples=10]',
- )
- def memdump(state, samples=10, **kwargs): # pragma: no cover
- """Dump statistics of previous memsample requests."""
- from celery.utils.debug import memdump
- out = io.StringIO()
- memdump(file=out)
- return out.getvalue()
- # -- Pool
- @control_command(
- args=[('n', int)],
- signature='[N=1]',
- )
- def pool_grow(state, n=1, **kwargs):
- """Grow pool by n processes/threads."""
- state.consumer.pool.grow(n)
- state.consumer._update_prefetch_count(n)
- return ok('pool will grow')
- @control_command(
- args=[('n', int)],
- signature='[N=1]',
- )
- def pool_shrink(state, n=1, **kwargs):
- """Shrink pool by n processes/threads."""
- state.consumer.pool.shrink(n)
- state.consumer._update_prefetch_count(-n)
- return ok('pool will shrink')
- @control_command()
- def pool_restart(state, modules=None, reload=False, reloader=None, **kwargs):
- """Restart execution pool."""
- if state.app.conf.worker_pool_restarts:
- state.consumer.controller.reload(modules, reload, reloader=reloader)
- return ok('reload started')
- else:
- raise ValueError('Pool restarts not enabled')
- @control_command()
- def shutdown(state, msg='Got shutdown from remote', **kwargs):
- """Shutdown worker(s)."""
- logger.warning(msg)
- raise WorkerShutdown(msg)
- # -- Queues
- @control_command(
- args=[
- ('queue', text_t),
- ('exchange', text_t),
- ('exchange_type', text_t),
- ('routing_key', text_t),
- ],
- signature='<queue> [exchange [type [routing_key]]]',
- )
- def add_consumer(state, queue, exchange=None, exchange_type=None,
- routing_key=None, **options):
- """Tell worker(s) to consume from task queue by name."""
- state.consumer.call_soon(
- state.consumer.add_task_queue,
- queue, exchange, exchange_type or 'direct', routing_key, **options)
- return ok('add consumer {0}'.format(queue))
- @control_command(
- args=[('queue', text_t)],
- signature='<queue>',
- )
- def cancel_consumer(state, queue, **_):
- """Tell worker(s) to stop consuming from task queue by name."""
- state.consumer.call_soon(
- state.consumer.cancel_task_queue, queue,
- )
- return ok('no longer consuming from {0}'.format(queue))
- @inspect_command()
- def active_queues(state):
- """List the task queues a worker are currently consuming from."""
- if state.consumer.task_consumer:
- return [dict(queue.as_dict(recurse=True))
- for queue in state.consumer.task_consumer.queues]
- return []
|