control.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from celery import conf
  2. from celery.messaging import TaskConsumer, BroadcastPublisher, with_connection
  3. @with_connection
  4. def discard_all(connection=None,
  5. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  6. """Discard all waiting tasks.
  7. This will ignore all tasks waiting for execution, and they will
  8. be deleted from the messaging server.
  9. :returns: the number of tasks discarded.
  10. """
  11. consumer = TaskConsumer(connection=connection)
  12. try:
  13. return consumer.discard_all()
  14. finally:
  15. consumer.close()
  16. def revoke(task_id, destination=None, connection=None,
  17. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  18. """Revoke a task by id.
  19. If a task is revoked, the workers will ignore the task and not execute
  20. it after all.
  21. :param task_id: Id of the task to revoke.
  22. :keyword destination: If set, a list of the hosts to send the command to,
  23. when empty broadcast to all workers.
  24. :keyword connection: Custom broker connection to use, if not set,
  25. a connection will be established automatically.
  26. :keyword connect_timeout: Timeout for new connection if a custom
  27. connection is not provided.
  28. """
  29. return broadcast("revoke", destination=destination,
  30. arguments={"task_id": task_id})
  31. def rate_limit(task_name, rate_limit, destination=None, connection=None,
  32. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  33. """Set rate limit for task by type.
  34. :param task_name: Type of task to change rate limit for.
  35. :param rate_limit: The rate limit as tasks per second, or a rate limit
  36. string (``"100/m"``, etc. see :attr:`celery.task.base.Task.rate_limit`
  37. for more information).
  38. :keyword destination: If set, a list of the hosts to send the command to,
  39. when empty broadcast to all workers.
  40. :keyword connection: Custom broker connection to use, if not set,
  41. a connection will be established automatically.
  42. :keyword connect_timeout: Timeout for new connection if a custom
  43. connection is not provided.
  44. """
  45. return broadcast("rate_limit", destination=destination,
  46. arguments={"task_name": task_name,
  47. "rate_limit": rate_limit})
  48. @with_connection
  49. def broadcast(command, arguments=None, destination=None, connection=None,
  50. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  51. """Broadcast a control command to the celery workers.
  52. :param command: Name of command to send.
  53. :param arguments: Keyword arguments for the command.
  54. :keyword destination: If set, a list of the hosts to send the command to,
  55. when empty broadcast to all workers.
  56. :keyword connection: Custom broker connection to use, if not set,
  57. a connection will be established automatically.
  58. :keyword connect_timeout: Timeout for new connection if a custom
  59. connection is not provided.
  60. """
  61. arguments = arguments or {}
  62. broadcast = BroadcastPublisher(connection)
  63. try:
  64. broadcast.send(command, arguments, destination=destination)
  65. finally:
  66. broadcast.close()