Files
u-boot/test/py/spawn.py
Simon Glass c5cad939ad test/py: Move expect() function into console_base
This function is only used from console_base and it is always accessed
via a 'self.p.expect()' expression, which is confusing.

Checking for expected output doesn't really relate to spawning in any
case, so move this function to console_base

For now the spawn class is kept as it is, to reduce the change in each
patch.

Signed-off-by: Simon Glass <sjg@chromium.org>
2025-07-23 17:38:59 -06:00

233 lines
7.2 KiB
Python

# SPDX-License-Identifier: GPL-2.0
# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
"""
Logic to spawn a sub-process and interact with its stdio.
This is used by console_board and console_sandbox
- console_board (for real hardware): Spawns 'u-boot-test-console' and provides
access to the console input/output
- console_sandbox (for sandbox): Spawns 'u-boot' and provides access to the
console input/output
In both cases, Spawn provides a way to send console commands and receive the
response from U-Boot. An expect() function helps to simplify things for the
higher levels.
The code in this file should be generic, i.e. not specific to sandbox or real
hardware.
Within the console_*py files, self.p is used to refer to the Spawn() object,
perhaps short for 'process'.
"""
import io
import os
import re
import pty
import signal
import select
import sys
import termios
import time
import traceback
# Character to send (twice) to exit the terminal
EXIT_CHAR = 0x1d # FS (Ctrl + ])
class Spawn:
"""Represents the stdio of a freshly created sub-process. Commands may be
sent to the process, and responses waited for.
Members:
output: accumulated output from expect()
"""
def __init__(self, args, cwd=None, decode_signal=False):
"""Spawn (fork/exec) the sub-process.
Args:
args (list of str): processs arguments. argv[0] is the command to
execute.
cwd (str or None): the directory to run the process in, or None for
no change.
decode_signal (bool): True to indicate the exception number when
something goes wrong
Returns:
Nothing.
"""
self.decode_signal = decode_signal
self.waited = False
self.exit_code = 0
self.exit_info = ''
self.buf = ''
self.output = ''
self.logfile_read = None
self.before = ''
self.after = ''
self.timeout = None
# http://stackoverflow.com/questions/7857352/python-regex-to-match-vt100-escape-sequences
self.re_vt100 = re.compile(r'(\x1b\[|\x9b)[^@-_]*[@-_]|\x1b[@-_]', re.I)
(self.pid, self.fd) = pty.fork()
if self.pid == 0:
try:
# For some reason, SIGHUP is set to SIG_IGN at this point when
# run under "go" (www.go.cd). Perhaps this happens under any
# background (non-interactive) system?
signal.signal(signal.SIGHUP, signal.SIG_DFL)
if cwd:
os.chdir(cwd)
os.execvp(args[0], args)
except:
print('CHILD EXECEPTION:')
traceback.print_exc()
finally:
os._exit(255)
old = None
try:
isatty = False
try:
isatty = os.isatty(sys.stdout.fileno())
# with --capture=tee-sys we cannot call fileno()
except io.UnsupportedOperation:
pass
if isatty:
new = termios.tcgetattr(self.fd)
old = new
new[3] = new[3] & ~(termios.ICANON | termios.ISIG)
new[3] = new[3] & ~termios.ECHO
new[6][termios.VMIN] = 0
new[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, new)
self.poll = select.poll()
self.poll.register(self.fd, select.POLLIN | select.POLLPRI | select.POLLERR |
select.POLLHUP | select.POLLNVAL)
except:
if old:
termios.tcsetattr(self.fd, termios.TCSANOW, old)
self.close()
raise
def kill(self, sig):
"""Send unix signal "sig" to the child process.
Args:
sig (int): The signal number to send
"""
os.kill(self.pid, sig)
def checkalive(self):
"""Determine whether the child process is still running.
Returns:
tuple:
True if process is alive, else False
0 if process is alive, else exit code of process
string describing what happened ('' or 'status/signal n')
"""
if self.waited:
return False, self.exit_code, self.exit_info
w = os.waitpid(self.pid, os.WNOHANG)
if w[0] == 0:
return True, 0, 'running'
status = w[1]
if os.WIFEXITED(status):
self.exit_code = os.WEXITSTATUS(status)
self.exit_info = f'status {self.exit_code}'
elif os.WIFSIGNALED(status):
signum = os.WTERMSIG(status)
self.exit_code = -signum
self.exit_info = f'signal {signum} ({signal.Signals(signum).name})'
self.waited = True
return False, self.exit_code, self.exit_info
def isalive(self):
"""Determine whether the child process is still running.
Returns:
bool: indicating whether process is alive
"""
return self.checkalive()[0]
def send(self, data):
"""Send data to the sub-process's stdin.
Args:
data (str): The data to send to the process.
"""
os.write(self.fd, data.encode(errors='replace'))
def receive(self, num_bytes):
"""Receive data from the sub-process's stdin.
Args:
num_bytes (int): Maximum number of bytes to read
Returns:
str: The data received
Raises:
ValueError if U-Boot died
"""
try:
c = os.read(self.fd, num_bytes).decode(errors='replace')
except OSError as err:
# With sandbox, try to detect when U-Boot exits when it
# shouldn't and explain why. This is much more friendly than
# just dying with an I/O error
if self.decode_signal and err.errno == 5: # I/O error
alive, _, info = self.checkalive()
if alive:
raise err
raise ValueError(f'U-Boot exited with {info}') from err
raise
return c
def close(self):
"""Close the stdio connection to the sub-process.
This also waits a reasonable time for the sub-process to stop running.
Args:
None.
Returns:
str: Type of closure completed
"""
# For Labgrid-sjg, ask it is exit gracefully, so it can transition the
# board to the final state (like 'off') before exiting.
if os.environ.get('USE_LABGRID_SJG'):
self.send(chr(EXIT_CHAR) * 2)
# Wait about 10 seconds for Labgrid to close and power off the board
for _ in range(100):
if not self.isalive():
return 'normal'
time.sleep(0.1)
# That didn't work, so try closing the PTY
os.close(self.fd)
for _ in range(100):
if not self.isalive():
return 'break'
time.sleep(0.1)
return 'timeout'
def get_expect_output(self):
"""Return the output read by expect()
Returns:
The output processed by expect(), as a string.
"""
return self.output