logtool.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. # -*- coding: utf-8 -*-
  2. """The :program:`celery logtool` command.
  3. .. program:: celery logtool
  4. """
  5. from __future__ import absolute_import, unicode_literals
  6. import re
  7. from collections import Counter
  8. from fileinput import FileInput
  9. from .base import Command
  10. __all__ = ['logtool']
  11. RE_LOG_START = re.compile(r'^\[\d\d\d\d\-\d\d-\d\d ')
  12. RE_TASK_RECEIVED = re.compile(r'.+?\] Received')
  13. RE_TASK_READY = re.compile(r'.+?\] Task')
  14. RE_TASK_INFO = re.compile(r'.+?([\w\.]+)\[(.+?)\].+')
  15. RE_TASK_RESULT = re.compile(r'.+?[\w\.]+\[.+?\] (.+)')
  16. REPORT_FORMAT = """
  17. Report
  18. ======
  19. Task total: {task[total]}
  20. Task errors: {task[errors]}
  21. Task success: {task[succeeded]}
  22. Task completed: {task[completed]}
  23. Tasks
  24. =====
  25. {task[types].format}
  26. """
  27. class _task_counts(list):
  28. @property
  29. def format(self):
  30. return '\n'.join('{0}: {1}'.format(*i) for i in self)
  31. def task_info(line):
  32. m = RE_TASK_INFO.match(line)
  33. return m.groups()
  34. class Audit(object):
  35. def __init__(self, on_task_error=None, on_trace=None, on_debug=None):
  36. self.ids = set()
  37. self.names = {}
  38. self.results = {}
  39. self.ready = set()
  40. self.task_types = Counter()
  41. self.task_errors = 0
  42. self.on_task_error = on_task_error
  43. self.on_trace = on_trace
  44. self.on_debug = on_debug
  45. self.prev_line = None
  46. def run(self, files):
  47. for line in FileInput(files):
  48. self.feed(line)
  49. return self
  50. def task_received(self, line, task_name, task_id):
  51. self.names[task_id] = task_name
  52. self.ids.add(task_id)
  53. self.task_types[task_name] += 1
  54. def task_ready(self, line, task_name, task_id, result):
  55. self.ready.add(task_id)
  56. self.results[task_id] = result
  57. if 'succeeded' not in result:
  58. self.task_error(line, task_name, task_id, result)
  59. def task_error(self, line, task_name, task_id, result):
  60. self.task_errors += 1
  61. if self.on_task_error:
  62. self.on_task_error(line, task_name, task_id, result)
  63. def feed(self, line):
  64. if RE_LOG_START.match(line):
  65. if RE_TASK_RECEIVED.match(line):
  66. task_name, task_id = task_info(line)
  67. self.task_received(line, task_name, task_id)
  68. elif RE_TASK_READY.match(line):
  69. task_name, task_id = task_info(line)
  70. result = RE_TASK_RESULT.match(line)
  71. if result:
  72. result, = result.groups()
  73. self.task_ready(line, task_name, task_id, result)
  74. else:
  75. if self.on_debug:
  76. self.on_debug(line)
  77. self.prev_line = line
  78. else:
  79. if self.on_trace:
  80. self.on_trace('\n'.join(filter(None, [self.prev_line, line])))
  81. self.prev_line = None
  82. def incomplete_tasks(self):
  83. return self.ids ^ self.ready
  84. def report(self):
  85. return {
  86. 'task': {
  87. 'types': _task_counts(self.task_types.most_common()),
  88. 'total': len(self.ids),
  89. 'errors': self.task_errors,
  90. 'completed': len(self.ready),
  91. 'succeeded': len(self.ready) - self.task_errors,
  92. }
  93. }
  94. class logtool(Command):
  95. """The ``celery logtool`` command."""
  96. args = """<action> [arguments]
  97. ..... stats [file1|- [file2 [...]]]
  98. ..... traces [file1|- [file2 [...]]]
  99. ..... errors [file1|- [file2 [...]]]
  100. ..... incomplete [file1|- [file2 [...]]]
  101. ..... debug [file1|- [file2 [...]]]
  102. """
  103. def run(self, what=None, *files, **kwargs):
  104. map = {
  105. 'stats': self.stats,
  106. 'traces': self.traces,
  107. 'errors': self.errors,
  108. 'incomplete': self.incomplete,
  109. 'debug': self.debug,
  110. }
  111. if not what:
  112. raise self.UsageError('missing action')
  113. elif what not in map:
  114. raise self.Error(
  115. 'action {0} not in {1}'.format(what, '|'.join(map)),
  116. )
  117. return map[what](files)
  118. def stats(self, files):
  119. self.out(REPORT_FORMAT.format(
  120. **Audit().run(files).report()
  121. ))
  122. def traces(self, files):
  123. Audit(on_trace=self.out).run(files)
  124. def errors(self, files):
  125. Audit(on_task_error=self.say1).run(files)
  126. def incomplete(self, files):
  127. audit = Audit()
  128. audit.run(files)
  129. for task_id in audit.incomplete_tasks():
  130. self.error('Did not complete: %r' % (task_id,))
  131. def debug(self, files):
  132. Audit(on_debug=self.out).run(files)
  133. def say1(self, line, *_):
  134. self.out(line)