control.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from kombu.pidbox import Mailbox
  2. from celery.app import app_or_default
  3. def flatten_reply(reply):
  4. nodes = {}
  5. for item in reply:
  6. nodes.update(item)
  7. return nodes
  8. class Inspect(object):
  9. def __init__(self, control, destination=None, timeout=1, callback=None,):
  10. self.destination = destination
  11. self.timeout = timeout
  12. self.callback = callback
  13. self.control = control
  14. def _prepare(self, reply):
  15. if not reply:
  16. return
  17. by_node = flatten_reply(reply)
  18. if self.destination and \
  19. not isinstance(self.destination, (list, tuple)):
  20. return by_node.get(self.destination)
  21. return by_node
  22. def _request(self, command, **kwargs):
  23. return self._prepare(self.control.broadcast(command,
  24. arguments=kwargs,
  25. destination=self.destination,
  26. callback=self.callback,
  27. timeout=self.timeout, reply=True))
  28. def active(self, safe=False):
  29. return self._request("dump_active", safe=safe)
  30. def scheduled(self, safe=False):
  31. return self._request("dump_schedule", safe=safe)
  32. def reserved(self, safe=False):
  33. return self._request("dump_reserved", safe=safe)
  34. def stats(self):
  35. return self._request("stats")
  36. def revoked(self):
  37. return self._request("dump_revoked")
  38. def registered_tasks(self):
  39. return self._request("dump_tasks")
  40. def enable_events(self):
  41. return self._request("enable_events")
  42. def disable_events(self):
  43. return self._request("disable_events")
  44. def ping(self):
  45. return self._request("ping")
  46. def add_consumer(self, queue, exchange=None, exchange_type="direct",
  47. routing_key=None, **options):
  48. return self._request("add_consumer", queue=queue, exchange=exchange,
  49. exchange_type=exchange_type,
  50. routing_key=routing_key, **options)
  51. def cancel_consumer(self, queue, **kwargs):
  52. return self._request("cancel_consumer", queue=queue, **kwargs)
  53. class Control(object):
  54. Mailbox = Mailbox
  55. def __init__(self, app):
  56. self.app = app
  57. self.mailbox = self.Mailbox("celeryd", type="fanout")
  58. def inspect(self, destination=None, timeout=1, callback=None):
  59. return Inspect(self, destination=destination, timeout=timeout,
  60. callback=callback)
  61. def discard_all(self, connection=None, connect_timeout=None):
  62. """Discard all waiting tasks.
  63. This will ignore all tasks waiting for execution, and they will
  64. be deleted from the messaging server.
  65. :returns: the number of tasks discarded.
  66. """
  67. def _do_discard(connection=None, connect_timeout=None):
  68. consumer = self.app.amqp.get_task_consumer(connection=connection)
  69. try:
  70. return consumer.discard_all()
  71. finally:
  72. consumer.close()
  73. return self.app.with_default_connection(_do_discard)(
  74. connection=connection, connect_timeout=connect_timeout)
  75. def revoke(self, task_id, destination=None, **kwargs):
  76. """Revoke a task by id.
  77. If a task is revoked, the workers will ignore the task and
  78. not execute it after all.
  79. :param task_id: Id of the task to revoke.
  80. :keyword destination: If set, a list of the hosts to send the
  81. command to, when empty broadcast to all workers.
  82. :keyword connection: Custom broker connection to use, if not set,
  83. a connection will be established automatically.
  84. :keyword connect_timeout: Timeout for new connection if a custom
  85. connection is not provided.
  86. :keyword reply: Wait for and return the reply.
  87. :keyword timeout: Timeout in seconds to wait for the reply.
  88. :keyword limit: Limit number of replies.
  89. """
  90. return self.broadcast("revoke", destination=destination,
  91. arguments={"task_id": task_id}, **kwargs)
  92. def ping(self, destination=None, timeout=1, **kwargs):
  93. """Ping workers.
  94. Returns answer from alive workers.
  95. :keyword destination: If set, a list of the hosts to send the
  96. command to, when empty broadcast to all workers.
  97. :keyword connection: Custom broker connection to use, if not set,
  98. a connection will be established automatically.
  99. :keyword connect_timeout: Timeout for new connection if a custom
  100. connection is not provided.
  101. :keyword reply: Wait for and return the reply.
  102. :keyword timeout: Timeout in seconds to wait for the reply.
  103. :keyword limit: Limit number of replies.
  104. """
  105. return self.broadcast("ping", reply=True, destination=destination,
  106. timeout=timeout, **kwargs)
  107. def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
  108. """Set rate limit for task by type.
  109. :param task_name: Type of task to change rate limit for.
  110. :param rate_limit: The rate limit as tasks per second, or a rate limit
  111. string (`"100/m"`, etc.
  112. see :attr:`celery.task.base.Task.rate_limit` for
  113. more information).
  114. :keyword destination: If set, a list of the hosts to send the
  115. command to, when empty broadcast to all workers.
  116. :keyword connection: Custom broker connection to use, if not set,
  117. a connection will be established automatically.
  118. :keyword connect_timeout: Timeout for new connection if a custom
  119. connection is not provided.
  120. :keyword reply: Wait for and return the reply.
  121. :keyword timeout: Timeout in seconds to wait for the reply.
  122. :keyword limit: Limit number of replies.
  123. """
  124. return self.broadcast("rate_limit", destination=destination,
  125. arguments={"task_name": task_name,
  126. "rate_limit": rate_limit},
  127. **kwargs)
  128. def broadcast(self, command, arguments=None, destination=None,
  129. connection=None, connect_timeout=None, reply=False, timeout=1,
  130. limit=None, callback=None):
  131. """Broadcast a control command to the celery workers.
  132. :param command: Name of command to send.
  133. :param arguments: Keyword arguments for the command.
  134. :keyword destination: If set, a list of the hosts to send the
  135. command to, when empty broadcast to all workers.
  136. :keyword connection: Custom broker connection to use, if not set,
  137. a connection will be established automatically.
  138. :keyword connect_timeout: Timeout for new connection if a custom
  139. connection is not provided.
  140. :keyword reply: Wait for and return the reply.
  141. :keyword timeout: Timeout in seconds to wait for the reply.
  142. :keyword limit: Limit number of replies.
  143. :keyword callback: Callback called immediately for each reply
  144. received.
  145. """
  146. def _do_broadcast(connection=None, connect_timeout=None):
  147. return self.mailbox(connection)._broadcast(command, arguments,
  148. destination, reply,
  149. timeout, limit,
  150. callback)
  151. return self.app.with_default_connection(_do_broadcast)(
  152. connection=connection, connect_timeout=connect_timeout)
  153. _default_control = Control(app_or_default())
  154. broadcast = _default_control.broadcast
  155. rate_limit = _default_control.rate_limit
  156. ping = _default_control.ping
  157. revoke = _default_control.revoke
  158. discard_all = _default_control.discard_all
  159. inspect = _default_control.inspect