control.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.control
  4. ~~~~~~~~~~~~~~~~~~~
  5. Client for worker remote control commands.
  6. Server implementation is in :mod:`celery.worker.control`.
  7. :copyright: (c) 2009 - 2012 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. from __future__ import with_statement
  12. from kombu.pidbox import Mailbox
  13. from . import app_or_default
  14. def flatten_reply(reply):
  15. nodes = {}
  16. for item in reply:
  17. nodes.update(item)
  18. return nodes
  19. class Inspect(object):
  20. def __init__(self, control, destination=None, timeout=1, callback=None,):
  21. self.destination = destination
  22. self.timeout = timeout
  23. self.callback = callback
  24. self.control = control
  25. def _prepare(self, reply):
  26. if not reply:
  27. return
  28. by_node = flatten_reply(reply)
  29. if self.destination and \
  30. not isinstance(self.destination, (list, tuple)):
  31. return by_node.get(self.destination)
  32. return by_node
  33. def _request(self, command, **kwargs):
  34. return self._prepare(self.control.broadcast(command,
  35. arguments=kwargs,
  36. destination=self.destination,
  37. callback=self.callback,
  38. timeout=self.timeout, reply=True))
  39. def report(self):
  40. return self._request("report")
  41. def active(self, safe=False):
  42. return self._request("dump_active", safe=safe)
  43. def scheduled(self, safe=False):
  44. return self._request("dump_schedule", safe=safe)
  45. def reserved(self, safe=False):
  46. return self._request("dump_reserved", safe=safe)
  47. def stats(self):
  48. return self._request("stats")
  49. def revoked(self):
  50. return self._request("dump_revoked")
  51. def registered(self):
  52. return self._request("dump_tasks")
  53. registered_tasks = registered
  54. def enable_events(self):
  55. return self._request("enable_events")
  56. def disable_events(self):
  57. return self._request("disable_events")
  58. def ping(self):
  59. return self._request("ping")
  60. def add_consumer(self, queue, exchange=None, exchange_type="direct",
  61. routing_key=None, **options):
  62. return self._request("add_consumer", queue=queue, exchange=exchange,
  63. exchange_type=exchange_type,
  64. routing_key=routing_key, **options)
  65. def cancel_consumer(self, queue, **kwargs):
  66. return self._request("cancel_consumer", queue=queue, **kwargs)
  67. def active_queues(self):
  68. return self._request("active_queues")
  69. class Control(object):
  70. Mailbox = Mailbox
  71. def __init__(self, app=None):
  72. self.app = app_or_default(app)
  73. self.mailbox = self.Mailbox("celeryd", type="fanout")
  74. def inspect(self, destination=None, timeout=1, callback=None):
  75. return Inspect(self, destination=destination, timeout=timeout,
  76. callback=callback)
  77. def discard_all(self, connection=None):
  78. """Discard all waiting tasks.
  79. This will ignore all tasks waiting for execution, and they will
  80. be deleted from the messaging server.
  81. :returns: the number of tasks discarded.
  82. """
  83. with self.app.default_connection(connection) as conn:
  84. return self.app.amqp.get_task_consumer(connection=conn)\
  85. .discard_all()
  86. def revoke(self, task_id, destination=None, terminate=False,
  87. signal="SIGTERM", **kwargs):
  88. """Revoke a task by id.
  89. If a task is revoked, the workers will ignore the task and
  90. not execute it after all.
  91. :param task_id: Id of the task to revoke.
  92. :keyword terminate: Also terminate the process currently working
  93. on the task (if any).
  94. :keyword signal: Name of signal to send to process if terminate.
  95. Default is TERM.
  96. :keyword destination: If set, a list of the hosts to send the
  97. command to, when empty broadcast to all workers.
  98. :keyword connection: Custom broker connection to use, if not set,
  99. a connection will be established automatically.
  100. :keyword reply: Wait for and return the reply.
  101. :keyword timeout: Timeout in seconds to wait for the reply.
  102. :keyword limit: Limit number of replies.
  103. """
  104. return self.broadcast("revoke", destination=destination,
  105. arguments={"task_id": task_id,
  106. "terminate": terminate,
  107. "signal": signal}, **kwargs)
  108. def ping(self, destination=None, timeout=1, **kwargs):
  109. """Ping workers.
  110. Returns answer from alive workers.
  111. :keyword destination: If set, a list of the hosts to send the
  112. command to, when empty broadcast to all workers.
  113. :keyword connection: Custom broker connection to use, if not set,
  114. a connection will be established automatically.
  115. :keyword reply: Wait for and return the reply.
  116. :keyword timeout: Timeout in seconds to wait for the reply.
  117. :keyword limit: Limit number of replies.
  118. """
  119. return self.broadcast("ping", reply=True, destination=destination,
  120. timeout=timeout, **kwargs)
  121. def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
  122. """Set rate limit for task by type.
  123. :param task_name: Name of task to change rate limit for.
  124. :param rate_limit: The rate limit as tasks per second, or a rate limit
  125. string (`"100/m"`, etc.
  126. see :attr:`celery.task.base.Task.rate_limit` for
  127. more information).
  128. :keyword destination: If set, a list of the hosts to send the
  129. command to, when empty broadcast to all workers.
  130. :keyword connection: Custom broker connection to use, if not set,
  131. a connection will be established automatically.
  132. :keyword reply: Wait for and return the reply.
  133. :keyword timeout: Timeout in seconds to wait for the reply.
  134. :keyword limit: Limit number of replies.
  135. """
  136. return self.broadcast("rate_limit", destination=destination,
  137. arguments={"task_name": task_name,
  138. "rate_limit": rate_limit},
  139. **kwargs)
  140. def time_limit(self, task_name, soft=None, hard=None, **kwargs):
  141. """Set time limits for task by type.
  142. :param task_name: Name of task to change time limits for.
  143. :keyword soft: New soft time limit (in seconds).
  144. :keyword hard: New hard time limit (in seconds).
  145. Any additional keyword arguments are passed on to :meth:`broadcast`.
  146. """
  147. return self.broadcast("time_limit",
  148. arguments={"task_name": task_name,
  149. "hard": hard, "soft": soft},
  150. **kwargs)
  151. def broadcast(self, command, arguments=None, destination=None,
  152. connection=None, reply=False, timeout=1, limit=None,
  153. callback=None, channel=None):
  154. """Broadcast a control command to the celery workers.
  155. :param command: Name of command to send.
  156. :param arguments: Keyword arguments for the command.
  157. :keyword destination: If set, a list of the hosts to send the
  158. command to, when empty broadcast to all workers.
  159. :keyword connection: Custom broker connection to use, if not set,
  160. a connection will be established automatically.
  161. :keyword reply: Wait for and return the reply.
  162. :keyword timeout: Timeout in seconds to wait for the reply.
  163. :keyword limit: Limit number of replies.
  164. :keyword callback: Callback called immediately for each reply
  165. received.
  166. """
  167. with self.app.default_connection(connection) as conn:
  168. if channel is None:
  169. channel = conn.default_channel
  170. return self.mailbox(conn)._broadcast(command, arguments,
  171. destination, reply, timeout,
  172. limit, callback,
  173. channel=channel)