control.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. from celery import conf
  2. from celery.utils import gen_unique_id
  3. from celery.messaging import BroadcastPublisher, ControlReplyConsumer
  4. from celery.messaging import with_connection, get_consumer_set
  5. @with_connection
  6. def discard_all(connection=None,
  7. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  8. """Discard all waiting tasks.
  9. This will ignore all tasks waiting for execution, and they will
  10. be deleted from the messaging server.
  11. :returns: the number of tasks discarded.
  12. """
  13. consumers = get_consumer_set(connection=connection)
  14. try:
  15. return consumers.discard_all()
  16. finally:
  17. consumers.close()
  18. def revoke(task_id, destination=None, **kwargs):
  19. """Revoke a task by id.
  20. If a task is revoked, the workers will ignore the task and not execute
  21. it after all.
  22. :param task_id: Id of the task to revoke.
  23. :keyword destination: If set, a list of the hosts to send the command to,
  24. when empty broadcast to all workers.
  25. :keyword connection: Custom broker connection to use, if not set,
  26. a connection will be established automatically.
  27. :keyword connect_timeout: Timeout for new connection if a custom
  28. connection is not provided.
  29. :keyword reply: Wait for and return the reply.
  30. :keyword timeout: Timeout in seconds to wait for the reply.
  31. :keyword limit: Limit number of replies.
  32. """
  33. return broadcast("revoke", destination=destination,
  34. arguments={"task_id": task_id}, **kwargs)
  35. def ping(destination=None, timeout=1, **kwargs):
  36. """Ping workers.
  37. Returns answer from alive workers.
  38. :keyword destination: If set, a list of the hosts to send the command to,
  39. when empty broadcast to all workers.
  40. :keyword connection: Custom broker connection to use, if not set,
  41. a connection will be established automatically.
  42. :keyword connect_timeout: Timeout for new connection if a custom
  43. connection is not provided.
  44. :keyword reply: Wait for and return the reply.
  45. :keyword timeout: Timeout in seconds to wait for the reply.
  46. :keyword limit: Limit number of replies.
  47. """
  48. return broadcast("ping", reply=True, destination=destination,
  49. timeout=timeout, **kwargs)
  50. def rate_limit(task_name, rate_limit, destination=None, **kwargs):
  51. """Set rate limit for task by type.
  52. :param task_name: Type of task to change rate limit for.
  53. :param rate_limit: The rate limit as tasks per second, or a rate limit
  54. string (``"100/m"``, etc. see :attr:`celery.task.base.Task.rate_limit`
  55. for more information).
  56. :keyword destination: If set, a list of the hosts to send the command to,
  57. when empty broadcast to all workers.
  58. :keyword connection: Custom broker connection to use, if not set,
  59. a connection will be established automatically.
  60. :keyword connect_timeout: Timeout for new connection if a custom
  61. connection is not provided.
  62. :keyword reply: Wait for and return the reply.
  63. :keyword timeout: Timeout in seconds to wait for the reply.
  64. :keyword limit: Limit number of replies.
  65. """
  66. return broadcast("rate_limit", destination=destination,
  67. arguments={"task_name": task_name,
  68. "rate_limit": rate_limit},
  69. **kwargs)
  70. def flatten_reply(reply):
  71. nodes = {}
  72. for item in reply:
  73. nodes.update(item)
  74. return nodes
  75. class inspect(object):
  76. def __init__(self, destination=None, timeout=1):
  77. self.destination = destination
  78. self.timeout = timeout
  79. def _prepare(self, reply):
  80. if not reply:
  81. return
  82. by_node = flatten_reply(reply)
  83. if self.destination and \
  84. not isinstance(self.destination, (list, tuple)):
  85. return by_node.get(self.destination)
  86. return by_node
  87. def _request(self, command, **kwargs):
  88. return self._prepare(broadcast(command, arguments=kwargs,
  89. timeout=self.timeout, reply=True))
  90. def active(self, safe=False):
  91. return self._request("dump_active", safe=safe)
  92. def scheduled(self, safe=False):
  93. return self._request("dump_schedule", safe=safe)
  94. def reserved(self, safe=False):
  95. return self._request("dump_reserved", safe=safe)
  96. def stats(self):
  97. return self._request("stats")
  98. def revoked(self):
  99. return self._request("dump_revoked")
  100. def registered_tasks(self):
  101. return self._request("dump_tasks")
  102. def enable_events(self):
  103. return self._request("enable_events")
  104. def disable_events(self):
  105. return self._request("disable_events")
  106. @with_connection
  107. def broadcast(command, arguments=None, destination=None, connection=None,
  108. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT, reply=False,
  109. timeout=1, limit=None):
  110. """Broadcast a control command to the celery workers.
  111. :param command: Name of command to send.
  112. :param arguments: Keyword arguments for the command.
  113. :keyword destination: If set, a list of the hosts to send the command to,
  114. when empty broadcast to all workers.
  115. :keyword connection: Custom broker connection to use, if not set,
  116. a connection will be established automatically.
  117. :keyword connect_timeout: Timeout for new connection if a custom
  118. connection is not provided.
  119. :keyword reply: Wait for and return the reply.
  120. :keyword timeout: Timeout in seconds to wait for the reply.
  121. :keyword limit: Limit number of replies.
  122. """
  123. arguments = arguments or {}
  124. reply_ticket = reply and gen_unique_id() or None
  125. if destination is not None and not isinstance(destination, (list, tuple)):
  126. raise ValueError("destination must be a list/tuple not %s" % (
  127. type(destination)))
  128. # Set reply limit to number of destinations (if specificed)
  129. if limit is None and destination:
  130. limit = destination and len(destination) or None
  131. broadcast = BroadcastPublisher(connection)
  132. try:
  133. broadcast.send(command, arguments, destination=destination,
  134. reply_ticket=reply_ticket)
  135. finally:
  136. broadcast.close()
  137. if reply_ticket:
  138. crq = ControlReplyConsumer(connection, reply_ticket)
  139. try:
  140. return crq.collect(limit=limit, timeout=timeout)
  141. finally:
  142. crq.close()