1 # SPDX-License-Identifier: GPL-2.0
3 # Parses test results from a kernel dmesg log.
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import Iterable, Iterator, List, Optional, Tuple
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
19 class TestSuite(object):
20 def __init__(self) -> None:
21 self.status = TestStatus.SUCCESS
23 self.cases = [] # type: List[TestCase]
25 def __str__(self) -> str:
26 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
28 def __repr__(self) -> str:
31 class TestCase(object):
32 def __init__(self) -> None:
33 self.status = TestStatus.SUCCESS
35 self.log = [] # type: List[str]
37 def __str__(self) -> str:
38 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
40 def __repr__(self) -> str:
43 class TestStatus(Enum):
49 FAILURE_TO_PARSE_TESTS = auto()
52 """Provides a peek()/pop() interface over an iterator of (line#, text)."""
53 _lines: Iterator[Tuple[int, str]]
54 _next: Tuple[int, str]
57 def __init__(self, lines: Iterator[Tuple[int, str]]):
63 def _get_next(self) -> None:
65 self._next = next(self._lines)
69 def peek(self) -> str:
77 def __bool__(self) -> bool:
80 # Only used by kunit_tool_test.py.
81 def __iter__(self) -> Iterator[str]:
85 def line_number(self) -> int:
88 kunit_start_re = re.compile(r'TAP version [0-9]+$')
89 kunit_end_re = re.compile('(List of all partitions:|'
90 'Kernel panic - not syncing: VFS:|reboot: System halted)')
92 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
93 def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
96 for line in kernel_output:
98 line = line.rstrip() # line always has a trailing \n
99 if kunit_start_re.search(line):
100 prefix_len = len(line.split('TAP version')[0])
102 yield line_num, line[prefix_len:]
103 elif kunit_end_re.search(line):
106 yield line_num, line[prefix_len:]
107 return LineStream(lines=isolate_kunit_output(kernel_output))
109 def raw_output(kernel_output) -> None:
110 for line in kernel_output:
117 def red(text) -> str:
118 return '\033[1;31m' + text + RESET
120 def yellow(text) -> str:
121 return '\033[1;33m' + text + RESET
123 def green(text) -> str:
124 return '\033[1;32m' + text + RESET
126 def print_with_timestamp(message) -> None:
127 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
129 def format_suite_divider(message) -> str:
130 return '======== ' + message + ' ========'
132 def print_suite_divider(message) -> None:
133 print_with_timestamp(DIVIDER)
134 print_with_timestamp(format_suite_divider(message))
136 def print_log(log) -> None:
138 print_with_timestamp(m)
140 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
142 def consume_non_diagnostic(lines: LineStream) -> None:
143 while lines and not TAP_ENTRIES.match(lines.peek()):
146 def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
147 while lines and not TAP_ENTRIES.match(lines.peek()):
148 test_case.log.append(lines.peek())
151 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
153 OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
155 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
157 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
159 def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
160 save_non_diagnostic(lines, test_case)
162 test_case.status = TestStatus.TEST_CRASHED
165 match = OK_NOT_OK_SUBTEST.match(line)
166 while not match and lines:
168 match = OK_NOT_OK_SUBTEST.match(line)
170 test_case.log.append(lines.pop())
171 test_case.name = match.group(2)
172 skip_match = OK_NOT_OK_SKIP.match(line)
174 test_case.status = TestStatus.SKIPPED
176 if test_case.status == TestStatus.TEST_CRASHED:
178 if match.group(1) == 'ok':
179 test_case.status = TestStatus.SUCCESS
181 test_case.status = TestStatus.FAILURE
186 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
187 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
189 def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
190 save_non_diagnostic(lines, test_case)
194 match = SUBTEST_DIAGNOSTIC.match(line)
196 test_case.log.append(lines.pop())
197 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
199 test_case.status = TestStatus.TEST_CRASHED
204 def parse_test_case(lines: LineStream) -> Optional[TestCase]:
205 test_case = TestCase()
206 save_non_diagnostic(lines, test_case)
207 while parse_diagnostic(lines, test_case):
209 if parse_ok_not_ok_test_case(lines, test_case):
214 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
216 def parse_subtest_header(lines: LineStream) -> Optional[str]:
217 consume_non_diagnostic(lines)
220 match = SUBTEST_HEADER.match(lines.peek())
223 return match.group(1)
227 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
229 def parse_subtest_plan(lines: LineStream) -> Optional[int]:
230 consume_non_diagnostic(lines)
231 match = SUBTEST_PLAN.match(lines.peek())
234 return int(match.group(1))
238 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
241 elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
242 return TestStatus.TEST_CRASHED
243 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
244 return TestStatus.FAILURE
245 elif left == TestStatus.SKIPPED:
250 def parse_ok_not_ok_test_suite(lines: LineStream,
251 test_suite: TestSuite,
252 expected_suite_index: int) -> bool:
253 consume_non_diagnostic(lines)
255 test_suite.status = TestStatus.TEST_CRASHED
258 match = OK_NOT_OK_MODULE.match(line)
261 if match.group(1) == 'ok':
262 test_suite.status = TestStatus.SUCCESS
264 test_suite.status = TestStatus.FAILURE
265 skip_match = OK_NOT_OK_SKIP.match(line)
267 test_suite.status = TestStatus.SKIPPED
268 suite_index = int(match.group(2))
269 if suite_index != expected_suite_index:
270 print_with_timestamp(
271 red('[ERROR] ') + 'expected_suite_index ' +
272 str(expected_suite_index) + ', but got ' +
278 def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
279 return reduce(max_status, status_list, TestStatus.SKIPPED)
281 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
282 max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
283 return max_status(max_test_case_status, test_suite.status)
285 def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
288 consume_non_diagnostic(lines)
289 test_suite = TestSuite()
290 test_suite.status = TestStatus.SUCCESS
291 name = parse_subtest_header(lines)
294 test_suite.name = name
295 expected_test_case_num = parse_subtest_plan(lines)
296 if expected_test_case_num is None:
298 while expected_test_case_num > 0:
299 test_case = parse_test_case(lines)
302 test_suite.cases.append(test_case)
303 expected_test_case_num -= 1
304 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
305 test_suite.status = bubble_up_test_case_errors(test_suite)
308 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
311 print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
314 TAP_HEADER = re.compile(r'^TAP version 14$')
316 def parse_tap_header(lines: LineStream) -> bool:
317 consume_non_diagnostic(lines)
318 if TAP_HEADER.match(lines.peek()):
324 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
326 def parse_test_plan(lines: LineStream) -> Optional[int]:
327 consume_non_diagnostic(lines)
328 match = TEST_PLAN.match(lines.peek())
331 return int(match.group(1))
335 def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
336 return bubble_up_errors(x.status for x in test_suites)
338 def parse_test_result(lines: LineStream) -> TestResult:
339 consume_non_diagnostic(lines)
340 if not lines or not parse_tap_header(lines):
341 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
342 expected_test_suite_num = parse_test_plan(lines)
343 if expected_test_suite_num == 0:
344 return TestResult(TestStatus.NO_TESTS, [], lines)
345 elif expected_test_suite_num is None:
346 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
348 for i in range(1, expected_test_suite_num + 1):
349 test_suite = parse_test_suite(lines, i)
351 test_suites.append(test_suite)
353 print_with_timestamp(
354 red('[ERROR] ') + ' expected ' +
355 str(expected_test_suite_num) +
356 ' test suites, but got ' + str(i - 2))
358 test_suite = parse_test_suite(lines, -1)
360 print_with_timestamp(red('[ERROR] ') +
361 'got unexpected test suite: ' + test_suite.name)
363 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
365 return TestResult(TestStatus.NO_TESTS, [], lines)
379 def total(self) -> int:
380 return self.passed + self.failed + self.crashed + self.skipped
382 def print_and_count_results(test_result: TestResult) -> TestCounts:
383 counts = TestCounts()
384 for test_suite in test_result.suites:
385 if test_suite.status == TestStatus.SUCCESS:
386 print_suite_divider(green('[PASSED] ') + test_suite.name)
387 elif test_suite.status == TestStatus.SKIPPED:
388 print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
389 elif test_suite.status == TestStatus.TEST_CRASHED:
390 print_suite_divider(red('[CRASHED] ' + test_suite.name))
392 print_suite_divider(red('[FAILED] ') + test_suite.name)
393 for test_case in test_suite.cases:
394 if test_case.status == TestStatus.SUCCESS:
396 print_with_timestamp(green('[PASSED] ') + test_case.name)
397 elif test_case.status == TestStatus.SKIPPED:
399 print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
400 elif test_case.status == TestStatus.TEST_CRASHED:
402 print_with_timestamp(red('[CRASHED] ' + test_case.name))
403 print_log(map(yellow, test_case.log))
404 print_with_timestamp('')
407 print_with_timestamp(red('[FAILED] ') + test_case.name)
408 print_log(map(yellow, test_case.log))
409 print_with_timestamp('')
412 def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
413 counts = TestCounts()
414 lines = extract_tap_lines(kernel_output)
415 test_result = parse_test_result(lines)
416 if test_result.status == TestStatus.NO_TESTS:
417 print(red('[ERROR] ') + yellow('no tests run!'))
418 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
419 print(red('[ERROR] ') + yellow('could not parse test results!'))
421 counts = print_and_count_results(test_result)
422 print_with_timestamp(DIVIDER)
423 if test_result.status == TestStatus.SUCCESS:
425 elif test_result.status == TestStatus.SKIPPED:
429 print_with_timestamp(
430 fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
431 (counts.total(), counts.failed, counts.crashed, counts.skipped)))