@@ -5,26 +5,32 @@ from __future__ import absolute_import, unicode_literals
import io
import tempfile
+from collections import namedtuple
from billiard.common import TERM_SIGNAME
from kombu.utils.encoding import safe_repr
from celery.exceptions import WorkerShutdown
-from celery.five import UserDict, items, string_t
+from celery.five import UserDict, items, string_t, text_t
from celery.platforms import signals as _signals
from celery.utils import timeutils
from celery.utils.functional import maybe_list
from celery.utils.log import get_logger
-from celery.utils.serialization import jsonify
+from celery.utils.serialization import jsonify, strtobool
from . import state as worker_state
from .request import Request
-from .state import revoked
__all__ = ['Panel']
DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
logger = get_logger(__name__)
+controller_info_t = namedtuple('controller_info_t', [
+ 'alias', 'type', 'visible', 'default_timeout',
+ 'help', 'signature', 'args', 'variadic',
def ok(value):
return {'ok': value}
@@ -37,22 +43,56 @@ def nok(value):
class Panel(UserDict):
"""Global registry of remote control commands."""
data = dict() # global dict.
+ meta = dict()
+ by_alias = dict()
+ @classmethod
+ def register(cls, *args, **kwargs):
+ if args:
+ return cls._register(**kwargs)(*args)
+ return cls._register(**kwargs)
- def register(cls, method, name=None):
- cls.data[name or method.__name__] = method
- return method
+ def _register(cls, name=None, alias=None, type='control',
+ visible=True, default_timeout=1.0, help=None,
+ signature=None, args=None, variadic=None):
+ def _inner(fun):
+ control_name = name or fun.__name__
+ _help = help or (fun.__doc__ or '').strip().split('\n')[0]
+ cls.data[control_name] = fun
+ cls.meta[control_name] = controller_info_t(
+ alias, type, visible, default_timeout,
+ _help, signature, args, variadic)
+ if alias:
+ cls.data[alias] = fun
+ return fun
+ return _inner
+def control_command(**kwargs):
+ return Panel.register(type='control', **kwargs)
+def inspect_command(**kwargs):
+ return Panel.register(type='inspect', **kwargs)
# -- App
def report(state):
+ """Information about Celery installation for bug reports."""
return ok(state.app.bugreport())
-def dump_conf(state, with_defaults=False, **kwargs):
+ alias='dump_conf', # XXX < backwards compatible
+ signature='[include_defaults=False]',
+ args=[('with_defaults', strtobool)],
+def conf(state, with_defaults=False, **kwargs):
+ """List configuration."""
return jsonify(state.app.conf.table(with_defaults=with_defaults),
@@ -64,7 +104,10 @@ def _wanted_config_key(key):
# -- Task
+ variadic='ids',
+ signature='[id1 [id2 [... [idN]]]]',
def query_task(state, ids, **kwargs):
"""Query for task information by id."""
return {
@@ -92,7 +135,10 @@ def _state_of_task(request,
return 'ready'
+ variadic='task_id',
+ signature='[id1 [id2 [... [idN]]]]',
def revoke(state, task_id, terminate=False, signal=None, **kwargs):
"""Revoke task by task id (or list of ids).
@@ -105,7 +151,7 @@ def revoke(state, task_id, terminate=False, signal=None, **kwargs):
size = len(task_ids)
terminated = set()
- revoked.update(task_ids)
+ worker_state.revoked.update(task_ids)
if terminate:
signum = _signals.signum(signal or TERM_SIGNAME)
for request in _find_requests_by_id(task_ids):
@@ -125,10 +171,22 @@ def revoke(state, task_id, terminate=False, signal=None, **kwargs):
return ok('tasks {0} flagged as revoked'.format(idstr))
+ variadic='task_id',
+ args=[('signal', text_t)],
+ signature='<signal> [id1 [id2 [... [idN]]]]'
+def terminate(state, signal, task_id, **kwargs):
+ """Terminate task by task id (or list of ids)."""
+ return revoke(state, task_id, terminate=True, signal=signal)
+ args=[('task_name', text_t), ('rate_limit', text_t)],
+ signature='<task_name> <rate_limit (e.g. 5/s | 5/m | 5/h)>',
def rate_limit(state, task_name, rate_limit, **kwargs):
- """Tell worker(s) to modify the rate limit for a task by type."""
- """Set new rate limit for a task type.
+ """Tell worker(s) to modify the rate limit for a task by type.
See Also:
@@ -137,7 +195,6 @@ def rate_limit(state, task_name, rate_limit, **kwargs):
task_name (str): Type of task to set rate limit for.
rate_limit (int, str): New rate limit.
except ValueError as exc:
@@ -161,8 +218,18 @@ def rate_limit(state, task_name, rate_limit, **kwargs):
return ok('new rate limit set successfully')
+ args=[('task_name', text_t), ('soft', float), ('hard', float)],
+ signature='<task_name> <soft_secs> [hard_secs]',
def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
+ """Tell worker(s) to modify the time limit for task by type.
+ Arguments:
+ task_name (str): Name of task to change.
+ hard (float): Hard time limit.
+ soft (float): Soft time limit.
+ """
task = state.app.tasks[task_name]
except KeyError:
@@ -181,19 +248,28 @@ def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
# -- Events
def clock(state, **kwargs):
+ """Get current logical clock value."""
return {'clock': state.app.clock.value}
def election(state, id, topic, action=None, **kwargs):
+ """Hold election.
+ Arguments:
+ id (str): Unique election id.
+ topic (str): Election topic.
+ action (str): Action to take for elected actor.
+ """
if state.consumer.gossip:
state.consumer.gossip.election(id, topic, action)
def enable_events(state):
+ """Tell worker(s) to send task-related events."""
dispatcher = state.consumer.event_dispatcher
if dispatcher.groups and 'task' not in dispatcher.groups:
@@ -202,8 +278,9 @@ def enable_events(state):
return ok('task events already enabled')
def disable_events(state):
+ """Tell worker(s) to stop sending task-related events."""
dispatcher = state.consumer.event_dispatcher
if 'task' in dispatcher.groups:
@@ -212,8 +289,9 @@ def disable_events(state):
return ok('task events already disabled')
def heartbeat(state):
+ """Tell worker(s) to send event heartbeat immediately."""
logger.debug('Heartbeat requested by remote.')
dispatcher = state.consumer.event_dispatcher
dispatcher.send('worker-heartbeat', freq=5, **worker_state.SOFTWARE_INFO)
@@ -221,8 +299,9 @@ def heartbeat(state):
# -- Worker
def hello(state, from_node, revoked=None, **kwargs):
+ """Request mingle sync-data."""
if from_node != state.hostname:
logger.info('sync with %s', from_node)
if revoked:
@@ -233,22 +312,25 @@ def hello(state, from_node, revoked=None, **kwargs):
def ping(state, **kwargs):
+ """Ping worker(s)."""
return ok('pong')
def stats(state, **kwargs):
+ """Request worker statistics/information."""
return state.consumer.controller.stats()
-def dump_schedule(state, safe=False, **kwargs):
- return list(_iter_schedule_requests(state.consumer.timer, safe=safe))
+def scheduled(state, **kwargs):
+ """List of currently scheduled ETA/countdown tasks."""
+ return list(_iter_schedule_requests(state.consumer.timer))
-def _iter_schedule_requests(timer, safe=False, Request=Request):
+def _iter_schedule_requests(timer, Request=Request):
for waiting in timer.schedule.queue:
arg0 = waiting.entry.args[0]
@@ -259,34 +341,48 @@ def _iter_schedule_requests(timer, safe=False, Request=Request):
yield {
'eta': arg0.eta.isoformat() if arg0.eta else None,
'priority': waiting.priority,
- 'request': arg0.info(safe=safe),
+ 'request': arg0.info(),
-def dump_reserved(state, safe=False, **kwargs):
- reserved = (
+def reserved(state, **kwargs):
+ """List of currently reserved tasks (not including scheduled/active)."""
+ reserved_tasks = (
state.tset(worker_state.reserved_requests) -
- if not reserved:
+ if not reserved_tasks:
return []
- return [request.info(safe=safe) for request in reserved]
+ return [request.info() for request in reserved_tasks]
-def dump_active(state, safe=False, **kwargs):
- return [request.info(safe=safe)
+def active(state, **kwargs):
+ """List of tasks currently being executed."""
+ return [request.info()
for request in state.tset(worker_state.active_requests)]
-def dump_revoked(state, **kwargs):
+def revoked(state, **kwargs):
+ """List of revoked task-ids."""
return list(worker_state.revoked)
-def dump_tasks(state, taskinfoitems=None, builtins=False, **kwargs):
+ alias='dump_tasks',
+ variadic='taskinfoitems',
+ signature='[attr1 [attr2 [... [attrN]]]]',
+def registered(state, taskinfoitems=None, builtins=False, **kwargs):
+ """List of registered tasks.
+ Arguments:
+ taskinfoitems (Sequence[str]): List of task attributes to include.
+ Defaults to ``exchange,routing_key,rate_limit``.
+ builtins (bool): Also include built-in tasks.
+ """
reg = state.app.tasks
taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
@@ -308,8 +404,19 @@ def dump_tasks(state, taskinfoitems=None, builtins=False, **kwargs):
# -- Debugging
+ default_timeout=60.0,
+ args=[('type', text_t), ('num', int), ('max_depth', int)],
+ signature='[object_type=Request] [num=200 [max_depth=10]]',
def objgraph(state, num=200, max_depth=10, type='Request'): # pragma: no cover
+ """Create graph of uncollected objects (memory-leak debugging).
+ Arguments:
+ num (int): Max number of objects to graph.
+ max_depth (int): Traverse at most n levels deep.
+ type (str): Name of object to graph. Default is ``"Request"``.
+ """
import objgraph
except ImportError:
@@ -326,14 +433,19 @@ def objgraph(state, num=200, max_depth=10, type='Request'): # pragma: no cover
return {'filename': fh.name}
-def memsample(state, **kwargs): # pragma: no cover
+def memsample(state, **kwargs):
+ """Sample current RSS memory usage."""
from celery.utils.debug import sample_mem
return sample_mem()
+ args=[('samples', int)],
+ signature='[n_samples=10]',
def memdump(state, samples=10, **kwargs): # pragma: no cover
+ """Dump statistics of previous memsample requests."""
from celery.utils.debug import memdump
out = io.StringIO()
@@ -342,22 +454,31 @@ def memdump(state, samples=10, **kwargs): # pragma: no cover
# -- Pool
+ args=[('n', int)],
+ signature='[N=1]',
def pool_grow(state, n=1, **kwargs):
+ """Grow pool by n processes/threads."""
return ok('pool will grow')
+ args=[('n', int)],
+ signature='[N=1]',
def pool_shrink(state, n=1, **kwargs):
+ """Shrink pool by n processes/threads."""
return ok('pool will shrink')
def pool_restart(state, modules=None, reload=False, reloader=None, **kwargs):
+ """Restart execution pool."""
if state.app.conf.worker_pool_restarts:
state.consumer.controller.reload(modules, reload, reloader=reloader)
return ok('reload started')
@@ -365,35 +486,48 @@ def pool_restart(state, modules=None, reload=False, reloader=None, **kwargs):
raise ValueError('Pool restarts not enabled')
def shutdown(state, msg='Got shutdown from remote', **kwargs):
+ """Shutdown worker(s)."""
raise WorkerShutdown(msg)
# -- Queues
+ args=[
+ ('queue', text_t),
+ ('exchange', text_t),
+ ('exchange_type', text_t),
+ ('routing_key', text_t),
+ ],
+ signature='<queue> [exchange [type [routing_key]]]',
def add_consumer(state, queue, exchange=None, exchange_type=None,
routing_key=None, **options):
+ """Tell worker(s) to consume from task queue by name."""
- queue, exchange, exchange_type, routing_key, **options
- )
+ queue, exchange, exchange_type or 'direct', routing_key, **options)
return ok('add consumer {0}'.format(queue))
-def cancel_consumer(state, queue=None, **_):
+ args=[('queue', text_t)],
+ signature='<queue>',
+def cancel_consumer(state, queue, **_):
+ """Tell worker(s) to stop consuming from task queue by name."""
state.consumer.cancel_task_queue, queue,
return ok('no longer consuming from {0}'.format(queue))
def active_queues(state):
- """Return information about the queues a worker consumes from."""
+ """List the task queues a worker are currently consuming from."""
if state.consumer.task_consumer:
return [dict(queue.as_dict(recurse=True))
for queue in state.consumer.task_consumer.queues]