control.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. from kombu.pidbox import Mailbox
  2. from celery.app import app_or_default
  3. def flatten_reply(reply):
  4. nodes = {}
  5. for item in reply:
  6. nodes.update(item)
  7. return nodes
  8. class Inspect(object):
  9. def __init__(self, control, destination=None, timeout=1, callback=None,):
  10. self.destination = destination
  11. self.timeout = timeout
  12. self.callback = callback
  13. self.control = control
  14. def _prepare(self, reply):
  15. if not reply:
  16. return
  17. by_node = flatten_reply(reply)
  18. if self.destination and \
  19. not isinstance(self.destination, (list, tuple)):
  20. return by_node.get(self.destination)
  21. return by_node
  22. def _request(self, command, **kwargs):
  23. return self._prepare(self.control.broadcast(command,
  24. arguments=kwargs,
  25. destination=self.destination,
  26. callback=self.callback,
  27. timeout=self.timeout, reply=True))
  28. def active(self, safe=False):
  29. return self._request("dump_active", safe=safe)
  30. def scheduled(self, safe=False):
  31. return self._request("dump_schedule", safe=safe)
  32. def reserved(self, safe=False):
  33. return self._request("dump_reserved", safe=safe)
  34. def stats(self):
  35. return self._request("stats")
  36. def revoked(self):
  37. return self._request("dump_revoked")
  38. def registered_tasks(self):
  39. return self._request("dump_tasks")
  40. def enable_events(self):
  41. return self._request("enable_events")
  42. def disable_events(self):
  43. return self._request("disable_events")
  44. def ping(self):
  45. return self._request("ping")
  46. def add_consumer(self, queue, exchange=None, exchange_type="direct",
  47. routing_key=None, **options):
  48. return self._request("add_consumer", queue=queue, exchange=exchange,
  49. exchange_type=exchange_type,
  50. routing_key=routing_key, **options)
  51. def cancel_consumer(self, queue, **kwargs):
  52. return self._request("cancel_consumer", queue=queue, **kwargs)
  53. def active_queues(self):
  54. return self._request("active_queues")
  55. class Control(object):
  56. Mailbox = Mailbox
  57. def __init__(self, app):
  58. self.app = app
  59. self.mailbox = self.Mailbox("celeryd", type="fanout")
  60. def inspect(self, destination=None, timeout=1, callback=None):
  61. return Inspect(self, destination=destination, timeout=timeout,
  62. callback=callback)
  63. def discard_all(self, connection=None, connect_timeout=None):
  64. """Discard all waiting tasks.
  65. This will ignore all tasks waiting for execution, and they will
  66. be deleted from the messaging server.
  67. :returns: the number of tasks discarded.
  68. """
  69. def _do_discard(connection=None, connect_timeout=None):
  70. consumer = self.app.amqp.get_task_consumer(connection=connection)
  71. try:
  72. return consumer.discard_all()
  73. finally:
  74. consumer.close()
  75. return self.app.with_default_connection(_do_discard)(
  76. connection=connection, connect_timeout=connect_timeout)
  77. def revoke(self, task_id, destination=None, terminate=False,
  78. signal="SIGTERM", **kwargs):
  79. """Revoke a task by id.
  80. If a task is revoked, the workers will ignore the task and
  81. not execute it after all.
  82. :param task_id: Id of the task to revoke.
  83. :keyword terminate: Also terminate the process currently working
  84. on the task (if any).
  85. :keyword signal: Name of signal to send to process if terminate.
  86. Default is TERM.
  87. :keyword destination: If set, a list of the hosts to send the
  88. command to, when empty broadcast to all workers.
  89. :keyword connection: Custom broker connection to use, if not set,
  90. a connection will be established automatically.
  91. :keyword connect_timeout: Timeout for new connection if a custom
  92. connection is not provided.
  93. :keyword reply: Wait for and return the reply.
  94. :keyword timeout: Timeout in seconds to wait for the reply.
  95. :keyword limit: Limit number of replies.
  96. """
  97. return self.broadcast("revoke", destination=destination,
  98. arguments={"task_id": task_id,
  99. "terminate": terminate,
  100. "signal": signal}, **kwargs)
  101. def ping(self, destination=None, timeout=1, **kwargs):
  102. """Ping workers.
  103. Returns answer from alive workers.
  104. :keyword destination: If set, a list of the hosts to send the
  105. command to, when empty broadcast to all workers.
  106. :keyword connection: Custom broker connection to use, if not set,
  107. a connection will be established automatically.
  108. :keyword connect_timeout: Timeout for new connection if a custom
  109. connection is not provided.
  110. :keyword reply: Wait for and return the reply.
  111. :keyword timeout: Timeout in seconds to wait for the reply.
  112. :keyword limit: Limit number of replies.
  113. """
  114. return self.broadcast("ping", reply=True, destination=destination,
  115. timeout=timeout, **kwargs)
  116. def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
  117. """Set rate limit for task by type.
  118. :param task_name: Type of task to change rate limit for.
  119. :param rate_limit: The rate limit as tasks per second, or a rate limit
  120. string (`"100/m"`, etc.
  121. see :attr:`celery.task.base.Task.rate_limit` for
  122. more information).
  123. :keyword destination: If set, a list of the hosts to send the
  124. command to, when empty broadcast to all workers.
  125. :keyword connection: Custom broker connection to use, if not set,
  126. a connection will be established automatically.
  127. :keyword connect_timeout: Timeout for new connection if a custom
  128. connection is not provided.
  129. :keyword reply: Wait for and return the reply.
  130. :keyword timeout: Timeout in seconds to wait for the reply.
  131. :keyword limit: Limit number of replies.
  132. """
  133. return self.broadcast("rate_limit", destination=destination,
  134. arguments={"task_name": task_name,
  135. "rate_limit": rate_limit},
  136. **kwargs)
  137. def broadcast(self, command, arguments=None, destination=None,
  138. connection=None, connect_timeout=None, reply=False, timeout=1,
  139. limit=None, callback=None, channel=None):
  140. """Broadcast a control command to the celery workers.
  141. :param command: Name of command to send.
  142. :param arguments: Keyword arguments for the command.
  143. :keyword destination: If set, a list of the hosts to send the
  144. command to, when empty broadcast to all workers.
  145. :keyword connection: Custom broker connection to use, if not set,
  146. a connection will be established automatically.
  147. :keyword connect_timeout: Timeout for new connection if a custom
  148. connection is not provided.
  149. :keyword reply: Wait for and return the reply.
  150. :keyword timeout: Timeout in seconds to wait for the reply.
  151. :keyword limit: Limit number of replies.
  152. :keyword callback: Callback called immediately for each reply
  153. received.
  154. """
  155. def _do_broadcast(connection=None, connect_timeout=None,
  156. channel=None):
  157. return self.mailbox(connection)._broadcast(command, arguments,
  158. destination, reply,
  159. timeout, limit,
  160. callback,
  161. channel=channel)
  162. if channel:
  163. return _do_broadcast(connection, connect_timeout, channel)
  164. else:
  165. return self.app.with_default_connection(_do_broadcast)(
  166. connection=connection, connect_timeout=connect_timeout)
  167. _default_control = Control(app_or_default())
  168. broadcast = _default_control.broadcast
  169. rate_limit = _default_control.rate_limit
  170. ping = _default_control.ping
  171. revoke = _default_control.revoke
  172. discard_all = _default_control.discard_all
  173. inspect = _default_control.inspect