control.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.control
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Remote control commands.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. from kombu.utils.encoding import safe_repr
  10. from celery.five import UserDict, items
  11. from celery.platforms import signals as _signals
  12. from celery.utils import timeutils
  13. from celery.utils.log import get_logger
  14. from celery.utils import jsonify
  15. from . import state
  16. from .state import revoked
  17. DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
  18. logger = get_logger(__name__)
  19. class Panel(UserDict):
  20. data = dict() # Global registry.
  21. @classmethod
  22. def register(cls, method, name=None):
  23. cls.data[name or method.__name__] = method
  24. return method
  25. @Panel.register
  26. def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
  27. """Revoke task by task id."""
  28. revoked.add(task_id)
  29. if terminate:
  30. signum = _signals.signum(signal or 'TERM')
  31. for request in state.reserved_requests:
  32. if request.id == task_id:
  33. logger.info('Terminating %s (%s)', task_id, signum)
  34. request.terminate(panel.consumer.pool, signal=signum)
  35. break
  36. else:
  37. return {'ok': 'terminate: task {0} not found'.format(task_id)}
  38. return {'ok': 'terminating {0} ({1})'.format(task_id, signal)}
  39. logger.info('Revoking task %s', task_id)
  40. return {'ok': 'revoking task {0}'.format(task_id)}
  41. @Panel.register
  42. def report(panel):
  43. return {'ok': panel.app.bugreport()}
  44. @Panel.register
  45. def enable_events(panel):
  46. dispatcher = panel.consumer.event_dispatcher
  47. if 'task' not in dispatcher.groups:
  48. dispatcher.groups.add('task')
  49. logger.info('Events of group {task} enabled by remote.')
  50. return {'ok': 'task events enabled'}
  51. return {'ok': 'task events already enabled'}
  52. @Panel.register
  53. def disable_events(panel):
  54. dispatcher = panel.consumer.event_dispatcher
  55. if 'task' in dispatcher.groups:
  56. dispatcher.groups.discard('task')
  57. logger.info('Events of group {task} disabled by remote.')
  58. return {'ok': 'task events disabled'}
  59. return {'ok': 'task events already disabled'}
  60. @Panel.register
  61. def heartbeat(panel):
  62. logger.debug('Heartbeat requested by remote.')
  63. dispatcher = panel.consumer.event_dispatcher
  64. dispatcher.send('worker-heartbeat', freq=5, **state.SOFTWARE_INFO)
  65. @Panel.register
  66. def rate_limit(panel, task_name, rate_limit, **kwargs):
  67. """Set new rate limit for a task type.
  68. See :attr:`celery.task.base.Task.rate_limit`.
  69. :param task_name: Type of task.
  70. :param rate_limit: New rate limit.
  71. """
  72. try:
  73. timeutils.rate(rate_limit)
  74. except ValueError as exc:
  75. return {'error': 'Invalid rate limit string: {0!r}'.format(exc)}
  76. try:
  77. panel.app.tasks[task_name].rate_limit = rate_limit
  78. except KeyError:
  79. logger.error('Rate limit attempt for unknown task %s',
  80. task_name, exc_info=True)
  81. return {'error': 'unknown task'}
  82. if not hasattr(panel.consumer.ready_queue, 'refresh'):
  83. logger.error('Rate limit attempt, but rate limits disabled.')
  84. return {'error': 'rate limits disabled'}
  85. panel.consumer.ready_queue.refresh()
  86. if not rate_limit:
  87. logger.info('Rate limits disabled for tasks of type %s', task_name)
  88. return {'ok': 'rate limit disabled successfully'}
  89. logger.info('New rate limit for tasks of type %s: %s.',
  90. task_name, rate_limit)
  91. return {'ok': 'new rate limit set successfully'}
  92. @Panel.register
  93. def time_limit(panel, task_name=None, hard=None, soft=None, **kwargs):
  94. try:
  95. task = panel.app.tasks[task_name]
  96. except KeyError:
  97. logger.error('Change time limit attempt for unknown task %s',
  98. task_name, exc_info=True)
  99. return {'error': 'unknown task'}
  100. task.soft_time_limit = soft
  101. task.time_limit = hard
  102. logger.info('New time limits for tasks of type %s: soft=%s hard=%s',
  103. task_name, soft, hard)
  104. return {'ok': 'time limits set successfully'}
  105. @Panel.register
  106. def dump_schedule(panel, safe=False, **kwargs):
  107. from celery.worker.job import Request
  108. schedule = panel.consumer.timer.schedule
  109. if not schedule.queue:
  110. return []
  111. def prepare_entries():
  112. for entry in schedule.info():
  113. item = entry['item']
  114. if item.args and isinstance(item.args[0], Request):
  115. yield {'eta': entry['eta'],
  116. 'priority': entry['priority'],
  117. 'request': item.args[0].info(safe=safe)}
  118. return list(prepare_entries())
  119. @Panel.register
  120. def dump_reserved(panel, safe=False, **kwargs):
  121. reserved = state.reserved_requests
  122. if not reserved:
  123. logger.debug('--Empty queue--')
  124. return []
  125. logger.debug('* Dump of currently reserved tasks:\n%s',
  126. '\n'.join(safe_repr(id) for id in reserved))
  127. return [request.info(safe=safe)
  128. for request in reserved]
  129. @Panel.register
  130. def dump_active(panel, safe=False, **kwargs):
  131. return [request.info(safe=safe)
  132. for request in state.active_requests]
  133. @Panel.register
  134. def stats(panel, **kwargs):
  135. return panel.consumer.controller.stats()
  136. @Panel.register
  137. def clock(panel, **kwargs):
  138. return {'clock': panel.app.clock.value}
  139. @Panel.register
  140. def dump_revoked(panel, **kwargs):
  141. return list(state.revoked)
  142. @Panel.register
  143. def hello(panel, **kwargs):
  144. return {'revoked': state.revoked._data, 'clock': panel.app.clock.forward()}
  145. @Panel.register
  146. def dump_tasks(panel, taskinfoitems=None, **kwargs):
  147. tasks = panel.app.tasks
  148. taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
  149. def _extract_info(task):
  150. fields = dict((field, str(getattr(task, field, None)))
  151. for field in taskinfoitems
  152. if getattr(task, field, None) is not None)
  153. if fields:
  154. info = ['='.join(f) for f in items(fields)]
  155. return '{0} [{1}]'.format(task.name, ' '.join(info))
  156. return task.name
  157. return [_extract_info(tasks[task]) for task in sorted(tasks)]
  158. @Panel.register
  159. def ping(panel, **kwargs):
  160. return {'ok': 'pong'}
  161. @Panel.register
  162. def pool_grow(panel, n=1, **kwargs):
  163. if panel.consumer.controller.autoscaler:
  164. panel.consumer.controller.autoscaler.force_scale_up(n)
  165. else:
  166. panel.consumer.pool.grow(n)
  167. return {'ok': 'spawned worker processes'}
  168. @Panel.register
  169. def pool_shrink(panel, n=1, **kwargs):
  170. if panel.consumer.controller.autoscaler:
  171. panel.consumer.controller.autoscaler.force_scale_down(n)
  172. else:
  173. panel.consumer.pool.shrink(n)
  174. return {'ok': 'terminated worker processes'}
  175. @Panel.register
  176. def pool_restart(panel, modules=None, reload=False, reloader=None, **kwargs):
  177. panel.consumer.controller.reload(modules, reload, reloader=reloader)
  178. return {'ok': 'reload started'}
  179. @Panel.register
  180. def autoscale(panel, max=None, min=None):
  181. autoscaler = panel.consumer.controller.autoscaler
  182. if autoscaler:
  183. max_, min_ = autoscaler.update(max, min)
  184. return {'ok': 'autoscale now min={0} max={1}'.format(max_, min_)}
  185. raise ValueError('Autoscale not enabled')
  186. @Panel.register
  187. def shutdown(panel, msg='Got shutdown from remote', **kwargs):
  188. logger.warning(msg)
  189. raise SystemExit(msg)
  190. @Panel.register
  191. def add_consumer(panel, queue, exchange=None, exchange_type=None,
  192. routing_key=None, **options):
  193. panel.consumer.add_task_queue(queue, exchange, exchange_type,
  194. routing_key, **options)
  195. return {'ok': 'add consumer {0}'.format(queue)}
  196. @Panel.register
  197. def cancel_consumer(panel, queue=None, **_):
  198. panel.consumer.cancel_task_queue(queue)
  199. return {'ok': 'no longer consuming from {0}'.format(queue)}
  200. @Panel.register
  201. def active_queues(panel):
  202. """Returns the queues associated with each worker."""
  203. return [dict(queue.as_dict(recurse=True))
  204. for queue in panel.consumer.task_consumer.queues]
  205. @Panel.register
  206. def dump_conf(panel, **kwargs):
  207. return jsonify(dict(panel.app.conf))
  208. @Panel.register
  209. def election(panel, id, topic, action=None, **kwargs):
  210. panel.consumer.gossip.election(id, topic, action)