logtool.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery logtool` command.
  4. .. program:: celery logtool
  5. """
  6. from __future__ import absolute_import, unicode_literals
  7. import re
  8. from collections import Counter
  9. from fileinput import FileInput
  10. from .base import Command
  11. __all__ = ['logtool']
  12. RE_LOG_START = re.compile('^\[\d\d\d\d\-\d\d-\d\d ')
  13. RE_TASK_RECEIVED = re.compile('.+?\] Received')
  14. RE_TASK_READY = re.compile('.+?\] Task')
  15. RE_TASK_INFO = re.compile('.+?([\w\.]+)\[(.+?)\].+')
  16. RE_TASK_RESULT = re.compile('.+?[\w\.]+\[.+?\] (.+)')
  17. REPORT_FORMAT = """
  18. Report
  19. ======
  20. Task total: {task[total]}
  21. Task errors: {task[errors]}
  22. Task success: {task[succeeded]}
  23. Task completed: {task[completed]}
  24. Tasks
  25. =====
  26. {task[types].format}
  27. """
  28. class _task_counts(list):
  29. @property
  30. def format(self):
  31. return '\n'.join('{0}: {1}'.format(*i) for i in self)
  32. def task_info(line):
  33. m = RE_TASK_INFO.match(line)
  34. return m.groups()
  35. class Audit(object):
  36. def __init__(self, on_task_error=None, on_trace=None, on_debug=None):
  37. self.ids = set()
  38. self.names = {}
  39. self.results = {}
  40. self.ready = set()
  41. self.task_types = Counter()
  42. self.task_errors = 0
  43. self.on_task_error = on_task_error
  44. self.on_trace = on_trace
  45. self.on_debug = on_debug
  46. self.prev_line = None
  47. def run(self, files):
  48. for line in FileInput(files):
  49. self.feed(line)
  50. return self
  51. def task_received(self, line, task_name, task_id):
  52. self.names[task_id] = task_name
  53. self.ids.add(task_id)
  54. self.task_types[task_name] += 1
  55. def task_ready(self, line, task_name, task_id, result):
  56. self.ready.add(task_id)
  57. self.results[task_id] = result
  58. if 'succeeded' not in result:
  59. self.task_error(line, task_name, task_id, result)
  60. def task_error(self, line, task_name, task_id, result):
  61. self.task_errors += 1
  62. if self.on_task_error:
  63. self.on_task_error(line, task_name, task_id, result)
  64. def feed(self, line):
  65. if RE_LOG_START.match(line):
  66. if RE_TASK_RECEIVED.match(line):
  67. task_name, task_id = task_info(line)
  68. self.task_received(line, task_name, task_id)
  69. elif RE_TASK_READY.match(line):
  70. task_name, task_id = task_info(line)
  71. result = RE_TASK_RESULT.match(line)
  72. if result:
  73. result, = result.groups()
  74. self.task_ready(line, task_name, task_id, result)
  75. else:
  76. if self.on_debug:
  77. self.on_debug(line)
  78. self.prev_line = line
  79. else:
  80. if self.on_trace:
  81. self.on_trace('\n'.join(filter(None, [self.prev_line, line])))
  82. self.prev_line = None
  83. def incomplete_tasks(self):
  84. return self.ids ^ self.ready
  85. def report(self):
  86. return {
  87. 'task': {
  88. 'types': _task_counts(self.task_types.most_common()),
  89. 'total': len(self.ids),
  90. 'errors': self.task_errors,
  91. 'completed': len(self.ready),
  92. 'succeeded': len(self.ready) - self.task_errors,
  93. }
  94. }
  95. class logtool(Command):
  96. args = """<action> [arguments]
  97. ..... stats [file1|- [file2 [...]]]
  98. ..... traces [file1|- [file2 [...]]]
  99. ..... errors [file1|- [file2 [...]]]
  100. ..... incomplete [file1|- [file2 [...]]]
  101. ..... debug [file1|- [file2 [...]]]
  102. """
  103. def run(self, what=None, *files, **kwargs):
  104. map = {
  105. 'stats': self.stats,
  106. 'traces': self.traces,
  107. 'errors': self.errors,
  108. 'incomplete': self.incomplete,
  109. 'debug': self.debug,
  110. }
  111. if not what:
  112. raise self.UsageError('missing action')
  113. elif what not in map:
  114. raise self.Error(
  115. 'action {0} not in {1}'.format(what, '|'.join(map)),
  116. )
  117. return map[what](files)
  118. def stats(self, files):
  119. self.out(REPORT_FORMAT.format(
  120. **Audit().run(files).report()
  121. ))
  122. def traces(self, files):
  123. Audit(on_trace=self.out).run(files)
  124. def errors(self, files):
  125. Audit(on_task_error=self.say1).run(files)
  126. def incomplete(self, files):
  127. audit = Audit()
  128. audit.run(files)
  129. for task_id in audit.incomplete_tasks():
  130. self.error('Did not complete: %r' % (task_id,))
  131. def debug(self, files):
  132. Audit(on_debug=self.out).run(files)
  133. def say1(self, line, *_):
  134. self.out(line)