control.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.control
  4. ~~~~~~~~~~~~~~~~~~~
  5. Client for worker remote control commands.
  6. Server implementation is in :mod:`celery.worker.control`.
  7. :copyright: (c) 2009 - 2012 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. from __future__ import with_statement
  12. from kombu.pidbox import Mailbox
  13. from kombu.utils import cached_property
  14. from . import app_or_default
  15. def flatten_reply(reply):
  16. nodes = {}
  17. for item in reply:
  18. nodes.update(item)
  19. return nodes
  20. class Inspect(object):
  21. app = None
  22. def __init__(self, destination=None, timeout=1, callback=None,
  23. connection=None, app=None):
  24. self.app = app or self.app
  25. self.destination = destination
  26. self.timeout = timeout
  27. self.callback = callback
  28. self.connection = connection
  29. def _prepare(self, reply):
  30. if not reply:
  31. return
  32. by_node = flatten_reply(reply)
  33. if self.destination and \
  34. not isinstance(self.destination, (list, tuple)):
  35. return by_node.get(self.destination)
  36. return by_node
  37. def _request(self, command, **kwargs):
  38. return self._prepare(self.app.control.broadcast(command,
  39. arguments=kwargs,
  40. destination=self.destination,
  41. callback=self.callback,
  42. connection=self.connection,
  43. timeout=self.timeout, reply=True))
  44. def report(self):
  45. return self._request("report")
  46. def active(self, safe=False):
  47. return self._request("dump_active", safe=safe)
  48. def scheduled(self, safe=False):
  49. return self._request("dump_schedule", safe=safe)
  50. def reserved(self, safe=False):
  51. return self._request("dump_reserved", safe=safe)
  52. def stats(self):
  53. return self._request("stats")
  54. def revoked(self):
  55. return self._request("dump_revoked")
  56. def registered(self):
  57. return self._request("dump_tasks")
  58. registered_tasks = registered
  59. def enable_events(self):
  60. return self._request("enable_events")
  61. def disable_events(self):
  62. return self._request("disable_events")
  63. def ping(self):
  64. return self._request("ping")
  65. def add_consumer(self, queue, exchange=None, exchange_type="direct",
  66. routing_key=None, **options):
  67. return self._request("add_consumer", queue=queue, exchange=exchange,
  68. exchange_type=exchange_type,
  69. routing_key=routing_key, **options)
  70. def cancel_consumer(self, queue, **kwargs):
  71. return self._request("cancel_consumer", queue=queue, **kwargs)
  72. def active_queues(self):
  73. return self._request("active_queues")
  74. class Control(object):
  75. Mailbox = Mailbox
  76. def __init__(self, app=None):
  77. self.app = app_or_default(app)
  78. self.mailbox = self.Mailbox("celeryd", type="fanout")
  79. @cached_property
  80. def inspect(self):
  81. return self.app.subclass_with_self(Inspect, reverse="control.inspect")
  82. def purge(self, connection=None):
  83. """Discard all waiting tasks.
  84. This will ignore all tasks waiting for execution, and they will
  85. be deleted from the messaging server.
  86. :returns: the number of tasks discarded.
  87. """
  88. with self.app.default_connection(connection) as conn:
  89. return self.app.amqp.TaskConsumer(conn).purge()
  90. discard_all = purge
  91. def revoke(self, task_id, destination=None, terminate=False,
  92. signal="SIGTERM", **kwargs):
  93. """Revoke a task by id.
  94. If a task is revoked, the workers will ignore the task and
  95. not execute it after all.
  96. :param task_id: Id of the task to revoke.
  97. :keyword terminate: Also terminate the process currently working
  98. on the task (if any).
  99. :keyword signal: Name of signal to send to process if terminate.
  100. Default is TERM.
  101. :keyword destination: If set, a list of the hosts to send the
  102. command to, when empty broadcast to all workers.
  103. :keyword connection: Custom broker connection to use, if not set,
  104. a connection will be established automatically.
  105. :keyword reply: Wait for and return the reply.
  106. :keyword timeout: Timeout in seconds to wait for the reply.
  107. :keyword limit: Limit number of replies.
  108. """
  109. return self.broadcast("revoke", destination=destination,
  110. arguments={"task_id": task_id,
  111. "terminate": terminate,
  112. "signal": signal}, **kwargs)
  113. def ping(self, destination=None, timeout=1, **kwargs):
  114. """Ping workers.
  115. Returns answer from alive workers.
  116. :keyword destination: If set, a list of the hosts to send the
  117. command to, when empty broadcast to all workers.
  118. :keyword connection: Custom broker connection to use, if not set,
  119. a connection will be established automatically.
  120. :keyword reply: Wait for and return the reply.
  121. :keyword timeout: Timeout in seconds to wait for the reply.
  122. :keyword limit: Limit number of replies.
  123. """
  124. return self.broadcast("ping", reply=True, destination=destination,
  125. timeout=timeout, **kwargs)
  126. def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
  127. """Set rate limit for task by type.
  128. :param task_name: Name of task to change rate limit for.
  129. :param rate_limit: The rate limit as tasks per second, or a rate limit
  130. string (`"100/m"`, etc.
  131. see :attr:`celery.task.base.Task.rate_limit` for
  132. more information).
  133. :keyword destination: If set, a list of the hosts to send the
  134. command to, when empty broadcast to all workers.
  135. :keyword connection: Custom broker connection to use, if not set,
  136. a connection will be established automatically.
  137. :keyword reply: Wait for and return the reply.
  138. :keyword timeout: Timeout in seconds to wait for the reply.
  139. :keyword limit: Limit number of replies.
  140. """
  141. return self.broadcast("rate_limit", destination=destination,
  142. arguments={"task_name": task_name,
  143. "rate_limit": rate_limit},
  144. **kwargs)
  145. def time_limit(self, task_name, soft=None, hard=None, **kwargs):
  146. """Set time limits for task by type.
  147. :param task_name: Name of task to change time limits for.
  148. :keyword soft: New soft time limit (in seconds).
  149. :keyword hard: New hard time limit (in seconds).
  150. Any additional keyword arguments are passed on to :meth:`broadcast`.
  151. """
  152. return self.broadcast("time_limit",
  153. arguments={"task_name": task_name,
  154. "hard": hard, "soft": soft},
  155. **kwargs)
  156. def broadcast(self, command, arguments=None, destination=None,
  157. connection=None, reply=False, timeout=1, limit=None,
  158. callback=None, channel=None):
  159. """Broadcast a control command to the celery workers.
  160. :param command: Name of command to send.
  161. :param arguments: Keyword arguments for the command.
  162. :keyword destination: If set, a list of the hosts to send the
  163. command to, when empty broadcast to all workers.
  164. :keyword connection: Custom broker connection to use, if not set,
  165. a connection will be established automatically.
  166. :keyword reply: Wait for and return the reply.
  167. :keyword timeout: Timeout in seconds to wait for the reply.
  168. :keyword limit: Limit number of replies.
  169. :keyword callback: Callback called immediately for each reply
  170. received.
  171. """
  172. with self.app.default_connection(connection) as conn:
  173. if channel is None:
  174. channel = conn.default_channel
  175. return self.mailbox(conn)._broadcast(command, arguments,
  176. destination, reply, timeout,
  177. limit, callback,
  178. channel=channel)