control.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.control
  4. ~~~~~~~~~~~~~~~~~~~
  5. Client for worker remote control commands.
  6. Server implementation is in :mod:`celery.worker.control`.
  7. """
  8. from __future__ import absolute_import, unicode_literals
  9. import warnings
  10. from billiard.common import TERM_SIGNAME
  11. from kombu.pidbox import Mailbox
  12. from kombu.utils import cached_property
  13. from kombu.utils.functional import lazy
  14. from celery.exceptions import DuplicateNodenameWarning
  15. from celery.utils.text import pluralize
  16. __all__ = ['Inspect', 'Control', 'flatten_reply']
  17. W_DUPNODE = """\
  18. Received multiple replies from node {0}: {1}.
  19. Please make sure you give each node a unique nodename using
  20. the celery worker `-n` option.\
  21. """
  22. def flatten_reply(reply):
  23. nodes, dupes = {}, set()
  24. for item in reply:
  25. [dupes.add(name) for name in item if name in nodes]
  26. nodes.update(item)
  27. if dupes:
  28. warnings.warn(DuplicateNodenameWarning(
  29. W_DUPNODE.format(
  30. pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
  31. ),
  32. ))
  33. return nodes
  34. class Inspect(object):
  35. app = None
  36. def __init__(self, destination=None, timeout=1, callback=None,
  37. connection=None, app=None, limit=None):
  38. self.app = app or self.app
  39. self.destination = destination
  40. self.timeout = timeout
  41. self.callback = callback
  42. self.connection = connection
  43. self.limit = limit
  44. def _prepare(self, reply):
  45. if reply:
  46. by_node = flatten_reply(reply)
  47. if (self.destination and
  48. not isinstance(self.destination, (list, tuple))):
  49. return by_node.get(self.destination)
  50. return by_node
  51. def _request(self, command, **kwargs):
  52. return self._prepare(self.app.control.broadcast(
  53. command,
  54. arguments=kwargs,
  55. destination=self.destination,
  56. callback=self.callback,
  57. connection=self.connection,
  58. limit=self.limit,
  59. timeout=self.timeout, reply=True,
  60. ))
  61. def report(self):
  62. return self._request('report')
  63. def clock(self):
  64. return self._request('clock')
  65. def active(self, safe=False):
  66. return self._request('dump_active', safe=safe)
  67. def scheduled(self, safe=False):
  68. return self._request('dump_schedule', safe=safe)
  69. def reserved(self, safe=False):
  70. return self._request('dump_reserved', safe=safe)
  71. def stats(self):
  72. return self._request('stats')
  73. def revoked(self):
  74. return self._request('dump_revoked')
  75. def registered(self, *taskinfoitems):
  76. return self._request('dump_tasks', taskinfoitems=taskinfoitems)
  77. registered_tasks = registered
  78. def ping(self):
  79. return self._request('ping')
  80. def active_queues(self):
  81. return self._request('active_queues')
  82. def query_task(self, ids):
  83. return self._request('query_task', ids=ids)
  84. def conf(self, with_defaults=False):
  85. return self._request('dump_conf', with_defaults=with_defaults)
  86. def hello(self, from_node, revoked=None):
  87. return self._request('hello', from_node=from_node, revoked=revoked)
  88. def memsample(self):
  89. return self._request('memsample')
  90. def memdump(self, samples=10):
  91. return self._request('memdump', samples=samples)
  92. def objgraph(self, type='Request', n=200, max_depth=10):
  93. return self._request('objgraph', num=n, max_depth=max_depth, type=type)
  94. class Control(object):
  95. Mailbox = Mailbox
  96. def __init__(self, app=None):
  97. self.app = app
  98. self.mailbox = self.Mailbox(
  99. 'celery',
  100. type='fanout',
  101. accept=['json'],
  102. producer_pool=lazy(lambda: self.app.amqp.producer_pool),
  103. )
  104. @cached_property
  105. def inspect(self):
  106. return self.app.subclass_with_self(Inspect, reverse='control.inspect')
  107. def purge(self, connection=None):
  108. """Discard all waiting tasks.
  109. This will ignore all tasks waiting for execution, and they will
  110. be deleted from the messaging server.
  111. :returns: the number of tasks discarded.
  112. """
  113. with self.app.connection_or_acquire(connection) as conn:
  114. return self.app.amqp.TaskConsumer(conn).purge()
  115. discard_all = purge
  116. def election(self, id, topic, action=None, connection=None):
  117. self.broadcast('election', connection=connection, arguments={
  118. 'id': id, 'topic': topic, 'action': action,
  119. })
  120. def revoke(self, task_id, destination=None, terminate=False,
  121. signal=TERM_SIGNAME, **kwargs):
  122. """Tell all (or specific) workers to revoke a task by id.
  123. If a task is revoked, the workers will ignore the task and
  124. not execute it after all.
  125. :param task_id: Id of the task to revoke.
  126. :keyword terminate: Also terminate the process currently working
  127. on the task (if any).
  128. :keyword signal: Name of signal to send to process if terminate.
  129. Default is TERM.
  130. See :meth:`broadcast` for supported keyword arguments.
  131. """
  132. return self.broadcast('revoke', destination=destination,
  133. arguments={'task_id': task_id,
  134. 'terminate': terminate,
  135. 'signal': signal}, **kwargs)
  136. def ping(self, destination=None, timeout=1, **kwargs):
  137. """Ping all (or specific) workers.
  138. Will return the list of answers.
  139. See :meth:`broadcast` for supported keyword arguments.
  140. """
  141. return self.broadcast('ping', reply=True, destination=destination,
  142. timeout=timeout, **kwargs)
  143. def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
  144. """Tell all (or specific) workers to set a new rate limit
  145. for task by type.
  146. :param task_name: Name of task to change rate limit for.
  147. :param rate_limit: The rate limit as tasks per second, or a rate limit
  148. string (`'100/m'`, etc.
  149. see :attr:`celery.task.base.Task.rate_limit` for
  150. more information).
  151. See :meth:`broadcast` for supported keyword arguments.
  152. """
  153. return self.broadcast('rate_limit', destination=destination,
  154. arguments={'task_name': task_name,
  155. 'rate_limit': rate_limit},
  156. **kwargs)
  157. def add_consumer(self, queue, exchange=None, exchange_type='direct',
  158. routing_key=None, options=None, **kwargs):
  159. """Tell all (or specific) workers to start consuming from a new queue.
  160. Only the queue name is required as if only the queue is specified
  161. then the exchange/routing key will be set to the same name (
  162. like automatic queues do).
  163. .. note::
  164. This command does not respect the default queue/exchange
  165. options in the configuration.
  166. :param queue: Name of queue to start consuming from.
  167. :keyword exchange: Optional name of exchange.
  168. :keyword exchange_type: Type of exchange (defaults to 'direct')
  169. command to, when empty broadcast to all workers.
  170. :keyword routing_key: Optional routing key.
  171. :keyword options: Additional options as supported
  172. by :meth:`kombu.entitiy.Queue.from_dict`.
  173. See :meth:`broadcast` for supported keyword arguments.
  174. """
  175. return self.broadcast(
  176. 'add_consumer',
  177. arguments=dict({'queue': queue, 'exchange': exchange,
  178. 'exchange_type': exchange_type,
  179. 'routing_key': routing_key}, **options or {}),
  180. **kwargs
  181. )
  182. def cancel_consumer(self, queue, **kwargs):
  183. """Tell all (or specific) workers to stop consuming from ``queue``.
  184. Supports the same keyword arguments as :meth:`broadcast`.
  185. """
  186. return self.broadcast(
  187. 'cancel_consumer', arguments={'queue': queue}, **kwargs
  188. )
  189. def time_limit(self, task_name, soft=None, hard=None, **kwargs):
  190. """Tell all (or specific) workers to set time limits for
  191. a task by type.
  192. :param task_name: Name of task to change time limits for.
  193. :keyword soft: New soft time limit (in seconds).
  194. :keyword hard: New hard time limit (in seconds).
  195. Any additional keyword arguments are passed on to :meth:`broadcast`.
  196. """
  197. return self.broadcast(
  198. 'time_limit',
  199. arguments={'task_name': task_name,
  200. 'hard': hard, 'soft': soft}, **kwargs)
  201. def enable_events(self, destination=None, **kwargs):
  202. """Tell all (or specific) workers to enable events."""
  203. return self.broadcast('enable_events', {}, destination, **kwargs)
  204. def disable_events(self, destination=None, **kwargs):
  205. """Tell all (or specific) workers to disable events."""
  206. return self.broadcast('disable_events', {}, destination, **kwargs)
  207. def pool_grow(self, n=1, destination=None, **kwargs):
  208. """Tell all (or specific) workers to grow the pool by ``n``.
  209. Supports the same arguments as :meth:`broadcast`.
  210. """
  211. return self.broadcast('pool_grow', {'n': n}, destination, **kwargs)
  212. def pool_shrink(self, n=1, destination=None, **kwargs):
  213. """Tell all (or specific) workers to shrink the pool by ``n``.
  214. Supports the same arguments as :meth:`broadcast`.
  215. """
  216. return self.broadcast('pool_shrink', {'n': n}, destination, **kwargs)
  217. def autoscale(self, max, min, destination=None, **kwargs):
  218. """Change worker(s) autoscale setting.
  219. Supports the same arguments as :meth:`broadcast`.
  220. """
  221. return self.broadcast(
  222. 'autoscale', {'max': max, 'min': min}, destination, **kwargs)
  223. def broadcast(self, command, arguments=None, destination=None,
  224. connection=None, reply=False, timeout=1, limit=None,
  225. callback=None, channel=None, **extra_kwargs):
  226. """Broadcast a control command to the celery workers.
  227. :param command: Name of command to send.
  228. :param arguments: Keyword arguments for the command.
  229. :keyword destination: If set, a list of the hosts to send the
  230. command to, when empty broadcast to all workers.
  231. :keyword connection: Custom broker connection to use, if not set,
  232. a connection will be established automatically.
  233. :keyword reply: Wait for and return the reply.
  234. :keyword timeout: Timeout in seconds to wait for the reply.
  235. :keyword limit: Limit number of replies.
  236. :keyword callback: Callback called immediately for each reply
  237. received.
  238. """
  239. with self.app.connection_or_acquire(connection) as conn:
  240. arguments = dict(arguments or {}, **extra_kwargs)
  241. return self.mailbox(conn)._broadcast(
  242. command, arguments, destination, reply, timeout,
  243. limit, callback, channel=channel,
  244. )