- Add module-level disable for too-many-lines - Suppress consider-using-with for NamedTemporaryFile (intentional) - Change unused linenum to _ in parse_file() - Suppress too-many-arguments for _check_board() - Add missing param documentation for parse_extended() Cover-letter: buildman: Fix pylint warnings in board.py and boards.py This series addresses pylint warnings in the buildman board modules, achieving a 10/10 pylint score. For board.py, unavoidable warnings (too-many-instance-attributes, too-few-public-methods, too-many-arguments) are suppressed since the Board class is a legitimate data container. For boards.py, the series: - Fixes line-too-long and missing return documentation warnings - Refactors complex functions by extracting helper methods to reduce too-many-branches and too-many-locals warnings - Adds missing parameter documentation - Suppresses remaining unavoidable warnings END Co-developed-by: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Simon Glass <simon.glass@canonical.com>
1270 lines
42 KiB
Python
1270 lines
42 KiB
Python
# SPDX-License-Identifier: GPL-2.0+
|
|
# Copyright (c) 2012 The Chromium OS Authors.
|
|
# Author: Simon Glass <sjg@chromium.org>
|
|
# Author: Masahiro Yamada <yamada.m@jp.panasonic.com>
|
|
|
|
# pylint: disable=too-many-lines
|
|
|
|
"""Maintains a list of boards and allows them to be selected"""
|
|
|
|
from collections import OrderedDict, namedtuple
|
|
import errno
|
|
import fnmatch
|
|
import glob
|
|
import multiprocessing
|
|
import os
|
|
import re
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
from buildman import board
|
|
from buildman import kconfiglib
|
|
|
|
import qconfig
|
|
from u_boot_pylib import command
|
|
from u_boot_pylib.terminal import print_clear, tprint
|
|
from u_boot_pylib import tools
|
|
from u_boot_pylib import tout
|
|
|
|
### constant variables ###
|
|
CONFIG_DIR = 'configs'
|
|
SLEEP_TIME = 0.03
|
|
COMMENT_BLOCK = f'''#
|
|
# List of boards
|
|
# Automatically generated by {__file__}: don't edit
|
|
#
|
|
# Status, Arch, CPU, SoC, Vendor, Board, Target, Config, Maintainers
|
|
|
|
'''
|
|
|
|
Extended = namedtuple('Extended', 'name,desc,fragments,targets')
|
|
|
|
|
|
def try_remove(fname):
|
|
"""Remove a file ignoring 'No such file or directory' error.
|
|
|
|
Args:
|
|
fname (str): Filename to remove
|
|
|
|
Raises:
|
|
OSError: output file exists but could not be removed
|
|
"""
|
|
try:
|
|
os.remove(fname)
|
|
except OSError as exception:
|
|
# Ignore 'No such file or directory' error
|
|
if exception.errno != errno.ENOENT:
|
|
raise
|
|
|
|
|
|
def _check_srcdir_is_current(ctime, srcdir):
|
|
"""Check if any Kconfig or MAINTAINERS files are newer than ctime
|
|
|
|
Args:
|
|
ctime (float): Reference time to compare against
|
|
srcdir (str): Directory containing Kconfig and MAINTAINERS files
|
|
|
|
Returns:
|
|
bool: True if all files are older than ctime
|
|
"""
|
|
for (dirpath, _, filenames) in os.walk(srcdir):
|
|
for filename in filenames:
|
|
if (fnmatch.fnmatch(filename, '*~') or
|
|
not fnmatch.fnmatch(filename, 'Kconfig*') and
|
|
not filename == 'MAINTAINERS'):
|
|
continue
|
|
filepath = os.path.join(dirpath, filename)
|
|
if ctime < os.path.getctime(filepath):
|
|
return False
|
|
return True
|
|
|
|
|
|
def _check_output_is_current(output, config_dir):
|
|
"""Check if output references any removed boards
|
|
|
|
Args:
|
|
output (str): Path to the output file
|
|
config_dir (str): Directory containing defconfig files
|
|
|
|
Returns:
|
|
bool: True if all referenced boards still exist
|
|
"""
|
|
with open(output, encoding="utf-8") as inf:
|
|
for line in inf:
|
|
if 'Options,' in line:
|
|
return False
|
|
if line[0] == '#' or line == '\n':
|
|
continue
|
|
defconfig = line.split()[6] + '_defconfig'
|
|
if not os.path.exists(os.path.join(config_dir, defconfig)):
|
|
return False
|
|
return True
|
|
|
|
|
|
def output_is_new(output, config_dir, srcdir):
|
|
"""Check if the output file is up to date.
|
|
|
|
Looks at defconfig and Kconfig files to make sure none is newer than the
|
|
output file. Also ensures that the boards.cfg does not mention any removed
|
|
boards.
|
|
|
|
Args:
|
|
output (str): Filename to check
|
|
config_dir (str): Directory containing defconfig files
|
|
srcdir (str): Directory containing Kconfig and MAINTAINERS files
|
|
|
|
Returns:
|
|
bool: True if the given output file exists and is newer than any of
|
|
*_defconfig, MAINTAINERS and Kconfig*. False otherwise.
|
|
|
|
Raises:
|
|
OSError: output file exists but could not be opened
|
|
"""
|
|
try:
|
|
ctime = os.path.getctime(output)
|
|
except OSError as exception:
|
|
if exception.errno == errno.ENOENT:
|
|
# return False on 'No such file or directory' error
|
|
return False
|
|
raise
|
|
|
|
for (dirpath, _, filenames) in os.walk(config_dir):
|
|
for filename in fnmatch.filter(filenames, '*_defconfig'):
|
|
if fnmatch.fnmatch(filename, '.*'):
|
|
continue
|
|
filepath = os.path.join(dirpath, filename)
|
|
if ctime < os.path.getctime(filepath):
|
|
return False
|
|
|
|
if not _check_srcdir_is_current(ctime, srcdir):
|
|
return False
|
|
|
|
if not _check_output_is_current(output, config_dir):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class Expr:
|
|
"""A single regular expression for matching boards to build"""
|
|
|
|
def __init__(self, expr):
|
|
"""Set up a new Expr object.
|
|
|
|
Args:
|
|
expr (str): String containing regular expression to store
|
|
"""
|
|
self._expr = expr
|
|
self._re = re.compile(expr)
|
|
|
|
def matches(self, props):
|
|
"""Check if any of the properties match the regular expression.
|
|
|
|
Args:
|
|
props (list of str): List of properties to check
|
|
|
|
Returns:
|
|
bool: True if any of the properties match the regular expression
|
|
"""
|
|
for prop in props:
|
|
if self._re.match(prop):
|
|
return True
|
|
return False
|
|
|
|
def __str__(self):
|
|
return self._expr
|
|
|
|
class Term:
|
|
"""A list of expressions each of which must match with properties.
|
|
|
|
This provides a list of 'AND' expressions, meaning that each must
|
|
match the board properties for that board to be built.
|
|
"""
|
|
def __init__(self):
|
|
self._expr_list = []
|
|
self._board_count = 0
|
|
|
|
def add_expr(self, expr):
|
|
"""Add an Expr object to the list to check.
|
|
|
|
Args:
|
|
expr (Expr): New Expr object to add to the list of those that must
|
|
match for a board to be built.
|
|
"""
|
|
self._expr_list.append(Expr(expr))
|
|
|
|
def __str__(self):
|
|
"""Return some sort of useful string describing the term"""
|
|
return '&'.join([str(expr) for expr in self._expr_list])
|
|
|
|
def matches(self, props):
|
|
"""Check if any of the properties match this term
|
|
|
|
Each of the expressions in the term is checked. All must match.
|
|
|
|
Args:
|
|
props (list of str): List of properties to check
|
|
|
|
Returns:
|
|
bool: True if all of the expressions in the Term match, else False
|
|
"""
|
|
for expr in self._expr_list:
|
|
if not expr.matches(props):
|
|
return False
|
|
return True
|
|
|
|
|
|
class KconfigScanner:
|
|
|
|
"""Kconfig scanner."""
|
|
|
|
### constant variable only used in this class ###
|
|
_SYMBOL_TABLE = {
|
|
'arch' : 'SYS_ARCH',
|
|
'cpu' : 'SYS_CPU',
|
|
'soc' : 'SYS_SOC',
|
|
'vendor' : 'SYS_VENDOR',
|
|
'board' : 'SYS_BOARD',
|
|
'config' : 'SYS_CONFIG_NAME',
|
|
# 'target' is added later
|
|
}
|
|
|
|
def __init__(self, srctree):
|
|
"""Scan all the Kconfig files and create a Kconfig object."""
|
|
# Define environment variables referenced from Kconfig
|
|
os.environ['srctree'] = srctree
|
|
os.environ['UBOOTVERSION'] = 'dummy'
|
|
os.environ['KCONFIG_OBJDIR'] = ''
|
|
self._tmpfile = None
|
|
self._conf = kconfiglib.Kconfig(warn=False)
|
|
self._srctree = srctree
|
|
|
|
def __del__(self):
|
|
"""Delete a leftover temporary file before exit.
|
|
|
|
The scan() method of this class creates a temporay file and deletes
|
|
it on success. If scan() method throws an exception on the way,
|
|
the temporary file might be left over. In that case, it should be
|
|
deleted in this destructor.
|
|
"""
|
|
if self._tmpfile:
|
|
try_remove(self._tmpfile)
|
|
|
|
def _load_defconfig(self, defconfig):
|
|
"""Load a defconfig file, preprocessing if needed
|
|
|
|
If the defconfig contains #include directives, run the C
|
|
preprocessor to expand them before loading.
|
|
|
|
Args:
|
|
defconfig (str): Path to the defconfig file
|
|
"""
|
|
temp = None
|
|
if b'#include' in tools.read_file(defconfig):
|
|
cpp = os.getenv('CPP', 'cpp').split()
|
|
cmd = cpp + [
|
|
'-nostdinc', '-P',
|
|
'-I', self._srctree,
|
|
'-undef',
|
|
'-x', 'assembler-with-cpp',
|
|
defconfig]
|
|
stdout = command.output(*cmd, capture_stderr=True)
|
|
# pylint: disable=consider-using-with
|
|
temp = tempfile.NamedTemporaryFile(prefix='buildman-')
|
|
tools.write_file(temp.name, stdout, False)
|
|
fname = temp.name
|
|
tout.info(f'Processing #include to produce {defconfig}')
|
|
else:
|
|
fname = defconfig
|
|
|
|
self._conf.load_config(fname)
|
|
if temp:
|
|
del temp
|
|
self._tmpfile = None
|
|
|
|
def scan(self, defconfig, warn_targets):
|
|
"""Load a defconfig file to obtain board parameters.
|
|
|
|
Args:
|
|
defconfig (str): path to the defconfig file to be processed
|
|
warn_targets (bool): True to warn about missing or duplicate
|
|
CONFIG_TARGET options
|
|
|
|
Returns:
|
|
tuple: dictionary of board parameters. It has a form of:
|
|
{
|
|
'arch': <arch_name>,
|
|
'cpu': <cpu_name>,
|
|
'soc': <soc_name>,
|
|
'vendor': <vendor_name>,
|
|
'board': <board_name>,
|
|
'target': <target_name>,
|
|
'config': <config_header_name>,
|
|
}
|
|
warnings (list of str): list of warnings found
|
|
"""
|
|
leaf = os.path.basename(defconfig)
|
|
expect_target, match, rear = leaf.partition('_defconfig')
|
|
assert match and not rear, f'{leaf} : invalid defconfig'
|
|
|
|
self._load_defconfig(defconfig)
|
|
|
|
params = {}
|
|
warnings = []
|
|
|
|
# Get the value of CONFIG_SYS_ARCH, CONFIG_SYS_CPU, ... etc.
|
|
# Set '-' if the value is empty.
|
|
for key, symbol in list(self._SYMBOL_TABLE.items()):
|
|
value = self._conf.syms.get(symbol).str_value
|
|
if value:
|
|
params[key] = value
|
|
else:
|
|
params[key] = '-'
|
|
|
|
# Check there is exactly one TARGET_xxx set
|
|
if warn_targets:
|
|
warnings += self._check_targets(leaf, expect_target)
|
|
|
|
params['target'] = expect_target
|
|
|
|
self._fixup_arch(params)
|
|
|
|
return params, warnings
|
|
|
|
def _fixup_arch(self, params):
|
|
"""Fix up architecture names
|
|
|
|
Handle cases where the architecture name needs adjustment based on
|
|
CPU type or other configuration.
|
|
|
|
Args:
|
|
params (dict): Board parameters to update in place
|
|
"""
|
|
# fix-up for aarch64
|
|
if params['arch'] == 'arm' and params['cpu'] == 'armv8':
|
|
params['arch'] = 'aarch64'
|
|
|
|
# fix-up for riscv
|
|
if params['arch'] == 'riscv':
|
|
try:
|
|
value = self._conf.syms.get('ARCH_RV32I').str_value
|
|
except AttributeError:
|
|
value = ''
|
|
if value == 'y':
|
|
params['arch'] = 'riscv32'
|
|
else:
|
|
params['arch'] = 'riscv64'
|
|
|
|
def _check_targets(self, leaf, expect_target):
|
|
"""Check that exactly one TARGET_xxx option is set
|
|
|
|
Args:
|
|
leaf (str): Leaf name of defconfig file (for warnings)
|
|
expect_target (str): Expected target name
|
|
|
|
Returns:
|
|
list of str: List of warnings found
|
|
"""
|
|
warnings = []
|
|
target = None
|
|
for name, sym in self._conf.syms.items():
|
|
if name.startswith('TARGET_') and sym.str_value == 'y':
|
|
tname = name[7:].lower()
|
|
if target:
|
|
warnings.append(
|
|
f'WARNING: {leaf}: Duplicate TARGET_xxx: '
|
|
f'{target} and {tname}')
|
|
else:
|
|
target = tname
|
|
|
|
if not target:
|
|
cfg_name = expect_target.replace('-', '_').upper()
|
|
warnings.append(
|
|
f'WARNING: {leaf}: No TARGET_{cfg_name} enabled')
|
|
|
|
return warnings
|
|
|
|
|
|
class MaintainersDatabase:
|
|
|
|
"""The database of board status and maintainers.
|
|
|
|
Properties:
|
|
database: dict:
|
|
key: Board-target name (e.g. 'snow')
|
|
value: tuple:
|
|
str: Board status (e.g. 'Active')
|
|
str: List of maintainers, separated by :
|
|
warnings (list of str): List of warnings due to missing status, etc.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Create an empty database."""
|
|
self.database = {}
|
|
self.warnings = []
|
|
|
|
def get_status(self, target):
|
|
"""Return the status of the given board.
|
|
|
|
The board status is generally either 'Active' or 'Orphan'.
|
|
Display a warning message and return '-' if status information
|
|
is not found.
|
|
|
|
Args:
|
|
target (str): Build-target name
|
|
|
|
Returns:
|
|
str: 'Active', 'Orphan' or '-'.
|
|
"""
|
|
if not target in self.database:
|
|
self.warnings.append(f"WARNING: no status info for '{target}'")
|
|
return '-'
|
|
|
|
tmp = self.database[target][0]
|
|
if tmp.startswith('Maintained'):
|
|
return 'Active'
|
|
if tmp.startswith('Supported'):
|
|
return 'Active'
|
|
if tmp.startswith('Orphan'):
|
|
return 'Orphan'
|
|
self.warnings.append(f"WARNING: {tmp}: unknown status for '{target}'")
|
|
return '-'
|
|
|
|
def get_maintainers(self, target):
|
|
"""Return the maintainers of the given board.
|
|
|
|
Args:
|
|
target (str): Build-target name
|
|
|
|
Returns:
|
|
str: Maintainers of the board. If the board has two or more
|
|
maintainers, they are separated with colons.
|
|
"""
|
|
entry = self.database.get(target)
|
|
if entry:
|
|
status, maint_list = entry
|
|
if not status.startswith('Orphan'):
|
|
if len(maint_list) > 1 or (maint_list and maint_list[0] != '-'):
|
|
return ':'.join(maint_list)
|
|
|
|
self.warnings.append(f"WARNING: no maintainers for '{target}'")
|
|
return ''
|
|
|
|
def _add_targets(self, targets, status, maintainers):
|
|
"""Add targets to the database
|
|
|
|
Args:
|
|
targets (list of str): List of target names
|
|
status (str): Board status
|
|
maintainers (list of str): List of maintainers
|
|
"""
|
|
for target in targets:
|
|
self.database[target] = (status, maintainers)
|
|
|
|
@staticmethod
|
|
def _handle_f_tag(srcdir, rest, targets):
|
|
"""Handle F: tag - expand wildcard and filter by defconfig
|
|
|
|
Args:
|
|
srcdir (str): Source directory
|
|
rest (str): Remainder of line after 'F:'
|
|
targets (list of str): List to append targets to
|
|
"""
|
|
glob_path = os.path.join(srcdir, rest)
|
|
for item in glob.glob(glob_path):
|
|
front, match, rear = item.partition('configs/')
|
|
if front.endswith('/'):
|
|
front = front[:-1]
|
|
if front == srcdir and match:
|
|
front, match, rear = rear.rpartition('_defconfig')
|
|
if match and not rear:
|
|
targets.append(front)
|
|
|
|
@staticmethod
|
|
def _handle_n_tag(srcdir, rest, targets):
|
|
"""Handle N: tag - scan configs dir and match with regex
|
|
|
|
Args:
|
|
srcdir (str): Source directory
|
|
rest (str): Remainder of line after 'N:'
|
|
targets (list of str): List to append targets to
|
|
"""
|
|
walk_path = os.walk(os.path.join(srcdir, 'configs'))
|
|
for dirpath, _, fnames in walk_path:
|
|
for cfg in fnames:
|
|
path = os.path.join(dirpath, cfg)[len(srcdir) + 1:]
|
|
front, match, rear = path.partition('configs/')
|
|
if front or not match:
|
|
continue
|
|
front, match, rear = rear.rpartition('_defconfig')
|
|
|
|
# Use this entry if it matches the defconfig file
|
|
# without the _defconfig suffix. For example
|
|
# 'am335x.*' matches am335x_guardian_defconfig
|
|
if match and not rear and re.search(rest, front):
|
|
targets.append(front)
|
|
|
|
def parse_file(self, srcdir, fname):
|
|
"""Parse a MAINTAINERS file.
|
|
|
|
Parse a MAINTAINERS file and accumulate board status and maintainers
|
|
information in the self.database dict.
|
|
|
|
defconfig files are used to specify the target, e.g. xxx_defconfig is
|
|
used for target 'xxx'. If there is no defconfig file mentioned in the
|
|
MAINTAINERS file F: entries, then this function does nothing.
|
|
|
|
The N: name entries can be used to specify a defconfig file using
|
|
wildcards.
|
|
|
|
Args:
|
|
srcdir (str): Directory containing source code (Kconfig files)
|
|
fname (str): MAINTAINERS file to be parsed
|
|
"""
|
|
targets = []
|
|
maintainers = []
|
|
status = '-'
|
|
with open(fname, encoding="utf-8") as inf:
|
|
for _, line in enumerate(inf):
|
|
# Check also commented maintainers
|
|
if line[:3] == '#M:':
|
|
line = line[1:]
|
|
tag, rest = line[:2], line[2:].strip()
|
|
if tag == 'M:':
|
|
maintainers.append(rest)
|
|
elif tag == 'F:':
|
|
self._handle_f_tag(srcdir, rest, targets)
|
|
elif tag == 'S:':
|
|
status = rest
|
|
elif tag == 'N:':
|
|
self._handle_n_tag(srcdir, rest, targets)
|
|
elif line == '\n':
|
|
self._add_targets(targets, status, maintainers)
|
|
targets = []
|
|
maintainers = []
|
|
status = '-'
|
|
self._add_targets(targets, status, maintainers)
|
|
|
|
|
|
class Boards:
|
|
"""Manage a list of boards."""
|
|
def __init__(self):
|
|
self._boards = []
|
|
|
|
def add_board(self, brd):
|
|
"""Add a new board to the list.
|
|
|
|
The board's target member must not already exist in the board list.
|
|
|
|
Args:
|
|
brd (Board): board to add
|
|
"""
|
|
self._boards.append(brd)
|
|
|
|
def read_boards(self, fname):
|
|
"""Read a list of boards from a board file.
|
|
|
|
Create a Board object for each and add it to our _boards list.
|
|
|
|
Args:
|
|
fname (str): Filename of boards.cfg file
|
|
"""
|
|
with open(fname, 'r', encoding='utf-8') as inf:
|
|
for line in inf:
|
|
if line[0] == '#':
|
|
continue
|
|
fields = line.split()
|
|
if not fields:
|
|
continue
|
|
for upto, field in enumerate(fields):
|
|
if field == '-':
|
|
fields[upto] = ''
|
|
while len(fields) < 8:
|
|
fields.append('')
|
|
if len(fields) > 8:
|
|
fields = fields[:8]
|
|
|
|
brd = board.Board(*fields)
|
|
self.add_board(brd)
|
|
|
|
|
|
def get_list(self):
|
|
"""Return a list of available boards.
|
|
|
|
Returns:
|
|
list of Board: List of Board objects
|
|
"""
|
|
return self._boards
|
|
|
|
def get_dict(self):
|
|
"""Build a dictionary containing all the boards.
|
|
|
|
Returns:
|
|
Dictionary:
|
|
key is board.target
|
|
value is board
|
|
"""
|
|
board_dict = OrderedDict()
|
|
for brd in self._boards:
|
|
board_dict[brd.target] = brd
|
|
return board_dict
|
|
|
|
def get_selected_dict(self):
|
|
"""Return a dictionary containing the selected boards
|
|
|
|
Returns:
|
|
OrderedDict: Boards that are marked selected (key=target,
|
|
value=Board)
|
|
"""
|
|
board_dict = OrderedDict()
|
|
for brd in self._boards:
|
|
if brd.build_it:
|
|
board_dict[brd.target] = brd
|
|
return board_dict
|
|
|
|
def get_selected(self):
|
|
"""Return a list of selected boards
|
|
|
|
Returns:
|
|
list of Board: Board objects that are marked selected
|
|
"""
|
|
return [brd for brd in self._boards if brd.build_it]
|
|
|
|
def get_selected_names(self):
|
|
"""Return a list of selected boards
|
|
|
|
Returns:
|
|
list of str: Board names that are marked selected
|
|
"""
|
|
return [brd.target for brd in self._boards if brd.build_it]
|
|
|
|
@classmethod
|
|
def _build_terms(cls, args):
|
|
"""Convert command line arguments to a list of terms.
|
|
|
|
This deals with parsing of the arguments. It handles the '&'
|
|
operator, which joins several expressions into a single Term.
|
|
|
|
For example:
|
|
['arm & freescale sandbox', 'tegra']
|
|
|
|
will produce 3 Terms containing expressions as follows:
|
|
arm, freescale
|
|
sandbox
|
|
tegra
|
|
|
|
The first Term has two expressions, both of which must match for
|
|
a board to be selected.
|
|
|
|
Args:
|
|
args (list of str): List of command line arguments
|
|
|
|
Returns:
|
|
list of Term: A list of Term objects
|
|
"""
|
|
syms = []
|
|
for arg in args:
|
|
for word in arg.split():
|
|
sym_build = []
|
|
for term in word.split('&'):
|
|
if term:
|
|
sym_build.append(term)
|
|
sym_build.append('&')
|
|
syms += sym_build[:-1]
|
|
terms = []
|
|
term = None
|
|
oper = None
|
|
for sym in syms:
|
|
if sym == '&':
|
|
oper = sym
|
|
elif oper:
|
|
term.add_expr(sym)
|
|
oper = None
|
|
else:
|
|
if term:
|
|
terms.append(term)
|
|
term = Term()
|
|
term.add_expr(sym)
|
|
if term:
|
|
terms.append(term)
|
|
return terms
|
|
|
|
@staticmethod
|
|
# pylint: disable=too-many-arguments
|
|
def _check_board(brd, terms, brds, found, exclude_list, result):
|
|
"""Check whether to include or exclude a board
|
|
|
|
Checks the various terms and decides whether to build it or not.
|
|
|
|
If it is built, add the board to the result[term] list so we know
|
|
which term caused it to be built. Add it to result['all'] also.
|
|
|
|
Keep a list of boards we found in 'found', so we can report boards
|
|
which appear in self._boards but not in brds.
|
|
|
|
Args:
|
|
brd (Board): Board to check
|
|
terms (list of Term): Terms to match against
|
|
brds (list of str): List of board names to build, or None
|
|
found (list of str): List to append found board names to
|
|
exclude_list (list of Expr): Expressions for boards to exclude
|
|
result (OrderedDict): Dict to store results in
|
|
"""
|
|
matching_term = None
|
|
build_it = False
|
|
if terms:
|
|
for term in terms:
|
|
if term.matches(brd.props):
|
|
matching_term = str(term)
|
|
build_it = True
|
|
break
|
|
elif brds:
|
|
if brd.target in brds:
|
|
build_it = True
|
|
found.append(brd.target)
|
|
else:
|
|
build_it = True
|
|
|
|
# Check that it is not specifically excluded
|
|
for expr in exclude_list:
|
|
if expr.matches(brd.props):
|
|
build_it = False
|
|
break
|
|
|
|
if build_it:
|
|
brd.build_it = True
|
|
if matching_term:
|
|
result[matching_term].append(brd.target)
|
|
result['all'].append(brd.target)
|
|
|
|
def select_boards(self, args, exclude=None, brds=None):
|
|
"""Mark boards selected based on args
|
|
|
|
Normally either boards (an explicit list of boards) or args (a list of
|
|
terms to match against) is used. It is possible to specify both, in
|
|
which case they are additive.
|
|
|
|
If brds and args are both empty, all boards are selected.
|
|
|
|
Args:
|
|
args (list of str): List of strings specifying boards to include,
|
|
either named, or by their target, architecture, cpu, vendor or
|
|
soc. If empty, all boards are selected.
|
|
exclude (list of str): List of boards to exclude, regardless of
|
|
'args', or None for none
|
|
brds (list of Board): List of boards to build, or None/[] for all
|
|
|
|
Returns:
|
|
tuple:
|
|
OrderedDict: Boards selected due to each argument, keyed by
|
|
argument
|
|
list of str: Errors/warnings found
|
|
"""
|
|
result = OrderedDict()
|
|
warnings = []
|
|
terms = self._build_terms(args)
|
|
|
|
result['all'] = []
|
|
for term in terms:
|
|
result[str(term)] = []
|
|
|
|
exclude_list = []
|
|
if exclude:
|
|
for expr in exclude:
|
|
exclude_list.append(Expr(expr))
|
|
|
|
found = []
|
|
for brd in self._boards:
|
|
self._check_board(brd, terms, brds, found, exclude_list, result)
|
|
|
|
if brds:
|
|
remaining = set(brds) - set(found)
|
|
if remaining:
|
|
warnings.append(f"Boards not found: {', '.join(remaining)}\n")
|
|
|
|
return result, warnings
|
|
|
|
@classmethod
|
|
def scan_defconfigs_for_multiprocess(cls, srcdir, queue, defconfigs,
|
|
warn_targets):
|
|
"""Scan defconfig files and queue their board parameters
|
|
|
|
This function is intended to be passed to multiprocessing.Process()
|
|
constructor.
|
|
|
|
Args:
|
|
srcdir (str): Directory containing source code
|
|
queue (multiprocessing.Queue): The resulting board parameters are
|
|
written into this.
|
|
defconfigs (sequence of str): A sequence of defconfig files to be
|
|
scanned.
|
|
warn_targets (bool): True to warn about missing or duplicate
|
|
CONFIG_TARGET options
|
|
"""
|
|
kconf_scanner = KconfigScanner(srcdir)
|
|
for defconfig in defconfigs:
|
|
queue.put(kconf_scanner.scan(defconfig, warn_targets))
|
|
|
|
@classmethod
|
|
def read_queues(cls, queues, params_list, warnings):
|
|
"""Read the queues and append the data to the paramers list
|
|
|
|
Args:
|
|
queues (list of multiprocessing.Queue): Queues to read
|
|
params_list (list of dict): List to add params too
|
|
warnings (set of str): Set to add warnings to
|
|
"""
|
|
for que in queues:
|
|
while not que.empty():
|
|
params, warn = que.get()
|
|
params_list.append(params)
|
|
warnings.update(warn)
|
|
|
|
@staticmethod
|
|
def _collect_defconfigs(config_dir):
|
|
"""Collect all defconfig files from a directory
|
|
|
|
Args:
|
|
config_dir (str): Directory containing the defconfig files
|
|
|
|
Returns:
|
|
list of str: Paths to all defconfig files found
|
|
"""
|
|
all_defconfigs = []
|
|
for (dirpath, _, filenames) in os.walk(config_dir):
|
|
for filename in fnmatch.filter(filenames, '*_defconfig'):
|
|
if fnmatch.fnmatch(filename, '.*'):
|
|
continue
|
|
all_defconfigs.append(os.path.join(dirpath, filename))
|
|
return all_defconfigs
|
|
|
|
def _start_scanners(self, all_defconfigs, srcdir, jobs, warn_targets):
|
|
"""Start parallel defconfig scanning processes
|
|
|
|
Args:
|
|
all_defconfigs (list of str): Paths to defconfig files to scan
|
|
srcdir (str): Directory containing source code (Kconfig files)
|
|
jobs (int): The number of jobs to run simultaneously
|
|
warn_targets (bool): True to warn about missing or duplicate
|
|
CONFIG_TARGET options
|
|
|
|
Returns:
|
|
tuple:
|
|
list of Process: Running scanner processes
|
|
list of Queue: Queues for receiving results
|
|
"""
|
|
total_boards = len(all_defconfigs)
|
|
processes = []
|
|
queues = []
|
|
for i in range(jobs):
|
|
defconfigs = all_defconfigs[total_boards * i // jobs :
|
|
total_boards * (i + 1) // jobs]
|
|
que = multiprocessing.Queue(maxsize=-1)
|
|
proc = multiprocessing.Process(
|
|
target=self.scan_defconfigs_for_multiprocess,
|
|
args=(srcdir, que, defconfigs, warn_targets))
|
|
proc.start()
|
|
processes.append(proc)
|
|
queues.append(que)
|
|
|
|
return processes, queues
|
|
|
|
def scan_defconfigs(self, config_dir, srcdir, jobs=1, warn_targets=False):
|
|
"""Collect board parameters for all defconfig files.
|
|
|
|
This function invokes multiple processes for faster processing.
|
|
|
|
Args:
|
|
config_dir (str): Directory containing the defconfig files
|
|
srcdir (str): Directory containing source code (Kconfig files)
|
|
jobs (int): The number of jobs to run simultaneously
|
|
warn_targets (bool): True to warn about missing or duplicate
|
|
CONFIG_TARGET options
|
|
|
|
Returns:
|
|
tuple:
|
|
list of dict: List of board parameters, each a dict:
|
|
key: 'arch', 'cpu', 'soc', 'vendor', 'board', 'target',
|
|
'config'
|
|
value: string value of the key
|
|
list of str: List of warnings recorded
|
|
"""
|
|
all_defconfigs = self._collect_defconfigs(config_dir)
|
|
processes, queues = self._start_scanners(all_defconfigs, srcdir, jobs,
|
|
warn_targets)
|
|
|
|
# The resulting data should be accumulated to these lists
|
|
params_list = []
|
|
warnings = set()
|
|
|
|
# Data in the queues should be retrieved preriodically. Otherwise,
|
|
# the queues would become full and subprocesses would get stuck.
|
|
while any(p.is_alive() for p in processes):
|
|
self.read_queues(queues, params_list, warnings)
|
|
# sleep for a while until the queues are filled
|
|
time.sleep(SLEEP_TIME)
|
|
|
|
# Joining subprocesses just in case
|
|
# (All subprocesses should already have been finished)
|
|
for proc in processes:
|
|
proc.join()
|
|
|
|
# retrieve leftover data
|
|
self.read_queues(queues, params_list, warnings)
|
|
|
|
return params_list, sorted(list(warnings))
|
|
|
|
@classmethod
|
|
def insert_maintainers_info(cls, srcdir, params_list):
|
|
"""Add Status and Maintainers information to the board parameters list.
|
|
|
|
Args:
|
|
srcdir (str): Directory containing source code (MAINTAINERS files)
|
|
params_list (list of dict): A list of the board parameters
|
|
|
|
Returns:
|
|
list of str: List of warnings collected due to missing status, etc.
|
|
"""
|
|
database = MaintainersDatabase()
|
|
for (dirpath, _, filenames) in os.walk(srcdir):
|
|
if 'MAINTAINERS' in filenames and 'tools/buildman' not in dirpath:
|
|
database.parse_file(srcdir,
|
|
os.path.join(dirpath, 'MAINTAINERS'))
|
|
|
|
for i, params in enumerate(params_list):
|
|
target = params['target']
|
|
maintainers = database.get_maintainers(target)
|
|
params['maintainers'] = maintainers
|
|
if maintainers:
|
|
params['status'] = database.get_status(target)
|
|
else:
|
|
params['status'] = '-'
|
|
params_list[i] = params
|
|
return sorted(database.warnings)
|
|
|
|
@classmethod
|
|
def format_and_output(cls, params_list, output):
|
|
"""Write board parameters into a file.
|
|
|
|
Columnate the board parameters, sort lines alphabetically,
|
|
and then write them to a file.
|
|
|
|
Args:
|
|
params_list (list of dict): The list of board parameters
|
|
output (str): The path to the output file
|
|
"""
|
|
fields = ('status', 'arch', 'cpu', 'soc', 'vendor', 'board', 'target',
|
|
'config', 'maintainers')
|
|
|
|
# First, decide the width of each column
|
|
max_length = {f: 0 for f in fields}
|
|
for params in params_list:
|
|
for field in fields:
|
|
max_length[field] = max(max_length[field], len(params[field]))
|
|
|
|
output_lines = []
|
|
for params in params_list:
|
|
line = ''
|
|
for field in fields:
|
|
# insert two spaces between fields like column -t would
|
|
line += ' ' + params[field].ljust(max_length[field])
|
|
output_lines.append(line.strip())
|
|
|
|
# ignore case when sorting
|
|
output_lines.sort(key=str.lower)
|
|
|
|
with open(output, 'w', encoding="utf-8") as outf:
|
|
outf.write(COMMENT_BLOCK + '\n'.join(output_lines) + '\n')
|
|
|
|
def build_board_list(self, config_dir=CONFIG_DIR, srcdir='.', jobs=1,
|
|
warn_targets=False):
|
|
"""Generate a board-database file
|
|
|
|
This works by reading the Kconfig, then loading each board's defconfig
|
|
in to get the setting for each option. In particular, CONFIG_TARGET_xxx
|
|
is typically set by the defconfig, where xxx is the target to build.
|
|
|
|
Args:
|
|
config_dir (str): Directory containing the defconfig files
|
|
srcdir (str): Directory containing source code (Kconfig files)
|
|
jobs (int): The number of jobs to run simultaneously
|
|
warn_targets (bool): True to warn about missing or duplicate
|
|
CONFIG_TARGET options
|
|
|
|
Returns:
|
|
tuple:
|
|
list of dict: List of board parameters, each a dict:
|
|
key: 'arch', 'cpu', 'soc', 'vendor', 'board', 'config',
|
|
'target'
|
|
value: string value of the key
|
|
list of str: Warnings that came up
|
|
"""
|
|
params_list, warnings = self.scan_defconfigs(config_dir, srcdir, jobs,
|
|
warn_targets)
|
|
m_warnings = self.insert_maintainers_info(srcdir, params_list)
|
|
return params_list, warnings + m_warnings
|
|
|
|
def ensure_board_list(self, output, jobs=1, force=False, quiet=False):
|
|
"""Generate a board database file if needed.
|
|
|
|
This is intended to check if Kconfig has changed since the boards.cfg
|
|
files was generated.
|
|
|
|
Args:
|
|
output (str): The name of the output file
|
|
jobs (int): The number of jobs to run simultaneously
|
|
force (bool): Force to generate the output even if it is new
|
|
quiet (bool): True to avoid printing a message if nothing needs
|
|
doing
|
|
|
|
Returns:
|
|
bool: True if all is well, False if there were warnings
|
|
"""
|
|
if not force:
|
|
if not quiet:
|
|
tprint('\rChecking for Kconfig changes...', newline=False)
|
|
is_new = output_is_new(output, CONFIG_DIR, '.')
|
|
print_clear()
|
|
if is_new:
|
|
if not quiet:
|
|
print(f'{output} is up to date. Nothing to do.')
|
|
return True
|
|
if not quiet:
|
|
tprint('\rGenerating board list...', newline=False)
|
|
params_list, warnings = self.build_board_list(CONFIG_DIR, '.', jobs)
|
|
print_clear()
|
|
for warn in warnings:
|
|
print(warn, file=sys.stderr)
|
|
self.format_and_output(params_list, output)
|
|
return not warnings
|
|
|
|
def parse_all_extended(self, dbase):
|
|
"""Parse any .buildman files to find boards composed of fragments
|
|
|
|
Args:
|
|
dbase (tuple):
|
|
set of all config options seen (each a str)
|
|
set of all defconfigs seen (each a str)
|
|
dict of configs for each defconfig:
|
|
key: defconfig name, e.g. "MPC8548CDS_legacy_defconfig"
|
|
value: dict:
|
|
key: CONFIG option
|
|
value: Value of option
|
|
dict of defconfigs for each config:
|
|
key: CONFIG option
|
|
value: set of boards using that option
|
|
"""
|
|
for fname in glob.glob('configs/*.buildman'):
|
|
self.parse_extended(dbase, fname)
|
|
|
|
def find_by_target(self, target):
|
|
"""Find a board given its target name
|
|
|
|
Args:
|
|
target (str): Target string to search for
|
|
|
|
Return:
|
|
Board: board found
|
|
|
|
Raises:
|
|
ValueError: Board was not found
|
|
"""
|
|
for b in self._boards:
|
|
if b.target == target:
|
|
return b
|
|
|
|
targets = [b.target for b in self._boards]
|
|
for t in sorted(targets):
|
|
print(t)
|
|
raise ValueError(f"Board '{target}' not found")
|
|
|
|
def parse_extended(self, dbase, fname):
|
|
"""Parse a single 'extended' file
|
|
|
|
Args:
|
|
dbase (tuple): Database of defconfigs from qconfig
|
|
fname (str): Path to the extended-board file to parse
|
|
"""
|
|
result = ExtendedParser.parse_file(fname)
|
|
for ext in result:
|
|
ext_boards = self.scan_extended(dbase, ext)
|
|
for name in ext_boards:
|
|
# Find the base board
|
|
brd = self.find_by_target(name)
|
|
newb = board.Board(brd.status, brd.arch, brd.cpu, brd.soc,
|
|
brd.vendor, brd.board_name,
|
|
f'{ext.name},{brd.target}',
|
|
brd.cfg_name, ext, brd.target)
|
|
|
|
self.add_board(newb)
|
|
|
|
def scan_extended(self, dbase, ext):
|
|
"""Scan for extended boards
|
|
|
|
Args:
|
|
dbase (tuple): Database of defconfigs
|
|
ext (Extended): Extended-board definition
|
|
|
|
Returns:
|
|
set of str: Set of board names matching the extended definition
|
|
"""
|
|
# First check the fragments
|
|
frags = []
|
|
for frag in ext.fragments:
|
|
fname = os.path.join(f'configs/{frag}.config')
|
|
frags.append(tools.read_file(fname, binary=False))
|
|
|
|
# Now get a list of defconfigs (without the _defconfig suffix)
|
|
defconfigs = set()
|
|
cfg_list = []
|
|
for first, val in ext.targets:
|
|
if first == 'regex':
|
|
pattern = f'configs/{val}'
|
|
fnames = glob.glob(pattern)
|
|
if not fnames:
|
|
print(f"'Warning: No configs matching '{pattern}'")
|
|
for fname in fnames:
|
|
m_cfg = re.match(r'^configs/(.*)_defconfig$', fname)
|
|
defconfigs.add(m_cfg.group(1))
|
|
else:
|
|
if val == 'n':
|
|
cfg_list.append(f'~{first}')
|
|
elif val == 'y':
|
|
cfg_list.append(f'{first}')
|
|
else:
|
|
cfg_list.append(f'{first}={val}')
|
|
|
|
# Search for boards with the given configs
|
|
boards = qconfig.find_config(dbase, cfg_list)
|
|
if defconfigs:
|
|
boards &= defconfigs
|
|
|
|
return boards
|
|
|
|
|
|
class ExtendedParser:
|
|
"""Parser for extended-board (.buildman) files"""
|
|
def __init__(self):
|
|
self.extended = []
|
|
self.name = None
|
|
self.fragments = []
|
|
self.targets = []
|
|
self.in_targets = False
|
|
self.desc = None
|
|
|
|
def start(self):
|
|
"""Start a new extended board"""
|
|
self.name = None
|
|
self.fragments = []
|
|
self.targets = []
|
|
self.in_targets = False
|
|
self.desc = None
|
|
|
|
def finish(self):
|
|
"""Finish any pending extended board"""
|
|
if self.name:
|
|
self.extended.append(Extended(self.name, self.desc, self.fragments,
|
|
self.targets))
|
|
self.start()
|
|
|
|
@staticmethod
|
|
def parse_file(fname):
|
|
"""Parse a file and return the result
|
|
|
|
Args:
|
|
fname (str): Filename to parse
|
|
|
|
Returns:
|
|
list of Extended: List of extended-board definitions
|
|
"""
|
|
return ExtendedParser.parse_data(fname,
|
|
tools.read_file(fname, binary=False))
|
|
|
|
@staticmethod
|
|
def parse_data(fname, data):
|
|
"""Parse a file and return the result
|
|
|
|
Args:
|
|
fname (str): Filename (for error messages)
|
|
data (str): Contents of the file
|
|
|
|
Returns:
|
|
list of Extended: List of extended-board definitions
|
|
"""
|
|
parser = ExtendedParser()
|
|
parser.parse(fname, data)
|
|
return parser.extended
|
|
|
|
def parse(self, fname, data):
|
|
"""Parse the file
|
|
|
|
Args:
|
|
fname (str): Filename to parse (used for error messages)
|
|
data (str): Contents of the file
|
|
|
|
Returns:
|
|
list of Extended: List of extended-board definitions
|
|
|
|
Raises:
|
|
ValueError: Invalid syntax in file
|
|
"""
|
|
self.start()
|
|
for seq, line in enumerate(data.splitlines()):
|
|
linenum = seq + 1
|
|
if not line.strip() or line[0] == '#':
|
|
continue
|
|
if line[0] == ' ':
|
|
if not self.in_targets:
|
|
raise ValueError(f'{fname}:{linenum}: Unexpected indent')
|
|
if '=' in line:
|
|
pair = line.split('=')
|
|
if len(pair) != 2:
|
|
raise ValueError(
|
|
f'{fname}:{linenum}: Invalid CONFIG syntax')
|
|
first, rest = pair
|
|
cfg = first.strip()
|
|
value = rest.strip()
|
|
self.targets.append([cfg, value])
|
|
else:
|
|
target = line.strip()
|
|
if ' ' in target:
|
|
raise ValueError(
|
|
f'{fname}:{linenum}: Invalid target regex')
|
|
self.targets.append(['regex', line.strip()])
|
|
else:
|
|
self._parse_tag(fname, linenum, line)
|
|
|
|
self.finish()
|
|
return self.extended
|
|
|
|
def _parse_tag(self, fname, linenum, line):
|
|
"""Parse a tag line (one not starting with a space)
|
|
|
|
Args:
|
|
fname (str): Filename (for error messages)
|
|
linenum (int): Line number (for error messages)
|
|
line (str): Line to parse
|
|
|
|
Raises:
|
|
ValueError: Invalid syntax
|
|
"""
|
|
pair = line.split(':')
|
|
if len(pair) != 2:
|
|
raise ValueError(f'{fname}:{linenum}: Invalid tag')
|
|
tag, rest = pair
|
|
value = rest.strip()
|
|
if tag == 'name':
|
|
self.finish()
|
|
if ' ' in value:
|
|
raise ValueError(f'{fname}:{linenum}: Invalid name')
|
|
self.name = value
|
|
elif tag == 'desc':
|
|
self.desc = value
|
|
elif tag == 'fragment':
|
|
self.fragments.append(value)
|
|
elif tag == 'targets':
|
|
self.in_targets = True
|
|
else:
|
|
raise ValueError(f"{fname}:{linenum}: Unknown tag '{tag}'")
|