control.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from celery import conf
  2. from celery.messaging import BroadcastPublisher
  3. from celery.messaging import with_connection, get_consumer_set
  4. @with_connection
  5. def discard_all(connection=None,
  6. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  7. """Discard all waiting tasks.
  8. This will ignore all tasks waiting for execution, and they will
  9. be deleted from the messaging server.
  10. :returns: the number of tasks discarded.
  11. """
  12. consumers = get_consumer_set(connection=connection)
  13. try:
  14. return consumers.discard_all()
  15. finally:
  16. consumers.close()
  17. def revoke(task_id, destination=None, connection=None,
  18. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  19. """Revoke a task by id.
  20. If a task is revoked, the workers will ignore the task and not execute
  21. it after all.
  22. :param task_id: Id of the task to revoke.
  23. :keyword destination: If set, a list of the hosts to send the command to,
  24. when empty broadcast to all workers.
  25. :keyword connection: Custom broker connection to use, if not set,
  26. a connection will be established automatically.
  27. :keyword connect_timeout: Timeout for new connection if a custom
  28. connection is not provided.
  29. """
  30. return broadcast("revoke", destination=destination,
  31. arguments={"task_id": task_id})
  32. def rate_limit(task_name, rate_limit, destination=None, connection=None,
  33. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  34. """Set rate limit for task by type.
  35. :param task_name: Type of task to change rate limit for.
  36. :param rate_limit: The rate limit as tasks per second, or a rate limit
  37. string (``"100/m"``, etc. see :attr:`celery.task.base.Task.rate_limit`
  38. for more information).
  39. :keyword destination: If set, a list of the hosts to send the command to,
  40. when empty broadcast to all workers.
  41. :keyword connection: Custom broker connection to use, if not set,
  42. a connection will be established automatically.
  43. :keyword connect_timeout: Timeout for new connection if a custom
  44. connection is not provided.
  45. """
  46. return broadcast("rate_limit", destination=destination,
  47. arguments={"task_name": task_name,
  48. "rate_limit": rate_limit})
  49. @with_connection
  50. def broadcast(command, arguments=None, destination=None, connection=None,
  51. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  52. """Broadcast a control command to the celery workers.
  53. :param command: Name of command to send.
  54. :param arguments: Keyword arguments for the command.
  55. :keyword destination: If set, a list of the hosts to send the command to,
  56. when empty broadcast to all workers.
  57. :keyword connection: Custom broker connection to use, if not set,
  58. a connection will be established automatically.
  59. :keyword connect_timeout: Timeout for new connection if a custom
  60. connection is not provided.
  61. """
  62. arguments = arguments or {}
  63. broadcast = BroadcastPublisher(connection)
  64. try:
  65. broadcast.send(command, arguments, destination=destination)
  66. finally:
  67. broadcast.close()