1 # SPDX-License-Identifier: GPL-2.0
3 # Parses test results from a kernel dmesg log.
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import Iterable, Iterator, List, Optional, Tuple
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
19 class TestSuite(object):
20 def __init__(self) -> None:
21 self.status = TestStatus.SUCCESS
23 self.cases = [] # type: List[TestCase]
25 def __str__(self) -> str:
26 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
28 def __repr__(self) -> str:
31 class TestCase(object):
32 def __init__(self) -> None:
33 self.status = TestStatus.SUCCESS
35 self.log = [] # type: List[str]
37 def __str__(self) -> str:
38 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
40 def __repr__(self) -> str:
43 class TestStatus(Enum):
48 FAILURE_TO_PARSE_TESTS = auto()
50 kunit_start_re = re.compile(r'TAP version [0-9]+$')
51 kunit_end_re = re.compile('(List of all partitions:|'
52 'Kernel panic - not syncing: VFS:)')
54 def isolate_kunit_output(kernel_output) -> Iterator[str]:
56 for line in kernel_output:
57 line = line.rstrip() # line always has a trailing \n
58 if kunit_start_re.search(line):
59 prefix_len = len(line.split('TAP version')[0])
61 yield line[prefix_len:] if prefix_len > 0 else line
62 elif kunit_end_re.search(line):
65 yield line[prefix_len:] if prefix_len > 0 else line
67 def raw_output(kernel_output) -> None:
68 for line in kernel_output:
76 return '\033[1;31m' + text + RESET
78 def yellow(text) -> str:
79 return '\033[1;33m' + text + RESET
81 def green(text) -> str:
82 return '\033[1;32m' + text + RESET
84 def print_with_timestamp(message) -> None:
85 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
87 def format_suite_divider(message) -> str:
88 return '======== ' + message + ' ========'
90 def print_suite_divider(message) -> None:
91 print_with_timestamp(DIVIDER)
92 print_with_timestamp(format_suite_divider(message))
94 def print_log(log) -> None:
96 print_with_timestamp(m)
98 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
100 def consume_non_diagnostic(lines: List[str]) -> None:
101 while lines and not TAP_ENTRIES.match(lines[0]):
104 def save_non_diagnostic(lines: List[str], test_case: TestCase) -> None:
105 while lines and not TAP_ENTRIES.match(lines[0]):
106 test_case.log.append(lines[0])
109 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
111 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
113 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
115 def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
116 save_non_diagnostic(lines, test_case)
118 test_case.status = TestStatus.TEST_CRASHED
121 match = OK_NOT_OK_SUBTEST.match(line)
122 while not match and lines:
124 match = OK_NOT_OK_SUBTEST.match(line)
126 test_case.log.append(lines.pop(0))
127 test_case.name = match.group(2)
128 if test_case.status == TestStatus.TEST_CRASHED:
130 if match.group(1) == 'ok':
131 test_case.status = TestStatus.SUCCESS
133 test_case.status = TestStatus.FAILURE
138 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
139 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
141 def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
142 save_non_diagnostic(lines, test_case)
146 match = SUBTEST_DIAGNOSTIC.match(line)
148 test_case.log.append(lines.pop(0))
149 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
151 test_case.status = TestStatus.TEST_CRASHED
156 def parse_test_case(lines: List[str]) -> Optional[TestCase]:
157 test_case = TestCase()
158 save_non_diagnostic(lines, test_case)
159 while parse_diagnostic(lines, test_case):
161 if parse_ok_not_ok_test_case(lines, test_case):
166 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
168 def parse_subtest_header(lines: List[str]) -> Optional[str]:
169 consume_non_diagnostic(lines)
172 match = SUBTEST_HEADER.match(lines[0])
175 return match.group(1)
179 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
181 def parse_subtest_plan(lines: List[str]) -> Optional[int]:
182 consume_non_diagnostic(lines)
183 match = SUBTEST_PLAN.match(lines[0])
186 return int(match.group(1))
190 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
191 if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
192 return TestStatus.TEST_CRASHED
193 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
194 return TestStatus.FAILURE
195 elif left != TestStatus.SUCCESS:
197 elif right != TestStatus.SUCCESS:
200 return TestStatus.SUCCESS
202 def parse_ok_not_ok_test_suite(lines: List[str],
203 test_suite: TestSuite,
204 expected_suite_index: int) -> bool:
205 consume_non_diagnostic(lines)
207 test_suite.status = TestStatus.TEST_CRASHED
210 match = OK_NOT_OK_MODULE.match(line)
213 if match.group(1) == 'ok':
214 test_suite.status = TestStatus.SUCCESS
216 test_suite.status = TestStatus.FAILURE
217 suite_index = int(match.group(2))
218 if suite_index != expected_suite_index:
219 print_with_timestamp(
220 red('[ERROR] ') + 'expected_suite_index ' +
221 str(expected_suite_index) + ', but got ' +
227 def bubble_up_errors(statuses: Iterable[TestStatus]) -> TestStatus:
228 return reduce(max_status, statuses, TestStatus.SUCCESS)
230 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
231 max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
232 return max_status(max_test_case_status, test_suite.status)
234 def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[TestSuite]:
237 consume_non_diagnostic(lines)
238 test_suite = TestSuite()
239 test_suite.status = TestStatus.SUCCESS
240 name = parse_subtest_header(lines)
243 test_suite.name = name
244 expected_test_case_num = parse_subtest_plan(lines)
245 if expected_test_case_num is None:
247 while expected_test_case_num > 0:
248 test_case = parse_test_case(lines)
251 test_suite.cases.append(test_case)
252 expected_test_case_num -= 1
253 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
254 test_suite.status = bubble_up_test_case_errors(test_suite)
257 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
260 print('failed to parse end of suite' + lines[0])
263 TAP_HEADER = re.compile(r'^TAP version 14$')
265 def parse_tap_header(lines: List[str]) -> bool:
266 consume_non_diagnostic(lines)
267 if TAP_HEADER.match(lines[0]):
273 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
275 def parse_test_plan(lines: List[str]) -> Optional[int]:
276 consume_non_diagnostic(lines)
277 match = TEST_PLAN.match(lines[0])
280 return int(match.group(1))
284 def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
285 return bubble_up_errors(x.status for x in test_suites)
287 def parse_test_result(lines: List[str]) -> TestResult:
288 consume_non_diagnostic(lines)
289 if not lines or not parse_tap_header(lines):
290 return TestResult(TestStatus.NO_TESTS, [], lines)
291 expected_test_suite_num = parse_test_plan(lines)
292 if not expected_test_suite_num:
293 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
295 for i in range(1, expected_test_suite_num + 1):
296 test_suite = parse_test_suite(lines, i)
298 test_suites.append(test_suite)
300 print_with_timestamp(
301 red('[ERROR] ') + ' expected ' +
302 str(expected_test_suite_num) +
303 ' test suites, but got ' + str(i - 2))
305 test_suite = parse_test_suite(lines, -1)
307 print_with_timestamp(red('[ERROR] ') +
308 'got unexpected test suite: ' + test_suite.name)
310 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
312 return TestResult(TestStatus.NO_TESTS, [], lines)
314 def print_and_count_results(test_result: TestResult) -> Tuple[int, int, int]:
318 for test_suite in test_result.suites:
319 if test_suite.status == TestStatus.SUCCESS:
320 print_suite_divider(green('[PASSED] ') + test_suite.name)
321 elif test_suite.status == TestStatus.TEST_CRASHED:
322 print_suite_divider(red('[CRASHED] ' + test_suite.name))
324 print_suite_divider(red('[FAILED] ') + test_suite.name)
325 for test_case in test_suite.cases:
327 if test_case.status == TestStatus.SUCCESS:
328 print_with_timestamp(green('[PASSED] ') + test_case.name)
329 elif test_case.status == TestStatus.TEST_CRASHED:
331 print_with_timestamp(red('[CRASHED] ' + test_case.name))
332 print_log(map(yellow, test_case.log))
333 print_with_timestamp('')
336 print_with_timestamp(red('[FAILED] ') + test_case.name)
337 print_log(map(yellow, test_case.log))
338 print_with_timestamp('')
339 return total_tests, failed_tests, crashed_tests
341 def parse_run_tests(kernel_output) -> TestResult:
345 test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
346 if test_result.status == TestStatus.NO_TESTS:
347 print(red('[ERROR] ') + yellow('no tests run!'))
348 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
349 print(red('[ERROR] ') + yellow('could not parse test results!'))
353 crashed_tests) = print_and_count_results(test_result)
354 print_with_timestamp(DIVIDER)
355 fmt = green if test_result.status == TestStatus.SUCCESS else red
356 print_with_timestamp(
357 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
358 (total_tests, failed_tests, crashed_tests)))