codman: Begin an experimental lsp analyser

It is possible to use an LSP to determine which code is used, at least
to some degree.

Make a start on this, in the hope that future work may prove out the
concept.

So far I have not found this to be particularly useful, since it does
not seem to handle IS_ENABLED() and similar macros when working out
inactive regions.

Co-developed-by: Claude <noreply@anthropic.com>
Signed-off-by: Simon Glass <simon.glass@canonical.com>
This commit is contained in:
Simon Glass
2025-11-21 09:07:33 -07:00
parent 683d1578ae
commit 50e9e75d44
3 changed files with 697 additions and 0 deletions

319
tools/codman/lsp.py Normal file
View File

@@ -0,0 +1,319 @@
# SPDX-License-Identifier: GPL-2.0
#
# Copyright 2025 Canonical Ltd
#
"""LSP-based line-level analysis for source code.
This module provides functionality to analyse which lines in source files
are active vs inactive based on preprocessor conditionals, using clangd's
inactive regions feature via the Language Server Protocol (LSP).
"""
import concurrent.futures
import json
import multiprocessing
import os
import re
import tempfile
import time
from u_boot_pylib import tools, tout
from analyser import Analyser, FileResult
from lsp_client import LspClient
def create_compile_commands(build_dir, srcdir):
"""Create compile_commands.json using gen_compile_commands.py.
Args:
build_dir (str): Build directory path
srcdir (str): Source directory path
Returns:
list: List of compile command entries
"""
# Use the same pattern as gen_compile_commands.py
line_pattern = re.compile(
r'^(saved)?cmd_[^ ]*\.o := (?P<command_prefix>.* )'
r'(?P<file_path>[^ ]*\.[cS]) *(;|$)')
compile_commands = []
# Walk through build directory looking for .cmd files
filename_matcher = re.compile(r'^\..*\.cmd$')
exclude_dirs = ['.git', 'Documentation', 'include', 'tools']
for dirpath, dirnames, filenames in os.walk(build_dir, topdown=True):
# Prune unwanted directories
dirnames = [d for d in dirnames if d not in exclude_dirs]
for filename in filenames:
if not filename_matcher.match(filename):
continue
cmd_file = os.path.join(dirpath, filename)
try:
with open(cmd_file, 'rt', encoding='utf-8') as f:
result = line_pattern.match(f.readline())
if result:
command_prefix = result.group('command_prefix')
file_path = result.group('file_path')
# Clean up command prefix (handle escaped #)
prefix = command_prefix.replace(r'\#', '#').replace(
'$(pound)', '#')
# Get absolute path to source file
abs_path = os.path.realpath(
os.path.join(srcdir, file_path))
if os.path.exists(abs_path):
compile_commands.append({
'directory': srcdir,
'file': abs_path,
'command': prefix + file_path,
})
except (OSError, IOError):
continue
return compile_commands
def worker(args):
"""Analyse a single source file using clangd LSP.
Args:
args (tuple): Tuple of (source_file, client)
where client is a shared LspClient instance
Returns:
tuple: (source_file, inactive_regions, error_msg)
"""
source_file, client = args
try:
# Read file content
content = tools.read_file(source_file, binary=False)
# Open the document
client.notify('textDocument/didOpen', {
'textDocument': {
'uri': f'file://{source_file}',
'languageId': 'c',
'version': 1,
'text': content
}
})
# Wait for clangd to process and send notifications
# Poll for inactive regions notification for this specific file
max_wait = 10 # seconds
start_time = time.time()
inactive_regions = None
while time.time() - start_time < max_wait:
time.sleep(0.1)
with client.lock:
notifications = list(client.notifications)
# Clear processed notifications to avoid buildup
client.notifications = []
for notif in notifications:
method = notif.get('method', '')
if method == 'textDocument/clangd.inactiveRegions':
params = notif.get('params', {})
uri = params.get('uri', '')
# Check if this notification is for our file
if uri == f'file://{source_file}':
inactive_regions = params.get('inactiveRegions', [])
break
if inactive_regions is not None:
break
# Close the document to free resources
client.notify('textDocument/didClose', {
'textDocument': {
'uri': f'file://{source_file}'
}
})
if inactive_regions is None:
# No inactive regions notification received
# This could mean the file has no inactive code
inactive_regions = []
return (source_file, inactive_regions, None)
except Exception as e:
return (source_file, None, str(e))
class LspAnalyser(Analyser): # pylint: disable=too-few-public-methods
"""Analyser that uses clangd LSP to determine active lines.
This analyser uses the Language Server Protocol (LSP) with clangd to
identify inactive preprocessor regions in source files.
"""
def __init__(self, build_dir, srcdir, used_sources, keep_temps=False):
"""Set up the LSP analyser.
Args:
build_dir (str): Build directory containing .o and .cmd files
srcdir (str): Path to source root directory
used_sources (set): Set of source files that are compiled
keep_temps (bool): If True, keep temporary files for debugging
"""
super().__init__(srcdir, keep_temps)
self.build_dir = build_dir
self.used_sources = used_sources
def extract_inactive_regions(self, jobs=None):
"""Extract inactive regions from source files using clangd.
Args:
jobs (int): Number of parallel jobs (None = use all CPUs)
Returns:
dict: Mapping of source file paths to lists of inactive regions
"""
# Create compile commands database
tout.progress('Building compile commands database...')
compile_commands = create_compile_commands(self.build_dir, self.srcdir)
# Filter to only .c and .S files that we need to analyse
filtered_files = []
for cmd in compile_commands:
source_file = cmd['file']
if source_file in self.used_sources:
if source_file.endswith('.c') or source_file.endswith('.S'):
filtered_files.append(source_file)
tout.progress(f'Found {len(filtered_files)} source files to analyse')
if not filtered_files:
return {}
inactive = {}
errors = []
# Create a single clangd instance and use it for all files
with tempfile.TemporaryDirectory() as tmpdir:
# Write compile commands database
compile_db = os.path.join(tmpdir, 'compile_commands.json')
with open(compile_db, 'w', encoding='utf-8') as f:
json.dump(compile_commands, f)
# Start a single clangd server
tout.progress('Starting clangd server...')
with LspClient(['clangd', '--log=error',
f'--compile-commands-dir={tmpdir}']) as client:
result = client.init(f'file://{self.srcdir}')
if not result:
tout.error('Failed to start clangd')
return {}
# Determine number of workers
if jobs is None:
jobs = min(multiprocessing.cpu_count(), len(filtered_files))
elif jobs <= 0:
jobs = 1
tout.progress(f'Processing files with {jobs} workers...')
# Use ThreadPoolExecutor to process files in parallel
# (threads share the same clangd client)
with concurrent.futures.ThreadPoolExecutor(
max_workers=jobs) as executor:
# Submit all tasks
future_to_file = {
executor.submit(worker, (source_file, client)):
source_file
for source_file in filtered_files
}
# Collect results as they complete
completed = 0
for future in concurrent.futures.as_completed(future_to_file):
source_file = future_to_file[future]
completed += 1
tout.progress(
f'Processing {completed}/{len(filtered_files)}: ' +
f'{os.path.basename(source_file)}...')
try:
source_file_result, inactive_regions, error_msg = (
future.result())
if error_msg:
errors.append(f'{source_file}: {error_msg}')
elif inactive_regions is not None:
inactive[source_file_result] = (
inactive_regions)
except Exception as exc:
errors.append(f'{source_file}: {exc}')
# Report any errors
if errors:
for error in errors[:10]: # Show first 10 errors
tout.error(error)
if len(errors) > 10:
tout.error(f'... and {len(errors) - 10} more errors')
tout.warning(f'Failed to analyse {len(errors)} file(s) with LSP')
return inactive
def process(self, jobs=None):
"""Perform line-level analysis using clangd LSP.
Args:
jobs (int): Number of parallel jobs (None = use all CPUs)
Returns:
dict: Mapping of source file paths to FileResult named tuples
"""
tout.progress('Extracting inactive regions using clangd LSP...')
inactive_regions_map = self.extract_inactive_regions(jobs)
file_results = {}
for source_file in self.used_sources:
# Only process .c and .S files
if not (source_file.endswith('.c') or source_file.endswith('.S')):
continue
abs_path = os.path.realpath(source_file)
inactive_regions = inactive_regions_map.get(abs_path, [])
# Count total lines in the file
total_lines = self.count_lines(abs_path)
# Create line status dict
line_status = {}
# Set up all lines as active
for i in range(1, total_lines + 1):
line_status[i] = 'active'
# Mark inactive lines based on regions
# LSP uses 0-indexed line numbers
for region in inactive_regions:
start_line = region['start']['line'] + 1
end_line = region['end']['line'] + 1
# Mark lines as inactive (inclusive range)
for line_num in range(start_line, end_line + 1):
if line_num <= total_lines:
line_status[line_num] = 'inactive'
inactive_lines = len([s for s in line_status.values()
if s == 'inactive'])
active_lines = total_lines - inactive_lines
file_results[abs_path] = FileResult(
total_lines=total_lines,
active_lines=active_lines,
inactive_lines=inactive_lines,
line_status=line_status
)
tout.info(f'Analysed {len(file_results)} files using clangd LSP')
return file_results

225
tools/codman/lsp_client.py Normal file
View File

@@ -0,0 +1,225 @@
# SPDX-License-Identifier: GPL-2.0
#
# Copyright 2025 Canonical Ltd
#
"""Minimal LSP (Language Server Protocol) client for clangd.
This module provides a simple JSON-RPC 2.0 client for communicating with
LSP servers like clangd. It focuses on the specific functionality needed
for analyzing inactive preprocessor regions.
"""
import json
import subprocess
import threading
from typing import Any, Dict, Optional
class LspClient:
"""Minimal LSP client for JSON-RPC 2.0 communication.
This client handles the basic LSP protocol communication over
stdin/stdout with a language server process.
Attributes:
process: The language server subprocess
next_id: Counter for JSON-RPC request IDs
responses: Dict mapping request IDs to response data
lock: Thread lock for response dictionary
reader_thread: Background thread reading server responses
"""
def __init__(self, server_command):
"""Init the LSP client and start the server.
Args:
server_command (list): Command to start the LSP server
(e.g., ['clangd', '--log=error'])
"""
self.process = subprocess.Popen(
server_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0
)
self.next_id = 1
self.responses = {}
self.notifications = []
self.lock = threading.Lock()
self.running = True
# Start background thread to read responses
self.reader_thread = threading.Thread(target=self._read_responses)
self.reader_thread.daemon = True
self.reader_thread.start()
def _read_responses(self):
"""Background thread to read responses from the server"""
while self.running and self.process.poll() is None:
try:
# Read headers
headers = {}
while True:
line = self.process.stdout.readline()
if not line or line == '\r\n' or line == '\n':
break
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
if 'Content-Length' not in headers:
continue
# Read content
content_length = int(headers['Content-Length'])
content = self.process.stdout.read(content_length)
if not content:
break
# Parse JSON
message = json.loads(content)
# Store response or notification
with self.lock:
if 'id' in message:
# Response to a request
self.responses[message['id']] = message
else:
# Notification from server
self.notifications.append(message)
except (json.JSONDecodeError, ValueError):
continue
except Exception:
break
def _send_message(self, message: Dict[str, Any]):
"""Send a JSON-RPC message to the server.
Args:
message: JSON-RPC message dictionary
"""
content = json.dumps(message)
headers = f'Content-Length: {len(content)}\r\n\r\n'
self.process.stdin.write(headers + content)
self.process.stdin.flush()
def request(self, method: str, params: Optional[Dict] = None,
timeout: int = 30) -> Optional[Dict]:
"""Send a JSON-RPC request and wait for response.
Args:
method: LSP method name (e.g., 'initialize')
params: Method parameters dictionary
timeout: Timeout in seconds (default: 30)
Returns:
Response dictionary, or None on timeout/error
"""
request_id = self.next_id
self.next_id += 1
message = {
'jsonrpc': '2.0',
'id': request_id,
'method': method,
}
if params:
message['params'] = params
self._send_message(message)
# Wait for response
import time
start_time = time.time()
while time.time() - start_time < timeout:
with self.lock:
if request_id in self.responses:
response = self.responses.pop(request_id)
if 'result' in response:
return response['result']
if 'error' in response:
raise RuntimeError(
f"LSP error: {response['error']}")
return response
time.sleep(0.01)
return None
def notify(self, method: str, params: Optional[Dict] = None):
"""Send a JSON-RPC notification (no response expected).
Args:
method: LSP method name
params: Method parameters dictionary
"""
message = {
'jsonrpc': '2.0',
'method': method,
}
if params:
message['params'] = params
self._send_message(message)
def init(self, root_uri: str, capabilities: Optional[Dict] = None) -> Dict:
"""Send initialize request to the server.
Args:
root_uri: Workspace root URI (e.g., 'file:///path/to/workspace')
capabilities: Client capabilities dict
Returns:
Server capabilities from initialize response
"""
if capabilities is None:
capabilities = {
'textDocument': {
'semanticTokens': {
'requests': {
'full': True
}
},
'publishDiagnostics': {},
'inactiveRegions': {
'refreshSupport': False
}
}
}
result = self.request('initialize', {
'processId': None,
'rootUri': root_uri,
'capabilities': capabilities
})
# Send initialized notification
self.notify('initialized', {})
return result
def shutdown(self):
"""Shutdown the language server"""
self.request('shutdown')
self.notify('exit')
self.running = False
if self.process:
self.process.wait(timeout=5)
# Close file descriptors to avoid ResourceWarnings
if self.process.stdin:
self.process.stdin.close()
if self.process.stdout:
self.process.stdout.close()
if self.process.stderr:
self.process.stderr.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - ensure cleanup"""
self.shutdown()

153
tools/codman/test_lsp.py Executable file
View File

@@ -0,0 +1,153 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0+
#
# Copyright 2025 Canonical Ltd
#
"""Test script for LSP client with clangd"""
import json
import os
import sys
import tempfile
import time
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from lsp_client import LspClient # pylint: disable=wrong-import-position
def test_clangd():
"""Test basic clangd functionality"""
# Create a temporary directory with a simple C file
with tempfile.TemporaryDirectory() as tmpdir:
# Create a C file with CONFIG-style inactive code
test_file = os.path.join(tmpdir, 'test.c')
with open(test_file, 'w', encoding='utf-8') as f:
f.write('''#include <stdio.h>
// Simulate U-Boot style CONFIG options
#define CONFIG_FEATURE_A 1
void always_compiled(void)
{
printf("Always here\\n");
}
#ifdef CONFIG_FEATURE_A
void feature_a_code(void)
{
printf("Feature A enabled\\n");
}
#endif
#ifdef CONFIG_FEATURE_B
void feature_b_code(void)
{
printf("Feature B enabled (THIS SHOULD BE INACTIVE)\\n");
}
#endif
#if 0
void disabled_debug_code(void)
{
printf("Debug code (INACTIVE)\\n");
}
#endif
''')
# Create compile_commands.json
compile_commands = [
{
'directory': tmpdir,
'command': f'gcc -c {test_file}',
'file': test_file
}
]
compile_db = os.path.join(tmpdir, 'compile_commands.json')
with open(compile_db, 'w', encoding='utf-8') as f:
json.dump(compile_commands, f)
# Create .clangd config to enable inactive regions
clangd_config = os.path.join(tmpdir, '.clangd')
with open(clangd_config, 'w', encoding='utf-8') as f:
f.write('''InactiveRegions:
Opacity: 0.55
''')
print(f'Created test file: {test_file}')
print(f'Created compile DB: {compile_db}')
print(f'Created clangd config: {clangd_config}')
# Start clangd
print('\\nStarting clangd...')
with LspClient(['clangd', '--log=error',
f'--compile-commands-dir={tmpdir}']) as client:
print('Initialising...')
result = client.init(f'file://{tmpdir}')
print(f'Server capabilities: {result.get("capabilities", {}).keys()}')
# Open the document
print(f'\\nOpening document: {test_file}')
with open(test_file, 'r', encoding='utf-8') as f:
content = f.read()
client.notify('textDocument/didOpen', {
'textDocument': {
'uri': f'file://{test_file}',
'languageId': 'c',
'version': 1,
'text': content
}
})
# Wait for clangd to index the file
print('\\nWaiting for clangd to index file...')
time.sleep(3)
# Check for inactive regions notification
print('\\nChecking for inactive regions notification...')
with client.lock:
notifications = list(client.notifications)
print(f'Received {len(notifications)} notifications:')
inactive_regions = None
for notif in notifications:
method = notif.get('method', 'unknown')
print(f' - {method}')
# Look for the clangd inactive regions extension
if method == 'textDocument/clangd.inactiveRegions':
params = notif.get('params', {})
inactive_regions = params.get('inactiveRegions', [])
print(f' Found {len(inactive_regions)} inactive regions!')
if inactive_regions:
print('\\nInactive regions:')
for region in inactive_regions:
start = region['start']
end = region['end']
start_line = start['line'] + 1 # LSP is 0-indexed
end_line = end['line'] + 1
print(f' Lines {start_line}-{end_line}')
else:
print('\\nNo inactive regions received (feature may not be enabled)')
# Also show the file with line numbers for reference
print('\\nFile contents:')
for i, line in enumerate(content.split('\\n'), 1):
print(f'{i:3}: {line}')
print('\\nTest completed!')
# Check clangd stderr for any errors
print('\\n=== Clangd stderr output ===')
stderr_output = client.process.stderr.read()
if stderr_output:
print(stderr_output[:1000])
else:
print('(no stderr output)')
if __name__ == '__main__':
test_clangd()