control.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. # -*- coding: utf-8 -*-
  2. """Worker Remote Control Client.
  3. Client for worker remote control commands.
  4. Server implementation is in :mod:`celery.worker.control`.
  5. """
  6. from __future__ import absolute_import, unicode_literals
  7. import warnings
  8. from billiard.common import TERM_SIGNAME
  9. from kombu.pidbox import Mailbox
  10. from kombu.utils.functional import lazy
  11. from kombu.utils.objects import cached_property
  12. from celery.exceptions import DuplicateNodenameWarning
  13. from celery.utils.text import pluralize
  14. __all__ = ['Inspect', 'Control', 'flatten_reply']
  15. W_DUPNODE = """\
  16. Received multiple replies from node {0}: {1}.
  17. Please make sure you give each node a unique nodename using
  18. the celery worker `-n` option.\
  19. """
  20. def flatten_reply(reply):
  21. """Flatten node replies.
  22. Convert from a list of replies in this format::
  23. [{'a@example.com': reply},
  24. {'b@example.com': reply}]
  25. into this format::
  26. {'a@example.com': reply,
  27. 'b@example.com': reply}
  28. """
  29. nodes, dupes = {}, set()
  30. for item in reply:
  31. [dupes.add(name) for name in item if name in nodes]
  32. nodes.update(item)
  33. if dupes:
  34. warnings.warn(DuplicateNodenameWarning(
  35. W_DUPNODE.format(
  36. pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
  37. ),
  38. ))
  39. return nodes
  40. class Inspect(object):
  41. """API for app.control.inspect."""
  42. app = None
  43. def __init__(self, destination=None, timeout=1, callback=None,
  44. connection=None, app=None, limit=None):
  45. self.app = app or self.app
  46. self.destination = destination
  47. self.timeout = timeout
  48. self.callback = callback
  49. self.connection = connection
  50. self.limit = limit
  51. def _prepare(self, reply):
  52. if reply:
  53. by_node = flatten_reply(reply)
  54. if (self.destination and
  55. not isinstance(self.destination, (list, tuple))):
  56. return by_node.get(self.destination)
  57. return by_node
  58. def _request(self, command, **kwargs):
  59. return self._prepare(self.app.control.broadcast(
  60. command,
  61. arguments=kwargs,
  62. destination=self.destination,
  63. callback=self.callback,
  64. connection=self.connection,
  65. limit=self.limit,
  66. timeout=self.timeout, reply=True,
  67. ))
  68. def report(self):
  69. return self._request('report')
  70. def clock(self):
  71. return self._request('clock')
  72. def active(self, safe=None):
  73. # safe is ignored since 4.0
  74. # as no objects will need serialization now that we
  75. # have argsrepr/kwargsrepr.
  76. return self._request('active')
  77. def scheduled(self, safe=None):
  78. return self._request('scheduled')
  79. def reserved(self, safe=None):
  80. return self._request('reserved')
  81. def stats(self):
  82. return self._request('stats')
  83. def revoked(self):
  84. return self._request('revoked')
  85. def registered(self, *taskinfoitems):
  86. return self._request('registered', taskinfoitems=taskinfoitems)
  87. registered_tasks = registered
  88. def ping(self):
  89. return self._request('ping')
  90. def active_queues(self):
  91. return self._request('active_queues')
  92. def query_task(self, ids):
  93. return self._request('query_task', ids=ids)
  94. def conf(self, with_defaults=False):
  95. return self._request('conf', with_defaults=with_defaults)
  96. def hello(self, from_node, revoked=None):
  97. return self._request('hello', from_node=from_node, revoked=revoked)
  98. def memsample(self):
  99. return self._request('memsample')
  100. def memdump(self, samples=10):
  101. return self._request('memdump', samples=samples)
  102. def objgraph(self, type='Request', n=200, max_depth=10):
  103. return self._request('objgraph', num=n, max_depth=max_depth, type=type)
  104. class Control(object):
  105. """Worker remote control client."""
  106. Mailbox = Mailbox
  107. def __init__(self, app=None):
  108. self.app = app
  109. self.mailbox = self.Mailbox(
  110. 'celery',
  111. type='fanout',
  112. accept=['json'],
  113. producer_pool=lazy(lambda: self.app.amqp.producer_pool),
  114. )
  115. @cached_property
  116. def inspect(self):
  117. return self.app.subclass_with_self(Inspect, reverse='control.inspect')
  118. def purge(self, connection=None):
  119. """Discard all waiting tasks.
  120. This will ignore all tasks waiting for execution, and they will
  121. be deleted from the messaging server.
  122. Arguments:
  123. connection (kombu.Connection): Optional specific connection
  124. instance to use. If not provided a connection will
  125. be acquired from the connection pool.
  126. Returns:
  127. int: the number of tasks discarded.
  128. """
  129. with self.app.connection_or_acquire(connection) as conn:
  130. return self.app.amqp.TaskConsumer(conn).purge()
  131. discard_all = purge
  132. def election(self, id, topic, action=None, connection=None):
  133. self.broadcast('election', connection=connection, arguments={
  134. 'id': id, 'topic': topic, 'action': action,
  135. })
  136. def revoke(self, task_id, destination=None, terminate=False,
  137. signal=TERM_SIGNAME, **kwargs):
  138. """Tell all (or specific) workers to revoke a task by id.
  139. If a task is revoked, the workers will ignore the task and
  140. not execute it after all.
  141. Arguments:
  142. task_id (str): Id of the task to revoke.
  143. terminate (bool): Also terminate the process currently working
  144. on the task (if any).
  145. signal (str): Name of signal to send to process if terminate.
  146. Default is TERM.
  147. See Also:
  148. :meth:`broadcast` for supported keyword arguments.
  149. """
  150. return self.broadcast('revoke', destination=destination,
  151. arguments={'task_id': task_id,
  152. 'terminate': terminate,
  153. 'signal': signal}, **kwargs)
  154. def ping(self, destination=None, timeout=1, **kwargs):
  155. """Ping all (or specific) workers.
  156. Returns:
  157. List[Dict]: List of ``{'hostname': reply}`` dictionaries.
  158. See Also:
  159. :meth:`broadcast` for supported keyword arguments.
  160. """
  161. return self.broadcast('ping', reply=True, destination=destination,
  162. timeout=timeout, **kwargs)
  163. def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
  164. """Tell workers to set a new rate limit for task by type.
  165. Arguments:
  166. task_name (str): Name of task to change rate limit for.
  167. rate_limit (int, str): The rate limit as tasks per second,
  168. or a rate limit string (`'100/m'`, etc.
  169. see :attr:`celery.task.base.Task.rate_limit` for
  170. more information).
  171. See Also:
  172. :meth:`broadcast` for supported keyword arguments.
  173. """
  174. return self.broadcast('rate_limit', destination=destination,
  175. arguments={'task_name': task_name,
  176. 'rate_limit': rate_limit},
  177. **kwargs)
  178. def add_consumer(self, queue, exchange=None, exchange_type='direct',
  179. routing_key=None, options=None, **kwargs):
  180. """Tell all (or specific) workers to start consuming from a new queue.
  181. Only the queue name is required as if only the queue is specified
  182. then the exchange/routing key will be set to the same name (
  183. like automatic queues do).
  184. Note:
  185. This command does not respect the default queue/exchange
  186. options in the configuration.
  187. Arguments:
  188. queue (str): Name of queue to start consuming from.
  189. exchange (str): Optional name of exchange.
  190. exchange_type (str): Type of exchange (defaults to 'direct')
  191. command to, when empty broadcast to all workers.
  192. routing_key (str): Optional routing key.
  193. options (Dict): Additional options as supported
  194. by :meth:`kombu.entitiy.Queue.from_dict`.
  195. See Also:
  196. :meth:`broadcast` for supported keyword arguments.
  197. """
  198. return self.broadcast(
  199. 'add_consumer',
  200. arguments=dict({'queue': queue, 'exchange': exchange,
  201. 'exchange_type': exchange_type,
  202. 'routing_key': routing_key}, **options or {}),
  203. **kwargs
  204. )
  205. def cancel_consumer(self, queue, **kwargs):
  206. """Tell all (or specific) workers to stop consuming from ``queue``.
  207. See Also:
  208. Supports the same arguments as :meth:`broadcast`.
  209. """
  210. return self.broadcast(
  211. 'cancel_consumer', arguments={'queue': queue}, **kwargs
  212. )
  213. def time_limit(self, task_name, soft=None, hard=None, **kwargs):
  214. """Tell workers to set time limits for a task by type.
  215. Arguments:
  216. task_name (str): Name of task to change time limits for.
  217. soft (float): New soft time limit (in seconds).
  218. hard (float): New hard time limit (in seconds).
  219. **kwargs (Any): arguments passed on to :meth:`broadcast`.
  220. """
  221. return self.broadcast(
  222. 'time_limit',
  223. arguments={'task_name': task_name,
  224. 'hard': hard, 'soft': soft}, **kwargs)
  225. def enable_events(self, destination=None, **kwargs):
  226. """Tell all (or specific) workers to enable events.
  227. See Also:
  228. Supports the same arguments as :meth:`broadcast`.
  229. """
  230. return self.broadcast('enable_events', {}, destination, **kwargs)
  231. def disable_events(self, destination=None, **kwargs):
  232. """Tell all (or specific) workers to disable events.
  233. See Also:
  234. Supports the same arguments as :meth:`broadcast`.
  235. """
  236. return self.broadcast('disable_events', {}, destination, **kwargs)
  237. def pool_grow(self, n=1, destination=None, **kwargs):
  238. """Tell all (or specific) workers to grow the pool by ``n``.
  239. See Also:
  240. Supports the same arguments as :meth:`broadcast`.
  241. """
  242. return self.broadcast('pool_grow', {'n': n}, destination, **kwargs)
  243. def pool_shrink(self, n=1, destination=None, **kwargs):
  244. """Tell all (or specific) workers to shrink the pool by ``n``.
  245. See Also:
  246. Supports the same arguments as :meth:`broadcast`.
  247. """
  248. return self.broadcast('pool_shrink', {'n': n}, destination, **kwargs)
  249. def broadcast(self, command, arguments=None, destination=None,
  250. connection=None, reply=False, timeout=1, limit=None,
  251. callback=None, channel=None, **extra_kwargs):
  252. """Broadcast a control command to the celery workers.
  253. Arguments:
  254. command (str): Name of command to send.
  255. arguments (Dict): Keyword arguments for the command.
  256. destination (List): If set, a list of the hosts to send the
  257. command to, when empty broadcast to all workers.
  258. connection (kombu.Connection): Custom broker connection to use,
  259. if not set, a connection will be acquired from the pool.
  260. reply (bool): Wait for and return the reply.
  261. timeout (float): Timeout in seconds to wait for the reply.
  262. limit (int): Limit number of replies.
  263. callback (Callable): Callback called immediately for
  264. each reply received.
  265. """
  266. with self.app.connection_or_acquire(connection) as conn:
  267. arguments = dict(arguments or {}, **extra_kwargs)
  268. return self.mailbox(conn)._broadcast(
  269. command, arguments, destination, reply, timeout,
  270. limit, callback, channel=channel,
  271. )