control.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import socket
  2. from celery import log
  3. from celery.registry import tasks
  4. from celery.worker.revoke import revoked
  5. TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
  6. def expose(fun):
  7. """Expose method as a celery worker control command, allowed to be called
  8. from a message."""
  9. fun.exposed = True
  10. return fun
  11. class Control(object):
  12. """The worker control panel.
  13. :param logger: The current logger to use.
  14. """
  15. def __init__(self, logger, hostname=None):
  16. self.logger = logger
  17. self.hostname = hostname or socket.gethostname()
  18. @expose
  19. def revoke(self, task_id, **kwargs):
  20. """Revoke task by task id."""
  21. revoked.add(task_id)
  22. self.logger.warn("Task %s revoked." % task_id)
  23. @expose
  24. def rate_limit(self, task_name, rate_limit, **kwargs):
  25. """Set new rate limit for a task type.
  26. See :attr:`celery.task.base.Task.rate_limit`.
  27. :param task_name: Type of task.
  28. :param rate_limit: New rate limit.
  29. """
  30. try:
  31. tasks[task_name].rate_limit = rate_limit
  32. except KeyError:
  33. return
  34. if not rate_limit:
  35. self.logger.warn("Disabled rate limits for tasks of type %s" % (
  36. task_name))
  37. else:
  38. self.logger.warn("New rate limit for tasks of type %s: %s." % (
  39. task_name, rate_limit))
  40. @expose
  41. def shutdown(self, **kwargs):
  42. self.logger.critical("Got shutdown from remote.")
  43. raise SystemExit
  44. @expose
  45. def dump_tasks(self, **kwargs):
  46. from celery import registry
  47. def _extract_info(task):
  48. fields = dict((field, str(getattr(task, field, None)))
  49. for field in TASK_INFO_FIELDS
  50. if getattr(task, field, None) is not None)
  51. info = map("=".join, fields.items())
  52. if not info:
  53. return "\t%s" % task.name
  54. return "\t%s [%s]" % (task.name, " ".join(info))
  55. tasks = sorted(registry.tasks.keys())
  56. tasks = [registry.tasks[task] for task in tasks]
  57. self.logger.warn("* Dump of currently registered tasks:\n%s" % (
  58. "\n".join(map(_extract_info, tasks))))
  59. class ControlDispatch(object):
  60. """Execute worker control panel commands."""
  61. panel_cls = Control
  62. def __init__(self, logger=None, hostname=None):
  63. self.logger = logger or log.get_default_logger()
  64. self.hostname = hostname
  65. self.panel = self.panel_cls(self.logger, hostname=self.hostname)
  66. def dispatch_from_message(self, message):
  67. """Dispatch by using message data received by the broker.
  68. Example:
  69. >>> def receive_message(message_data, message):
  70. ... control = message_data.get("control")
  71. ... if control:
  72. ... ControlDispatch().dispatch_from_message(control)
  73. """
  74. message = dict(message) # don't modify callers message.
  75. command = message.pop("command")
  76. destination = message.pop("destination", None)
  77. if not destination or self.hostname in destination:
  78. return self.execute(command, message)
  79. def execute(self, command, kwargs=None):
  80. """Execute control command by name and keyword arguments.
  81. :param command: Name of the command to execute.
  82. :param kwargs: Keyword arguments.
  83. """
  84. kwargs = kwargs or {}
  85. control = None
  86. try:
  87. control = getattr(self.panel, command)
  88. except AttributeError:
  89. pass
  90. if control is None or not control.exposed:
  91. self.logger.error("No such control command: %s" % command)
  92. else:
  93. # need to make sure keyword arguments are not in unicode
  94. # this should be fixed in newer Python's
  95. # (see: http://bugs.python.org/issue4978)
  96. kwargs = dict((k.encode('utf8'), v)
  97. for (k, v) in kwargs.iteritems())
  98. return control(**kwargs)