1 # SPDX-License-Identifier: GPL-2.0
3 # Parses KTAP test results from a kernel dmesg log and incrementally prints
4 # results with reader-friendly format. Stores and returns test results in a
7 # Copyright (C) 2019, Google LLC.
8 # Author: Felix Guo <felixguoxiuping@gmail.com>
9 # Author: Brendan Higgins <brendanhiggins@google.com>
10 # Author: Rae Moar <rmoar@google.com>
12 from __future__ import annotations
17 from enum import Enum, auto
18 from typing import Iterable, Iterator, List, Optional, Tuple
22 A class to represent a test parsed from KTAP results. All KTAP
23 results within a test log are stored in a main Test object as
27 status : TestStatus - status of the test
28 name : str - name of the test
29 expected_count : int - expected number of subtests (0 if single
30 test case and None if unknown expected number of subtests)
31 subtests : List[Test] - list of subtests
32 log : List[str] - log of KTAP lines that correspond to the test
33 counts : TestCounts - counts of the test statuses and errors of
34 subtests or of the test itself if the test is a single
37 def __init__(self) -> None:
38 """Creates Test object with default attributes."""
39 self.status = TestStatus.TEST_CRASHED
41 self.expected_count = 0 # type: Optional[int]
42 self.subtests = [] # type: List[Test]
43 self.log = [] # type: List[str]
44 self.counts = TestCounts()
46 def __str__(self) -> str:
47 """Returns string representation of a Test class object."""
48 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
49 f'{self.subtests}, {self.log}, {self.counts})')
51 def __repr__(self) -> str:
52 """Returns string representation of a Test class object."""
55 def add_error(self, error_message: str) -> None:
56 """Records an error that occurred while parsing this test."""
57 self.counts.errors += 1
58 print_with_timestamp(red('[ERROR]') + f' Test: {self.name}: {error_message}')
60 class TestStatus(Enum):
61 """An enumeration class to represent the status of a test."""
67 FAILURE_TO_PARSE_TESTS = auto()
71 Tracks the counts of statuses of all test cases and any errors within
75 passed : int - the number of tests that have passed
76 failed : int - the number of tests that have failed
77 crashed : int - the number of tests that have crashed
78 skipped : int - the number of tests that have skipped
79 errors : int - the number of errors in the test and subtests
82 """Creates TestCounts object with counts of all test
83 statuses and test errors set to 0.
91 def __str__(self) -> str:
92 """Returns the string representation of a TestCounts object."""
93 statuses = [('passed', self.passed), ('failed', self.failed),
94 ('crashed', self.crashed), ('skipped', self.skipped),
95 ('errors', self.errors)]
96 return f'Ran {self.total()} tests: ' + \
97 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
99 def total(self) -> int:
100 """Returns the total number of test cases within a test
101 object, where a test case is a test with no subtests.
103 return (self.passed + self.failed + self.crashed +
106 def add_subtest_counts(self, counts: TestCounts) -> None:
108 Adds the counts of another TestCounts object to the current
109 TestCounts object. Used to add the counts of a subtest to the
113 counts - a different TestCounts object whose counts
114 will be added to the counts of the TestCounts object
116 self.passed += counts.passed
117 self.failed += counts.failed
118 self.crashed += counts.crashed
119 self.skipped += counts.skipped
120 self.errors += counts.errors
122 def get_status(self) -> TestStatus:
123 """Returns the aggregated status of a Test using test
126 if self.total() == 0:
127 return TestStatus.NO_TESTS
129 # Crashes should take priority.
130 return TestStatus.TEST_CRASHED
132 return TestStatus.FAILURE
134 # No failures or crashes, looks good!
135 return TestStatus.SUCCESS
136 # We have only skipped tests.
137 return TestStatus.SKIPPED
139 def add_status(self, status: TestStatus) -> None:
140 """Increments the count for `status`."""
141 if status == TestStatus.SUCCESS:
143 elif status == TestStatus.FAILURE:
145 elif status == TestStatus.SKIPPED:
147 elif status != TestStatus.NO_TESTS:
152 A class to represent the lines of kernel output.
153 Provides a lazy peek()/pop() interface over an iterator of
156 _lines: Iterator[Tuple[int, str]]
157 _next: Tuple[int, str]
161 def __init__(self, lines: Iterator[Tuple[int, str]]):
162 """Creates a new LineStream that wraps the given iterator."""
165 self._need_next = True
168 def _get_next(self) -> None:
169 """Advances the LineSteam to the next line, if necessary."""
170 if not self._need_next:
173 self._next = next(self._lines)
174 except StopIteration:
177 self._need_next = False
179 def peek(self) -> str:
180 """Returns the current line, without advancing the LineStream.
185 def pop(self) -> str:
186 """Returns the current line and advances the LineStream to
191 raise ValueError(f'LineStream: going past EOF, last line was {s}')
192 self._need_next = True
195 def __bool__(self) -> bool:
196 """Returns True if stream has more lines."""
198 return not self._done
200 # Only used by kunit_tool_test.py.
201 def __iter__(self) -> Iterator[str]:
202 """Empties all lines stored in LineStream object into
203 Iterator object and returns the Iterator object.
208 def line_number(self) -> int:
209 """Returns the line number of the current line."""
213 # Parsing helper methods:
215 KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
216 TAP_START = re.compile(r'TAP version ([0-9]+)$')
217 KTAP_END = re.compile('(List of all partitions:|'
218 'Kernel panic - not syncing: VFS:|reboot: System halted)')
220 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
221 """Extracts KTAP lines from the kernel output."""
222 def isolate_ktap_output(kernel_output: Iterable[str]) \
223 -> Iterator[Tuple[int, str]]:
226 for line in kernel_output:
228 line = line.rstrip() # remove trailing \n
229 if not started and KTAP_START.search(line):
230 # start extracting KTAP lines and set prefix
231 # to number of characters before version line
233 line.split('KTAP version')[0])
235 yield line_num, line[prefix_len:]
236 elif not started and TAP_START.search(line):
237 # start extracting KTAP lines and set prefix
238 # to number of characters before version line
239 prefix_len = len(line.split('TAP version')[0])
241 yield line_num, line[prefix_len:]
242 elif started and KTAP_END.search(line):
243 # stop extracting KTAP lines
246 # remove prefix and any indention and yield
247 # line with line number
248 line = line[prefix_len:].lstrip()
250 return LineStream(lines=isolate_ktap_output(kernel_output))
253 TAP_VERSIONS = [13, 14]
255 def check_version(version_num: int, accepted_versions: List[int],
256 version_type: str, test: Test) -> None:
258 Adds error to test object if version number is too high or too
262 version_num - The inputted version number from the parsed KTAP or TAP
264 accepted_version - List of accepted KTAP or TAP versions
265 version_type - 'KTAP' or 'TAP' depending on the type of
267 test - Test object for current test being parsed
269 if version_num < min(accepted_versions):
270 test.add_error(f'{version_type} version lower than expected!')
271 elif version_num > max(accepted_versions):
272 test.add_error(f'{version_type} version higer than expected!')
274 def parse_ktap_header(lines: LineStream, test: Test) -> bool:
276 Parses KTAP/TAP header line and checks version number.
277 Returns False if fails to parse KTAP/TAP header line.
280 - 'KTAP version [version number]'
281 - 'TAP version [version number]'
284 lines - LineStream of KTAP output to parse
285 test - Test object for current test being parsed
288 True if successfully parsed KTAP/TAP header line
290 ktap_match = KTAP_START.match(lines.peek())
291 tap_match = TAP_START.match(lines.peek())
293 version_num = int(ktap_match.group(1))
294 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
296 version_num = int(tap_match.group(1))
297 check_version(version_num, TAP_VERSIONS, 'TAP', test)
300 test.log.append(lines.pop())
303 TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
305 def parse_test_header(lines: LineStream, test: Test) -> bool:
307 Parses test header and stores test name in test object.
308 Returns False if fails to parse test header line.
311 - '# Subtest: [test name]'
314 lines - LineStream of KTAP output to parse
315 test - Test object for current test being parsed
318 True if successfully parsed test header line
320 match = TEST_HEADER.match(lines.peek())
323 test.log.append(lines.pop())
324 test.name = match.group(1)
327 TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
329 def parse_test_plan(lines: LineStream, test: Test) -> bool:
331 Parses test plan line and stores the expected number of subtests in
332 test object. Reports an error if expected count is 0.
333 Returns False and sets expected_count to None if there is no valid test
337 - '1..[number of subtests]'
340 lines - LineStream of KTAP output to parse
341 test - Test object for current test being parsed
344 True if successfully parsed test plan line
346 match = TEST_PLAN.match(lines.peek())
348 test.expected_count = None
350 test.log.append(lines.pop())
351 expected_count = int(match.group(1))
352 test.expected_count = expected_count
355 TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
357 TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
359 def peek_test_name_match(lines: LineStream, test: Test) -> bool:
361 Matches current line with the format of a test result line and checks
362 if the name matches the name of the current test.
363 Returns False if fails to match format or name.
366 - '[ok|not ok] [test number] [-] [test name] [optional skip
370 lines - LineStream of KTAP output to parse
371 test - Test object for current test being parsed
374 True if matched a test result line and the name matching the
378 match = TEST_RESULT.match(line)
381 name = match.group(4)
382 return name == test.name
384 def parse_test_result(lines: LineStream, test: Test,
385 expected_num: int) -> bool:
387 Parses test result line and stores the status and name in the test
388 object. Reports an error if the test number does not match expected
390 Returns False if fails to parse test result line.
392 Note that the SKIP directive is the only direction that causes a
396 - '[ok|not ok] [test number] [-] [test name] [optional skip
400 lines - LineStream of KTAP output to parse
401 test - Test object for current test being parsed
402 expected_num - expected test number for current test
405 True if successfully parsed a test result line.
408 match = TEST_RESULT.match(line)
409 skip_match = TEST_RESULT_SKIP.match(line)
411 # Check if line matches test result line format
414 test.log.append(lines.pop())
416 # Set name of test object
418 test.name = skip_match.group(4)
420 test.name = match.group(4)
423 num = int(match.group(2))
424 if num != expected_num:
425 test.add_error(f'Expected test number {expected_num} but found {num}')
427 # Set status of test object
428 status = match.group(1)
430 test.status = TestStatus.SKIPPED
432 test.status = TestStatus.SUCCESS
434 test.status = TestStatus.FAILURE
437 def parse_diagnostic(lines: LineStream) -> List[str]:
439 Parse lines that do not match the format of a test result line or
440 test header line and returns them in list.
442 Line formats that are not parsed:
443 - '# Subtest: [test name]'
444 - '[ok|not ok] [test number] [-] [test name] [optional skip
448 lines - LineStream of KTAP output to parse
451 Log of diagnostic lines
453 log = [] # type: List[str]
454 while lines and not TEST_RESULT.match(lines.peek()) and not \
455 TEST_HEADER.match(lines.peek()):
456 log.append(lines.pop())
460 # Printing helper methods:
466 def red(text: str) -> str:
467 """Returns inputted string with red color code."""
468 if not sys.stdout.isatty():
470 return '\033[1;31m' + text + RESET
472 def yellow(text: str) -> str:
473 """Returns inputted string with yellow color code."""
474 if not sys.stdout.isatty():
476 return '\033[1;33m' + text + RESET
478 def green(text: str) -> str:
479 """Returns inputted string with green color code."""
480 if not sys.stdout.isatty():
482 return '\033[1;32m' + text + RESET
484 ANSI_LEN = len(red(''))
486 def print_with_timestamp(message: str) -> None:
487 """Prints message with timestamp at beginning."""
488 print('[%s] %s' % (datetime.datetime.now().strftime('%H:%M:%S'), message))
490 def format_test_divider(message: str, len_message: int) -> str:
492 Returns string with message centered in fixed width divider.
495 '===================== message example ====================='
498 message - message to be centered in divider line
499 len_message - length of the message to be printed such that
500 any characters of the color codes are not counted
503 String containing message centered in fixed width divider
505 default_count = 3 # default number of dashes
506 len_1 = default_count
507 len_2 = default_count
508 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
510 # calculate number of dashes for each side of the divider
511 len_1 = int(difference / 2)
512 len_2 = difference - len_1
513 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
515 def print_test_header(test: Test) -> None:
517 Prints test header with test name and optionally the expected number
521 '=================== example (2 subtests) ==================='
524 test - Test object representing current test being printed
527 if test.expected_count:
528 if test.expected_count == 1:
529 message += ' (1 subtest)'
531 message += f' ({test.expected_count} subtests)'
532 print_with_timestamp(format_test_divider(message, len(message)))
534 def print_log(log: Iterable[str]) -> None:
535 """Prints all strings in saved log for test in yellow."""
537 print_with_timestamp(yellow(m))
539 def format_test_result(test: Test) -> str:
541 Returns string with formatted test result with colored status and test
548 test - Test object representing current test being printed
551 String containing formatted test result
553 if test.status == TestStatus.SUCCESS:
554 return green('[PASSED] ') + test.name
555 if test.status == TestStatus.SKIPPED:
556 return yellow('[SKIPPED] ') + test.name
557 if test.status == TestStatus.NO_TESTS:
558 return yellow('[NO TESTS RUN] ') + test.name
559 if test.status == TestStatus.TEST_CRASHED:
561 return red('[CRASHED] ') + test.name
563 return red('[FAILED] ') + test.name
565 def print_test_result(test: Test) -> None:
567 Prints result line with status of test.
573 test - Test object representing current test being printed
575 print_with_timestamp(format_test_result(test))
577 def print_test_footer(test: Test) -> None:
579 Prints test footer with status of test.
582 '===================== [PASSED] example ====================='
585 test - Test object representing current test being printed
587 message = format_test_result(test)
588 print_with_timestamp(format_test_divider(message,
589 len(message) - ANSI_LEN))
591 def print_summary_line(test: Test) -> None:
593 Prints summary line of test object. Color of line is dependent on
594 status of test. Color is green if test passes, yellow if test is
595 skipped, and red if the test fails or crashes. Summary line contains
596 counts of the statuses of the tests subtests or the test itself if it
600 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
603 test - Test object representing current test being printed
605 if test.status == TestStatus.SUCCESS:
607 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
611 print_with_timestamp(color(f'Testing complete. {test.counts}'))
615 def bubble_up_test_results(test: Test) -> None:
617 If the test has subtests, add the test counts of the subtests to the
618 test and check if any of the tests crashed and if so set the test
619 status to crashed. Otherwise if the test has no subtests add the
620 status of the test to the test counts.
623 test - Test object for current test being parsed
625 subtests = test.subtests
629 counts.add_subtest_counts(t.counts)
630 if counts.total() == 0:
631 counts.add_status(status)
632 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
633 test.status = TestStatus.TEST_CRASHED
635 def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
637 Finds next test to parse in LineStream, creates new Test object,
638 parses any subtests of the test, populates Test object with all
639 information (status, name) about the test and the Test objects for
640 any subtests, and then returns the Test object. The method accepts
641 three formats of tests:
643 Accepted test formats:
645 - Main KTAP/TAP header
653 - Subtest header line
669 lines - LineStream of KTAP output to parse
670 expected_num - expected test number for test to be parsed
671 log - list of strings containing any preceding diagnostic lines
672 corresponding to the current test
675 Test object populated with characteristics and any subtests
680 main = parse_ktap_header(lines, test)
682 # If KTAP/TAP header is found, attempt to parse
685 parse_test_plan(lines, test)
688 # If KTAP/TAP header is not found, test must be subtest
689 # header or test result line so parse attempt to parser
691 parent_test = parse_test_header(lines, test)
693 # If subtest header is found, attempt to parse
694 # test plan and print header
695 parse_test_plan(lines, test)
696 print_test_header(test)
697 expected_count = test.expected_count
700 while parent_test and (expected_count is None or test_num <= expected_count):
701 # Loop to parse any subtests.
702 # Break after parsing expected number of tests or
703 # if expected number of tests is unknown break when test
704 # result line with matching name to subtest header is found
705 # or no more lines in stream.
706 sub_log = parse_diagnostic(lines)
708 if not lines or (peek_test_name_match(lines, test) and
710 if expected_count and test_num <= expected_count:
711 # If parser reaches end of test before
712 # parsing expected number of subtests, print
713 # crashed subtest and record error
714 test.add_error('missing expected subtest!')
715 sub_test.log.extend(sub_log)
716 test.counts.add_status(
717 TestStatus.TEST_CRASHED)
718 print_test_result(sub_test)
720 test.log.extend(sub_log)
723 sub_test = parse_test(lines, test_num, sub_log)
724 subtests.append(sub_test)
726 test.subtests = subtests
728 # If not main test, look for test result line
729 test.log.extend(parse_diagnostic(lines))
730 if (parent_test and peek_test_name_match(lines, test)) or \
732 parse_test_result(lines, test, expected_num)
734 test.add_error('missing subtest result line!')
736 # Check for there being no tests
737 if parent_test and len(subtests) == 0:
738 # Don't override a bad status if this test had one reported.
739 # Assumption: no subtests means CRASHED is from Test.__init__()
740 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
741 test.status = TestStatus.NO_TESTS
742 test.add_error('0 tests run!')
744 # Add statuses to TestCounts attribute in Test object
745 bubble_up_test_results(test)
746 if parent_test and not main:
747 # If test has subtests and is not the main test object, print
749 print_test_footer(test)
751 print_test_result(test)
754 def parse_run_tests(kernel_output: Iterable[str]) -> Test:
756 Using kernel output, extract KTAP lines, parse the lines for test
757 results and print condensed test results and summary line.
760 kernel_output - Iterable object contains lines of kernel output
763 Test - the main test object with all subtests.
765 print_with_timestamp(DIVIDER)
766 lines = extract_tap_lines(kernel_output)
769 test.name = '<missing>'
770 test.add_error('could not find any KTAP output!')
771 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
773 test = parse_test(lines, 0, [])
774 if test.status != TestStatus.NO_TESTS:
775 test.status = test.counts.get_status()
776 print_with_timestamp(DIVIDER)
777 print_summary_line(test)