control.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import socket
  2. from celery import log
  3. from celery.registry import tasks
  4. from celery.worker.revoke import revoked
  5. TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
  6. def expose(fun):
  7. """Expose method as a celery worker control command, allowed to be called
  8. from a message."""
  9. fun.exposed = True
  10. return fun
  11. class Control(object):
  12. """The worker control panel.
  13. :param logger: The current logger to use.
  14. """
  15. def __init__(self, logger, hostname=None, listener=None):
  16. assert listener is not None
  17. self.logger = logger
  18. self.hostname = hostname or socket.gethostname()
  19. self.listener = listener
  20. @expose
  21. def revoke(self, task_id, **kwargs):
  22. """Revoke task by task id."""
  23. revoked.add(task_id)
  24. self.logger.warn("Task %s revoked." % task_id)
  25. @expose
  26. def rate_limit(self, task_name, rate_limit, **kwargs):
  27. """Set new rate limit for a task type.
  28. See :attr:`celery.task.base.Task.rate_limit`.
  29. :param task_name: Type of task.
  30. :param rate_limit: New rate limit.
  31. """
  32. try:
  33. tasks[task_name].rate_limit = rate_limit
  34. except KeyError:
  35. return
  36. self.listener.ready_queue.refresh()
  37. if not rate_limit:
  38. self.logger.warn("Disabled rate limits for tasks of type %s" % (
  39. task_name))
  40. else:
  41. self.logger.warn("New rate limit for tasks of type %s: %s." % (
  42. task_name, rate_limit))
  43. @expose
  44. def shutdown(self, **kwargs):
  45. self.logger.critical("Got shutdown from remote.")
  46. raise SystemExit
  47. @expose
  48. def dump_tasks(self, **kwargs):
  49. from celery import registry
  50. def _extract_info(task):
  51. fields = dict((field, str(getattr(task, field, None)))
  52. for field in TASK_INFO_FIELDS
  53. if getattr(task, field, None) is not None)
  54. info = map("=".join, fields.items())
  55. if not info:
  56. return "\t%s" % task.name
  57. return "\t%s [%s]" % (task.name, " ".join(info))
  58. tasks = sorted(registry.tasks.keys())
  59. tasks = [registry.tasks[task] for task in tasks]
  60. self.logger.warn("* Dump of currently registered tasks:\n%s" % (
  61. "\n".join(map(_extract_info, tasks))))
  62. class ControlDispatch(object):
  63. """Execute worker control panel commands."""
  64. panel_cls = Control
  65. def __init__(self, logger=None, hostname=None, listener=None):
  66. self.logger = logger or log.get_default_logger()
  67. self.hostname = hostname
  68. self.listener = listener
  69. self.panel = self.panel_cls(self.logger,
  70. hostname=self.hostname,
  71. listener=self.listener)
  72. def dispatch_from_message(self, message):
  73. """Dispatch by using message data received by the broker.
  74. Example:
  75. >>> def receive_message(message_data, message):
  76. ... control = message_data.get("control")
  77. ... if control:
  78. ... ControlDispatch().dispatch_from_message(control)
  79. """
  80. message = dict(message) # don't modify callers message.
  81. command = message.pop("command")
  82. destination = message.pop("destination", None)
  83. if not destination or self.hostname in destination:
  84. return self.execute(command, message)
  85. def execute(self, command, kwargs=None):
  86. """Execute control command by name and keyword arguments.
  87. :param command: Name of the command to execute.
  88. :param kwargs: Keyword arguments.
  89. """
  90. kwargs = kwargs or {}
  91. control = None
  92. try:
  93. control = getattr(self.panel, command)
  94. except AttributeError:
  95. pass
  96. if control is None or not control.exposed:
  97. self.logger.error("No such control command: %s" % command)
  98. else:
  99. # need to make sure keyword arguments are not in unicode
  100. # this should be fixed in newer Python's
  101. # (see: http://bugs.python.org/issue4978)
  102. kwargs = dict((k.encode('utf8'), v)
  103. for (k, v) in kwargs.iteritems())
  104. return control(**kwargs)