control.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.control
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Remote control commands.
  6. """
  7. from __future__ import absolute_import
  8. import logging
  9. import os
  10. from datetime import datetime
  11. from kombu.utils.encoding import safe_repr
  12. from celery.platforms import signals as _signals
  13. from celery.utils import timeutils
  14. from celery.utils.compat import UserDict
  15. from celery.utils.log import get_logger
  16. from celery.utils import jsonify
  17. from . import state
  18. from .state import revoked
  19. DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
  20. logger = get_logger(__name__)
  21. class Panel(UserDict):
  22. data = dict() # Global registry.
  23. @classmethod
  24. def register(cls, method, name=None):
  25. cls.data[name or method.__name__] = method
  26. return method
  27. @Panel.register
  28. def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
  29. """Revoke task by task id."""
  30. revoked.add(task_id)
  31. if terminate:
  32. signum = _signals.signum(signal or 'TERM')
  33. for request in state.reserved_requests:
  34. if request.id == task_id:
  35. logger.info('Terminating %s (%s)', task_id, signum)
  36. request.terminate(panel.consumer.pool, signal=signum)
  37. break
  38. else:
  39. return {'ok': 'terminate: task %s not found' % (task_id, )}
  40. return {'ok': 'terminating %s (%s)' % (task_id, signal)}
  41. logger.info('Revoking task %s', task_id)
  42. return {'ok': 'revoking task %s' % (task_id, )}
  43. @Panel.register
  44. def report(panel):
  45. return {'ok': panel.app.bugreport()}
  46. @Panel.register
  47. def enable_events(panel):
  48. dispatcher = panel.consumer.event_dispatcher
  49. if not dispatcher.enabled:
  50. dispatcher.enable()
  51. dispatcher.send('worker-online')
  52. logger.info('Events enabled by remote.')
  53. return {'ok': 'events enabled'}
  54. return {'ok': 'events already enabled'}
  55. @Panel.register
  56. def disable_events(panel):
  57. dispatcher = panel.consumer.event_dispatcher
  58. if dispatcher.enabled:
  59. dispatcher.send('worker-offline')
  60. dispatcher.disable()
  61. logger.info('Events disabled by remote.')
  62. return {'ok': 'events disabled'}
  63. return {'ok': 'events already disabled'}
  64. @Panel.register
  65. def heartbeat(panel):
  66. logger.debug('Heartbeat requested by remote.')
  67. dispatcher = panel.consumer.event_dispatcher
  68. dispatcher.send('worker-heartbeat', freq=5, **state.SOFTWARE_INFO)
  69. @Panel.register
  70. def rate_limit(panel, task_name, rate_limit, **kwargs):
  71. """Set new rate limit for a task type.
  72. See :attr:`celery.task.base.Task.rate_limit`.
  73. :param task_name: Type of task.
  74. :param rate_limit: New rate limit.
  75. """
  76. try:
  77. timeutils.rate(rate_limit)
  78. except ValueError, exc:
  79. return {'error': 'Invalid rate limit string: %s' % exc}
  80. try:
  81. panel.app.tasks[task_name].rate_limit = rate_limit
  82. except KeyError:
  83. logger.error('Rate limit attempt for unknown task %s',
  84. task_name, exc_info=True)
  85. return {'error': 'unknown task'}
  86. if not hasattr(panel.consumer.ready_queue, 'refresh'):
  87. logger.error('Rate limit attempt, but rate limits disabled.')
  88. return {'error': 'rate limits disabled'}
  89. panel.consumer.ready_queue.refresh()
  90. if not rate_limit:
  91. logger.info('Rate limits disabled for tasks of type %s', task_name)
  92. return {'ok': 'rate limit disabled successfully'}
  93. logger.info('New rate limit for tasks of type %s: %s.',
  94. task_name, rate_limit)
  95. return {'ok': 'new rate limit set successfully'}
  96. @Panel.register
  97. def time_limit(panel, task_name=None, hard=None, soft=None, **kwargs):
  98. try:
  99. task = panel.app.tasks[task_name]
  100. except KeyError:
  101. logger.error('Change time limit attempt for unknown task %s',
  102. task_name, exc_info=True)
  103. return {'error': 'unknown task'}
  104. task.soft_time_limit = soft
  105. task.time_limit = hard
  106. logger.info('New time limits for tasks of type %s: soft=%s hard=%s',
  107. task_name, soft, hard)
  108. return {'ok': 'time limits set successfully'}
  109. @Panel.register
  110. def dump_schedule(panel, safe=False, **kwargs):
  111. from celery.worker.job import Request
  112. schedule = panel.consumer.timer.schedule
  113. if not schedule.queue:
  114. logger.debug('--Empty schedule--')
  115. return []
  116. formatitem = lambda i, item: '%s. %s pri%s %r' % (
  117. i, datetime.utcfromtimestamp(item['eta']),
  118. item['priority'], item['item'],
  119. )
  120. if logger.isEnabledFor(logging.DEBUG):
  121. logger.debug('* Dump of current schedule:\n%s', '\n'.join(
  122. formatitem(i, item) for i, item in enumerate(schedule.info())
  123. ))
  124. scheduled_tasks = []
  125. for info in schedule.info():
  126. item = info['item']
  127. if item.args and isinstance(item.args[0], Request):
  128. scheduled_tasks.append({
  129. 'eta': info['eta'],
  130. 'priority': info['priority'],
  131. 'request': item.args[0].info(safe=safe),
  132. })
  133. return scheduled_tasks
  134. @Panel.register
  135. def dump_reserved(panel, safe=False, **kwargs):
  136. reserved = state.reserved_requests
  137. if not reserved:
  138. logger.debug('--Empty queue--')
  139. return []
  140. if logger.isEnabledFor(logging.DEBUG):
  141. logger.debug('* Dump of currently reserved tasks:\n%s',
  142. '\n'.join(safe_repr(r) for r in reserved))
  143. return [request.info(safe=safe) for request in reserved]
  144. @Panel.register
  145. def dump_active(panel, safe=False, **kwargs):
  146. return [request.info(safe=safe) for request in state.active_requests]
  147. @Panel.register
  148. def stats(panel, **kwargs):
  149. asinfo = {}
  150. if panel.consumer.controller.autoscaler:
  151. asinfo = panel.consumer.controller.autoscaler.info()
  152. return {'total': state.total_count,
  153. 'consumer': panel.consumer.info,
  154. 'pool': panel.consumer.pool.info,
  155. 'autoscaler': asinfo,
  156. 'pid': os.getpid()}
  157. @Panel.register
  158. def dump_revoked(panel, **kwargs):
  159. return list(state.revoked)
  160. @Panel.register
  161. def dump_tasks(panel, taskinfoitems=None, **kwargs):
  162. tasks = panel.app.tasks
  163. taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
  164. def _extract_info(task):
  165. fields = dict(
  166. (field, str(getattr(task, field, None)))
  167. for field in taskinfoitems
  168. if getattr(task, field, None) is not None)
  169. info = ['='.join(f) for f in fields.items()]
  170. if not info:
  171. return task.name
  172. return '%s [%s]' % (task.name, ' '.join(info))
  173. info = [_extract_info(tasks[task]) for task in sorted(tasks)]
  174. if logger.isEnabledFor(logging.DEBUG):
  175. logger.debug('* Dump of currently registered tasks:\n%s',
  176. '\n'.join(info))
  177. return info
  178. @Panel.register
  179. def ping(panel, **kwargs):
  180. return 'pong'
  181. @Panel.register
  182. def pool_grow(panel, n=1, **kwargs):
  183. if panel.consumer.controller.autoscaler:
  184. panel.consumer.controller.autoscaler.force_scale_up(n)
  185. else:
  186. panel.consumer.pool.grow(n)
  187. return {'ok': 'spawned worker processes'}
  188. @Panel.register
  189. def pool_shrink(panel, n=1, **kwargs):
  190. if panel.consumer.controller.autoscaler:
  191. panel.consumer.controller.autoscaler.force_scale_down(n)
  192. else:
  193. panel.consumer.pool.shrink(n)
  194. return {'ok': 'terminated worker processes'}
  195. @Panel.register
  196. def pool_restart(panel, modules=None, reload=False, reloader=None, **kwargs):
  197. panel.consumer.controller.reload(modules, reload, reloader=reloader)
  198. return {'ok': 'reload started'}
  199. @Panel.register
  200. def autoscale(panel, max=None, min=None):
  201. autoscaler = panel.consumer.controller.autoscaler
  202. if autoscaler:
  203. max_, min_ = autoscaler.update(max, min)
  204. return {'ok': 'autoscale now min=%r max=%r' % (max_, min_)}
  205. raise ValueError('Autoscale not enabled')
  206. @Panel.register
  207. def shutdown(panel, msg='Got shutdown from remote', **kwargs):
  208. logger.warning(msg)
  209. raise SystemExit(msg)
  210. @Panel.register
  211. def add_consumer(panel, queue, exchange=None, exchange_type=None,
  212. routing_key=None, **options):
  213. panel.consumer.add_task_queue(queue, exchange, exchange_type,
  214. routing_key, **options)
  215. return {'ok': 'add consumer %r' % (queue, )}
  216. @Panel.register
  217. def cancel_consumer(panel, queue=None, **_):
  218. panel.consumer.cancel_task_queue(queue)
  219. return {'ok': 'no longer consuming from %s' % (queue, )}
  220. @Panel.register
  221. def active_queues(panel):
  222. """Returns the queues associated with each worker."""
  223. return [dict(queue.as_dict(recurse=True))
  224. for queue in panel.consumer.task_consumer.queues]
  225. @Panel.register
  226. def dump_conf(panel, **kwargs):
  227. return jsonify(dict(panel.app.conf))