control.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. # -*- coding: utf-8 -*-
  2. """Worker remote control command implementations."""
  3. from __future__ import absolute_import, unicode_literals
  4. import io
  5. import tempfile
  6. from collections import namedtuple
  7. from billiard.common import TERM_SIGNAME
  8. from kombu.utils.encoding import safe_repr
  9. from celery.exceptions import WorkerShutdown
  10. from celery.five import UserDict, items, string_t, text_t
  11. from celery.platforms import signals as _signals
  12. from celery.utils.functional import maybe_list
  13. from celery.utils.log import get_logger
  14. from celery.utils.serialization import jsonify, strtobool
  15. from celery.utils.time import rate
  16. from . import state as worker_state
  17. from .request import Request
  18. __all__ = ('Panel',)
  19. DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
  20. logger = get_logger(__name__)
  21. controller_info_t = namedtuple('controller_info_t', [
  22. 'alias', 'type', 'visible', 'default_timeout',
  23. 'help', 'signature', 'args', 'variadic',
  24. ])
  25. def ok(value):
  26. return {'ok': value}
  27. def nok(value):
  28. return {'error': value}
  29. class Panel(UserDict):
  30. """Global registry of remote control commands."""
  31. data = {} # global dict.
  32. meta = {} # -"-
  33. @classmethod
  34. def register(cls, *args, **kwargs):
  35. if args:
  36. return cls._register(**kwargs)(*args)
  37. return cls._register(**kwargs)
  38. @classmethod
  39. def _register(cls, name=None, alias=None, type='control',
  40. visible=True, default_timeout=1.0, help=None,
  41. signature=None, args=None, variadic=None):
  42. def _inner(fun):
  43. control_name = name or fun.__name__
  44. _help = help or (fun.__doc__ or '').strip().split('\n')[0]
  45. cls.data[control_name] = fun
  46. cls.meta[control_name] = controller_info_t(
  47. alias, type, visible, default_timeout,
  48. _help, signature, args, variadic)
  49. if alias:
  50. cls.data[alias] = fun
  51. return fun
  52. return _inner
  53. def control_command(**kwargs):
  54. return Panel.register(type='control', **kwargs)
  55. def inspect_command(**kwargs):
  56. return Panel.register(type='inspect', **kwargs)
  57. # -- App
  58. @inspect_command()
  59. def report(state):
  60. """Information about Celery installation for bug reports."""
  61. return ok(state.app.bugreport())
  62. @inspect_command(
  63. alias='dump_conf', # XXX < backwards compatible
  64. signature='[include_defaults=False]',
  65. args=[('with_defaults', strtobool)],
  66. )
  67. def conf(state, with_defaults=False, **kwargs):
  68. """List configuration."""
  69. return jsonify(state.app.conf.table(with_defaults=with_defaults),
  70. keyfilter=_wanted_config_key,
  71. unknown_type_filter=safe_repr)
  72. def _wanted_config_key(key):
  73. return isinstance(key, string_t) and not key.startswith('__')
  74. # -- Task
  75. @inspect_command(
  76. variadic='ids',
  77. signature='[id1 [id2 [... [idN]]]]',
  78. )
  79. def query_task(state, ids, **kwargs):
  80. """Query for task information by id."""
  81. return {
  82. req.id: (_state_of_task(req), req.info())
  83. for req in _find_requests_by_id(maybe_list(ids))
  84. }
  85. def _find_requests_by_id(ids,
  86. get_request=worker_state.requests.__getitem__):
  87. for task_id in ids:
  88. try:
  89. yield get_request(task_id)
  90. except KeyError:
  91. pass
  92. def _state_of_task(request,
  93. is_active=worker_state.active_requests.__contains__,
  94. is_reserved=worker_state.reserved_requests.__contains__):
  95. if is_active(request):
  96. return 'active'
  97. elif is_reserved(request):
  98. return 'reserved'
  99. return 'ready'
  100. @control_command(
  101. variadic='task_id',
  102. signature='[id1 [id2 [... [idN]]]]',
  103. )
  104. def revoke(state, task_id, terminate=False, signal=None, **kwargs):
  105. """Revoke task by task id (or list of ids).
  106. Keyword Arguments:
  107. terminate (bool): Also terminate the process if the task is active.
  108. signal (str): Name of signal to use for terminate (e.g., ``KILL``).
  109. """
  110. # pylint: disable=redefined-outer-name
  111. # XXX Note that this redefines `terminate`:
  112. # Outside of this scope that is a function.
  113. # supports list argument since 3.1
  114. task_ids, task_id = set(maybe_list(task_id) or []), None
  115. size = len(task_ids)
  116. terminated = set()
  117. worker_state.revoked.update(task_ids)
  118. if terminate:
  119. signum = _signals.signum(signal or TERM_SIGNAME)
  120. for request in _find_requests_by_id(task_ids):
  121. if request.id not in terminated:
  122. terminated.add(request.id)
  123. logger.info('Terminating %s (%s)', request.id, signum)
  124. request.terminate(state.consumer.pool, signal=signum)
  125. if len(terminated) >= size:
  126. break
  127. if not terminated:
  128. return ok('terminate: tasks unknown')
  129. return ok('terminate: {0}'.format(', '.join(terminated)))
  130. idstr = ', '.join(task_ids)
  131. logger.info('Tasks flagged as revoked: %s', idstr)
  132. return ok('tasks {0} flagged as revoked'.format(idstr))
  133. @control_command(
  134. variadic='task_id',
  135. args=[('signal', text_t)],
  136. signature='<signal> [id1 [id2 [... [idN]]]]'
  137. )
  138. def terminate(state, signal, task_id, **kwargs):
  139. """Terminate task by task id (or list of ids)."""
  140. return revoke(state, task_id, terminate=True, signal=signal)
  141. @control_command(
  142. args=[('task_name', text_t), ('rate_limit', text_t)],
  143. signature='<task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>',
  144. )
  145. def rate_limit(state, task_name, rate_limit, **kwargs):
  146. """Tell worker(s) to modify the rate limit for a task by type.
  147. See Also:
  148. :attr:`celery.task.base.Task.rate_limit`.
  149. Arguments:
  150. task_name (str): Type of task to set rate limit for.
  151. rate_limit (int, str): New rate limit.
  152. """
  153. # pylint: disable=redefined-outer-name
  154. # XXX Note that this redefines `terminate`:
  155. # Outside of this scope that is a function.
  156. try:
  157. rate(rate_limit)
  158. except ValueError as exc:
  159. return nok('Invalid rate limit string: {0!r}'.format(exc))
  160. try:
  161. state.app.tasks[task_name].rate_limit = rate_limit
  162. except KeyError:
  163. logger.error('Rate limit attempt for unknown task %s',
  164. task_name, exc_info=True)
  165. return nok('unknown task')
  166. state.consumer.reset_rate_limits()
  167. if not rate_limit:
  168. logger.info('Rate limits disabled for tasks of type %s', task_name)
  169. return ok('rate limit disabled successfully')
  170. logger.info('New rate limit for tasks of type %s: %s.',
  171. task_name, rate_limit)
  172. return ok('new rate limit set successfully')
  173. @control_command(
  174. args=[('task_name', text_t), ('soft', float), ('hard', float)],
  175. signature='<task_name> <soft_secs> [hard_secs]',
  176. )
  177. def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
  178. """Tell worker(s) to modify the time limit for task by type.
  179. Arguments:
  180. task_name (str): Name of task to change.
  181. hard (float): Hard time limit.
  182. soft (float): Soft time limit.
  183. """
  184. try:
  185. task = state.app.tasks[task_name]
  186. except KeyError:
  187. logger.error('Change time limit attempt for unknown task %s',
  188. task_name, exc_info=True)
  189. return nok('unknown task')
  190. task.soft_time_limit = soft
  191. task.time_limit = hard
  192. logger.info('New time limits for tasks of type %s: soft=%s hard=%s',
  193. task_name, soft, hard)
  194. return ok('time limits set successfully')
  195. # -- Events
  196. @inspect_command()
  197. def clock(state, **kwargs):
  198. """Get current logical clock value."""
  199. return {'clock': state.app.clock.value}
  200. @control_command()
  201. def election(state, id, topic, action=None, **kwargs):
  202. """Hold election.
  203. Arguments:
  204. id (str): Unique election id.
  205. topic (str): Election topic.
  206. action (str): Action to take for elected actor.
  207. """
  208. if state.consumer.gossip:
  209. state.consumer.gossip.election(id, topic, action)
  210. @control_command()
  211. def enable_events(state):
  212. """Tell worker(s) to send task-related events."""
  213. dispatcher = state.consumer.event_dispatcher
  214. if dispatcher.groups and 'task' not in dispatcher.groups:
  215. dispatcher.groups.add('task')
  216. logger.info('Events of group {task} enabled by remote.')
  217. return ok('task events enabled')
  218. return ok('task events already enabled')
  219. @control_command()
  220. def disable_events(state):
  221. """Tell worker(s) to stop sending task-related events."""
  222. dispatcher = state.consumer.event_dispatcher
  223. if 'task' in dispatcher.groups:
  224. dispatcher.groups.discard('task')
  225. logger.info('Events of group {task} disabled by remote.')
  226. return ok('task events disabled')
  227. return ok('task events already disabled')
  228. @control_command()
  229. def heartbeat(state):
  230. """Tell worker(s) to send event heartbeat immediately."""
  231. logger.debug('Heartbeat requested by remote.')
  232. dispatcher = state.consumer.event_dispatcher
  233. dispatcher.send('worker-heartbeat', freq=5, **worker_state.SOFTWARE_INFO)
  234. # -- Worker
  235. @inspect_command(visible=False)
  236. def hello(state, from_node, revoked=None, **kwargs):
  237. """Request mingle sync-data."""
  238. # pylint: disable=redefined-outer-name
  239. # XXX Note that this redefines `revoked`:
  240. # Outside of this scope that is a function.
  241. if from_node != state.hostname:
  242. logger.info('sync with %s', from_node)
  243. if revoked:
  244. worker_state.revoked.update(revoked)
  245. return {
  246. 'revoked': worker_state.revoked._data,
  247. 'clock': state.app.clock.forward(),
  248. }
  249. @inspect_command(default_timeout=0.2)
  250. def ping(state, **kwargs):
  251. """Ping worker(s)."""
  252. return ok('pong')
  253. @inspect_command()
  254. def stats(state, **kwargs):
  255. """Request worker statistics/information."""
  256. return state.consumer.controller.stats()
  257. @inspect_command(alias='dump_schedule')
  258. def scheduled(state, **kwargs):
  259. """List of currently scheduled ETA/countdown tasks."""
  260. return list(_iter_schedule_requests(state.consumer.timer))
  261. def _iter_schedule_requests(timer):
  262. for waiting in timer.schedule.queue:
  263. try:
  264. arg0 = waiting.entry.args[0]
  265. except (IndexError, TypeError):
  266. continue
  267. else:
  268. if isinstance(arg0, Request):
  269. yield {
  270. 'eta': arg0.eta.isoformat() if arg0.eta else None,
  271. 'priority': waiting.priority,
  272. 'request': arg0.info(),
  273. }
  274. @inspect_command(alias='dump_reserved')
  275. def reserved(state, **kwargs):
  276. """List of currently reserved tasks, not including scheduled/active."""
  277. reserved_tasks = (
  278. state.tset(worker_state.reserved_requests) -
  279. state.tset(worker_state.active_requests)
  280. )
  281. if not reserved_tasks:
  282. return []
  283. return [request.info() for request in reserved_tasks]
  284. @inspect_command(alias='dump_active')
  285. def active(state, **kwargs):
  286. """List of tasks currently being executed."""
  287. return [request.info()
  288. for request in state.tset(worker_state.active_requests)]
  289. @inspect_command(alias='dump_revoked')
  290. def revoked(state, **kwargs):
  291. """List of revoked task-ids."""
  292. return list(worker_state.revoked)
  293. @inspect_command(
  294. alias='dump_tasks',
  295. variadic='taskinfoitems',
  296. signature='[attr1 [attr2 [... [attrN]]]]',
  297. )
  298. def registered(state, taskinfoitems=None, builtins=False, **kwargs):
  299. """List of registered tasks.
  300. Arguments:
  301. taskinfoitems (Sequence[str]): List of task attributes to include.
  302. Defaults to ``exchange,routing_key,rate_limit``.
  303. builtins (bool): Also include built-in tasks.
  304. """
  305. reg = state.app.tasks
  306. taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
  307. tasks = reg if builtins else (
  308. task for task in reg if not task.startswith('celery.'))
  309. def _extract_info(task):
  310. fields = {
  311. field: str(getattr(task, field, None)) for field in taskinfoitems
  312. if getattr(task, field, None) is not None
  313. }
  314. if fields:
  315. info = ['='.join(f) for f in items(fields)]
  316. return '{0} [{1}]'.format(task.name, ' '.join(info))
  317. return task.name
  318. return [_extract_info(reg[task]) for task in sorted(tasks)]
  319. # -- Debugging
  320. @inspect_command(
  321. default_timeout=60.0,
  322. args=[('type', text_t), ('num', int), ('max_depth', int)],
  323. signature='[object_type=Request] [num=200 [max_depth=10]]',
  324. )
  325. def objgraph(state, num=200, max_depth=10, type='Request'): # pragma: no cover
  326. """Create graph of uncollected objects (memory-leak debugging).
  327. Arguments:
  328. num (int): Max number of objects to graph.
  329. max_depth (int): Traverse at most n levels deep.
  330. type (str): Name of object to graph. Default is ``"Request"``.
  331. """
  332. try:
  333. import objgraph as _objgraph
  334. except ImportError:
  335. raise ImportError('Requires the objgraph library')
  336. logger.info('Dumping graph for type %r', type)
  337. with tempfile.NamedTemporaryFile(prefix='cobjg',
  338. suffix='.png', delete=False) as fh:
  339. objects = _objgraph.by_type(type)[:num]
  340. _objgraph.show_backrefs(
  341. objects,
  342. max_depth=max_depth, highlight=lambda v: v in objects,
  343. filename=fh.name,
  344. )
  345. return {'filename': fh.name}
  346. @inspect_command()
  347. def memsample(state, **kwargs):
  348. """Sample current RSS memory usage."""
  349. from celery.utils.debug import sample_mem
  350. return sample_mem()
  351. @inspect_command(
  352. args=[('samples', int)],
  353. signature='[n_samples=10]',
  354. )
  355. def memdump(state, samples=10, **kwargs): # pragma: no cover
  356. """Dump statistics of previous memsample requests."""
  357. from celery.utils import debug
  358. out = io.StringIO()
  359. debug.memdump(file=out)
  360. return out.getvalue()
  361. # -- Pool
  362. @control_command(
  363. args=[('n', int)],
  364. signature='[N=1]',
  365. )
  366. def pool_grow(state, n=1, **kwargs):
  367. """Grow pool by n processes/threads."""
  368. if state.consumer.controller.autoscaler:
  369. state.consumer.controller.autoscaler.force_scale_up(n)
  370. else:
  371. state.consumer.pool.grow(n)
  372. state.consumer._update_prefetch_count(n)
  373. return ok('pool will grow')
  374. @control_command(
  375. args=[('n', int)],
  376. signature='[N=1]',
  377. )
  378. def pool_shrink(state, n=1, **kwargs):
  379. """Shrink pool by n processes/threads."""
  380. if state.consumer.controller.autoscaler:
  381. state.consumer.controller.autoscaler.force_scale_down(n)
  382. else:
  383. state.consumer.pool.shrink(n)
  384. state.consumer._update_prefetch_count(-n)
  385. return ok('pool will shrink')
  386. @control_command()
  387. def pool_restart(state, modules=None, reload=False, reloader=None, **kwargs):
  388. """Restart execution pool."""
  389. if state.app.conf.worker_pool_restarts:
  390. state.consumer.controller.reload(modules, reload, reloader=reloader)
  391. return ok('reload started')
  392. else:
  393. raise ValueError('Pool restarts not enabled')
  394. @control_command(
  395. args=[('max', int), ('min', int)],
  396. signature='[max [min]]',
  397. )
  398. def autoscale(state, max=None, min=None):
  399. """Modify autoscale settings."""
  400. autoscaler = state.consumer.controller.autoscaler
  401. if autoscaler:
  402. max_, min_ = autoscaler.update(max, min)
  403. return ok('autoscale now max={0} min={1}'.format(max_, min_))
  404. raise ValueError('Autoscale not enabled')
  405. @control_command()
  406. def shutdown(state, msg='Got shutdown from remote', **kwargs):
  407. """Shutdown worker(s)."""
  408. logger.warning(msg)
  409. raise WorkerShutdown(msg)
  410. # -- Queues
  411. @control_command(
  412. args=[
  413. ('queue', text_t),
  414. ('exchange', text_t),
  415. ('exchange_type', text_t),
  416. ('routing_key', text_t),
  417. ],
  418. signature='<queue> [exchange [type [routing_key]]]',
  419. )
  420. def add_consumer(state, queue, exchange=None, exchange_type=None,
  421. routing_key=None, **options):
  422. """Tell worker(s) to consume from task queue by name."""
  423. state.consumer.call_soon(
  424. state.consumer.add_task_queue,
  425. queue, exchange, exchange_type or 'direct', routing_key, **options)
  426. return ok('add consumer {0}'.format(queue))
  427. @control_command(
  428. args=[('queue', text_t)],
  429. signature='<queue>',
  430. )
  431. def cancel_consumer(state, queue, **_):
  432. """Tell worker(s) to stop consuming from task queue by name."""
  433. state.consumer.call_soon(
  434. state.consumer.cancel_task_queue, queue,
  435. )
  436. return ok('no longer consuming from {0}'.format(queue))
  437. @inspect_command()
  438. def active_queues(state):
  439. """List the task queues a worker is currently consuming from."""
  440. if state.consumer.task_consumer:
  441. return [dict(queue.as_dict(recurse=True))
  442. for queue in state.consumer.task_consumer.queues]
  443. return []