control.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.control
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Remote control commands.
  6. """
  7. from __future__ import absolute_import
  8. import io
  9. import tempfile
  10. from kombu.utils.encoding import safe_repr
  11. from celery.exceptions import WorkerShutdown
  12. from celery.five import UserDict, items, string_t
  13. from celery.platforms import signals as _signals
  14. from celery.utils import timeutils
  15. from celery.utils.functional import maybe_list
  16. from celery.utils.log import get_logger
  17. from celery.utils import jsonify
  18. from . import state as worker_state
  19. from .state import revoked
  20. from .job import Request
  21. __all__ = ['Panel']
  22. DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
  23. logger = get_logger(__name__)
  24. class Panel(UserDict):
  25. data = dict() # Global registry.
  26. @classmethod
  27. def register(cls, method, name=None):
  28. cls.data[name or method.__name__] = method
  29. return method
  30. def _find_requests_by_id(ids, requests):
  31. found, total = 0, len(ids)
  32. for request in requests:
  33. if request.id in ids:
  34. yield request
  35. found += 1
  36. if found >= total:
  37. break
  38. @Panel.register
  39. def query_task(state, ids, **kwargs):
  40. ids = maybe_list(ids)
  41. def reqinfo(state, req):
  42. return state, req.info()
  43. reqs = dict((req.id, ('reserved', req.info()))
  44. for req in _find_requests_by_id(
  45. ids, worker_state.reserved_requests))
  46. reqs.update(dict(
  47. (req.id, ('active', req.info()))
  48. for req in _find_requests_by_id(
  49. ids, worker_state.active_requests,
  50. )
  51. ))
  52. return reqs
  53. @Panel.register
  54. def revoke(state, task_id, terminate=False, signal=None, **kwargs):
  55. """Revoke task by task id."""
  56. # supports list argument since 3.1
  57. task_ids, task_id = set(maybe_list(task_id) or []), None
  58. size = len(task_ids)
  59. terminated = set()
  60. revoked.update(task_ids)
  61. if terminate:
  62. signum = _signals.signum(signal or 'TERM')
  63. # reserved_requests changes size during iteration
  64. # so need to consume the items first, then terminate after.
  65. requests = set(_find_requests_by_id(
  66. task_ids,
  67. worker_state.reserved_requests,
  68. ))
  69. for request in requests:
  70. if request.id not in terminated:
  71. terminated.add(request.id)
  72. logger.info('Terminating %s (%s)', request.id, signum)
  73. request.terminate(state.consumer.pool, signal=signum)
  74. if len(terminated) >= size:
  75. break
  76. if not terminated:
  77. return {'ok': 'terminate: tasks unknown'}
  78. return {'ok': 'terminate: {0}'.format(', '.join(terminated))}
  79. idstr = ', '.join(task_ids)
  80. logger.info('Tasks flagged as revoked: %s', idstr)
  81. return {'ok': 'tasks {0} flagged as revoked'.format(idstr)}
  82. @Panel.register
  83. def report(state):
  84. return {'ok': state.app.bugreport()}
  85. @Panel.register
  86. def enable_events(state):
  87. dispatcher = state.consumer.event_dispatcher
  88. if 'task' not in dispatcher.groups:
  89. dispatcher.groups.add('task')
  90. logger.info('Events of group {task} enabled by remote.')
  91. return {'ok': 'task events enabled'}
  92. return {'ok': 'task events already enabled'}
  93. @Panel.register
  94. def disable_events(state):
  95. dispatcher = state.consumer.event_dispatcher
  96. if 'task' in dispatcher.groups:
  97. dispatcher.groups.discard('task')
  98. logger.info('Events of group {task} disabled by remote.')
  99. return {'ok': 'task events disabled'}
  100. return {'ok': 'task events already disabled'}
  101. @Panel.register
  102. def heartbeat(state):
  103. logger.debug('Heartbeat requested by remote.')
  104. dispatcher = state.consumer.event_dispatcher
  105. dispatcher.send('worker-heartbeat', freq=5, **worker_state.SOFTWARE_INFO)
  106. @Panel.register
  107. def rate_limit(state, task_name, rate_limit, **kwargs):
  108. """Set new rate limit for a task type.
  109. See :attr:`celery.task.base.Task.rate_limit`.
  110. :param task_name: Type of task.
  111. :param rate_limit: New rate limit.
  112. """
  113. try:
  114. timeutils.rate(rate_limit)
  115. except ValueError as exc:
  116. return {'error': 'Invalid rate limit string: {0!r}'.format(exc)}
  117. try:
  118. state.app.tasks[task_name].rate_limit = rate_limit
  119. except KeyError:
  120. logger.error('Rate limit attempt for unknown task %s',
  121. task_name, exc_info=True)
  122. return {'error': 'unknown task'}
  123. state.consumer.reset_rate_limits()
  124. if not rate_limit:
  125. logger.info('Rate limits disabled for tasks of type %s', task_name)
  126. return {'ok': 'rate limit disabled successfully'}
  127. logger.info('New rate limit for tasks of type %s: %s.',
  128. task_name, rate_limit)
  129. return {'ok': 'new rate limit set successfully'}
  130. @Panel.register
  131. def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
  132. try:
  133. task = state.app.tasks[task_name]
  134. except KeyError:
  135. logger.error('Change time limit attempt for unknown task %s',
  136. task_name, exc_info=True)
  137. return {'error': 'unknown task'}
  138. task.soft_time_limit = soft
  139. task.time_limit = hard
  140. logger.info('New time limits for tasks of type %s: soft=%s hard=%s',
  141. task_name, soft, hard)
  142. return {'ok': 'time limits set successfully'}
  143. @Panel.register
  144. def dump_schedule(state, safe=False, **kwargs):
  145. def prepare_entries():
  146. for waiting in state.consumer.timer.schedule.queue:
  147. try:
  148. arg0 = waiting.entry.args[0]
  149. except (IndexError, TypeError):
  150. continue
  151. else:
  152. if isinstance(arg0, Request):
  153. yield {'eta': arg0.eta.isoformat() if arg0.eta else None,
  154. 'priority': waiting.priority,
  155. 'request': arg0.info(safe=safe)}
  156. return list(prepare_entries())
  157. @Panel.register
  158. def dump_reserved(state, safe=False, **kwargs):
  159. reserved = worker_state.reserved_requests - worker_state.active_requests
  160. if not reserved:
  161. return []
  162. return [request.info(safe=safe) for request in reserved]
  163. @Panel.register
  164. def dump_active(state, safe=False, **kwargs):
  165. return [request.info(safe=safe)
  166. for request in worker_state.active_requests]
  167. @Panel.register
  168. def stats(state, **kwargs):
  169. return state.consumer.controller.stats()
  170. @Panel.register
  171. def objgraph(state, num=200, max_depth=10, type='Request'): # pragma: no cover
  172. try:
  173. import objgraph
  174. except ImportError:
  175. raise ImportError('Requires the objgraph library')
  176. print('Dumping graph for type %r' % (type, ))
  177. with tempfile.NamedTemporaryFile(prefix='cobjg',
  178. suffix='.png', delete=False) as fh:
  179. objects = objgraph.by_type(type)[:num]
  180. objgraph.show_backrefs(
  181. objects,
  182. max_depth=max_depth, highlight=lambda v: v in objects,
  183. filename=fh.name,
  184. )
  185. return {'filename': fh.name}
  186. @Panel.register
  187. def memsample(state, **kwargs): # pragma: no cover
  188. from celery.utils.debug import sample_mem
  189. return sample_mem()
  190. @Panel.register
  191. def memdump(state, samples=10, **kwargs): # pragma: no cover
  192. from celery.utils.debug import memdump
  193. out = io.StringIO()
  194. memdump(file=out)
  195. return out.getvalue()
  196. @Panel.register
  197. def clock(state, **kwargs):
  198. return {'clock': state.app.clock.value}
  199. @Panel.register
  200. def dump_revoked(state, **kwargs):
  201. return list(worker_state.revoked)
  202. @Panel.register
  203. def hello(state, from_node, revoked=None, **kwargs):
  204. if from_node != state.hostname:
  205. logger.info('sync with %s', from_node)
  206. if revoked:
  207. worker_state.revoked.update(revoked)
  208. return {'revoked': worker_state.revoked._data,
  209. 'clock': state.app.clock.forward()}
  210. @Panel.register
  211. def dump_tasks(state, taskinfoitems=None, builtins=False, **kwargs):
  212. reg = state.app.tasks
  213. taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
  214. tasks = reg if builtins else (
  215. task for task in reg if not task.startswith('celery.'))
  216. def _extract_info(task):
  217. fields = dict((field, str(getattr(task, field, None)))
  218. for field in taskinfoitems
  219. if getattr(task, field, None) is not None)
  220. if fields:
  221. info = ['='.join(f) for f in items(fields)]
  222. return '{0} [{1}]'.format(task.name, ' '.join(info))
  223. return task.name
  224. return [_extract_info(reg[task]) for task in sorted(tasks)]
  225. @Panel.register
  226. def ping(state, **kwargs):
  227. return {'ok': 'pong'}
  228. @Panel.register
  229. def pool_grow(state, n=1, **kwargs):
  230. if state.consumer.controller.autoscaler:
  231. state.consumer.controller.autoscaler.force_scale_up(n)
  232. else:
  233. state.consumer.pool.grow(n)
  234. state.consumer._update_prefetch_count(n)
  235. return {'ok': 'pool will grow'}
  236. @Panel.register
  237. def pool_shrink(state, n=1, **kwargs):
  238. if state.consumer.controller.autoscaler:
  239. state.consumer.controller.autoscaler.force_scale_down(n)
  240. else:
  241. state.consumer.pool.shrink(n)
  242. state.consumer._update_prefetch_count(-n)
  243. return {'ok': 'pool will shrink'}
  244. @Panel.register
  245. def pool_restart(state, modules=None, reload=False, reloader=None, **kwargs):
  246. if state.app.conf.CELERYD_POOL_RESTARTS:
  247. state.consumer.controller.reload(modules, reload, reloader=reloader)
  248. return {'ok': 'reload started'}
  249. else:
  250. raise ValueError('Pool restarts not enabled')
  251. @Panel.register
  252. def autoscale(state, max=None, min=None):
  253. autoscaler = state.consumer.controller.autoscaler
  254. if autoscaler:
  255. max_, min_ = autoscaler.update(max, min)
  256. return {'ok': 'autoscale now min={0} max={1}'.format(max_, min_)}
  257. raise ValueError('Autoscale not enabled')
  258. @Panel.register
  259. def shutdown(state, msg='Got shutdown from remote', **kwargs):
  260. logger.warning(msg)
  261. raise WorkerShutdown(msg)
  262. @Panel.register
  263. def add_consumer(state, queue, exchange=None, exchange_type=None,
  264. routing_key=None, **options):
  265. state.consumer.add_task_queue(queue, exchange, exchange_type,
  266. routing_key, **options)
  267. return {'ok': 'add consumer {0}'.format(queue)}
  268. @Panel.register
  269. def cancel_consumer(state, queue=None, **_):
  270. state.consumer.cancel_task_queue(queue)
  271. return {'ok': 'no longer consuming from {0}'.format(queue)}
  272. @Panel.register
  273. def active_queues(state):
  274. """Return information about the queues a worker consumes from."""
  275. if state.consumer.task_consumer:
  276. return [dict(queue.as_dict(recurse=True))
  277. for queue in state.consumer.task_consumer.queues]
  278. return []
  279. def _wanted_config_key(key):
  280. return (isinstance(key, string_t) and
  281. key.isupper() and
  282. not key.startswith('__'))
  283. @Panel.register
  284. def dump_conf(state, with_defaults=False, **kwargs):
  285. return jsonify(state.app.conf.table(with_defaults=with_defaults),
  286. keyfilter=_wanted_config_key,
  287. unknown_type_filter=safe_repr)
  288. @Panel.register
  289. def election(state, id, topic, action=None, **kwargs):
  290. if state.consumer.gossip:
  291. state.consumer.gossip.election(id, topic, action)