list.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. """The ``celery list bindings`` command, used to inspect queue bindings."""
  2. from __future__ import absolute_import, unicode_literals
  3. from celery.bin.base import Command
  4. class list_(Command):
  5. """Get info from broker.
  6. Note:
  7. For RabbitMQ the management plugin is required.
  8. Example:
  9. .. code-block:: console
  10. $ celery list bindings
  11. """
  12. args = '[bindings]'
  13. def list_bindings(self, management):
  14. try:
  15. bindings = management.get_bindings()
  16. except NotImplementedError:
  17. raise self.Error('Your transport cannot list bindings.')
  18. def fmt(q, e, r):
  19. return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
  20. fmt('Queue', 'Exchange', 'Routing Key')
  21. fmt('-' * 16, '-' * 16, '-' * 16)
  22. for b in bindings:
  23. fmt(b['destination'], b['source'], b['routing_key'])
  24. def run(self, what=None, *_, **kw):
  25. topics = {'bindings': self.list_bindings}
  26. available = ', '.join(topics)
  27. if not what:
  28. raise self.UsageError(
  29. 'Missing argument, specify one of: {0}'.format(available))
  30. if what not in topics:
  31. raise self.UsageError(
  32. 'unknown topic {0!r} (choose one of: {1})'.format(
  33. what, available))
  34. with self.app.connection() as conn:
  35. self.app.amqp.TaskConsumer(conn).declare()
  36. topics[what](conn.manager)