Merge tag 'for-linus-5.11-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/rw/uml
[linux-2.6-microblaze.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses test results from a kernel dmesg log.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import re
10
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import List, Optional, Tuple
16
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
18
19 class TestSuite(object):
20         def __init__(self):
21                 self.status = None
22                 self.name = None
23                 self.cases = []
24
25         def __str__(self):
26                 return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'
27
28         def __repr__(self):
29                 return str(self)
30
31 class TestCase(object):
32         def __init__(self):
33                 self.status = None
34                 self.name = ''
35                 self.log = []
36
37         def __str__(self):
38                 return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'
39
40         def __repr__(self):
41                 return str(self)
42
43 class TestStatus(Enum):
44         SUCCESS = auto()
45         FAILURE = auto()
46         TEST_CRASHED = auto()
47         NO_TESTS = auto()
48         FAILURE_TO_PARSE_TESTS = auto()
49
50 kunit_start_re = re.compile(r'TAP version [0-9]+$')
51 kunit_end_re = re.compile('(List of all partitions:|'
52                           'Kernel panic - not syncing: VFS:)')
53
54 def isolate_kunit_output(kernel_output):
55         started = False
56         for line in kernel_output:
57                 line = line.rstrip()  # line always has a trailing \n
58                 if kunit_start_re.search(line):
59                         prefix_len = len(line.split('TAP version')[0])
60                         started = True
61                         yield line[prefix_len:] if prefix_len > 0 else line
62                 elif kunit_end_re.search(line):
63                         break
64                 elif started:
65                         yield line[prefix_len:] if prefix_len > 0 else line
66
67 def raw_output(kernel_output):
68         for line in kernel_output:
69                 print(line.rstrip())
70
71 DIVIDER = '=' * 60
72
73 RESET = '\033[0;0m'
74
75 def red(text):
76         return '\033[1;31m' + text + RESET
77
78 def yellow(text):
79         return '\033[1;33m' + text + RESET
80
81 def green(text):
82         return '\033[1;32m' + text + RESET
83
84 def print_with_timestamp(message):
85         print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
86
87 def format_suite_divider(message):
88         return '======== ' + message + ' ========'
89
90 def print_suite_divider(message):
91         print_with_timestamp(DIVIDER)
92         print_with_timestamp(format_suite_divider(message))
93
94 def print_log(log):
95         for m in log:
96                 print_with_timestamp(m)
97
98 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
99
100 def consume_non_diagnositic(lines: List[str]) -> None:
101         while lines and not TAP_ENTRIES.match(lines[0]):
102                 lines.pop(0)
103
104 def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
105         while lines and not TAP_ENTRIES.match(lines[0]):
106                 test_case.log.append(lines[0])
107                 lines.pop(0)
108
109 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
110
111 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
112
113 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
114
115 def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
116         save_non_diagnositic(lines, test_case)
117         if not lines:
118                 test_case.status = TestStatus.TEST_CRASHED
119                 return True
120         line = lines[0]
121         match = OK_NOT_OK_SUBTEST.match(line)
122         while not match and lines:
123                 line = lines.pop(0)
124                 match = OK_NOT_OK_SUBTEST.match(line)
125         if match:
126                 test_case.log.append(lines.pop(0))
127                 test_case.name = match.group(2)
128                 if test_case.status == TestStatus.TEST_CRASHED:
129                         return True
130                 if match.group(1) == 'ok':
131                         test_case.status = TestStatus.SUCCESS
132                 else:
133                         test_case.status = TestStatus.FAILURE
134                 return True
135         else:
136                 return False
137
138 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
139 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
140
141 def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
142         save_non_diagnositic(lines, test_case)
143         if not lines:
144                 return False
145         line = lines[0]
146         match = SUBTEST_DIAGNOSTIC.match(line)
147         if match:
148                 test_case.log.append(lines.pop(0))
149                 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
150                 if crash_match:
151                         test_case.status = TestStatus.TEST_CRASHED
152                 return True
153         else:
154                 return False
155
156 def parse_test_case(lines: List[str]) -> Optional[TestCase]:
157         test_case = TestCase()
158         save_non_diagnositic(lines, test_case)
159         while parse_diagnostic(lines, test_case):
160                 pass
161         if parse_ok_not_ok_test_case(lines, test_case):
162                 return test_case
163         else:
164                 return None
165
166 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
167
168 def parse_subtest_header(lines: List[str]) -> Optional[str]:
169         consume_non_diagnositic(lines)
170         if not lines:
171                 return None
172         match = SUBTEST_HEADER.match(lines[0])
173         if match:
174                 lines.pop(0)
175                 return match.group(1)
176         else:
177                 return None
178
179 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
180
181 def parse_subtest_plan(lines: List[str]) -> Optional[int]:
182         consume_non_diagnositic(lines)
183         match = SUBTEST_PLAN.match(lines[0])
184         if match:
185                 lines.pop(0)
186                 return int(match.group(1))
187         else:
188                 return None
189
190 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
191         if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
192                 return TestStatus.TEST_CRASHED
193         elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
194                 return TestStatus.FAILURE
195         elif left != TestStatus.SUCCESS:
196                 return left
197         elif right != TestStatus.SUCCESS:
198                 return right
199         else:
200                 return TestStatus.SUCCESS
201
202 def parse_ok_not_ok_test_suite(lines: List[str],
203                                test_suite: TestSuite,
204                                expected_suite_index: int) -> bool:
205         consume_non_diagnositic(lines)
206         if not lines:
207                 test_suite.status = TestStatus.TEST_CRASHED
208                 return False
209         line = lines[0]
210         match = OK_NOT_OK_MODULE.match(line)
211         if match:
212                 lines.pop(0)
213                 if match.group(1) == 'ok':
214                         test_suite.status = TestStatus.SUCCESS
215                 else:
216                         test_suite.status = TestStatus.FAILURE
217                 suite_index = int(match.group(2))
218                 if suite_index != expected_suite_index:
219                         print_with_timestamp(
220                                 red('[ERROR] ') + 'expected_suite_index ' +
221                                 str(expected_suite_index) + ', but got ' +
222                                 str(suite_index))
223                 return True
224         else:
225                 return False
226
227 def bubble_up_errors(to_status, status_container_list) -> TestStatus:
228         status_list = map(to_status, status_container_list)
229         return reduce(max_status, status_list, TestStatus.SUCCESS)
230
231 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
232         max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
233         return max_status(max_test_case_status, test_suite.status)
234
235 def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[TestSuite]:
236         if not lines:
237                 return None
238         consume_non_diagnositic(lines)
239         test_suite = TestSuite()
240         test_suite.status = TestStatus.SUCCESS
241         name = parse_subtest_header(lines)
242         if not name:
243                 return None
244         test_suite.name = name
245         expected_test_case_num = parse_subtest_plan(lines)
246         if expected_test_case_num is None:
247                 return None
248         while expected_test_case_num > 0:
249                 test_case = parse_test_case(lines)
250                 if not test_case:
251                         break
252                 test_suite.cases.append(test_case)
253                 expected_test_case_num -= 1
254         if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
255                 test_suite.status = bubble_up_test_case_errors(test_suite)
256                 return test_suite
257         elif not lines:
258                 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
259                 return test_suite
260         else:
261                 print('failed to parse end of suite' + lines[0])
262                 return None
263
264 TAP_HEADER = re.compile(r'^TAP version 14$')
265
266 def parse_tap_header(lines: List[str]) -> bool:
267         consume_non_diagnositic(lines)
268         if TAP_HEADER.match(lines[0]):
269                 lines.pop(0)
270                 return True
271         else:
272                 return False
273
274 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
275
276 def parse_test_plan(lines: List[str]) -> Optional[int]:
277         consume_non_diagnositic(lines)
278         match = TEST_PLAN.match(lines[0])
279         if match:
280                 lines.pop(0)
281                 return int(match.group(1))
282         else:
283                 return None
284
285 def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
286         return bubble_up_errors(lambda x: x.status, test_suite_list)
287
288 def parse_test_result(lines: List[str]) -> TestResult:
289         consume_non_diagnositic(lines)
290         if not lines or not parse_tap_header(lines):
291                 return TestResult(TestStatus.NO_TESTS, [], lines)
292         expected_test_suite_num = parse_test_plan(lines)
293         if not expected_test_suite_num:
294                 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
295         test_suites = []
296         for i in range(1, expected_test_suite_num + 1):
297                 test_suite = parse_test_suite(lines, i)
298                 if test_suite:
299                         test_suites.append(test_suite)
300                 else:
301                         print_with_timestamp(
302                                 red('[ERROR] ') + ' expected ' +
303                                 str(expected_test_suite_num) +
304                                 ' test suites, but got ' + str(i - 2))
305                         break
306         test_suite = parse_test_suite(lines, -1)
307         if test_suite:
308                 print_with_timestamp(red('[ERROR] ') +
309                         'got unexpected test suite: ' + test_suite.name)
310         if test_suites:
311                 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
312         else:
313                 return TestResult(TestStatus.NO_TESTS, [], lines)
314
315 def print_and_count_results(test_result: TestResult) -> Tuple[int, int, int]:
316         total_tests = 0
317         failed_tests = 0
318         crashed_tests = 0
319         for test_suite in test_result.suites:
320                 if test_suite.status == TestStatus.SUCCESS:
321                         print_suite_divider(green('[PASSED] ') + test_suite.name)
322                 elif test_suite.status == TestStatus.TEST_CRASHED:
323                         print_suite_divider(red('[CRASHED] ' + test_suite.name))
324                 else:
325                         print_suite_divider(red('[FAILED] ') + test_suite.name)
326                 for test_case in test_suite.cases:
327                         total_tests += 1
328                         if test_case.status == TestStatus.SUCCESS:
329                                 print_with_timestamp(green('[PASSED] ') + test_case.name)
330                         elif test_case.status == TestStatus.TEST_CRASHED:
331                                 crashed_tests += 1
332                                 print_with_timestamp(red('[CRASHED] ' + test_case.name))
333                                 print_log(map(yellow, test_case.log))
334                                 print_with_timestamp('')
335                         else:
336                                 failed_tests += 1
337                                 print_with_timestamp(red('[FAILED] ') + test_case.name)
338                                 print_log(map(yellow, test_case.log))
339                                 print_with_timestamp('')
340         return total_tests, failed_tests, crashed_tests
341
342 def parse_run_tests(kernel_output) -> TestResult:
343         total_tests = 0
344         failed_tests = 0
345         crashed_tests = 0
346         test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
347         if test_result.status == TestStatus.NO_TESTS:
348                 print(red('[ERROR] ') + yellow('no tests run!'))
349         elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
350                 print(red('[ERROR] ') + yellow('could not parse test results!'))
351         else:
352                 (total_tests,
353                  failed_tests,
354                  crashed_tests) = print_and_count_results(test_result)
355         print_with_timestamp(DIVIDER)
356         fmt = green if test_result.status == TestStatus.SUCCESS else red
357         print_with_timestamp(
358                 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
359                     (total_tests, failed_tests, crashed_tests)))
360         return test_result