control.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.control
  4. ~~~~~~~~~~~~~~~~~~~
  5. Client for worker remote control commands.
  6. Server implementation is in :mod:`celery.worker.control`.
  7. """
  8. from __future__ import absolute_import
  9. from __future__ import with_statement
  10. from kombu.pidbox import Mailbox
  11. from kombu.utils import cached_property
  12. from . import app_or_default
  13. def flatten_reply(reply):
  14. nodes = {}
  15. for item in reply:
  16. nodes.update(item)
  17. return nodes
  18. class Inspect(object):
  19. app = None
  20. def __init__(self, destination=None, timeout=1, callback=None,
  21. connection=None, app=None, limit=None):
  22. self.app = app or self.app
  23. self.destination = destination
  24. self.timeout = timeout
  25. self.callback = callback
  26. self.connection = connection
  27. self.limit = limit
  28. def _prepare(self, reply):
  29. if not reply:
  30. return
  31. by_node = flatten_reply(reply)
  32. if self.destination and \
  33. not isinstance(self.destination, (list, tuple)):
  34. return by_node.get(self.destination)
  35. return by_node
  36. def _request(self, command, **kwargs):
  37. return self._prepare(self.app.control.broadcast(command,
  38. arguments=kwargs,
  39. destination=self.destination,
  40. callback=self.callback,
  41. connection=self.connection,
  42. limit=self.limit,
  43. timeout=self.timeout, reply=True))
  44. def report(self):
  45. return self._request('report')
  46. def active(self, safe=False):
  47. return self._request('dump_active', safe=safe)
  48. def scheduled(self, safe=False):
  49. return self._request('dump_schedule', safe=safe)
  50. def reserved(self, safe=False):
  51. return self._request('dump_reserved', safe=safe)
  52. def stats(self):
  53. return self._request('stats')
  54. def revoked(self):
  55. return self._request('dump_revoked')
  56. def registered(self, *taskinfoitems):
  57. return self._request('dump_tasks', taskinfoitems=taskinfoitems)
  58. registered_tasks = registered
  59. def ping(self):
  60. return self._request('ping')
  61. def active_queues(self):
  62. return self._request('active_queues')
  63. def conf(self):
  64. return self._request('dump_conf')
  65. class Control(object):
  66. Mailbox = Mailbox
  67. def __init__(self, app=None):
  68. self.app = app_or_default(app)
  69. self.mailbox = self.Mailbox('celery', type='fanout')
  70. @cached_property
  71. def inspect(self):
  72. return self.app.subclass_with_self(Inspect, reverse='control.inspect')
  73. def purge(self, connection=None):
  74. """Discard all waiting tasks.
  75. This will ignore all tasks waiting for execution, and they will
  76. be deleted from the messaging server.
  77. :returns: the number of tasks discarded.
  78. """
  79. with self.app.default_connection(connection) as conn:
  80. return self.app.amqp.TaskConsumer(conn).purge()
  81. discard_all = purge
  82. def revoke(self, task_id, destination=None, terminate=False,
  83. signal='SIGTERM', **kwargs):
  84. """Tell all (or specific) workers to revoke a task by id.
  85. If a task is revoked, the workers will ignore the task and
  86. not execute it after all.
  87. :param task_id: Id of the task to revoke.
  88. :keyword terminate: Also terminate the process currently working
  89. on the task (if any).
  90. :keyword signal: Name of signal to send to process if terminate.
  91. Default is TERM.
  92. See :meth:`broadcast` for supported keyword arguments.
  93. """
  94. return self.broadcast('revoke', destination=destination,
  95. arguments={'task_id': task_id,
  96. 'terminate': terminate,
  97. 'signal': signal}, **kwargs)
  98. def ping(self, destination=None, timeout=1, **kwargs):
  99. """Ping all (or specific) workers.
  100. Returns answer from alive workers.
  101. See :meth:`broadcast` for supported keyword arguments.
  102. """
  103. return self.broadcast('ping', reply=True, destination=destination,
  104. timeout=timeout, **kwargs)
  105. def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
  106. """Tell all (or specific) workers to set a new rate limit
  107. for task by type.
  108. :param task_name: Name of task to change rate limit for.
  109. :param rate_limit: The rate limit as tasks per second, or a rate limit
  110. string (`'100/m'`, etc.
  111. see :attr:`celery.task.base.Task.rate_limit` for
  112. more information).
  113. See :meth:`broadcast` for supported keyword arguments.
  114. """
  115. return self.broadcast('rate_limit', destination=destination,
  116. arguments={'task_name': task_name,
  117. 'rate_limit': rate_limit},
  118. **kwargs)
  119. def add_consumer(self, queue, exchange=None, exchange_type='direct',
  120. routing_key=None, options=None, **kwargs):
  121. """Tell all (or specific) workers to start consuming from a new queue.
  122. Only the queue name is required as if only the queue is specified
  123. then the exchange/routing key will be set to the same name (
  124. like automatic queues do).
  125. .. note::
  126. This command does not respect the default queue/exchange
  127. options in the configuration.
  128. :param queue: Name of queue to start consuming from.
  129. :keyword exchange: Optional name of exchange.
  130. :keyword exchange_type: Type of exchange (defaults to 'direct')
  131. command to, when empty broadcast to all workers.
  132. :keyword routing_key: Optional routing key.
  133. See :meth:`broadcast` for supported keyword arguments.
  134. """
  135. return self.broadcast('add_consumer',
  136. arguments=dict({'queue': queue, 'exchange': exchange,
  137. 'exchange_type': exchange_type,
  138. 'routing_key': routing_key}, **options or {}),
  139. **kwargs)
  140. def cancel_consumer(self, queue, **kwargs):
  141. """Tell all (or specific) workers to stop consuming from ``queue``.
  142. Supports the same keyword arguments as :meth:`broadcast`.
  143. """
  144. return self.broadcast('cancel_consumer',
  145. arguments={'queue': queue}, **kwargs)
  146. def time_limit(self, task_name, soft=None, hard=None, **kwargs):
  147. """Tell all (or specific) workers to set time limits for
  148. a task by type.
  149. :param task_name: Name of task to change time limits for.
  150. :keyword soft: New soft time limit (in seconds).
  151. :keyword hard: New hard time limit (in seconds).
  152. Any additional keyword arguments are passed on to :meth:`broadcast`.
  153. """
  154. return self.broadcast('time_limit',
  155. arguments={'task_name': task_name,
  156. 'hard': hard, 'soft': soft}, **kwargs)
  157. def enable_events(self, destination=None, **kwargs):
  158. """Tell all (or specific) workers to enable events."""
  159. return self.broadcast('enable_events', {}, destination, **kwargs)
  160. def disable_events(self, destination=None, **kwargs):
  161. """Tell all (or specific) workers to enable events."""
  162. return self.broadcast('disable_events', {}, destination, **kwargs)
  163. def pool_grow(self, n=1, destination=None, **kwargs):
  164. """Tell all (or specific) workers to grow the pool by ``n``.
  165. Supports the same arguments as :meth:`broadcast`.
  166. """
  167. return self.broadcast('pool_grow', {}, destination, **kwargs)
  168. def pool_shrink(self, n=1, destination=None, **kwargs):
  169. """Tell all (or specific) workers to shrink the pool by ``n``.
  170. Supports the same arguments as :meth:`broadcast`.
  171. """
  172. return self.broadcast('pool_shrink', {}, destination, **kwargs)
  173. def broadcast(self, command, arguments=None, destination=None,
  174. connection=None, reply=False, timeout=1, limit=None,
  175. callback=None, channel=None, **extra_kwargs):
  176. """Broadcast a control command to the celery workers.
  177. :param command: Name of command to send.
  178. :param arguments: Keyword arguments for the command.
  179. :keyword destination: If set, a list of the hosts to send the
  180. command to, when empty broadcast to all workers.
  181. :keyword connection: Custom broker connection to use, if not set,
  182. a connection will be established automatically.
  183. :keyword reply: Wait for and return the reply.
  184. :keyword timeout: Timeout in seconds to wait for the reply.
  185. :keyword limit: Limit number of replies.
  186. :keyword callback: Callback called immediately for each reply
  187. received.
  188. """
  189. with self.app.default_connection(connection) as conn:
  190. arguments = dict(arguments or {}, **extra_kwargs)
  191. return self.mailbox(conn)._broadcast(command, arguments,
  192. destination, reply, timeout,
  193. limit, callback,
  194. channel=channel)