Merge tag 'char-misc-5.9-rc5' of git://git.kernel.org/pub/scm/linux/kernel/git/gregkh...
[linux-2.6-microblaze.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses test results from a kernel dmesg log.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import re
10
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import List
16
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
18
19 class TestSuite(object):
20         def __init__(self):
21                 self.status = None
22                 self.name = None
23                 self.cases = []
24
25         def __str__(self):
26                 return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'
27
28         def __repr__(self):
29                 return str(self)
30
31 class TestCase(object):
32         def __init__(self):
33                 self.status = None
34                 self.name = ''
35                 self.log = []
36
37         def __str__(self):
38                 return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'
39
40         def __repr__(self):
41                 return str(self)
42
43 class TestStatus(Enum):
44         SUCCESS = auto()
45         FAILURE = auto()
46         TEST_CRASHED = auto()
47         NO_TESTS = auto()
48
49 kunit_start_re = re.compile(r'TAP version [0-9]+$')
50 kunit_end_re = re.compile('(List of all partitions:|'
51                           'Kernel panic - not syncing: VFS:|reboot: System halted)')
52
53 def isolate_kunit_output(kernel_output):
54         started = False
55         for line in kernel_output:
56                 if kunit_start_re.search(line):
57                         prefix_len = len(line.split('TAP version')[0])
58                         started = True
59                         yield line[prefix_len:] if prefix_len > 0 else line
60                 elif kunit_end_re.search(line):
61                         break
62                 elif started:
63                         yield line[prefix_len:] if prefix_len > 0 else line
64
65 def raw_output(kernel_output):
66         for line in kernel_output:
67                 print(line)
68                 yield line
69
70 DIVIDER = '=' * 60
71
72 RESET = '\033[0;0m'
73
74 def red(text):
75         return '\033[1;31m' + text + RESET
76
77 def yellow(text):
78         return '\033[1;33m' + text + RESET
79
80 def green(text):
81         return '\033[1;32m' + text + RESET
82
83 def print_with_timestamp(message):
84         print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
85
86 def format_suite_divider(message):
87         return '======== ' + message + ' ========'
88
89 def print_suite_divider(message):
90         print_with_timestamp(DIVIDER)
91         print_with_timestamp(format_suite_divider(message))
92
93 def print_log(log):
94         for m in log:
95                 print_with_timestamp(m)
96
97 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
98
99 def consume_non_diagnositic(lines: List[str]) -> None:
100         while lines and not TAP_ENTRIES.match(lines[0]):
101                 lines.pop(0)
102
103 def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
104         while lines and not TAP_ENTRIES.match(lines[0]):
105                 test_case.log.append(lines[0])
106                 lines.pop(0)
107
108 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
109
110 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
111
112 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) [0-9]+ - (.*)$')
113
114 def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
115         save_non_diagnositic(lines, test_case)
116         if not lines:
117                 test_case.status = TestStatus.TEST_CRASHED
118                 return True
119         line = lines[0]
120         match = OK_NOT_OK_SUBTEST.match(line)
121         while not match and lines:
122                 line = lines.pop(0)
123                 match = OK_NOT_OK_SUBTEST.match(line)
124         if match:
125                 test_case.log.append(lines.pop(0))
126                 test_case.name = match.group(2)
127                 if test_case.status == TestStatus.TEST_CRASHED:
128                         return True
129                 if match.group(1) == 'ok':
130                         test_case.status = TestStatus.SUCCESS
131                 else:
132                         test_case.status = TestStatus.FAILURE
133                 return True
134         else:
135                 return False
136
137 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# .*?: (.*)$')
138 DIAGNOSTIC_CRASH_MESSAGE = 'kunit test case crashed!'
139
140 def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
141         save_non_diagnositic(lines, test_case)
142         if not lines:
143                 return False
144         line = lines[0]
145         match = SUBTEST_DIAGNOSTIC.match(line)
146         if match:
147                 test_case.log.append(lines.pop(0))
148                 if match.group(1) == DIAGNOSTIC_CRASH_MESSAGE:
149                         test_case.status = TestStatus.TEST_CRASHED
150                 return True
151         else:
152                 return False
153
154 def parse_test_case(lines: List[str]) -> TestCase:
155         test_case = TestCase()
156         save_non_diagnositic(lines, test_case)
157         while parse_diagnostic(lines, test_case):
158                 pass
159         if parse_ok_not_ok_test_case(lines, test_case):
160                 return test_case
161         else:
162                 return None
163
164 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
165
166 def parse_subtest_header(lines: List[str]) -> str:
167         consume_non_diagnositic(lines)
168         if not lines:
169                 return None
170         match = SUBTEST_HEADER.match(lines[0])
171         if match:
172                 lines.pop(0)
173                 return match.group(1)
174         else:
175                 return None
176
177 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
178
179 def parse_subtest_plan(lines: List[str]) -> int:
180         consume_non_diagnositic(lines)
181         match = SUBTEST_PLAN.match(lines[0])
182         if match:
183                 lines.pop(0)
184                 return int(match.group(1))
185         else:
186                 return None
187
188 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
189         if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
190                 return TestStatus.TEST_CRASHED
191         elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
192                 return TestStatus.FAILURE
193         elif left != TestStatus.SUCCESS:
194                 return left
195         elif right != TestStatus.SUCCESS:
196                 return right
197         else:
198                 return TestStatus.SUCCESS
199
200 def parse_ok_not_ok_test_suite(lines: List[str], test_suite: TestSuite) -> bool:
201         consume_non_diagnositic(lines)
202         if not lines:
203                 test_suite.status = TestStatus.TEST_CRASHED
204                 return False
205         line = lines[0]
206         match = OK_NOT_OK_MODULE.match(line)
207         if match:
208                 lines.pop(0)
209                 if match.group(1) == 'ok':
210                         test_suite.status = TestStatus.SUCCESS
211                 else:
212                         test_suite.status = TestStatus.FAILURE
213                 return True
214         else:
215                 return False
216
217 def bubble_up_errors(to_status, status_container_list) -> TestStatus:
218         status_list = map(to_status, status_container_list)
219         return reduce(max_status, status_list, TestStatus.SUCCESS)
220
221 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
222         max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
223         return max_status(max_test_case_status, test_suite.status)
224
225 def parse_test_suite(lines: List[str]) -> TestSuite:
226         if not lines:
227                 return None
228         consume_non_diagnositic(lines)
229         test_suite = TestSuite()
230         test_suite.status = TestStatus.SUCCESS
231         name = parse_subtest_header(lines)
232         if not name:
233                 return None
234         test_suite.name = name
235         expected_test_case_num = parse_subtest_plan(lines)
236         if not expected_test_case_num:
237                 return None
238         while expected_test_case_num > 0:
239                 test_case = parse_test_case(lines)
240                 if not test_case:
241                         break
242                 test_suite.cases.append(test_case)
243                 expected_test_case_num -= 1
244         if parse_ok_not_ok_test_suite(lines, test_suite):
245                 test_suite.status = bubble_up_test_case_errors(test_suite)
246                 return test_suite
247         elif not lines:
248                 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
249                 return test_suite
250         else:
251                 print('failed to parse end of suite' + lines[0])
252                 return None
253
254 TAP_HEADER = re.compile(r'^TAP version 14$')
255
256 def parse_tap_header(lines: List[str]) -> bool:
257         consume_non_diagnositic(lines)
258         if TAP_HEADER.match(lines[0]):
259                 lines.pop(0)
260                 return True
261         else:
262                 return False
263
264 def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
265         return bubble_up_errors(lambda x: x.status, test_suite_list)
266
267 def parse_test_result(lines: List[str]) -> TestResult:
268         consume_non_diagnositic(lines)
269         if not lines or not parse_tap_header(lines):
270                 return TestResult(TestStatus.NO_TESTS, [], lines)
271         test_suites = []
272         test_suite = parse_test_suite(lines)
273         while test_suite:
274                 test_suites.append(test_suite)
275                 test_suite = parse_test_suite(lines)
276         return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
277
278 def parse_run_tests(kernel_output) -> TestResult:
279         total_tests = 0
280         failed_tests = 0
281         crashed_tests = 0
282         test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
283         if test_result.status == TestStatus.NO_TESTS:
284                 print_with_timestamp(red('[ERROR] ') + 'no kunit output detected')
285         for test_suite in test_result.suites:
286                 if test_suite.status == TestStatus.SUCCESS:
287                         print_suite_divider(green('[PASSED] ') + test_suite.name)
288                 elif test_suite.status == TestStatus.TEST_CRASHED:
289                         print_suite_divider(red('[CRASHED] ' + test_suite.name))
290                 else:
291                         print_suite_divider(red('[FAILED] ') + test_suite.name)
292                 for test_case in test_suite.cases:
293                         total_tests += 1
294                         if test_case.status == TestStatus.SUCCESS:
295                                 print_with_timestamp(green('[PASSED] ') + test_case.name)
296                         elif test_case.status == TestStatus.TEST_CRASHED:
297                                 crashed_tests += 1
298                                 print_with_timestamp(red('[CRASHED] ' + test_case.name))
299                                 print_log(map(yellow, test_case.log))
300                                 print_with_timestamp('')
301                         else:
302                                 failed_tests += 1
303                                 print_with_timestamp(red('[FAILED] ') + test_case.name)
304                                 print_log(map(yellow, test_case.log))
305                                 print_with_timestamp('')
306         print_with_timestamp(DIVIDER)
307         fmt = green if test_result.status == TestStatus.SUCCESS else red
308         print_with_timestamp(
309                 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
310                     (total_tests, failed_tests, crashed_tests)))
311         return test_result