Merge tag 'memblock-v5.15-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/rppt...
[linux-2.6-microblaze.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses test results from a kernel dmesg log.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import re
10
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import Iterable, Iterator, List, Optional, Tuple
16
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
18
19 class TestSuite(object):
20         def __init__(self) -> None:
21                 self.status = TestStatus.SUCCESS
22                 self.name = ''
23                 self.cases = []  # type: List[TestCase]
24
25         def __str__(self) -> str:
26                 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
27
28         def __repr__(self) -> str:
29                 return str(self)
30
31 class TestCase(object):
32         def __init__(self) -> None:
33                 self.status = TestStatus.SUCCESS
34                 self.name = ''
35                 self.log = []  # type: List[str]
36
37         def __str__(self) -> str:
38                 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
39
40         def __repr__(self) -> str:
41                 return str(self)
42
43 class TestStatus(Enum):
44         SUCCESS = auto()
45         FAILURE = auto()
46         SKIPPED = auto()
47         TEST_CRASHED = auto()
48         NO_TESTS = auto()
49         FAILURE_TO_PARSE_TESTS = auto()
50
51 class LineStream:
52         """Provides a peek()/pop() interface over an iterator of (line#, text)."""
53         _lines: Iterator[Tuple[int, str]]
54         _next: Tuple[int, str]
55         _done: bool
56
57         def __init__(self, lines: Iterator[Tuple[int, str]]):
58                 self._lines = lines
59                 self._done = False
60                 self._next = (0, '')
61                 self._get_next()
62
63         def _get_next(self) -> None:
64                 try:
65                         self._next = next(self._lines)
66                 except StopIteration:
67                         self._done = True
68
69         def peek(self) -> str:
70                 return self._next[1]
71
72         def pop(self) -> str:
73                 n = self._next
74                 self._get_next()
75                 return n[1]
76
77         def __bool__(self) -> bool:
78                 return not self._done
79
80         # Only used by kunit_tool_test.py.
81         def __iter__(self) -> Iterator[str]:
82                 while bool(self):
83                         yield self.pop()
84
85         def line_number(self) -> int:
86                 return self._next[0]
87
88 kunit_start_re = re.compile(r'TAP version [0-9]+$')
89 kunit_end_re = re.compile('(List of all partitions:|'
90                           'Kernel panic - not syncing: VFS:|reboot: System halted)')
91
92 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
93         def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
94                 line_num = 0
95                 started = False
96                 for line in kernel_output:
97                         line_num += 1
98                         line = line.rstrip()  # line always has a trailing \n
99                         if kunit_start_re.search(line):
100                                 prefix_len = len(line.split('TAP version')[0])
101                                 started = True
102                                 yield line_num, line[prefix_len:]
103                         elif kunit_end_re.search(line):
104                                 break
105                         elif started:
106                                 yield line_num, line[prefix_len:]
107         return LineStream(lines=isolate_kunit_output(kernel_output))
108
109 def raw_output(kernel_output) -> None:
110         for line in kernel_output:
111                 print(line.rstrip())
112
113 DIVIDER = '=' * 60
114
115 RESET = '\033[0;0m'
116
117 def red(text) -> str:
118         return '\033[1;31m' + text + RESET
119
120 def yellow(text) -> str:
121         return '\033[1;33m' + text + RESET
122
123 def green(text) -> str:
124         return '\033[1;32m' + text + RESET
125
126 def print_with_timestamp(message) -> None:
127         print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
128
129 def format_suite_divider(message) -> str:
130         return '======== ' + message + ' ========'
131
132 def print_suite_divider(message) -> None:
133         print_with_timestamp(DIVIDER)
134         print_with_timestamp(format_suite_divider(message))
135
136 def print_log(log) -> None:
137         for m in log:
138                 print_with_timestamp(m)
139
140 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
141
142 def consume_non_diagnostic(lines: LineStream) -> None:
143         while lines and not TAP_ENTRIES.match(lines.peek()):
144                 lines.pop()
145
146 def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
147         while lines and not TAP_ENTRIES.match(lines.peek()):
148                 test_case.log.append(lines.peek())
149                 lines.pop()
150
151 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
152
153 OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
154
155 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
156
157 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
158
159 def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
160         save_non_diagnostic(lines, test_case)
161         if not lines:
162                 test_case.status = TestStatus.TEST_CRASHED
163                 return True
164         line = lines.peek()
165         match = OK_NOT_OK_SUBTEST.match(line)
166         while not match and lines:
167                 line = lines.pop()
168                 match = OK_NOT_OK_SUBTEST.match(line)
169         if match:
170                 test_case.log.append(lines.pop())
171                 test_case.name = match.group(2)
172                 skip_match = OK_NOT_OK_SKIP.match(line)
173                 if skip_match:
174                         test_case.status = TestStatus.SKIPPED
175                         return True
176                 if test_case.status == TestStatus.TEST_CRASHED:
177                         return True
178                 if match.group(1) == 'ok':
179                         test_case.status = TestStatus.SUCCESS
180                 else:
181                         test_case.status = TestStatus.FAILURE
182                 return True
183         else:
184                 return False
185
186 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
187 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
188
189 def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
190         save_non_diagnostic(lines, test_case)
191         if not lines:
192                 return False
193         line = lines.peek()
194         match = SUBTEST_DIAGNOSTIC.match(line)
195         if match:
196                 test_case.log.append(lines.pop())
197                 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
198                 if crash_match:
199                         test_case.status = TestStatus.TEST_CRASHED
200                 return True
201         else:
202                 return False
203
204 def parse_test_case(lines: LineStream) -> Optional[TestCase]:
205         test_case = TestCase()
206         save_non_diagnostic(lines, test_case)
207         while parse_diagnostic(lines, test_case):
208                 pass
209         if parse_ok_not_ok_test_case(lines, test_case):
210                 return test_case
211         else:
212                 return None
213
214 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
215
216 def parse_subtest_header(lines: LineStream) -> Optional[str]:
217         consume_non_diagnostic(lines)
218         if not lines:
219                 return None
220         match = SUBTEST_HEADER.match(lines.peek())
221         if match:
222                 lines.pop()
223                 return match.group(1)
224         else:
225                 return None
226
227 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
228
229 def parse_subtest_plan(lines: LineStream) -> Optional[int]:
230         consume_non_diagnostic(lines)
231         match = SUBTEST_PLAN.match(lines.peek())
232         if match:
233                 lines.pop()
234                 return int(match.group(1))
235         else:
236                 return None
237
238 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
239         if left == right:
240                 return left
241         elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
242                 return TestStatus.TEST_CRASHED
243         elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
244                 return TestStatus.FAILURE
245         elif left == TestStatus.SKIPPED:
246                 return right
247         else:
248                 return left
249
250 def parse_ok_not_ok_test_suite(lines: LineStream,
251                                test_suite: TestSuite,
252                                expected_suite_index: int) -> bool:
253         consume_non_diagnostic(lines)
254         if not lines:
255                 test_suite.status = TestStatus.TEST_CRASHED
256                 return False
257         line = lines.peek()
258         match = OK_NOT_OK_MODULE.match(line)
259         if match:
260                 lines.pop()
261                 if match.group(1) == 'ok':
262                         test_suite.status = TestStatus.SUCCESS
263                 else:
264                         test_suite.status = TestStatus.FAILURE
265                 skip_match = OK_NOT_OK_SKIP.match(line)
266                 if skip_match:
267                         test_suite.status = TestStatus.SKIPPED
268                 suite_index = int(match.group(2))
269                 if suite_index != expected_suite_index:
270                         print_with_timestamp(
271                                 red('[ERROR] ') + 'expected_suite_index ' +
272                                 str(expected_suite_index) + ', but got ' +
273                                 str(suite_index))
274                 return True
275         else:
276                 return False
277
278 def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
279         return reduce(max_status, status_list, TestStatus.SKIPPED)
280
281 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
282         max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
283         return max_status(max_test_case_status, test_suite.status)
284
285 def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
286         if not lines:
287                 return None
288         consume_non_diagnostic(lines)
289         test_suite = TestSuite()
290         test_suite.status = TestStatus.SUCCESS
291         name = parse_subtest_header(lines)
292         if not name:
293                 return None
294         test_suite.name = name
295         expected_test_case_num = parse_subtest_plan(lines)
296         if expected_test_case_num is None:
297                 return None
298         while expected_test_case_num > 0:
299                 test_case = parse_test_case(lines)
300                 if not test_case:
301                         break
302                 test_suite.cases.append(test_case)
303                 expected_test_case_num -= 1
304         if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
305                 test_suite.status = bubble_up_test_case_errors(test_suite)
306                 return test_suite
307         elif not lines:
308                 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
309                 return test_suite
310         else:
311                 print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
312                 return None
313
314 TAP_HEADER = re.compile(r'^TAP version 14$')
315
316 def parse_tap_header(lines: LineStream) -> bool:
317         consume_non_diagnostic(lines)
318         if TAP_HEADER.match(lines.peek()):
319                 lines.pop()
320                 return True
321         else:
322                 return False
323
324 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
325
326 def parse_test_plan(lines: LineStream) -> Optional[int]:
327         consume_non_diagnostic(lines)
328         match = TEST_PLAN.match(lines.peek())
329         if match:
330                 lines.pop()
331                 return int(match.group(1))
332         else:
333                 return None
334
335 def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
336         return bubble_up_errors(x.status for x in test_suites)
337
338 def parse_test_result(lines: LineStream) -> TestResult:
339         consume_non_diagnostic(lines)
340         if not lines or not parse_tap_header(lines):
341                 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
342         expected_test_suite_num = parse_test_plan(lines)
343         if expected_test_suite_num == 0:
344                 return TestResult(TestStatus.NO_TESTS, [], lines)
345         elif expected_test_suite_num is None:
346                 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
347         test_suites = []
348         for i in range(1, expected_test_suite_num + 1):
349                 test_suite = parse_test_suite(lines, i)
350                 if test_suite:
351                         test_suites.append(test_suite)
352                 else:
353                         print_with_timestamp(
354                                 red('[ERROR] ') + ' expected ' +
355                                 str(expected_test_suite_num) +
356                                 ' test suites, but got ' + str(i - 2))
357                         break
358         test_suite = parse_test_suite(lines, -1)
359         if test_suite:
360                 print_with_timestamp(red('[ERROR] ') +
361                         'got unexpected test suite: ' + test_suite.name)
362         if test_suites:
363                 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
364         else:
365                 return TestResult(TestStatus.NO_TESTS, [], lines)
366
367 class TestCounts:
368         passed: int
369         failed: int
370         crashed: int
371         skipped: int
372
373         def __init__(self):
374                 self.passed = 0
375                 self.failed = 0
376                 self.crashed = 0
377                 self.skipped = 0
378
379         def total(self) -> int:
380                 return self.passed + self.failed + self.crashed + self.skipped
381
382 def print_and_count_results(test_result: TestResult) -> TestCounts:
383         counts = TestCounts()
384         for test_suite in test_result.suites:
385                 if test_suite.status == TestStatus.SUCCESS:
386                         print_suite_divider(green('[PASSED] ') + test_suite.name)
387                 elif test_suite.status == TestStatus.SKIPPED:
388                         print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
389                 elif test_suite.status == TestStatus.TEST_CRASHED:
390                         print_suite_divider(red('[CRASHED] ' + test_suite.name))
391                 else:
392                         print_suite_divider(red('[FAILED] ') + test_suite.name)
393                 for test_case in test_suite.cases:
394                         if test_case.status == TestStatus.SUCCESS:
395                                 counts.passed += 1
396                                 print_with_timestamp(green('[PASSED] ') + test_case.name)
397                         elif test_case.status == TestStatus.SKIPPED:
398                                 counts.skipped += 1
399                                 print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
400                         elif test_case.status == TestStatus.TEST_CRASHED:
401                                 counts.crashed += 1
402                                 print_with_timestamp(red('[CRASHED] ' + test_case.name))
403                                 print_log(map(yellow, test_case.log))
404                                 print_with_timestamp('')
405                         else:
406                                 counts.failed += 1
407                                 print_with_timestamp(red('[FAILED] ') + test_case.name)
408                                 print_log(map(yellow, test_case.log))
409                                 print_with_timestamp('')
410         return counts
411
412 def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
413         counts = TestCounts()
414         lines = extract_tap_lines(kernel_output)
415         test_result = parse_test_result(lines)
416         if test_result.status == TestStatus.NO_TESTS:
417                 print(red('[ERROR] ') + yellow('no tests run!'))
418         elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
419                 print(red('[ERROR] ') + yellow('could not parse test results!'))
420         else:
421                 counts = print_and_count_results(test_result)
422         print_with_timestamp(DIVIDER)
423         if test_result.status == TestStatus.SUCCESS:
424                 fmt = green
425         elif test_result.status == TestStatus.SKIPPED:
426                 fmt = yellow
427         else:
428                 fmt =red
429         print_with_timestamp(
430                 fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
431                     (counts.total(), counts.failed, counts.crashed, counts.skipped)))
432         return test_result