Merge tag 'xtensa-20201119' of git://github.com/jcmvbkbc/linux-xtensa
[linux-2.6-microblaze.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses test results from a kernel dmesg log.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import re
10
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import List, Optional, Tuple
16
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
18
19 class TestSuite(object):
20         def __init__(self):
21                 self.status = None
22                 self.name = None
23                 self.cases = []
24
25         def __str__(self):
26                 return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'
27
28         def __repr__(self):
29                 return str(self)
30
31 class TestCase(object):
32         def __init__(self):
33                 self.status = None
34                 self.name = ''
35                 self.log = []
36
37         def __str__(self):
38                 return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'
39
40         def __repr__(self):
41                 return str(self)
42
43 class TestStatus(Enum):
44         SUCCESS = auto()
45         FAILURE = auto()
46         TEST_CRASHED = auto()
47         NO_TESTS = auto()
48         FAILURE_TO_PARSE_TESTS = auto()
49
50 kunit_start_re = re.compile(r'TAP version [0-9]+$')
51 kunit_end_re = re.compile('(List of all partitions:|'
52                           'Kernel panic - not syncing: VFS:)')
53
54 def isolate_kunit_output(kernel_output):
55         started = False
56         for line in kernel_output:
57                 line = line.rstrip()  # line always has a trailing \n
58                 if kunit_start_re.search(line):
59                         prefix_len = len(line.split('TAP version')[0])
60                         started = True
61                         yield line[prefix_len:] if prefix_len > 0 else line
62                 elif kunit_end_re.search(line):
63                         break
64                 elif started:
65                         yield line[prefix_len:] if prefix_len > 0 else line
66
67 def raw_output(kernel_output):
68         for line in kernel_output:
69                 print(line.rstrip())
70
71 DIVIDER = '=' * 60
72
73 RESET = '\033[0;0m'
74
75 def red(text):
76         return '\033[1;31m' + text + RESET
77
78 def yellow(text):
79         return '\033[1;33m' + text + RESET
80
81 def green(text):
82         return '\033[1;32m' + text + RESET
83
84 def print_with_timestamp(message):
85         print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
86
87 def format_suite_divider(message):
88         return '======== ' + message + ' ========'
89
90 def print_suite_divider(message):
91         print_with_timestamp(DIVIDER)
92         print_with_timestamp(format_suite_divider(message))
93
94 def print_log(log):
95         for m in log:
96                 print_with_timestamp(m)
97
98 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
99
100 def consume_non_diagnositic(lines: List[str]) -> None:
101         while lines and not TAP_ENTRIES.match(lines[0]):
102                 lines.pop(0)
103
104 def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
105         while lines and not TAP_ENTRIES.match(lines[0]):
106                 test_case.log.append(lines[0])
107                 lines.pop(0)
108
109 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
110
111 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
112
113 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
114
115 def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
116         save_non_diagnositic(lines, test_case)
117         if not lines:
118                 test_case.status = TestStatus.TEST_CRASHED
119                 return True
120         line = lines[0]
121         match = OK_NOT_OK_SUBTEST.match(line)
122         while not match and lines:
123                 line = lines.pop(0)
124                 match = OK_NOT_OK_SUBTEST.match(line)
125         if match:
126                 test_case.log.append(lines.pop(0))
127                 test_case.name = match.group(2)
128                 if test_case.status == TestStatus.TEST_CRASHED:
129                         return True
130                 if match.group(1) == 'ok':
131                         test_case.status = TestStatus.SUCCESS
132                 else:
133                         test_case.status = TestStatus.FAILURE
134                 return True
135         else:
136                 return False
137
138 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# .*?: (.*)$')
139 DIAGNOSTIC_CRASH_MESSAGE = 'kunit test case crashed!'
140
141 def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
142         save_non_diagnositic(lines, test_case)
143         if not lines:
144                 return False
145         line = lines[0]
146         match = SUBTEST_DIAGNOSTIC.match(line)
147         if match:
148                 test_case.log.append(lines.pop(0))
149                 if match.group(1) == DIAGNOSTIC_CRASH_MESSAGE:
150                         test_case.status = TestStatus.TEST_CRASHED
151                 return True
152         else:
153                 return False
154
155 def parse_test_case(lines: List[str]) -> Optional[TestCase]:
156         test_case = TestCase()
157         save_non_diagnositic(lines, test_case)
158         while parse_diagnostic(lines, test_case):
159                 pass
160         if parse_ok_not_ok_test_case(lines, test_case):
161                 return test_case
162         else:
163                 return None
164
165 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
166
167 def parse_subtest_header(lines: List[str]) -> Optional[str]:
168         consume_non_diagnositic(lines)
169         if not lines:
170                 return None
171         match = SUBTEST_HEADER.match(lines[0])
172         if match:
173                 lines.pop(0)
174                 return match.group(1)
175         else:
176                 return None
177
178 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
179
180 def parse_subtest_plan(lines: List[str]) -> Optional[int]:
181         consume_non_diagnositic(lines)
182         match = SUBTEST_PLAN.match(lines[0])
183         if match:
184                 lines.pop(0)
185                 return int(match.group(1))
186         else:
187                 return None
188
189 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
190         if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
191                 return TestStatus.TEST_CRASHED
192         elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
193                 return TestStatus.FAILURE
194         elif left != TestStatus.SUCCESS:
195                 return left
196         elif right != TestStatus.SUCCESS:
197                 return right
198         else:
199                 return TestStatus.SUCCESS
200
201 def parse_ok_not_ok_test_suite(lines: List[str],
202                                test_suite: TestSuite,
203                                expected_suite_index: int) -> bool:
204         consume_non_diagnositic(lines)
205         if not lines:
206                 test_suite.status = TestStatus.TEST_CRASHED
207                 return False
208         line = lines[0]
209         match = OK_NOT_OK_MODULE.match(line)
210         if match:
211                 lines.pop(0)
212                 if match.group(1) == 'ok':
213                         test_suite.status = TestStatus.SUCCESS
214                 else:
215                         test_suite.status = TestStatus.FAILURE
216                 suite_index = int(match.group(2))
217                 if suite_index != expected_suite_index:
218                         print_with_timestamp(
219                                 red('[ERROR] ') + 'expected_suite_index ' +
220                                 str(expected_suite_index) + ', but got ' +
221                                 str(suite_index))
222                 return True
223         else:
224                 return False
225
226 def bubble_up_errors(to_status, status_container_list) -> TestStatus:
227         status_list = map(to_status, status_container_list)
228         return reduce(max_status, status_list, TestStatus.SUCCESS)
229
230 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
231         max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
232         return max_status(max_test_case_status, test_suite.status)
233
234 def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[TestSuite]:
235         if not lines:
236                 return None
237         consume_non_diagnositic(lines)
238         test_suite = TestSuite()
239         test_suite.status = TestStatus.SUCCESS
240         name = parse_subtest_header(lines)
241         if not name:
242                 return None
243         test_suite.name = name
244         expected_test_case_num = parse_subtest_plan(lines)
245         if expected_test_case_num is None:
246                 return None
247         while expected_test_case_num > 0:
248                 test_case = parse_test_case(lines)
249                 if not test_case:
250                         break
251                 test_suite.cases.append(test_case)
252                 expected_test_case_num -= 1
253         if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
254                 test_suite.status = bubble_up_test_case_errors(test_suite)
255                 return test_suite
256         elif not lines:
257                 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
258                 return test_suite
259         else:
260                 print('failed to parse end of suite' + lines[0])
261                 return None
262
263 TAP_HEADER = re.compile(r'^TAP version 14$')
264
265 def parse_tap_header(lines: List[str]) -> bool:
266         consume_non_diagnositic(lines)
267         if TAP_HEADER.match(lines[0]):
268                 lines.pop(0)
269                 return True
270         else:
271                 return False
272
273 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
274
275 def parse_test_plan(lines: List[str]) -> Optional[int]:
276         consume_non_diagnositic(lines)
277         match = TEST_PLAN.match(lines[0])
278         if match:
279                 lines.pop(0)
280                 return int(match.group(1))
281         else:
282                 return None
283
284 def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
285         return bubble_up_errors(lambda x: x.status, test_suite_list)
286
287 def parse_test_result(lines: List[str]) -> TestResult:
288         consume_non_diagnositic(lines)
289         if not lines or not parse_tap_header(lines):
290                 return TestResult(TestStatus.NO_TESTS, [], lines)
291         expected_test_suite_num = parse_test_plan(lines)
292         if not expected_test_suite_num:
293                 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
294         test_suites = []
295         for i in range(1, expected_test_suite_num + 1):
296                 test_suite = parse_test_suite(lines, i)
297                 if test_suite:
298                         test_suites.append(test_suite)
299                 else:
300                         print_with_timestamp(
301                                 red('[ERROR] ') + ' expected ' +
302                                 str(expected_test_suite_num) +
303                                 ' test suites, but got ' + str(i - 2))
304                         break
305         test_suite = parse_test_suite(lines, -1)
306         if test_suite:
307                 print_with_timestamp(red('[ERROR] ') +
308                         'got unexpected test suite: ' + test_suite.name)
309         if test_suites:
310                 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
311         else:
312                 return TestResult(TestStatus.NO_TESTS, [], lines)
313
314 def print_and_count_results(test_result: TestResult) -> Tuple[int, int, int]:
315         total_tests = 0
316         failed_tests = 0
317         crashed_tests = 0
318         for test_suite in test_result.suites:
319                 if test_suite.status == TestStatus.SUCCESS:
320                         print_suite_divider(green('[PASSED] ') + test_suite.name)
321                 elif test_suite.status == TestStatus.TEST_CRASHED:
322                         print_suite_divider(red('[CRASHED] ' + test_suite.name))
323                 else:
324                         print_suite_divider(red('[FAILED] ') + test_suite.name)
325                 for test_case in test_suite.cases:
326                         total_tests += 1
327                         if test_case.status == TestStatus.SUCCESS:
328                                 print_with_timestamp(green('[PASSED] ') + test_case.name)
329                         elif test_case.status == TestStatus.TEST_CRASHED:
330                                 crashed_tests += 1
331                                 print_with_timestamp(red('[CRASHED] ' + test_case.name))
332                                 print_log(map(yellow, test_case.log))
333                                 print_with_timestamp('')
334                         else:
335                                 failed_tests += 1
336                                 print_with_timestamp(red('[FAILED] ') + test_case.name)
337                                 print_log(map(yellow, test_case.log))
338                                 print_with_timestamp('')
339         return total_tests, failed_tests, crashed_tests
340
341 def parse_run_tests(kernel_output) -> TestResult:
342         total_tests = 0
343         failed_tests = 0
344         crashed_tests = 0
345         test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
346         if test_result.status == TestStatus.NO_TESTS:
347                 print(red('[ERROR] ') + yellow('no tests run!'))
348         elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
349                 print(red('[ERROR] ') + yellow('could not parse test results!'))
350         else:
351                 (total_tests,
352                  failed_tests,
353                  crashed_tests) = print_and_count_results(test_result)
354         print_with_timestamp(DIVIDER)
355         fmt = green if test_result.status == TestStatus.SUCCESS else red
356         print_with_timestamp(
357                 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
358                     (total_tests, failed_tests, crashed_tests)))
359         return test_result