control.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. from celery.app import default_app
  2. from celery.pidbox import BroadcastPublisher, ControlReplyConsumer
  3. from celery.utils import gen_unique_id
  4. def flatten_reply(reply):
  5. nodes = {}
  6. for item in reply:
  7. nodes.update(item)
  8. return nodes
  9. class Inspect(object):
  10. def __init__(self, control, destination=None, timeout=1, callback=None,):
  11. self.destination = destination
  12. self.timeout = timeout
  13. self.callback = callback
  14. self.control = control
  15. def _prepare(self, reply):
  16. if not reply:
  17. return
  18. by_node = flatten_reply(reply)
  19. if self.destination and \
  20. not isinstance(self.destination, (list, tuple)):
  21. return by_node.get(self.destination)
  22. return by_node
  23. def _request(self, command, **kwargs):
  24. return self._prepare(self.control.broadcast(command,
  25. arguments=kwargs,
  26. destination=self.destination,
  27. callback=self.callback,
  28. timeout=self.timeout, reply=True))
  29. def active(self, safe=False):
  30. return self._request("dump_active", safe=safe)
  31. def scheduled(self, safe=False):
  32. return self._request("dump_schedule", safe=safe)
  33. def reserved(self, safe=False):
  34. return self._request("dump_reserved", safe=safe)
  35. def stats(self):
  36. return self._request("stats")
  37. def revoked(self):
  38. return self._request("dump_revoked")
  39. def registered_tasks(self):
  40. return self._request("dump_tasks")
  41. def enable_events(self):
  42. return self._request("enable_events")
  43. def disable_events(self):
  44. return self._request("disable_events")
  45. def diagnose(self):
  46. diagnose_timeout = self.timeout * 0.85 # 15% of timeout
  47. return self._request("diagnose", timeout=diagnose_timeout)
  48. def ping(self):
  49. return self._request("ping")
  50. class Control(object):
  51. def __init__(self, app):
  52. self.app = app
  53. def inspect(self, destination=None, timeout=1, callback=None):
  54. return Inspect(self, destination=destination, timeout=timeout,
  55. callback=callback)
  56. def discard_all(self, connection=None, connect_timeout=None):
  57. """Discard all waiting tasks.
  58. This will ignore all tasks waiting for execution, and they will
  59. be deleted from the messaging server.
  60. :returns: the number of tasks discarded.
  61. """
  62. def _do_discard(connection=None, connect_timeout=None):
  63. consumers = self.app.amqp.get_consumer_set(connection=connection)
  64. try:
  65. return consumers.discard_all()
  66. finally:
  67. consumers.close()
  68. return self.app.with_default_connection(_do_discard)(
  69. connection=connection, connect_timeout=connect_timeout)
  70. def revoke(self, task_id, destination=None, **kwargs):
  71. """Revoke a task by id.
  72. If a task is revoked, the workers will ignore the task and
  73. not execute it after all.
  74. :param task_id: Id of the task to revoke.
  75. :keyword destination: If set, a list of the hosts to send the
  76. command to, when empty broadcast to all workers.
  77. :keyword connection: Custom broker connection to use, if not set,
  78. a connection will be established automatically.
  79. :keyword connect_timeout: Timeout for new connection if a custom
  80. connection is not provided.
  81. :keyword reply: Wait for and return the reply.
  82. :keyword timeout: Timeout in seconds to wait for the reply.
  83. :keyword limit: Limit number of replies.
  84. """
  85. return self.broadcast("revoke", destination=destination,
  86. arguments={"task_id": task_id}, **kwargs)
  87. def ping(self, destination=None, timeout=1, **kwargs):
  88. """Ping workers.
  89. Returns answer from alive workers.
  90. :keyword destination: If set, a list of the hosts to send the
  91. command to, when empty broadcast to all workers.
  92. :keyword connection: Custom broker connection to use, if not set,
  93. a connection will be established automatically.
  94. :keyword connect_timeout: Timeout for new connection if a custom
  95. connection is not provided.
  96. :keyword reply: Wait for and return the reply.
  97. :keyword timeout: Timeout in seconds to wait for the reply.
  98. :keyword limit: Limit number of replies.
  99. """
  100. return self.broadcast("ping", reply=True, destination=destination,
  101. timeout=timeout, **kwargs)
  102. def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
  103. """Set rate limit for task by type.
  104. :param task_name: Type of task to change rate limit for.
  105. :param rate_limit: The rate limit as tasks per second, or a rate limit
  106. string (``"100/m"``, etc.
  107. see :attr:`celery.task.base.Task.rate_limit` for
  108. more information).
  109. :keyword destination: If set, a list of the hosts to send the
  110. command to, when empty broadcast to all workers.
  111. :keyword connection: Custom broker connection to use, if not set,
  112. a connection will be established automatically.
  113. :keyword connect_timeout: Timeout for new connection if a custom
  114. connection is not provided.
  115. :keyword reply: Wait for and return the reply.
  116. :keyword timeout: Timeout in seconds to wait for the reply.
  117. :keyword limit: Limit number of replies.
  118. """
  119. return self.broadcast("rate_limit", destination=destination,
  120. arguments={"task_name": task_name,
  121. "rate_limit": rate_limit},
  122. **kwargs)
  123. def broadcast(self, command, arguments=None, destination=None,
  124. connection=None, connect_timeout=None, reply=False, timeout=1,
  125. limit=None, callback=None):
  126. """Broadcast a control command to the celery workers.
  127. :param command: Name of command to send.
  128. :param arguments: Keyword arguments for the command.
  129. :keyword destination: If set, a list of the hosts to send the
  130. command to, when empty broadcast to all workers.
  131. :keyword connection: Custom broker connection to use, if not set,
  132. a connection will be established automatically.
  133. :keyword connect_timeout: Timeout for new connection if a custom
  134. connection is not provided.
  135. :keyword reply: Wait for and return the reply.
  136. :keyword timeout: Timeout in seconds to wait for the reply.
  137. :keyword limit: Limit number of replies.
  138. :keyword callback: Callback called immediately for each reply
  139. received.
  140. """
  141. arguments = arguments or {}
  142. reply_ticket = reply and gen_unique_id() or None
  143. if destination is not None and \
  144. not isinstance(destination, (list, tuple)):
  145. raise ValueError("destination must be a list/tuple not %s" % (
  146. type(destination)))
  147. # Set reply limit to number of destinations (if specificed)
  148. if limit is None and destination:
  149. limit = destination and len(destination) or None
  150. def _do_broadcast(connection=None, connect_timeout=None):
  151. broadcaster = BroadcastPublisher(connection, app=self.app)
  152. try:
  153. broadcaster.send(command, arguments, destination=destination,
  154. reply_ticket=reply_ticket)
  155. finally:
  156. broadcaster.close()
  157. if reply_ticket:
  158. crq = ControlReplyConsumer(connection, reply_ticket)
  159. try:
  160. return crq.collect(limit=limit, timeout=timeout,
  161. callback=callback)
  162. finally:
  163. crq.close()
  164. return self.app.with_default_connection(_do_broadcast)(
  165. connection=connection, connect_timeout=connect_timeout)
  166. _default_control = Control(default_app)
  167. broadcast = _default_control.broadcast
  168. rate_limit = _default_control.rate_limit
  169. ping = _default_control.ping
  170. revoke = _default_control.revoke
  171. discard_all = _default_control.discard_all
  172. inspect = _default_control.inspect