purge.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. """The ``celery purge`` program, used to delete messages from queues."""
  2. from __future__ import absolute_import, unicode_literals
  3. from celery.bin.base import Command
  4. from celery.five import keys
  5. from celery.utils import text
  6. class purge(Command):
  7. """Erase all messages from all known task queues.
  8. Warning:
  9. There's no undo operation for this command.
  10. """
  11. warn_prelude = (
  12. '{warning}: This will remove all tasks from {queues}: {names}.\n'
  13. ' There is no undo for this operation!\n\n'
  14. '(to skip this prompt use the -f option)\n'
  15. )
  16. warn_prompt = 'Are you sure you want to delete all tasks'
  17. fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
  18. fmt_empty = 'No messages purged from {qnum} {queues}'
  19. def add_arguments(self, parser):
  20. group = parser.add_argument_group('Purging Options')
  21. group.add_argument(
  22. '--force', '-f', action='store_true', default=False,
  23. help="Don't prompt for verification",
  24. )
  25. group.add_argument(
  26. '--queues', '-Q', default=[],
  27. help='Comma separated list of queue names to purge.',
  28. )
  29. group.add_argument(
  30. '--exclude-queues', '-X', default=[],
  31. help='Comma separated list of queues names not to purge.',
  32. )
  33. def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
  34. queues = set(text.str_to_list(queues or []))
  35. exclude = set(text.str_to_list(exclude_queues or []))
  36. names = (queues or set(keys(self.app.amqp.queues))) - exclude
  37. qnum = len(names)
  38. messages = None
  39. if names:
  40. if not force:
  41. self.out(self.warn_prelude.format(
  42. warning=self.colored.red('WARNING'),
  43. queues=text.pluralize(qnum, 'queue'),
  44. names=', '.join(sorted(names)),
  45. ))
  46. if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
  47. return
  48. with self.app.connection_for_write() as conn:
  49. messages = sum(self._purge(conn, queue) for queue in names)
  50. fmt = self.fmt_purged if messages else self.fmt_empty
  51. self.out(fmt.format(
  52. mnum=messages, qnum=qnum,
  53. messages=text.pluralize(messages, 'message'),
  54. queues=text.pluralize(qnum, 'queue')))
  55. def _purge(self, conn, queue):
  56. try:
  57. return conn.default_channel.queue_purge(queue) or 0
  58. except conn.channel_errors:
  59. return 0