__init__.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from celery import log
  2. from celery.messaging import ControlReplyPublisher, with_connection
  3. from celery.utils import kwdict
  4. from celery.worker.control.registry import Panel
  5. from celery.worker.control import builtins
  6. class ControlDispatch(object):
  7. """Execute worker control panel commands."""
  8. Panel = Panel
  9. ReplyPublisher = ControlReplyPublisher
  10. def __init__(self, logger=None, hostname=None, listener=None):
  11. self.logger = logger or log.get_default_logger()
  12. self.hostname = hostname
  13. self.listener = listener
  14. self.panel = self.Panel(self.logger, self.listener, self.hostname)
  15. @with_connection
  16. def reply(self, data, exchange, routing_key, connection=None,
  17. connect_timeout=None):
  18. crq = self.ReplyPublisher(connection, exchange=exchange)
  19. try:
  20. crq.send(data, routing_key=routing_key)
  21. finally:
  22. crq.close()
  23. def dispatch_from_message(self, message):
  24. """Dispatch by using message data received by the broker.
  25. Example:
  26. >>> def receive_message(message_data, message):
  27. ... control = message_data.get("control")
  28. ... if control:
  29. ... ControlDispatch().dispatch_from_message(control)
  30. """
  31. message = dict(message) # don't modify callers message.
  32. command = message.pop("command")
  33. destination = message.pop("destination", None)
  34. reply_to = message.pop("reply_to", None)
  35. if not destination or self.hostname in destination:
  36. return self.execute(command, message, reply_to=reply_to)
  37. def execute(self, command, kwargs=None, reply_to=None):
  38. """Execute control command by name and keyword arguments.
  39. :param command: Name of the command to execute.
  40. :param kwargs: Keyword arguments.
  41. """
  42. kwargs = kwargs or {}
  43. control = None
  44. try:
  45. control = self.panel[command]
  46. except KeyError:
  47. self.logger.error("No such control command: %s" % command)
  48. else:
  49. try:
  50. reply = control(self.panel, **kwdict(kwargs))
  51. except Exception, exc:
  52. self.logger.error(
  53. "Error running control command %s kwargs=%s: %s" % (
  54. command, kwargs, exc))
  55. reply = {"error": str(exc)}
  56. if reply_to:
  57. self.reply({self.hostname: reply},
  58. exchange=reply_to["exchange"],
  59. routing_key=reply_to["routing_key"])
  60. return reply