1 # SPDX-License-Identifier: GPL-2.0
3 # Parses KTAP test results from a kernel dmesg log and incrementally prints
4 # results with reader-friendly format. Stores and returns test results in a
7 # Copyright (C) 2019, Google LLC.
8 # Author: Felix Guo <felixguoxiuping@gmail.com>
9 # Author: Brendan Higgins <brendanhiggins@google.com>
10 # Author: Rae Moar <rmoar@google.com>
12 from __future__ import annotations
16 from enum import Enum, auto
17 from functools import reduce
18 from typing import Iterable, Iterator, List, Optional, Tuple
22 A class to represent a test parsed from KTAP results. All KTAP
23 results within a test log are stored in a main Test object as
27 status : TestStatus - status of the test
28 name : str - name of the test
29 expected_count : int - expected number of subtests (0 if single
30 test case and None if unknown expected number of subtests)
31 subtests : List[Test] - list of subtests
32 log : List[str] - log of KTAP lines that correspond to the test
33 counts : TestCounts - counts of the test statuses and errors of
34 subtests or of the test itself if the test is a single
37 def __init__(self) -> None:
38 """Creates Test object with default attributes."""
39 self.status = TestStatus.TEST_CRASHED
41 self.expected_count = 0 # type: Optional[int]
42 self.subtests = [] # type: List[Test]
43 self.log = [] # type: List[str]
44 self.counts = TestCounts()
46 def __str__(self) -> str:
47 """Returns string representation of a Test class object."""
48 return ('Test(' + str(self.status) + ', ' + self.name +
49 ', ' + str(self.expected_count) + ', ' +
50 str(self.subtests) + ', ' + str(self.log) + ', ' +
51 str(self.counts) + ')')
53 def __repr__(self) -> str:
54 """Returns string representation of a Test class object."""
57 def add_error(self, error_message: str) -> None:
58 """Records an error that occurred while parsing this test."""
59 self.counts.errors += 1
60 print_error('Test ' + self.name + ': ' + error_message)
62 class TestStatus(Enum):
63 """An enumeration class to represent the status of a test."""
69 FAILURE_TO_PARSE_TESTS = auto()
73 Tracks the counts of statuses of all test cases and any errors within
77 passed : int - the number of tests that have passed
78 failed : int - the number of tests that have failed
79 crashed : int - the number of tests that have crashed
80 skipped : int - the number of tests that have skipped
81 errors : int - the number of errors in the test and subtests
84 """Creates TestCounts object with counts of all test
85 statuses and test errors set to 0.
93 def __str__(self) -> str:
94 """Returns the string representation of a TestCounts object.
96 return ('Passed: ' + str(self.passed) +
97 ', Failed: ' + str(self.failed) +
98 ', Crashed: ' + str(self.crashed) +
99 ', Skipped: ' + str(self.skipped) +
100 ', Errors: ' + str(self.errors))
102 def total(self) -> int:
103 """Returns the total number of test cases within a test
104 object, where a test case is a test with no subtests.
106 return (self.passed + self.failed + self.crashed +
109 def add_subtest_counts(self, counts: TestCounts) -> None:
111 Adds the counts of another TestCounts object to the current
112 TestCounts object. Used to add the counts of a subtest to the
116 counts - a different TestCounts object whose counts
117 will be added to the counts of the TestCounts object
119 self.passed += counts.passed
120 self.failed += counts.failed
121 self.crashed += counts.crashed
122 self.skipped += counts.skipped
123 self.errors += counts.errors
125 def get_status(self) -> TestStatus:
126 """Returns the aggregated status of a Test using test
129 if self.total() == 0:
130 return TestStatus.NO_TESTS
132 # If one of the subtests crash, the expected status
133 # of the Test is crashed.
134 return TestStatus.TEST_CRASHED
136 # Otherwise if one of the subtests fail, the
137 # expected status of the Test is failed.
138 return TestStatus.FAILURE
140 # Otherwise if one of the subtests pass, the
141 # expected status of the Test is passed.
142 return TestStatus.SUCCESS
144 # Finally, if none of the subtests have failed,
145 # crashed, or passed, the expected status of the
147 return TestStatus.SKIPPED
149 def add_status(self, status: TestStatus) -> None:
151 Increments count of inputted status.
154 status - status to be added to the TestCounts object
156 if status == TestStatus.SUCCESS:
158 elif status == TestStatus.FAILURE:
160 elif status == TestStatus.SKIPPED:
162 elif status != TestStatus.NO_TESTS:
167 A class to represent the lines of kernel output.
168 Provides a lazy peek()/pop() interface over an iterator of
171 _lines: Iterator[Tuple[int, str]]
172 _next: Tuple[int, str]
176 def __init__(self, lines: Iterator[Tuple[int, str]]):
177 """Creates a new LineStream that wraps the given iterator."""
180 self._need_next = True
183 def _get_next(self) -> None:
184 """Advances the LineSteam to the next line, if necessary."""
185 if not self._need_next:
188 self._next = next(self._lines)
189 except StopIteration:
192 self._need_next = False
194 def peek(self) -> str:
195 """Returns the current line, without advancing the LineStream.
200 def pop(self) -> str:
201 """Returns the current line and advances the LineStream to
206 raise ValueError(f'LineStream: going past EOF, last line was {s}')
207 self._need_next = True
210 def __bool__(self) -> bool:
211 """Returns True if stream has more lines."""
213 return not self._done
215 # Only used by kunit_tool_test.py.
216 def __iter__(self) -> Iterator[str]:
217 """Empties all lines stored in LineStream object into
218 Iterator object and returns the Iterator object.
223 def line_number(self) -> int:
224 """Returns the line number of the current line."""
228 # Parsing helper methods:
230 KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
231 TAP_START = re.compile(r'TAP version ([0-9]+)$')
232 KTAP_END = re.compile('(List of all partitions:|'
233 'Kernel panic - not syncing: VFS:|reboot: System halted)')
235 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
236 """Extracts KTAP lines from the kernel output."""
237 def isolate_ktap_output(kernel_output: Iterable[str]) \
238 -> Iterator[Tuple[int, str]]:
241 for line in kernel_output:
243 line = line.rstrip() # remove trailing \n
244 if not started and KTAP_START.search(line):
245 # start extracting KTAP lines and set prefix
246 # to number of characters before version line
248 line.split('KTAP version')[0])
250 yield line_num, line[prefix_len:]
251 elif not started and TAP_START.search(line):
252 # start extracting KTAP lines and set prefix
253 # to number of characters before version line
254 prefix_len = len(line.split('TAP version')[0])
256 yield line_num, line[prefix_len:]
257 elif started and KTAP_END.search(line):
258 # stop extracting KTAP lines
261 # remove prefix and any indention and yield
262 # line with line number
263 line = line[prefix_len:].lstrip()
265 return LineStream(lines=isolate_ktap_output(kernel_output))
268 TAP_VERSIONS = [13, 14]
270 def check_version(version_num: int, accepted_versions: List[int],
271 version_type: str, test: Test) -> None:
273 Adds error to test object if version number is too high or too
277 version_num - The inputted version number from the parsed KTAP or TAP
279 accepted_version - List of accepted KTAP or TAP versions
280 version_type - 'KTAP' or 'TAP' depending on the type of
282 test - Test object for current test being parsed
284 if version_num < min(accepted_versions):
285 test.add_error(version_type +
286 ' version lower than expected!')
287 elif version_num > max(accepted_versions):
289 version_type + ' version higher than expected!')
291 def parse_ktap_header(lines: LineStream, test: Test) -> bool:
293 Parses KTAP/TAP header line and checks version number.
294 Returns False if fails to parse KTAP/TAP header line.
297 - 'KTAP version [version number]'
298 - 'TAP version [version number]'
301 lines - LineStream of KTAP output to parse
302 test - Test object for current test being parsed
305 True if successfully parsed KTAP/TAP header line
307 ktap_match = KTAP_START.match(lines.peek())
308 tap_match = TAP_START.match(lines.peek())
310 version_num = int(ktap_match.group(1))
311 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
313 version_num = int(tap_match.group(1))
314 check_version(version_num, TAP_VERSIONS, 'TAP', test)
317 test.log.append(lines.pop())
320 TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
322 def parse_test_header(lines: LineStream, test: Test) -> bool:
324 Parses test header and stores test name in test object.
325 Returns False if fails to parse test header line.
328 - '# Subtest: [test name]'
331 lines - LineStream of KTAP output to parse
332 test - Test object for current test being parsed
335 True if successfully parsed test header line
337 match = TEST_HEADER.match(lines.peek())
340 test.log.append(lines.pop())
341 test.name = match.group(1)
344 TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
346 def parse_test_plan(lines: LineStream, test: Test) -> bool:
348 Parses test plan line and stores the expected number of subtests in
349 test object. Reports an error if expected count is 0.
350 Returns False and sets expected_count to None if there is no valid test
354 - '1..[number of subtests]'
357 lines - LineStream of KTAP output to parse
358 test - Test object for current test being parsed
361 True if successfully parsed test plan line
363 match = TEST_PLAN.match(lines.peek())
365 test.expected_count = None
367 test.log.append(lines.pop())
368 expected_count = int(match.group(1))
369 test.expected_count = expected_count
372 TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
374 TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
376 def peek_test_name_match(lines: LineStream, test: Test) -> bool:
378 Matches current line with the format of a test result line and checks
379 if the name matches the name of the current test.
380 Returns False if fails to match format or name.
383 - '[ok|not ok] [test number] [-] [test name] [optional skip
387 lines - LineStream of KTAP output to parse
388 test - Test object for current test being parsed
391 True if matched a test result line and the name matching the
395 match = TEST_RESULT.match(line)
398 name = match.group(4)
399 return (name == test.name)
401 def parse_test_result(lines: LineStream, test: Test,
402 expected_num: int) -> bool:
404 Parses test result line and stores the status and name in the test
405 object. Reports an error if the test number does not match expected
407 Returns False if fails to parse test result line.
409 Note that the SKIP directive is the only direction that causes a
413 - '[ok|not ok] [test number] [-] [test name] [optional skip
417 lines - LineStream of KTAP output to parse
418 test - Test object for current test being parsed
419 expected_num - expected test number for current test
422 True if successfully parsed a test result line.
425 match = TEST_RESULT.match(line)
426 skip_match = TEST_RESULT_SKIP.match(line)
428 # Check if line matches test result line format
431 test.log.append(lines.pop())
433 # Set name of test object
435 test.name = skip_match.group(4)
437 test.name = match.group(4)
440 num = int(match.group(2))
441 if num != expected_num:
442 test.add_error('Expected test number ' +
443 str(expected_num) + ' but found ' + str(num))
445 # Set status of test object
446 status = match.group(1)
448 test.status = TestStatus.SKIPPED
450 test.status = TestStatus.SUCCESS
452 test.status = TestStatus.FAILURE
455 def parse_diagnostic(lines: LineStream) -> List[str]:
457 Parse lines that do not match the format of a test result line or
458 test header line and returns them in list.
460 Line formats that are not parsed:
461 - '# Subtest: [test name]'
462 - '[ok|not ok] [test number] [-] [test name] [optional skip
466 lines - LineStream of KTAP output to parse
469 Log of diagnostic lines
471 log = [] # type: List[str]
472 while lines and not TEST_RESULT.match(lines.peek()) and not \
473 TEST_HEADER.match(lines.peek()):
474 log.append(lines.pop())
477 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^# .*?: kunit test case crashed!$')
479 def parse_crash_in_log(test: Test) -> bool:
481 Iterate through the lines of the log to parse for crash message.
482 If crash message found, set status to crashed and return True.
483 Otherwise return False.
486 test - Test object for current test being parsed
489 True if crash message found in log
491 for line in test.log:
492 if DIAGNOSTIC_CRASH_MESSAGE.match(line):
493 test.status = TestStatus.TEST_CRASHED
498 # Printing helper methods:
504 def red(text: str) -> str:
505 """Returns inputted string with red color code."""
506 return '\033[1;31m' + text + RESET
508 def yellow(text: str) -> str:
509 """Returns inputted string with yellow color code."""
510 return '\033[1;33m' + text + RESET
512 def green(text: str) -> str:
513 """Returns inputted string with green color code."""
514 return '\033[1;32m' + text + RESET
516 ANSI_LEN = len(red(''))
518 def print_with_timestamp(message: str) -> None:
519 """Prints message with timestamp at beginning."""
520 print('[%s] %s' % (datetime.datetime.now().strftime('%H:%M:%S'), message))
522 def format_test_divider(message: str, len_message: int) -> str:
524 Returns string with message centered in fixed width divider.
527 '===================== message example ====================='
530 message - message to be centered in divider line
531 len_message - length of the message to be printed such that
532 any characters of the color codes are not counted
535 String containing message centered in fixed width divider
537 default_count = 3 # default number of dashes
538 len_1 = default_count
539 len_2 = default_count
540 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
542 # calculate number of dashes for each side of the divider
543 len_1 = int(difference / 2)
544 len_2 = difference - len_1
545 return ('=' * len_1) + ' ' + message + ' ' + ('=' * len_2)
547 def print_test_header(test: Test) -> None:
549 Prints test header with test name and optionally the expected number
553 '=================== example (2 subtests) ==================='
556 test - Test object representing current test being printed
559 if test.expected_count:
560 if test.expected_count == 1:
561 message += (' (' + str(test.expected_count) +
564 message += (' (' + str(test.expected_count) +
566 print_with_timestamp(format_test_divider(message, len(message)))
568 def print_log(log: Iterable[str]) -> None:
570 Prints all strings in saved log for test in yellow.
573 log - Iterable object with all strings saved in log for test
576 print_with_timestamp(yellow(m))
578 def format_test_result(test: Test) -> str:
580 Returns string with formatted test result with colored status and test
587 test - Test object representing current test being printed
590 String containing formatted test result
592 if test.status == TestStatus.SUCCESS:
593 return (green('[PASSED] ') + test.name)
594 elif test.status == TestStatus.SKIPPED:
595 return (yellow('[SKIPPED] ') + test.name)
596 elif test.status == TestStatus.NO_TESTS:
597 return (yellow('[NO TESTS RUN] ') + test.name)
598 elif test.status == TestStatus.TEST_CRASHED:
600 return (red('[CRASHED] ') + test.name)
603 return (red('[FAILED] ') + test.name)
605 def print_test_result(test: Test) -> None:
607 Prints result line with status of test.
613 test - Test object representing current test being printed
615 print_with_timestamp(format_test_result(test))
617 def print_test_footer(test: Test) -> None:
619 Prints test footer with status of test.
622 '===================== [PASSED] example ====================='
625 test - Test object representing current test being printed
627 message = format_test_result(test)
628 print_with_timestamp(format_test_divider(message,
629 len(message) - ANSI_LEN))
631 def print_summary_line(test: Test) -> None:
633 Prints summary line of test object. Color of line is dependent on
634 status of test. Color is green if test passes, yellow if test is
635 skipped, and red if the test fails or crashes. Summary line contains
636 counts of the statuses of the tests subtests or the test itself if it
640 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
643 test - Test object representing current test being printed
645 if test.status == TestStatus.SUCCESS:
647 elif test.status == TestStatus.SKIPPED or test.status == TestStatus.NO_TESTS:
652 print_with_timestamp(color('Testing complete. ' + str(counts)))
654 def print_error(error_message: str) -> None:
656 Prints error message with error format.
659 "[ERROR] Test example: missing test plan!"
662 error_message - message describing error
664 print_with_timestamp(red('[ERROR] ') + error_message)
668 def bubble_up_test_results(test: Test) -> None:
670 If the test has subtests, add the test counts of the subtests to the
671 test and check if any of the tests crashed and if so set the test
672 status to crashed. Otherwise if the test has no subtests add the
673 status of the test to the test counts.
676 test - Test object for current test being parsed
678 parse_crash_in_log(test)
679 subtests = test.subtests
683 counts.add_subtest_counts(t.counts)
684 if counts.total() == 0:
685 counts.add_status(status)
686 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
687 test.status = TestStatus.TEST_CRASHED
689 def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
691 Finds next test to parse in LineStream, creates new Test object,
692 parses any subtests of the test, populates Test object with all
693 information (status, name) about the test and the Test objects for
694 any subtests, and then returns the Test object. The method accepts
695 three formats of tests:
697 Accepted test formats:
699 - Main KTAP/TAP header
707 - Subtest header line
723 lines - LineStream of KTAP output to parse
724 expected_num - expected test number for test to be parsed
725 log - list of strings containing any preceding diagnostic lines
726 corresponding to the current test
729 Test object populated with characteristics and any subtests
734 main = parse_ktap_header(lines, test)
736 # If KTAP/TAP header is found, attempt to parse
739 parse_test_plan(lines, test)
742 # If KTAP/TAP header is not found, test must be subtest
743 # header or test result line so parse attempt to parser
745 parent_test = parse_test_header(lines, test)
747 # If subtest header is found, attempt to parse
748 # test plan and print header
749 parse_test_plan(lines, test)
750 print_test_header(test)
751 expected_count = test.expected_count
754 while parent_test and (expected_count is None or test_num <= expected_count):
755 # Loop to parse any subtests.
756 # Break after parsing expected number of tests or
757 # if expected number of tests is unknown break when test
758 # result line with matching name to subtest header is found
759 # or no more lines in stream.
760 sub_log = parse_diagnostic(lines)
762 if not lines or (peek_test_name_match(lines, test) and
764 if expected_count and test_num <= expected_count:
765 # If parser reaches end of test before
766 # parsing expected number of subtests, print
767 # crashed subtest and record error
768 test.add_error('missing expected subtest!')
769 sub_test.log.extend(sub_log)
770 test.counts.add_status(
771 TestStatus.TEST_CRASHED)
772 print_test_result(sub_test)
774 test.log.extend(sub_log)
777 sub_test = parse_test(lines, test_num, sub_log)
778 subtests.append(sub_test)
780 test.subtests = subtests
782 # If not main test, look for test result line
783 test.log.extend(parse_diagnostic(lines))
784 if (parent_test and peek_test_name_match(lines, test)) or \
786 parse_test_result(lines, test, expected_num)
788 test.add_error('missing subtest result line!')
790 # Check for there being no tests
791 if parent_test and len(subtests) == 0:
792 test.status = TestStatus.NO_TESTS
793 test.add_error('0 tests run!')
795 # Add statuses to TestCounts attribute in Test object
796 bubble_up_test_results(test)
797 if parent_test and not main:
798 # If test has subtests and is not the main test object, print
800 print_test_footer(test)
802 print_test_result(test)
805 def parse_run_tests(kernel_output: Iterable[str]) -> Test:
807 Using kernel output, extract KTAP lines, parse the lines for test
808 results and print condensed test results and summary line .
811 kernel_output - Iterable object contains lines of kernel output
814 Test - the main test object with all subtests.
816 print_with_timestamp(DIVIDER)
817 lines = extract_tap_lines(kernel_output)
820 test.add_error('invalid KTAP input!')
821 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
823 test = parse_test(lines, 0, [])
824 if test.status != TestStatus.NO_TESTS:
825 test.status = test.counts.get_status()
826 print_with_timestamp(DIVIDER)
827 print_summary_line(test)