Merge branches 'fixes' and 'misc' into for-linus
[linux-2.6-microblaze.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses KTAP test results from a kernel dmesg log and incrementally prints
4 # results with reader-friendly format. Stores and returns test results in a
5 # Test object.
6 #
7 # Copyright (C) 2019, Google LLC.
8 # Author: Felix Guo <felixguoxiuping@gmail.com>
9 # Author: Brendan Higgins <brendanhiggins@google.com>
10 # Author: Rae Moar <rmoar@google.com>
11
12 from __future__ import annotations
13 import re
14
15 import datetime
16 from enum import Enum, auto
17 from functools import reduce
18 from typing import Iterable, Iterator, List, Optional, Tuple
19
20 class Test(object):
21         """
22         A class to represent a test parsed from KTAP results. All KTAP
23         results within a test log are stored in a main Test object as
24         subtests.
25
26         Attributes:
27         status : TestStatus - status of the test
28         name : str - name of the test
29         expected_count : int - expected number of subtests (0 if single
30                 test case and None if unknown expected number of subtests)
31         subtests : List[Test] - list of subtests
32         log : List[str] - log of KTAP lines that correspond to the test
33         counts : TestCounts - counts of the test statuses and errors of
34                 subtests or of the test itself if the test is a single
35                 test case.
36         """
37         def __init__(self) -> None:
38                 """Creates Test object with default attributes."""
39                 self.status = TestStatus.TEST_CRASHED
40                 self.name = ''
41                 self.expected_count = 0  # type: Optional[int]
42                 self.subtests = []  # type: List[Test]
43                 self.log = []  # type: List[str]
44                 self.counts = TestCounts()
45
46         def __str__(self) -> str:
47                 """Returns string representation of a Test class object."""
48                 return ('Test(' + str(self.status) + ', ' + self.name +
49                         ', ' + str(self.expected_count) + ', ' +
50                         str(self.subtests) + ', ' + str(self.log) + ', ' +
51                         str(self.counts) + ')')
52
53         def __repr__(self) -> str:
54                 """Returns string representation of a Test class object."""
55                 return str(self)
56
57         def add_error(self, error_message: str) -> None:
58                 """Records an error that occurred while parsing this test."""
59                 self.counts.errors += 1
60                 print_error('Test ' + self.name + ': ' + error_message)
61
62 class TestStatus(Enum):
63         """An enumeration class to represent the status of a test."""
64         SUCCESS = auto()
65         FAILURE = auto()
66         SKIPPED = auto()
67         TEST_CRASHED = auto()
68         NO_TESTS = auto()
69         FAILURE_TO_PARSE_TESTS = auto()
70
71 class TestCounts:
72         """
73         Tracks the counts of statuses of all test cases and any errors within
74         a Test.
75
76         Attributes:
77         passed : int - the number of tests that have passed
78         failed : int - the number of tests that have failed
79         crashed : int - the number of tests that have crashed
80         skipped : int - the number of tests that have skipped
81         errors : int - the number of errors in the test and subtests
82         """
83         def __init__(self):
84                 """Creates TestCounts object with counts of all test
85                 statuses and test errors set to 0.
86                 """
87                 self.passed = 0
88                 self.failed = 0
89                 self.crashed = 0
90                 self.skipped = 0
91                 self.errors = 0
92
93         def __str__(self) -> str:
94                 """Returns the string representation of a TestCounts object.
95                 """
96                 return ('Passed: ' + str(self.passed) +
97                         ', Failed: ' + str(self.failed) +
98                         ', Crashed: ' + str(self.crashed) +
99                         ', Skipped: ' + str(self.skipped) +
100                         ', Errors: ' + str(self.errors))
101
102         def total(self) -> int:
103                 """Returns the total number of test cases within a test
104                 object, where a test case is a test with no subtests.
105                 """
106                 return (self.passed + self.failed + self.crashed +
107                         self.skipped)
108
109         def add_subtest_counts(self, counts: TestCounts) -> None:
110                 """
111                 Adds the counts of another TestCounts object to the current
112                 TestCounts object. Used to add the counts of a subtest to the
113                 parent test.
114
115                 Parameters:
116                 counts - a different TestCounts object whose counts
117                         will be added to the counts of the TestCounts object
118                 """
119                 self.passed += counts.passed
120                 self.failed += counts.failed
121                 self.crashed += counts.crashed
122                 self.skipped += counts.skipped
123                 self.errors += counts.errors
124
125         def get_status(self) -> TestStatus:
126                 """Returns the aggregated status of a Test using test
127                 counts.
128                 """
129                 if self.total() == 0:
130                         return TestStatus.NO_TESTS
131                 elif self.crashed:
132                         # If one of the subtests crash, the expected status
133                         # of the Test is crashed.
134                         return TestStatus.TEST_CRASHED
135                 elif self.failed:
136                         # Otherwise if one of the subtests fail, the
137                         # expected status of the Test is failed.
138                         return TestStatus.FAILURE
139                 elif self.passed:
140                         # Otherwise if one of the subtests pass, the
141                         # expected status of the Test is passed.
142                         return TestStatus.SUCCESS
143                 else:
144                         # Finally, if none of the subtests have failed,
145                         # crashed, or passed, the expected status of the
146                         # Test is skipped.
147                         return TestStatus.SKIPPED
148
149         def add_status(self, status: TestStatus) -> None:
150                 """
151                 Increments count of inputted status.
152
153                 Parameters:
154                 status - status to be added to the TestCounts object
155                 """
156                 if status == TestStatus.SUCCESS:
157                         self.passed += 1
158                 elif status == TestStatus.FAILURE:
159                         self.failed += 1
160                 elif status == TestStatus.SKIPPED:
161                         self.skipped += 1
162                 elif status != TestStatus.NO_TESTS:
163                         self.crashed += 1
164
165 class LineStream:
166         """
167         A class to represent the lines of kernel output.
168         Provides a lazy peek()/pop() interface over an iterator of
169         (line#, text).
170         """
171         _lines: Iterator[Tuple[int, str]]
172         _next: Tuple[int, str]
173         _need_next: bool
174         _done: bool
175
176         def __init__(self, lines: Iterator[Tuple[int, str]]):
177                 """Creates a new LineStream that wraps the given iterator."""
178                 self._lines = lines
179                 self._done = False
180                 self._need_next = True
181                 self._next = (0, '')
182
183         def _get_next(self) -> None:
184                 """Advances the LineSteam to the next line, if necessary."""
185                 if not self._need_next:
186                         return
187                 try:
188                         self._next = next(self._lines)
189                 except StopIteration:
190                         self._done = True
191                 finally:
192                         self._need_next = False
193
194         def peek(self) -> str:
195                 """Returns the current line, without advancing the LineStream.
196                 """
197                 self._get_next()
198                 return self._next[1]
199
200         def pop(self) -> str:
201                 """Returns the current line and advances the LineStream to
202                 the next line.
203                 """
204                 s = self.peek()
205                 if self._done:
206                         raise ValueError(f'LineStream: going past EOF, last line was {s}')
207                 self._need_next = True
208                 return s
209
210         def __bool__(self) -> bool:
211                 """Returns True if stream has more lines."""
212                 self._get_next()
213                 return not self._done
214
215         # Only used by kunit_tool_test.py.
216         def __iter__(self) -> Iterator[str]:
217                 """Empties all lines stored in LineStream object into
218                 Iterator object and returns the Iterator object.
219                 """
220                 while bool(self):
221                         yield self.pop()
222
223         def line_number(self) -> int:
224                 """Returns the line number of the current line."""
225                 self._get_next()
226                 return self._next[0]
227
228 # Parsing helper methods:
229
230 KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
231 TAP_START = re.compile(r'TAP version ([0-9]+)$')
232 KTAP_END = re.compile('(List of all partitions:|'
233         'Kernel panic - not syncing: VFS:|reboot: System halted)')
234
235 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
236         """Extracts KTAP lines from the kernel output."""
237         def isolate_ktap_output(kernel_output: Iterable[str]) \
238                         -> Iterator[Tuple[int, str]]:
239                 line_num = 0
240                 started = False
241                 for line in kernel_output:
242                         line_num += 1
243                         line = line.rstrip()  # remove trailing \n
244                         if not started and KTAP_START.search(line):
245                                 # start extracting KTAP lines and set prefix
246                                 # to number of characters before version line
247                                 prefix_len = len(
248                                         line.split('KTAP version')[0])
249                                 started = True
250                                 yield line_num, line[prefix_len:]
251                         elif not started and TAP_START.search(line):
252                                 # start extracting KTAP lines and set prefix
253                                 # to number of characters before version line
254                                 prefix_len = len(line.split('TAP version')[0])
255                                 started = True
256                                 yield line_num, line[prefix_len:]
257                         elif started and KTAP_END.search(line):
258                                 # stop extracting KTAP lines
259                                 break
260                         elif started:
261                                 # remove prefix and any indention and yield
262                                 # line with line number
263                                 line = line[prefix_len:].lstrip()
264                                 yield line_num, line
265         return LineStream(lines=isolate_ktap_output(kernel_output))
266
267 KTAP_VERSIONS = [1]
268 TAP_VERSIONS = [13, 14]
269
270 def check_version(version_num: int, accepted_versions: List[int],
271                         version_type: str, test: Test) -> None:
272         """
273         Adds error to test object if version number is too high or too
274         low.
275
276         Parameters:
277         version_num - The inputted version number from the parsed KTAP or TAP
278                 header line
279         accepted_version - List of accepted KTAP or TAP versions
280         version_type - 'KTAP' or 'TAP' depending on the type of
281                 version line.
282         test - Test object for current test being parsed
283         """
284         if version_num < min(accepted_versions):
285                 test.add_error(version_type +
286                         ' version lower than expected!')
287         elif version_num > max(accepted_versions):
288                 test.add_error(
289                         version_type + ' version higher than expected!')
290
291 def parse_ktap_header(lines: LineStream, test: Test) -> bool:
292         """
293         Parses KTAP/TAP header line and checks version number.
294         Returns False if fails to parse KTAP/TAP header line.
295
296         Accepted formats:
297         - 'KTAP version [version number]'
298         - 'TAP version [version number]'
299
300         Parameters:
301         lines - LineStream of KTAP output to parse
302         test - Test object for current test being parsed
303
304         Return:
305         True if successfully parsed KTAP/TAP header line
306         """
307         ktap_match = KTAP_START.match(lines.peek())
308         tap_match = TAP_START.match(lines.peek())
309         if ktap_match:
310                 version_num = int(ktap_match.group(1))
311                 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
312         elif tap_match:
313                 version_num = int(tap_match.group(1))
314                 check_version(version_num, TAP_VERSIONS, 'TAP', test)
315         else:
316                 return False
317         test.log.append(lines.pop())
318         return True
319
320 TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
321
322 def parse_test_header(lines: LineStream, test: Test) -> bool:
323         """
324         Parses test header and stores test name in test object.
325         Returns False if fails to parse test header line.
326
327         Accepted format:
328         - '# Subtest: [test name]'
329
330         Parameters:
331         lines - LineStream of KTAP output to parse
332         test - Test object for current test being parsed
333
334         Return:
335         True if successfully parsed test header line
336         """
337         match = TEST_HEADER.match(lines.peek())
338         if not match:
339                 return False
340         test.log.append(lines.pop())
341         test.name = match.group(1)
342         return True
343
344 TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
345
346 def parse_test_plan(lines: LineStream, test: Test) -> bool:
347         """
348         Parses test plan line and stores the expected number of subtests in
349         test object. Reports an error if expected count is 0.
350         Returns False and sets expected_count to None if there is no valid test
351         plan.
352
353         Accepted format:
354         - '1..[number of subtests]'
355
356         Parameters:
357         lines - LineStream of KTAP output to parse
358         test - Test object for current test being parsed
359
360         Return:
361         True if successfully parsed test plan line
362         """
363         match = TEST_PLAN.match(lines.peek())
364         if not match:
365                 test.expected_count = None
366                 return False
367         test.log.append(lines.pop())
368         expected_count = int(match.group(1))
369         test.expected_count = expected_count
370         return True
371
372 TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
373
374 TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
375
376 def peek_test_name_match(lines: LineStream, test: Test) -> bool:
377         """
378         Matches current line with the format of a test result line and checks
379         if the name matches the name of the current test.
380         Returns False if fails to match format or name.
381
382         Accepted format:
383         - '[ok|not ok] [test number] [-] [test name] [optional skip
384                 directive]'
385
386         Parameters:
387         lines - LineStream of KTAP output to parse
388         test - Test object for current test being parsed
389
390         Return:
391         True if matched a test result line and the name matching the
392                 expected test name
393         """
394         line = lines.peek()
395         match = TEST_RESULT.match(line)
396         if not match:
397                 return False
398         name = match.group(4)
399         return (name == test.name)
400
401 def parse_test_result(lines: LineStream, test: Test,
402                         expected_num: int) -> bool:
403         """
404         Parses test result line and stores the status and name in the test
405         object. Reports an error if the test number does not match expected
406         test number.
407         Returns False if fails to parse test result line.
408
409         Note that the SKIP directive is the only direction that causes a
410         change in status.
411
412         Accepted format:
413         - '[ok|not ok] [test number] [-] [test name] [optional skip
414                 directive]'
415
416         Parameters:
417         lines - LineStream of KTAP output to parse
418         test - Test object for current test being parsed
419         expected_num - expected test number for current test
420
421         Return:
422         True if successfully parsed a test result line.
423         """
424         line = lines.peek()
425         match = TEST_RESULT.match(line)
426         skip_match = TEST_RESULT_SKIP.match(line)
427
428         # Check if line matches test result line format
429         if not match:
430                 return False
431         test.log.append(lines.pop())
432
433         # Set name of test object
434         if skip_match:
435                 test.name = skip_match.group(4)
436         else:
437                 test.name = match.group(4)
438
439         # Check test num
440         num = int(match.group(2))
441         if num != expected_num:
442                 test.add_error('Expected test number ' +
443                         str(expected_num) + ' but found ' + str(num))
444
445         # Set status of test object
446         status = match.group(1)
447         if skip_match:
448                 test.status = TestStatus.SKIPPED
449         elif status == 'ok':
450                 test.status = TestStatus.SUCCESS
451         else:
452                 test.status = TestStatus.FAILURE
453         return True
454
455 def parse_diagnostic(lines: LineStream) -> List[str]:
456         """
457         Parse lines that do not match the format of a test result line or
458         test header line and returns them in list.
459
460         Line formats that are not parsed:
461         - '# Subtest: [test name]'
462         - '[ok|not ok] [test number] [-] [test name] [optional skip
463                 directive]'
464
465         Parameters:
466         lines - LineStream of KTAP output to parse
467
468         Return:
469         Log of diagnostic lines
470         """
471         log = []  # type: List[str]
472         while lines and not TEST_RESULT.match(lines.peek()) and not \
473                         TEST_HEADER.match(lines.peek()):
474                 log.append(lines.pop())
475         return log
476
477 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^# .*?: kunit test case crashed!$')
478
479 def parse_crash_in_log(test: Test) -> bool:
480         """
481         Iterate through the lines of the log to parse for crash message.
482         If crash message found, set status to crashed and return True.
483         Otherwise return False.
484
485         Parameters:
486         test - Test object for current test being parsed
487
488         Return:
489         True if crash message found in log
490         """
491         for line in test.log:
492                 if DIAGNOSTIC_CRASH_MESSAGE.match(line):
493                         test.status = TestStatus.TEST_CRASHED
494                         return True
495         return False
496
497
498 # Printing helper methods:
499
500 DIVIDER = '=' * 60
501
502 RESET = '\033[0;0m'
503
504 def red(text: str) -> str:
505         """Returns inputted string with red color code."""
506         return '\033[1;31m' + text + RESET
507
508 def yellow(text: str) -> str:
509         """Returns inputted string with yellow color code."""
510         return '\033[1;33m' + text + RESET
511
512 def green(text: str) -> str:
513         """Returns inputted string with green color code."""
514         return '\033[1;32m' + text + RESET
515
516 ANSI_LEN = len(red(''))
517
518 def print_with_timestamp(message: str) -> None:
519         """Prints message with timestamp at beginning."""
520         print('[%s] %s' % (datetime.datetime.now().strftime('%H:%M:%S'), message))
521
522 def format_test_divider(message: str, len_message: int) -> str:
523         """
524         Returns string with message centered in fixed width divider.
525
526         Example:
527         '===================== message example ====================='
528
529         Parameters:
530         message - message to be centered in divider line
531         len_message - length of the message to be printed such that
532                 any characters of the color codes are not counted
533
534         Return:
535         String containing message centered in fixed width divider
536         """
537         default_count = 3  # default number of dashes
538         len_1 = default_count
539         len_2 = default_count
540         difference = len(DIVIDER) - len_message - 2  # 2 spaces added
541         if difference > 0:
542                 # calculate number of dashes for each side of the divider
543                 len_1 = int(difference / 2)
544                 len_2 = difference - len_1
545         return ('=' * len_1) + ' ' + message + ' ' + ('=' * len_2)
546
547 def print_test_header(test: Test) -> None:
548         """
549         Prints test header with test name and optionally the expected number
550         of subtests.
551
552         Example:
553         '=================== example (2 subtests) ==================='
554
555         Parameters:
556         test - Test object representing current test being printed
557         """
558         message = test.name
559         if test.expected_count:
560                 if test.expected_count == 1:
561                         message += (' (' + str(test.expected_count) +
562                                 ' subtest)')
563                 else:
564                         message += (' (' + str(test.expected_count) +
565                                 ' subtests)')
566         print_with_timestamp(format_test_divider(message, len(message)))
567
568 def print_log(log: Iterable[str]) -> None:
569         """
570         Prints all strings in saved log for test in yellow.
571
572         Parameters:
573         log - Iterable object with all strings saved in log for test
574         """
575         for m in log:
576                 print_with_timestamp(yellow(m))
577
578 def format_test_result(test: Test) -> str:
579         """
580         Returns string with formatted test result with colored status and test
581         name.
582
583         Example:
584         '[PASSED] example'
585
586         Parameters:
587         test - Test object representing current test being printed
588
589         Return:
590         String containing formatted test result
591         """
592         if test.status == TestStatus.SUCCESS:
593                 return (green('[PASSED] ') + test.name)
594         elif test.status == TestStatus.SKIPPED:
595                 return (yellow('[SKIPPED] ') + test.name)
596         elif test.status == TestStatus.NO_TESTS:
597                 return (yellow('[NO TESTS RUN] ') + test.name)
598         elif test.status == TestStatus.TEST_CRASHED:
599                 print_log(test.log)
600                 return (red('[CRASHED] ') + test.name)
601         else:
602                 print_log(test.log)
603                 return (red('[FAILED] ') + test.name)
604
605 def print_test_result(test: Test) -> None:
606         """
607         Prints result line with status of test.
608
609         Example:
610         '[PASSED] example'
611
612         Parameters:
613         test - Test object representing current test being printed
614         """
615         print_with_timestamp(format_test_result(test))
616
617 def print_test_footer(test: Test) -> None:
618         """
619         Prints test footer with status of test.
620
621         Example:
622         '===================== [PASSED] example ====================='
623
624         Parameters:
625         test - Test object representing current test being printed
626         """
627         message = format_test_result(test)
628         print_with_timestamp(format_test_divider(message,
629                 len(message) - ANSI_LEN))
630
631 def print_summary_line(test: Test) -> None:
632         """
633         Prints summary line of test object. Color of line is dependent on
634         status of test. Color is green if test passes, yellow if test is
635         skipped, and red if the test fails or crashes. Summary line contains
636         counts of the statuses of the tests subtests or the test itself if it
637         has no subtests.
638
639         Example:
640         "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
641         Errors: 0"
642
643         test - Test object representing current test being printed
644         """
645         if test.status == TestStatus.SUCCESS:
646                 color = green
647         elif test.status == TestStatus.SKIPPED or test.status == TestStatus.NO_TESTS:
648                 color = yellow
649         else:
650                 color = red
651         counts = test.counts
652         print_with_timestamp(color('Testing complete. ' + str(counts)))
653
654 def print_error(error_message: str) -> None:
655         """
656         Prints error message with error format.
657
658         Example:
659         "[ERROR] Test example: missing test plan!"
660
661         Parameters:
662         error_message - message describing error
663         """
664         print_with_timestamp(red('[ERROR] ') + error_message)
665
666 # Other methods:
667
668 def bubble_up_test_results(test: Test) -> None:
669         """
670         If the test has subtests, add the test counts of the subtests to the
671         test and check if any of the tests crashed and if so set the test
672         status to crashed. Otherwise if the test has no subtests add the
673         status of the test to the test counts.
674
675         Parameters:
676         test - Test object for current test being parsed
677         """
678         parse_crash_in_log(test)
679         subtests = test.subtests
680         counts = test.counts
681         status = test.status
682         for t in subtests:
683                 counts.add_subtest_counts(t.counts)
684         if counts.total() == 0:
685                 counts.add_status(status)
686         elif test.counts.get_status() == TestStatus.TEST_CRASHED:
687                 test.status = TestStatus.TEST_CRASHED
688
689 def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
690         """
691         Finds next test to parse in LineStream, creates new Test object,
692         parses any subtests of the test, populates Test object with all
693         information (status, name) about the test and the Test objects for
694         any subtests, and then returns the Test object. The method accepts
695         three formats of tests:
696
697         Accepted test formats:
698
699         - Main KTAP/TAP header
700
701         Example:
702
703         KTAP version 1
704         1..4
705         [subtests]
706
707         - Subtest header line
708
709         Example:
710
711         # Subtest: name
712         1..3
713         [subtests]
714         ok 1 name
715
716         - Test result line
717
718         Example:
719
720         ok 1 - test
721
722         Parameters:
723         lines - LineStream of KTAP output to parse
724         expected_num - expected test number for test to be parsed
725         log - list of strings containing any preceding diagnostic lines
726                 corresponding to the current test
727
728         Return:
729         Test object populated with characteristics and any subtests
730         """
731         test = Test()
732         test.log.extend(log)
733         parent_test = False
734         main = parse_ktap_header(lines, test)
735         if main:
736                 # If KTAP/TAP header is found, attempt to parse
737                 # test plan
738                 test.name = "main"
739                 parse_test_plan(lines, test)
740                 parent_test = True
741         else:
742                 # If KTAP/TAP header is not found, test must be subtest
743                 # header or test result line so parse attempt to parser
744                 # subtest header
745                 parent_test = parse_test_header(lines, test)
746                 if parent_test:
747                         # If subtest header is found, attempt to parse
748                         # test plan and print header
749                         parse_test_plan(lines, test)
750                         print_test_header(test)
751         expected_count = test.expected_count
752         subtests = []
753         test_num = 1
754         while parent_test and (expected_count is None or test_num <= expected_count):
755                 # Loop to parse any subtests.
756                 # Break after parsing expected number of tests or
757                 # if expected number of tests is unknown break when test
758                 # result line with matching name to subtest header is found
759                 # or no more lines in stream.
760                 sub_log = parse_diagnostic(lines)
761                 sub_test = Test()
762                 if not lines or (peek_test_name_match(lines, test) and
763                                 not main):
764                         if expected_count and test_num <= expected_count:
765                                 # If parser reaches end of test before
766                                 # parsing expected number of subtests, print
767                                 # crashed subtest and record error
768                                 test.add_error('missing expected subtest!')
769                                 sub_test.log.extend(sub_log)
770                                 test.counts.add_status(
771                                         TestStatus.TEST_CRASHED)
772                                 print_test_result(sub_test)
773                         else:
774                                 test.log.extend(sub_log)
775                                 break
776                 else:
777                         sub_test = parse_test(lines, test_num, sub_log)
778                 subtests.append(sub_test)
779                 test_num += 1
780         test.subtests = subtests
781         if not main:
782                 # If not main test, look for test result line
783                 test.log.extend(parse_diagnostic(lines))
784                 if (parent_test and peek_test_name_match(lines, test)) or \
785                                 not parent_test:
786                         parse_test_result(lines, test, expected_num)
787                 else:
788                         test.add_error('missing subtest result line!')
789
790         # Check for there being no tests
791         if parent_test and len(subtests) == 0:
792                 test.status = TestStatus.NO_TESTS
793                 test.add_error('0 tests run!')
794
795         # Add statuses to TestCounts attribute in Test object
796         bubble_up_test_results(test)
797         if parent_test and not main:
798                 # If test has subtests and is not the main test object, print
799                 # footer.
800                 print_test_footer(test)
801         elif not main:
802                 print_test_result(test)
803         return test
804
805 def parse_run_tests(kernel_output: Iterable[str]) -> Test:
806         """
807         Using kernel output, extract KTAP lines, parse the lines for test
808         results and print condensed test results and summary line .
809
810         Parameters:
811         kernel_output - Iterable object contains lines of kernel output
812
813         Return:
814         Test - the main test object with all subtests.
815         """
816         print_with_timestamp(DIVIDER)
817         lines = extract_tap_lines(kernel_output)
818         test = Test()
819         if not lines:
820                 test.add_error('invalid KTAP input!')
821                 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
822         else:
823                 test = parse_test(lines, 0, [])
824                 if test.status != TestStatus.NO_TESTS:
825                         test.status = test.counts.get_status()
826         print_with_timestamp(DIVIDER)
827         print_summary_line(test)
828         return test