Linux 6.9-rc1
[linux-2.6-microblaze.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses KTAP test results from a kernel dmesg log and incrementally prints
4 # results with reader-friendly format. Stores and returns test results in a
5 # Test object.
6 #
7 # Copyright (C) 2019, Google LLC.
8 # Author: Felix Guo <felixguoxiuping@gmail.com>
9 # Author: Brendan Higgins <brendanhiggins@google.com>
10 # Author: Rae Moar <rmoar@google.com>
11
12 from __future__ import annotations
13 import re
14 import sys
15
16 from enum import Enum, auto
17 from typing import Iterable, Iterator, List, Optional, Tuple
18
19 from kunit_printer import stdout
20
21 class Test:
22         """
23         A class to represent a test parsed from KTAP results. All KTAP
24         results within a test log are stored in a main Test object as
25         subtests.
26
27         Attributes:
28         status : TestStatus - status of the test
29         name : str - name of the test
30         expected_count : int - expected number of subtests (0 if single
31                 test case and None if unknown expected number of subtests)
32         subtests : List[Test] - list of subtests
33         log : List[str] - log of KTAP lines that correspond to the test
34         counts : TestCounts - counts of the test statuses and errors of
35                 subtests or of the test itself if the test is a single
36                 test case.
37         """
38         def __init__(self) -> None:
39                 """Creates Test object with default attributes."""
40                 self.status = TestStatus.TEST_CRASHED
41                 self.name = ''
42                 self.expected_count = 0  # type: Optional[int]
43                 self.subtests = []  # type: List[Test]
44                 self.log = []  # type: List[str]
45                 self.counts = TestCounts()
46
47         def __str__(self) -> str:
48                 """Returns string representation of a Test class object."""
49                 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
50                         f'{self.subtests}, {self.log}, {self.counts})')
51
52         def __repr__(self) -> str:
53                 """Returns string representation of a Test class object."""
54                 return str(self)
55
56         def add_error(self, error_message: str) -> None:
57                 """Records an error that occurred while parsing this test."""
58                 self.counts.errors += 1
59                 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
60
61 class TestStatus(Enum):
62         """An enumeration class to represent the status of a test."""
63         SUCCESS = auto()
64         FAILURE = auto()
65         SKIPPED = auto()
66         TEST_CRASHED = auto()
67         NO_TESTS = auto()
68         FAILURE_TO_PARSE_TESTS = auto()
69
70 class TestCounts:
71         """
72         Tracks the counts of statuses of all test cases and any errors within
73         a Test.
74
75         Attributes:
76         passed : int - the number of tests that have passed
77         failed : int - the number of tests that have failed
78         crashed : int - the number of tests that have crashed
79         skipped : int - the number of tests that have skipped
80         errors : int - the number of errors in the test and subtests
81         """
82         def __init__(self):
83                 """Creates TestCounts object with counts of all test
84                 statuses and test errors set to 0.
85                 """
86                 self.passed = 0
87                 self.failed = 0
88                 self.crashed = 0
89                 self.skipped = 0
90                 self.errors = 0
91
92         def __str__(self) -> str:
93                 """Returns the string representation of a TestCounts object."""
94                 statuses = [('passed', self.passed), ('failed', self.failed),
95                         ('crashed', self.crashed), ('skipped', self.skipped),
96                         ('errors', self.errors)]
97                 return f'Ran {self.total()} tests: ' + \
98                         ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
99
100         def total(self) -> int:
101                 """Returns the total number of test cases within a test
102                 object, where a test case is a test with no subtests.
103                 """
104                 return (self.passed + self.failed + self.crashed +
105                         self.skipped)
106
107         def add_subtest_counts(self, counts: TestCounts) -> None:
108                 """
109                 Adds the counts of another TestCounts object to the current
110                 TestCounts object. Used to add the counts of a subtest to the
111                 parent test.
112
113                 Parameters:
114                 counts - a different TestCounts object whose counts
115                         will be added to the counts of the TestCounts object
116                 """
117                 self.passed += counts.passed
118                 self.failed += counts.failed
119                 self.crashed += counts.crashed
120                 self.skipped += counts.skipped
121                 self.errors += counts.errors
122
123         def get_status(self) -> TestStatus:
124                 """Returns the aggregated status of a Test using test
125                 counts.
126                 """
127                 if self.total() == 0:
128                         return TestStatus.NO_TESTS
129                 if self.crashed:
130                         # Crashes should take priority.
131                         return TestStatus.TEST_CRASHED
132                 if self.failed:
133                         return TestStatus.FAILURE
134                 if self.passed:
135                         # No failures or crashes, looks good!
136                         return TestStatus.SUCCESS
137                 # We have only skipped tests.
138                 return TestStatus.SKIPPED
139
140         def add_status(self, status: TestStatus) -> None:
141                 """Increments the count for `status`."""
142                 if status == TestStatus.SUCCESS:
143                         self.passed += 1
144                 elif status == TestStatus.FAILURE:
145                         self.failed += 1
146                 elif status == TestStatus.SKIPPED:
147                         self.skipped += 1
148                 elif status != TestStatus.NO_TESTS:
149                         self.crashed += 1
150
151 class LineStream:
152         """
153         A class to represent the lines of kernel output.
154         Provides a lazy peek()/pop() interface over an iterator of
155         (line#, text).
156         """
157         _lines: Iterator[Tuple[int, str]]
158         _next: Tuple[int, str]
159         _need_next: bool
160         _done: bool
161
162         def __init__(self, lines: Iterator[Tuple[int, str]]):
163                 """Creates a new LineStream that wraps the given iterator."""
164                 self._lines = lines
165                 self._done = False
166                 self._need_next = True
167                 self._next = (0, '')
168
169         def _get_next(self) -> None:
170                 """Advances the LineSteam to the next line, if necessary."""
171                 if not self._need_next:
172                         return
173                 try:
174                         self._next = next(self._lines)
175                 except StopIteration:
176                         self._done = True
177                 finally:
178                         self._need_next = False
179
180         def peek(self) -> str:
181                 """Returns the current line, without advancing the LineStream.
182                 """
183                 self._get_next()
184                 return self._next[1]
185
186         def pop(self) -> str:
187                 """Returns the current line and advances the LineStream to
188                 the next line.
189                 """
190                 s = self.peek()
191                 if self._done:
192                         raise ValueError(f'LineStream: going past EOF, last line was {s}')
193                 self._need_next = True
194                 return s
195
196         def __bool__(self) -> bool:
197                 """Returns True if stream has more lines."""
198                 self._get_next()
199                 return not self._done
200
201         # Only used by kunit_tool_test.py.
202         def __iter__(self) -> Iterator[str]:
203                 """Empties all lines stored in LineStream object into
204                 Iterator object and returns the Iterator object.
205                 """
206                 while bool(self):
207                         yield self.pop()
208
209         def line_number(self) -> int:
210                 """Returns the line number of the current line."""
211                 self._get_next()
212                 return self._next[0]
213
214 # Parsing helper methods:
215
216 KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
217 TAP_START = re.compile(r'TAP version ([0-9]+)$')
218 KTAP_END = re.compile('(List of all partitions:|'
219         'Kernel panic - not syncing: VFS:|reboot: System halted)')
220
221 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
222         """Extracts KTAP lines from the kernel output."""
223         def isolate_ktap_output(kernel_output: Iterable[str]) \
224                         -> Iterator[Tuple[int, str]]:
225                 line_num = 0
226                 started = False
227                 for line in kernel_output:
228                         line_num += 1
229                         line = line.rstrip()  # remove trailing \n
230                         if not started and KTAP_START.search(line):
231                                 # start extracting KTAP lines and set prefix
232                                 # to number of characters before version line
233                                 prefix_len = len(
234                                         line.split('KTAP version')[0])
235                                 started = True
236                                 yield line_num, line[prefix_len:]
237                         elif not started and TAP_START.search(line):
238                                 # start extracting KTAP lines and set prefix
239                                 # to number of characters before version line
240                                 prefix_len = len(line.split('TAP version')[0])
241                                 started = True
242                                 yield line_num, line[prefix_len:]
243                         elif started and KTAP_END.search(line):
244                                 # stop extracting KTAP lines
245                                 break
246                         elif started:
247                                 # remove prefix and any indention and yield
248                                 # line with line number
249                                 line = line[prefix_len:].lstrip()
250                                 yield line_num, line
251         return LineStream(lines=isolate_ktap_output(kernel_output))
252
253 KTAP_VERSIONS = [1]
254 TAP_VERSIONS = [13, 14]
255
256 def check_version(version_num: int, accepted_versions: List[int],
257                         version_type: str, test: Test) -> None:
258         """
259         Adds error to test object if version number is too high or too
260         low.
261
262         Parameters:
263         version_num - The inputted version number from the parsed KTAP or TAP
264                 header line
265         accepted_version - List of accepted KTAP or TAP versions
266         version_type - 'KTAP' or 'TAP' depending on the type of
267                 version line.
268         test - Test object for current test being parsed
269         """
270         if version_num < min(accepted_versions):
271                 test.add_error(f'{version_type} version lower than expected!')
272         elif version_num > max(accepted_versions):
273                 test.add_error(f'{version_type} version higer than expected!')
274
275 def parse_ktap_header(lines: LineStream, test: Test) -> bool:
276         """
277         Parses KTAP/TAP header line and checks version number.
278         Returns False if fails to parse KTAP/TAP header line.
279
280         Accepted formats:
281         - 'KTAP version [version number]'
282         - 'TAP version [version number]'
283
284         Parameters:
285         lines - LineStream of KTAP output to parse
286         test - Test object for current test being parsed
287
288         Return:
289         True if successfully parsed KTAP/TAP header line
290         """
291         ktap_match = KTAP_START.match(lines.peek())
292         tap_match = TAP_START.match(lines.peek())
293         if ktap_match:
294                 version_num = int(ktap_match.group(1))
295                 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
296         elif tap_match:
297                 version_num = int(tap_match.group(1))
298                 check_version(version_num, TAP_VERSIONS, 'TAP', test)
299         else:
300                 return False
301         test.log.append(lines.pop())
302         return True
303
304 TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
305
306 def parse_test_header(lines: LineStream, test: Test) -> bool:
307         """
308         Parses test header and stores test name in test object.
309         Returns False if fails to parse test header line.
310
311         Accepted format:
312         - '# Subtest: [test name]'
313
314         Parameters:
315         lines - LineStream of KTAP output to parse
316         test - Test object for current test being parsed
317
318         Return:
319         True if successfully parsed test header line
320         """
321         match = TEST_HEADER.match(lines.peek())
322         if not match:
323                 return False
324         test.log.append(lines.pop())
325         test.name = match.group(1)
326         return True
327
328 TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
329
330 def parse_test_plan(lines: LineStream, test: Test) -> bool:
331         """
332         Parses test plan line and stores the expected number of subtests in
333         test object. Reports an error if expected count is 0.
334         Returns False and sets expected_count to None if there is no valid test
335         plan.
336
337         Accepted format:
338         - '1..[number of subtests]'
339
340         Parameters:
341         lines - LineStream of KTAP output to parse
342         test - Test object for current test being parsed
343
344         Return:
345         True if successfully parsed test plan line
346         """
347         match = TEST_PLAN.match(lines.peek())
348         if not match:
349                 test.expected_count = None
350                 return False
351         test.log.append(lines.pop())
352         expected_count = int(match.group(1))
353         test.expected_count = expected_count
354         return True
355
356 TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
357
358 TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
359
360 def peek_test_name_match(lines: LineStream, test: Test) -> bool:
361         """
362         Matches current line with the format of a test result line and checks
363         if the name matches the name of the current test.
364         Returns False if fails to match format or name.
365
366         Accepted format:
367         - '[ok|not ok] [test number] [-] [test name] [optional skip
368                 directive]'
369
370         Parameters:
371         lines - LineStream of KTAP output to parse
372         test - Test object for current test being parsed
373
374         Return:
375         True if matched a test result line and the name matching the
376                 expected test name
377         """
378         line = lines.peek()
379         match = TEST_RESULT.match(line)
380         if not match:
381                 return False
382         name = match.group(4)
383         return name == test.name
384
385 def parse_test_result(lines: LineStream, test: Test,
386                         expected_num: int) -> bool:
387         """
388         Parses test result line and stores the status and name in the test
389         object. Reports an error if the test number does not match expected
390         test number.
391         Returns False if fails to parse test result line.
392
393         Note that the SKIP directive is the only direction that causes a
394         change in status.
395
396         Accepted format:
397         - '[ok|not ok] [test number] [-] [test name] [optional skip
398                 directive]'
399
400         Parameters:
401         lines - LineStream of KTAP output to parse
402         test - Test object for current test being parsed
403         expected_num - expected test number for current test
404
405         Return:
406         True if successfully parsed a test result line.
407         """
408         line = lines.peek()
409         match = TEST_RESULT.match(line)
410         skip_match = TEST_RESULT_SKIP.match(line)
411
412         # Check if line matches test result line format
413         if not match:
414                 return False
415         test.log.append(lines.pop())
416
417         # Set name of test object
418         if skip_match:
419                 test.name = skip_match.group(4)
420         else:
421                 test.name = match.group(4)
422
423         # Check test num
424         num = int(match.group(2))
425         if num != expected_num:
426                 test.add_error(f'Expected test number {expected_num} but found {num}')
427
428         # Set status of test object
429         status = match.group(1)
430         if skip_match:
431                 test.status = TestStatus.SKIPPED
432         elif status == 'ok':
433                 test.status = TestStatus.SUCCESS
434         else:
435                 test.status = TestStatus.FAILURE
436         return True
437
438 def parse_diagnostic(lines: LineStream) -> List[str]:
439         """
440         Parse lines that do not match the format of a test result line or
441         test header line and returns them in list.
442
443         Line formats that are not parsed:
444         - '# Subtest: [test name]'
445         - '[ok|not ok] [test number] [-] [test name] [optional skip
446                 directive]'
447
448         Parameters:
449         lines - LineStream of KTAP output to parse
450
451         Return:
452         Log of diagnostic lines
453         """
454         log = []  # type: List[str]
455         while lines and not TEST_RESULT.match(lines.peek()) and not \
456                         TEST_HEADER.match(lines.peek()):
457                 log.append(lines.pop())
458         return log
459
460
461 # Printing helper methods:
462
463 DIVIDER = '=' * 60
464
465 def format_test_divider(message: str, len_message: int) -> str:
466         """
467         Returns string with message centered in fixed width divider.
468
469         Example:
470         '===================== message example ====================='
471
472         Parameters:
473         message - message to be centered in divider line
474         len_message - length of the message to be printed such that
475                 any characters of the color codes are not counted
476
477         Return:
478         String containing message centered in fixed width divider
479         """
480         default_count = 3  # default number of dashes
481         len_1 = default_count
482         len_2 = default_count
483         difference = len(DIVIDER) - len_message - 2  # 2 spaces added
484         if difference > 0:
485                 # calculate number of dashes for each side of the divider
486                 len_1 = int(difference / 2)
487                 len_2 = difference - len_1
488         return ('=' * len_1) + f' {message} ' + ('=' * len_2)
489
490 def print_test_header(test: Test) -> None:
491         """
492         Prints test header with test name and optionally the expected number
493         of subtests.
494
495         Example:
496         '=================== example (2 subtests) ==================='
497
498         Parameters:
499         test - Test object representing current test being printed
500         """
501         message = test.name
502         if test.expected_count:
503                 if test.expected_count == 1:
504                         message += ' (1 subtest)'
505                 else:
506                         message += f' ({test.expected_count} subtests)'
507         stdout.print_with_timestamp(format_test_divider(message, len(message)))
508
509 def print_log(log: Iterable[str]) -> None:
510         """Prints all strings in saved log for test in yellow."""
511         for m in log:
512                 stdout.print_with_timestamp(stdout.yellow(m))
513
514 def format_test_result(test: Test) -> str:
515         """
516         Returns string with formatted test result with colored status and test
517         name.
518
519         Example:
520         '[PASSED] example'
521
522         Parameters:
523         test - Test object representing current test being printed
524
525         Return:
526         String containing formatted test result
527         """
528         if test.status == TestStatus.SUCCESS:
529                 return stdout.green('[PASSED] ') + test.name
530         if test.status == TestStatus.SKIPPED:
531                 return stdout.yellow('[SKIPPED] ') + test.name
532         if test.status == TestStatus.NO_TESTS:
533                 return stdout.yellow('[NO TESTS RUN] ') + test.name
534         if test.status == TestStatus.TEST_CRASHED:
535                 print_log(test.log)
536                 return stdout.red('[CRASHED] ') + test.name
537         print_log(test.log)
538         return stdout.red('[FAILED] ') + test.name
539
540 def print_test_result(test: Test) -> None:
541         """
542         Prints result line with status of test.
543
544         Example:
545         '[PASSED] example'
546
547         Parameters:
548         test - Test object representing current test being printed
549         """
550         stdout.print_with_timestamp(format_test_result(test))
551
552 def print_test_footer(test: Test) -> None:
553         """
554         Prints test footer with status of test.
555
556         Example:
557         '===================== [PASSED] example ====================='
558
559         Parameters:
560         test - Test object representing current test being printed
561         """
562         message = format_test_result(test)
563         stdout.print_with_timestamp(format_test_divider(message,
564                 len(message) - stdout.color_len()))
565
566 def print_summary_line(test: Test) -> None:
567         """
568         Prints summary line of test object. Color of line is dependent on
569         status of test. Color is green if test passes, yellow if test is
570         skipped, and red if the test fails or crashes. Summary line contains
571         counts of the statuses of the tests subtests or the test itself if it
572         has no subtests.
573
574         Example:
575         "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
576         Errors: 0"
577
578         test - Test object representing current test being printed
579         """
580         if test.status == TestStatus.SUCCESS:
581                 color = stdout.green
582         elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
583                 color = stdout.yellow
584         else:
585                 color = stdout.red
586         stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
587
588 # Other methods:
589
590 def bubble_up_test_results(test: Test) -> None:
591         """
592         If the test has subtests, add the test counts of the subtests to the
593         test and check if any of the tests crashed and if so set the test
594         status to crashed. Otherwise if the test has no subtests add the
595         status of the test to the test counts.
596
597         Parameters:
598         test - Test object for current test being parsed
599         """
600         subtests = test.subtests
601         counts = test.counts
602         status = test.status
603         for t in subtests:
604                 counts.add_subtest_counts(t.counts)
605         if counts.total() == 0:
606                 counts.add_status(status)
607         elif test.counts.get_status() == TestStatus.TEST_CRASHED:
608                 test.status = TestStatus.TEST_CRASHED
609
610 def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
611         """
612         Finds next test to parse in LineStream, creates new Test object,
613         parses any subtests of the test, populates Test object with all
614         information (status, name) about the test and the Test objects for
615         any subtests, and then returns the Test object. The method accepts
616         three formats of tests:
617
618         Accepted test formats:
619
620         - Main KTAP/TAP header
621
622         Example:
623
624         KTAP version 1
625         1..4
626         [subtests]
627
628         - Subtest header line
629
630         Example:
631
632         # Subtest: name
633         1..3
634         [subtests]
635         ok 1 name
636
637         - Test result line
638
639         Example:
640
641         ok 1 - test
642
643         Parameters:
644         lines - LineStream of KTAP output to parse
645         expected_num - expected test number for test to be parsed
646         log - list of strings containing any preceding diagnostic lines
647                 corresponding to the current test
648
649         Return:
650         Test object populated with characteristics and any subtests
651         """
652         test = Test()
653         test.log.extend(log)
654         parent_test = False
655         main = parse_ktap_header(lines, test)
656         if main:
657                 # If KTAP/TAP header is found, attempt to parse
658                 # test plan
659                 test.name = "main"
660                 parse_test_plan(lines, test)
661                 parent_test = True
662         else:
663                 # If KTAP/TAP header is not found, test must be subtest
664                 # header or test result line so parse attempt to parser
665                 # subtest header
666                 parent_test = parse_test_header(lines, test)
667                 if parent_test:
668                         # If subtest header is found, attempt to parse
669                         # test plan and print header
670                         parse_test_plan(lines, test)
671                         print_test_header(test)
672         expected_count = test.expected_count
673         subtests = []
674         test_num = 1
675         while parent_test and (expected_count is None or test_num <= expected_count):
676                 # Loop to parse any subtests.
677                 # Break after parsing expected number of tests or
678                 # if expected number of tests is unknown break when test
679                 # result line with matching name to subtest header is found
680                 # or no more lines in stream.
681                 sub_log = parse_diagnostic(lines)
682                 sub_test = Test()
683                 if not lines or (peek_test_name_match(lines, test) and
684                                 not main):
685                         if expected_count and test_num <= expected_count:
686                                 # If parser reaches end of test before
687                                 # parsing expected number of subtests, print
688                                 # crashed subtest and record error
689                                 test.add_error('missing expected subtest!')
690                                 sub_test.log.extend(sub_log)
691                                 test.counts.add_status(
692                                         TestStatus.TEST_CRASHED)
693                                 print_test_result(sub_test)
694                         else:
695                                 test.log.extend(sub_log)
696                                 break
697                 else:
698                         sub_test = parse_test(lines, test_num, sub_log)
699                 subtests.append(sub_test)
700                 test_num += 1
701         test.subtests = subtests
702         if not main:
703                 # If not main test, look for test result line
704                 test.log.extend(parse_diagnostic(lines))
705                 if (parent_test and peek_test_name_match(lines, test)) or \
706                                 not parent_test:
707                         parse_test_result(lines, test, expected_num)
708                 else:
709                         test.add_error('missing subtest result line!')
710
711         # Check for there being no tests
712         if parent_test and len(subtests) == 0:
713                 # Don't override a bad status if this test had one reported.
714                 # Assumption: no subtests means CRASHED is from Test.__init__()
715                 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
716                         test.status = TestStatus.NO_TESTS
717                         test.add_error('0 tests run!')
718
719         # Add statuses to TestCounts attribute in Test object
720         bubble_up_test_results(test)
721         if parent_test and not main:
722                 # If test has subtests and is not the main test object, print
723                 # footer.
724                 print_test_footer(test)
725         elif not main:
726                 print_test_result(test)
727         return test
728
729 def parse_run_tests(kernel_output: Iterable[str]) -> Test:
730         """
731         Using kernel output, extract KTAP lines, parse the lines for test
732         results and print condensed test results and summary line.
733
734         Parameters:
735         kernel_output - Iterable object contains lines of kernel output
736
737         Return:
738         Test - the main test object with all subtests.
739         """
740         stdout.print_with_timestamp(DIVIDER)
741         lines = extract_tap_lines(kernel_output)
742         test = Test()
743         if not lines:
744                 test.name = '<missing>'
745                 test.add_error('could not find any KTAP output!')
746                 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
747         else:
748                 test = parse_test(lines, 0, [])
749                 if test.status != TestStatus.NO_TESTS:
750                         test.status = test.counts.get_status()
751         stdout.print_with_timestamp(DIVIDER)
752         print_summary_line(test)
753         return test