Files
u-boot/tools/codman/lsp.py
Simon Glass 50e9e75d44 codman: Begin an experimental lsp analyser
It is possible to use an LSP to determine which code is used, at least
to some degree.

Make a start on this, in the hope that future work may prove out the
concept.

So far I have not found this to be particularly useful, since it does
not seem to handle IS_ENABLED() and similar macros when working out
inactive regions.

Co-developed-by: Claude <noreply@anthropic.com>
Signed-off-by: Simon Glass <simon.glass@canonical.com>
2025-11-24 06:47:07 -07:00

320 lines
12 KiB
Python

# SPDX-License-Identifier: GPL-2.0
#
# Copyright 2025 Canonical Ltd
#
"""LSP-based line-level analysis for source code.
This module provides functionality to analyse which lines in source files
are active vs inactive based on preprocessor conditionals, using clangd's
inactive regions feature via the Language Server Protocol (LSP).
"""
import concurrent.futures
import json
import multiprocessing
import os
import re
import tempfile
import time
from u_boot_pylib import tools, tout
from analyser import Analyser, FileResult
from lsp_client import LspClient
def create_compile_commands(build_dir, srcdir):
"""Create compile_commands.json using gen_compile_commands.py.
Args:
build_dir (str): Build directory path
srcdir (str): Source directory path
Returns:
list: List of compile command entries
"""
# Use the same pattern as gen_compile_commands.py
line_pattern = re.compile(
r'^(saved)?cmd_[^ ]*\.o := (?P<command_prefix>.* )'
r'(?P<file_path>[^ ]*\.[cS]) *(;|$)')
compile_commands = []
# Walk through build directory looking for .cmd files
filename_matcher = re.compile(r'^\..*\.cmd$')
exclude_dirs = ['.git', 'Documentation', 'include', 'tools']
for dirpath, dirnames, filenames in os.walk(build_dir, topdown=True):
# Prune unwanted directories
dirnames = [d for d in dirnames if d not in exclude_dirs]
for filename in filenames:
if not filename_matcher.match(filename):
continue
cmd_file = os.path.join(dirpath, filename)
try:
with open(cmd_file, 'rt', encoding='utf-8') as f:
result = line_pattern.match(f.readline())
if result:
command_prefix = result.group('command_prefix')
file_path = result.group('file_path')
# Clean up command prefix (handle escaped #)
prefix = command_prefix.replace(r'\#', '#').replace(
'$(pound)', '#')
# Get absolute path to source file
abs_path = os.path.realpath(
os.path.join(srcdir, file_path))
if os.path.exists(abs_path):
compile_commands.append({
'directory': srcdir,
'file': abs_path,
'command': prefix + file_path,
})
except (OSError, IOError):
continue
return compile_commands
def worker(args):
"""Analyse a single source file using clangd LSP.
Args:
args (tuple): Tuple of (source_file, client)
where client is a shared LspClient instance
Returns:
tuple: (source_file, inactive_regions, error_msg)
"""
source_file, client = args
try:
# Read file content
content = tools.read_file(source_file, binary=False)
# Open the document
client.notify('textDocument/didOpen', {
'textDocument': {
'uri': f'file://{source_file}',
'languageId': 'c',
'version': 1,
'text': content
}
})
# Wait for clangd to process and send notifications
# Poll for inactive regions notification for this specific file
max_wait = 10 # seconds
start_time = time.time()
inactive_regions = None
while time.time() - start_time < max_wait:
time.sleep(0.1)
with client.lock:
notifications = list(client.notifications)
# Clear processed notifications to avoid buildup
client.notifications = []
for notif in notifications:
method = notif.get('method', '')
if method == 'textDocument/clangd.inactiveRegions':
params = notif.get('params', {})
uri = params.get('uri', '')
# Check if this notification is for our file
if uri == f'file://{source_file}':
inactive_regions = params.get('inactiveRegions', [])
break
if inactive_regions is not None:
break
# Close the document to free resources
client.notify('textDocument/didClose', {
'textDocument': {
'uri': f'file://{source_file}'
}
})
if inactive_regions is None:
# No inactive regions notification received
# This could mean the file has no inactive code
inactive_regions = []
return (source_file, inactive_regions, None)
except Exception as e:
return (source_file, None, str(e))
class LspAnalyser(Analyser): # pylint: disable=too-few-public-methods
"""Analyser that uses clangd LSP to determine active lines.
This analyser uses the Language Server Protocol (LSP) with clangd to
identify inactive preprocessor regions in source files.
"""
def __init__(self, build_dir, srcdir, used_sources, keep_temps=False):
"""Set up the LSP analyser.
Args:
build_dir (str): Build directory containing .o and .cmd files
srcdir (str): Path to source root directory
used_sources (set): Set of source files that are compiled
keep_temps (bool): If True, keep temporary files for debugging
"""
super().__init__(srcdir, keep_temps)
self.build_dir = build_dir
self.used_sources = used_sources
def extract_inactive_regions(self, jobs=None):
"""Extract inactive regions from source files using clangd.
Args:
jobs (int): Number of parallel jobs (None = use all CPUs)
Returns:
dict: Mapping of source file paths to lists of inactive regions
"""
# Create compile commands database
tout.progress('Building compile commands database...')
compile_commands = create_compile_commands(self.build_dir, self.srcdir)
# Filter to only .c and .S files that we need to analyse
filtered_files = []
for cmd in compile_commands:
source_file = cmd['file']
if source_file in self.used_sources:
if source_file.endswith('.c') or source_file.endswith('.S'):
filtered_files.append(source_file)
tout.progress(f'Found {len(filtered_files)} source files to analyse')
if not filtered_files:
return {}
inactive = {}
errors = []
# Create a single clangd instance and use it for all files
with tempfile.TemporaryDirectory() as tmpdir:
# Write compile commands database
compile_db = os.path.join(tmpdir, 'compile_commands.json')
with open(compile_db, 'w', encoding='utf-8') as f:
json.dump(compile_commands, f)
# Start a single clangd server
tout.progress('Starting clangd server...')
with LspClient(['clangd', '--log=error',
f'--compile-commands-dir={tmpdir}']) as client:
result = client.init(f'file://{self.srcdir}')
if not result:
tout.error('Failed to start clangd')
return {}
# Determine number of workers
if jobs is None:
jobs = min(multiprocessing.cpu_count(), len(filtered_files))
elif jobs <= 0:
jobs = 1
tout.progress(f'Processing files with {jobs} workers...')
# Use ThreadPoolExecutor to process files in parallel
# (threads share the same clangd client)
with concurrent.futures.ThreadPoolExecutor(
max_workers=jobs) as executor:
# Submit all tasks
future_to_file = {
executor.submit(worker, (source_file, client)):
source_file
for source_file in filtered_files
}
# Collect results as they complete
completed = 0
for future in concurrent.futures.as_completed(future_to_file):
source_file = future_to_file[future]
completed += 1
tout.progress(
f'Processing {completed}/{len(filtered_files)}: ' +
f'{os.path.basename(source_file)}...')
try:
source_file_result, inactive_regions, error_msg = (
future.result())
if error_msg:
errors.append(f'{source_file}: {error_msg}')
elif inactive_regions is not None:
inactive[source_file_result] = (
inactive_regions)
except Exception as exc:
errors.append(f'{source_file}: {exc}')
# Report any errors
if errors:
for error in errors[:10]: # Show first 10 errors
tout.error(error)
if len(errors) > 10:
tout.error(f'... and {len(errors) - 10} more errors')
tout.warning(f'Failed to analyse {len(errors)} file(s) with LSP')
return inactive
def process(self, jobs=None):
"""Perform line-level analysis using clangd LSP.
Args:
jobs (int): Number of parallel jobs (None = use all CPUs)
Returns:
dict: Mapping of source file paths to FileResult named tuples
"""
tout.progress('Extracting inactive regions using clangd LSP...')
inactive_regions_map = self.extract_inactive_regions(jobs)
file_results = {}
for source_file in self.used_sources:
# Only process .c and .S files
if not (source_file.endswith('.c') or source_file.endswith('.S')):
continue
abs_path = os.path.realpath(source_file)
inactive_regions = inactive_regions_map.get(abs_path, [])
# Count total lines in the file
total_lines = self.count_lines(abs_path)
# Create line status dict
line_status = {}
# Set up all lines as active
for i in range(1, total_lines + 1):
line_status[i] = 'active'
# Mark inactive lines based on regions
# LSP uses 0-indexed line numbers
for region in inactive_regions:
start_line = region['start']['line'] + 1
end_line = region['end']['line'] + 1
# Mark lines as inactive (inclusive range)
for line_num in range(start_line, end_line + 1):
if line_num <= total_lines:
line_status[line_num] = 'inactive'
inactive_lines = len([s for s in line_status.values()
if s == 'inactive'])
active_lines = total_lines - inactive_lines
file_results[abs_path] = FileResult(
total_lines=total_lines,
active_lines=active_lines,
inactive_lines=inactive_lines,
line_status=line_status
)
tout.info(f'Analysed {len(file_results)} files using clangd LSP')
return file_results