Files
u-boot/scripts/tkey_fde_key.py
Simon Glass 857835fbf2 scripts: Add tkey_fde_key.py for TKey disk encryption
Add a Python script for TKey-based full disk encryption key generation
and disk encryption operations:

- Generate hardware-backed encryption keys using TKey's Ed25519
  signature and SHA-256 hashing
- Encrypt disk images with LUKS using the derived keys
- Open LUKS encrypted disks using the derived keys
- Support for both interactive password input and file/stdin input
- Automatic TKey device detection via USB enumeration

The script derives deterministic encryption keys from a password and
the TKey's unique device identifier, suitable for unlocking encrypted
root filesystems at boot time.

Co-developed-by: Claude <noreply@anthropic.com>
Signed-off-by: Simon Glass <simon.glass@canonical.com>
2025-12-08 05:22:18 -07:00

2004 lines
62 KiB
Python
Executable File

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0+
# Copyright (C) 2025 Canonical Ltd
"""TKey Full Disk Encryption Key Generator
This script uses tkey-sign to generate encryption keys for full-disk encryption.
It prompts the user for a passphrase and uses the TKey's hardware-based key
derivation to create a consistent encryption key.
USAGE OVERVIEW:
==============
This tool provides three main functions:
1. Generate hardware-backed encryption keys using TKey
2. Encrypt disk images with LUKS using the derived keys
3. Open LUKS encrypted disks using the derived keys
BASIC USAGE:
-----------
# Generate a key interactively (prompts for password)
tkey-fde-key.py
# Generate key and save to file
tkey-fde-key.py -o /tmp/my-key.bin
# Read password from file instead of interactive prompt
tkey-fde-key.py -p /path/to/passfile
# Read password from stdin
echo 'mypassword' | tkey-fde-key.py -p -
DISK ENCRYPTION:
---------------
# Encrypt a disk image with LUKS (automatically resizes image for LUKS header)
tkey-fde-key.py -e /path/to/disk.img -p /path/to/passfile
# Encrypt disk with password from stdin
echo 'mypassword' | tkey-fde-key.py -e /path/to/disk.img -p -
# Encrypt disk and save backup key
tkey-fde-key.py -e /path/to/disk.img -p /path/to/passfile -o /tmp/backup.key
# Interactive encryption (will prompt for password)
tkey-fde-key.py -e /path/to/disk.img
DISK OPENING:
------------
# Open an encrypted disk (creates /dev/mapper/tkey-disk)
tkey-fde-key.py -O /path/to/encrypted.img -p /path/to/passfile
# Open with password from stdin
echo 'mypassword' | tkey-fde-key.py -O /path/to/encrypted.img -p -
# Open with custom mapper name
tkey-fde-key.py -O /path/to/encrypted.img -m my-disk -p /path/to/passfile
# Interactive opening (will prompt for password)
tkey-fde-key.py -O /path/to/encrypted.img
# After opening, mount the filesystem:
sudo mount /dev/mapper/tkey-disk /mnt
# When done, unmount and close:
sudo umount /mnt
sudo cryptsetup close tkey-disk
IMPORTANT NOTES:
===============
- The same password must be used to derive the same key
- TKey must be in firmware mode or will prompt for reinsertion
- Disk operations may require root privileges
- LUKS encryption uses AES-XTS-256 with SHA256
- Device mapper names must be unique system-wide
- Always backup important data before encryption
SECURITY:
========
- Keys are derived using TKey's hardware security module
- Temporary keyfiles are created with restrictive permissions (600)
- All temporary files are automatically cleaned up
- Password confirmation required for interactive input
- Empty passwords are not allowed
DEPENDENCIES:
============
- tkey-sign (TKey development tools)
- cryptsetup (for LUKS operations)
- truncate (for disk resizing)
- dmsetup (for device mapper operations)
For more examples, see the --help output.
"""
import argparse
import base64
import getpass
import glob
import hashlib
import os
import subprocess
import sys
import tempfile
import time
from types import SimpleNamespace
import serial
# Add the tools directory to the path for u_boot_pylib
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'tools'))
# pylint: disable=wrong-import-position,import-error
from u_boot_pylib import tools
from u_boot_pylib import tout
# TKey frame constants (from U-Boot tkey-uclass.c)
TKEY_FRAME_ID_CMD_V1 = 0
TKEY_ENDPOINT_FIRMWARE = 2
TKEY_STATUS_OK = 0
TKEY_LENGTH_1_BYTE = 0
TKEY_FW_CMD_NAME_VERSION = 0x01
def parse_args():
"""Parse command line arguments
Returns:
argparse.Namespace: Parsed command line arguments
"""
parser = argparse.ArgumentParser(
description='Generate full-disk encryption keys using TKey hardware',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
# Generate key interactively
tkey-fde-key.py
# Generate key and save to file
tkey-fde-key.py --output /tmp/disk.key
# Read password from file
tkey-fde-key.py --password-file /path/to/passfile
# Read password from stdin
echo 'mypassword' | tkey-fde-key.py --password-file -
# Output binary format instead of hex
tkey-fde-key.py --binary
# Encrypt a disk image with LUKS
tkey-fde-key.py -e /path/to/disk.img
# Encrypt specific partition (will prompt for selection)
tkey-fde-key.py -e /path/to/disk.img -P 2
# Open an encrypted disk image
tkey-fde-key.py -O /path/to/encrypted.img
# Open disk with custom mapper name
tkey-fde-key.py -O /path/to/encrypted.img -m my-disk
'''
)
parser.add_argument(
'--device',
help='TKey serial device (auto-detected if not specified)'
)
parser.add_argument(
'--output', '-o',
help='Output file for the key (prints to stdout if not specified)'
)
parser.add_argument(
'--binary',
action='store_true',
help='Output key in binary format (default is hex)'
)
parser.add_argument(
'--force', '-f',
action='store_true',
help='Overwrite existing output file'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--debug', '-d',
action='store_true',
help='Enable debug mode (passes --verbose to tkey-sign)'
)
parser.add_argument(
'--password-file', '-p',
help="Read password from file (use '-' for stdin)"
)
parser.add_argument(
'--encrypt-disk', '-e',
help='Disk image file to encrypt with LUKS using the derived key'
)
parser.add_argument(
'--open-disk', '-O',
help='LUKS encrypted disk image to open using the derived key'
)
parser.add_argument(
'--mapper-name', '-m',
default='tkey-disk',
help='Device mapper name for opened disk (default: tkey-disk)'
)
parser.add_argument(
'--partition', '-P',
type=int,
help='Partition number to encrypt (if not specified, will prompt or encrypt whole disk)'
)
return parser.parse_args()
def run_sudo(cmd, inp=None, timeout=60, capture=True):
"""Run a command with sudo, handling password prompts properly
Args:
cmd (list): Command and arguments to run with sudo
inp (str, optional): Data to pass to stdin. Defaults to None.
timeout (int, optional): Command timeout in seconds. Defaults to 60.
capture (bool, optional): Capture stdout/stderr. Defaults to True.
Returns:
subprocess.CompletedProcess: Result of the subprocess run
"""
# Check if we're already running as root
if not os.geteuid():
# Already root, run command directly
sudo_cmd = cmd
else:
# Not root, use sudo with environment preservation
# Preserve PATH and other important environment variables
env_vars = []
for var in ['PATH', 'HOME', 'USER']:
if var in os.environ:
env_vars.extend([f'{var}={os.environ[var]}'])
if env_vars:
sudo_cmd = ['sudo', 'env'] + env_vars + cmd
else:
sudo_cmd = ['sudo'] + cmd
cmd_str = ' '.join(sudo_cmd)
# Mask environment variables in verbose output for cleanliness
if 'env' in cmd_str:
cmd_str = cmd_str.replace('env PATH=', 'env PATH=...')
tout.detail(f'Running: {cmd_str}')
if capture:
return subprocess.run(
sudo_cmd,
input=inp,
capture_output=True,
text=True,
timeout=timeout,
check=True
)
# Allow interactive sudo (don't capture output)
return subprocess.run(
sudo_cmd,
input=inp,
text=True,
timeout=timeout,
check=False
)
def find_tkey_sign():
"""Find the full path to tkey-sign command
Returns:
str or None: Full path to tkey-sign or None if not found
"""
# Check if tkey-sign is in current PATH
try:
result = subprocess.run(['which', 'tkey-sign'], capture_output=True,
text=True, timeout=5, check=False)
if not result.returncode:
return result.stdout.strip()
except (subprocess.TimeoutExpired, OSError):
pass
# If running as root, check the original user's paths
original_user = os.environ.get('SUDO_USER')
if original_user and not os.geteuid():
import pwd # pylint: disable=import-outside-toplevel
try:
user_info = pwd.getpwnam(original_user)
user_home = user_info.pw_dir
user_paths = [
f'{user_home}/bin/tkey-sign',
f'{user_home}/.local/bin/tkey-sign',
]
for path in user_paths:
if os.path.exists(path) and os.access(path, os.X_OK):
return path
except KeyError:
pass
# Common system locations
common_paths = [
'/usr/local/bin/tkey-sign',
'/usr/bin/tkey-sign',
os.path.expanduser('~/bin/tkey-sign'),
os.path.expanduser('~/.local/bin/tkey-sign'),
]
for path in common_paths:
if os.path.exists(path) and os.access(path, os.X_OK):
return path
return None
def run_tkey_sign(args, inp=None):
"""Run tkey-sign command with given arguments
Args:
args (list): Command line arguments for tkey-sign
inp (str, optional): Data to pass to stdin. Defaults to None.
Returns:
subprocess.CompletedProcess: Result of the subprocess run
"""
# Find tkey-sign path
fname = find_tkey_sign()
if not fname:
tout.warning('tkey-sign not found, using PATH search')
fname = 'tkey-sign'
cmd = [fname] + args
# Show the command being run, but mask USS input for security
cmd_str = ' '.join(cmd)
if '--uss-file' in args:
tout.detail(f'Running: {cmd_str} (with USS file)')
elif inp and '--uss' in args:
tout.detail(f'Running: {cmd_str} (with USS input)')
else:
tout.detail(f'Running: {cmd_str}')
result = subprocess.run(
cmd,
input=inp,
capture_output=True,
text=True,
timeout=30,
check=True
)
tout.detail(f'Command exit code: {result.returncode}')
if result.stdout:
tout.detail(f'Command stdout: {result.stdout}')
if result.stderr:
tout.detail(f'Command stderr: {result.stderr}')
return result
def get_tkey_pubkey(device=None):
"""Get the public key from TKey with optional USS
Args:
device (str, optional): TKey device path. Defaults to None
(auto-detect).
Returns:
str or None: Public key string on success, None on failure
"""
with tempfile.NamedTemporaryFile(mode='w+', suffix='.pub',
delete=False) as f:
pubkey_file = f.name
args = ['--getkey', '--public', pubkey_file, '--force']
if device:
args.extend(['--port', device])
result = run_tkey_sign(args)
if not result or result.returncode:
err = result.stderr if result else 'Unknown error'
tout.error(f'Error getting public key: {err}')
if os.path.exists(pubkey_file):
os.unlink(pubkey_file)
return None
pubkey = tools.read_file(pubkey_file, binary=False).strip()
if os.path.exists(pubkey_file):
os.unlink(pubkey_file)
return pubkey
def check_tkey_mode(device=None):
"""Check if TKey is in firmware mode or has an app loaded
Uses the same logic as U-Boot's tkey_get_name_version() function to detect
mode. In firmware mode: returns name0='tk1 ' name1='mkdf'
In app mode: firmware commands return error status with error code 0x00
Args:
device (str, optional): TKey device path. Defaults to None
(auto-detect).
Returns:
str or None: 'firmware' if in firmware mode, 'app' if app loaded, None
on error
"""
# Try to get name/version to determine mode (same as U-Boot does)
with tempfile.NamedTemporaryFile(mode='w', suffix='.temp',
delete=False) as temp_f:
temp_file = temp_f.name
# Use a simple command that should work regardless of USS
args = ['--getkey', '--public', temp_file, '--force']
if device:
args.extend(['--port', device])
result = run_tkey_sign(args)
# Read content before cleanup
content = ''
if os.path.exists(temp_file):
content = tools.read_file(temp_file, binary=False).strip()
os.unlink(temp_file)
if not result:
return None
if result.returncode:
# Command failed - could indicate various issues
return None
# In app mode with USS, tkey-sign warns about app already loaded
if result.stderr and 'App already loaded' in result.stderr:
return 'app'
# In firmware mode, we get a public key without warnings
if content:
return 'firmware'
return None
def check_tkey_mode_raw(device='/dev/ttyACM0'):
"""Check TKey mode by directly communicating with the device
Sends a firmware NAME_VERSION command to determine if the TKey is in:
- firmware mode (responds with 'tk1 mkdf' + version)
- app mode (responds with error status)
Args:
device (str, optional): Serial device path. Defaults to '/dev/ttyACM0'.
Returns:
str or None: 'firmware', 'app' if app loaded, None on error
"""
try:
# Open serial connection with TKey baud rate
ser = serial.Serial(device, baudrate=62500, timeout=.5)
tout.info(f'Opened {device} at 62500 baud')
# Build frame header: cmd=0, endpoint=2(firmware), status=0, len=1byte
# Header format: [id:2][endpoint:2][status:1][len:2][reserved:1]
header = (((TKEY_FRAME_ID_CMD_V1 & 0x3) << 5) |
((TKEY_ENDPOINT_FIRMWARE & 0x3) << 3) |
((TKEY_STATUS_OK & 0x1) << 2) |
(TKEY_LENGTH_1_BYTE & 0x3))
# Frame: [header][command]
frame = bytes([header, TKEY_FW_CMD_NAME_VERSION])
tout.info(f'Sending NAME_VERSION: {frame.hex()}')
# Send command
ser.write(frame)
ser.flush()
# Read response (up to 64 bytes with timeout)
resp = ser.read(64)
ser.close()
tout.info(f'Received ({len(resp)} bytes): {resp.hex()}')
if not resp:
tout.info('No response received')
return None
# Parse response header
if len(resp) >= 1:
hdr = resp[0]
status_bit = (hdr >> 2) & 0x1
tout.info(f'Response header: 0x{hdr:02x}, status: {status_bit}')
if status_bit == 1:
# Error status - likely app mode responding to firmware cmd
tout.info('Error status bit set - device in app mode')
return 'app'
# Success status - check for firmware mode response pattern
if len(resp) >= 13: # Header + 4 + 4 + 4 bytes minimum
# Look for 'tk1 ' and 'mkdf' in resp
resp_str = resp[1:].decode('ascii', errors='ignore')
if 'tk1' in resp_str and 'mkdf' in resp_str:
tout.info('Found firmware identifiers')
return 'firmware'
# Got success resp but couldn't parse - assume firmware
tout.info('Success response - assuming firmware mode')
return 'firmware'
return None
except OSError as e:
tout.info(f'Error during TKey communication: {e}')
return None
def get_tkey_pubkey_uss(uss, device=None):
'''Get public key from TKey using a User Supplied Secret
Args:
uss (str): User Supplied Secret (password/passphrase) for key derivation
device (str, optional): TKey device path. Defaults to None (auto-detect)
Returns:
str or None: Public key string on success, None on failure
'''
# Check if TKey already has an app loaded first
# Try direct serial communication first, fall back to tkey-sign
mode = None
if not device or device == '/dev/ttyACM0':
mode = check_tkey_mode_raw('/dev/ttyACM0')
if mode is None:
# Fallback to tkey-sign method
mode = check_tkey_mode(device)
tout.info(f'TKey mode: {mode}')
if mode == 'app':
# App already loaded, need to wait for reinsertion
wait_tkey_replug()
else:
# Show 'Setting up TKey' only when we're about to do the USS operation
# (not when we need to wait for reinsertion)
tout.notice('Setting up TKey')
with tempfile.NamedTemporaryFile(mode='w+', suffix='.pub',
delete=False) as f:
pubfile = f.name
# Create temporary file for USS
with tempfile.NamedTemporaryFile(mode='w', suffix='.uss',
delete=False) as uss_f:
uss_file = uss_f.name
uss_f.write(uss)
args = ['--getkey', '--public', pubfile, '--uss-file', uss_file, '--force']
if device:
args.extend(['--port', device])
result = run_tkey_sign(args)
if not result or result.returncode:
err = result.stderr if result else 'Unknown error'
tout.error(f'Error getting public key with USS: {err}')
if os.path.exists(pubfile):
os.unlink(pubfile)
if os.path.exists(uss_file):
os.unlink(uss_file)
return None
pubkey = tools.read_file(pubfile, binary=False).strip()
if os.path.exists(pubfile):
os.unlink(pubfile)
if os.path.exists(uss_file):
os.unlink(uss_file)
return pubkey
def derive_fde_key(uss, device=None):
"""Derive a full-disk encryption key using TKey hardware and USS.
This uses the TKey's hardware-based key derivation where the USS (User
Supplied Secret) affects the internal key generation, producing different
keys for different USS values.
The key derivation matches U-Boot's tkey_derive_disk_key():
1. Get the 32-byte Ed25519 public key from TKey
2. Convert to lowercase hex string (64 characters)
3. SHA256 hash the hex string to produce the disk key
Args:
uss (str): User Supplied Secret (password/passphrase) for key derivation
device (str, optional): TKey device path. Defaults to None (auto-detect)
Returns:
bytes or None: 32-byte encryption key material on success, None on
failure
"""
# Get the public key using the USS
# Different USS values produce different private/public key pairs inside the
# TKey
pubkey_signify = get_tkey_pubkey_uss(uss, device)
if not pubkey_signify:
return None
tout.info(f'Signify public key:\n{pubkey_signify}')
# Parse signify format: skip comment line, decode base64 of second line
# Format: "untrusted comment: ...\n<base64-encoded-data>"
lines = pubkey_signify.strip().split('\n')
if len(lines) < 2:
tout.error('Invalid public key format')
return None
# Decode base64 data (second line)
try:
pubkey_data = base64.b64decode(lines[1])
except base64.binascii.Error as e:
tout.error(f'Error decoding public key: {e}')
return None
# Signify format: 2-byte algorithm + 8-byte keynum + 32-byte pubkey
# Extract the 32-byte Ed25519 public key (last 32 bytes)
if len(pubkey_data) < 32:
tout.error(f'Public key data too short ({len(pubkey_data)} bytes)')
return None
pubkey_bytes = pubkey_data[-32:]
tout.info(f'Ed25519 public key (hex): {pubkey_bytes.hex()}')
# Match U-Boot's tkey_derive_disk_key(): SHA256(hex_string_of_pubkey)
pubkey_hex = pubkey_bytes.hex()
key_material = hashlib.sha256(pubkey_hex.encode()).digest()
tout.info(f'Derived disk key (hex): {key_material.hex()}')
return key_material
def find_tkey_device():
"""Check if TKey USB device is present
Looks for TKey device (vendor ID 1207, product ID 8887) via USB enumeration.
Returns:
bool: True if TKey device found, False otherwise
"""
usb_devices = glob.glob('/sys/bus/usb/devices/*/idVendor')
for vendor_file in usb_devices:
try:
vendor_id = tools.read_file(vendor_file, binary=False).strip()
if vendor_id == '1207': # Tillitis vendor ID
product = vendor_file.replace('idVendor', 'idProduct')
if os.path.exists(product):
product_id = tools.read_file(product, binary=False).strip()
if product_id == '8887': # TKey product ID
return True
except (IOError, OSError):
continue
return False
def detect_tkey(device=None):
"""Check if TKey is present at startup and prompt for insertion if needed
Args:
device (str, optional): TKey device path. Defaults to None (auto-detect)
Returns:
bool: True if TKey is available, False on error
"""
# Check if specific device path exists
if device and os.path.exists(device):
tout.info(f'TKey device found at {device}')
return True
# Check for TKey via USB enumeration
if find_tkey_device():
tout.info('TKey detected via USB enumeration')
return True
# No TKey found - prompt for insertion
tout.notice('Please insert your TKey...')
# Wait for TKey to be inserted
while not find_tkey_device():
time.sleep(0.5)
tout.info('TKey detected')
# Give the device a moment to settle
time.sleep(1)
return True
def wait_tkey_replug():
"""Wait for TKey to be removed and then re-inserted"""
tout.info('Waiting for TKey removal and reinsertion...')
# Wait for device to be removed
if find_tkey_device():
tout.notice('Please remove your TKey...')
while find_tkey_device():
time.sleep(0.5)
tout.info('TKey removed')
# Wait for device to be inserted
tout.notice('Please insert your TKey...')
while not find_tkey_device():
time.sleep(0.5)
tout.info('TKey detected')
tout.notice('Setting up TKey')
# Give the device a moment to settle
time.sleep(.2)
def format_key_for_luks(key_material):
'''Format the key material as hex for LUKS
Args:
key_material (bytes): Raw key material bytes
Returns:
str: Hexadecimal representation of the key
'''
return key_material.hex()
def save_key_to_file(key_material, outfile, force=False):
"""Save the key material to a file
Args:
key_material (bytes): Raw key material to save
outfile (str): Path to output file
force (bool, optional): Overwrite existing files. Defaults to False.
Returns:
bool: True on success, False on failure
"""
if os.path.exists(outfile) and not force:
tout.error(f'Output file {outfile} already exists. '
'Use --force to overwrite.')
return False
tools.write_file(outfile, key_material)
# Set restrictive permissions
os.chmod(outfile, 0o600)
tout.notice(f'Key saved to {outfile}')
return True
def get_password(args):
"""Get password from user input or file
Args:
args (argparse.Namespace): Parsed command line arguments
Returns:
str or None: Password string on success, None on failure
"""
if args.password_file:
if args.password_file == '-':
# Read from stdin
tout.info('Reading password from stdin...')
password = sys.stdin.readline().rstrip('\n\r')
else:
# Read from file
tout.info(f'Reading password from file: {args.password_file}')
password = tools.read_file(args.password_file, binary=False).strip()
if not password:
tout.error('Empty password not allowed')
return None
tout.info(f'Password length: {len(password)}, repr: {repr(password)}')
else:
# Interactive input
password = getpass.getpass('Enter password for key derivation: ')
if not password:
tout.error('Empty password not allowed')
return None
# Confirm password only for interactive input
confirm = getpass.getpass('Confirm password: ')
if password != confirm:
tout.error('Passwords do not match')
return None
return password
def check_broken_luks(disk_path):
"""Check if disk has broken LUKS metadata
Args:
disk_path (str): Path to disk image file
Returns:
bool: True if broken LUKS metadata detected, False otherwise
"""
try:
# Use cryptsetup luksDump to check for broken metadata
result = subprocess.run(
['cryptsetup', 'luksDump', disk_path],
capture_output=True,
text=True,
timeout=30,
check=False
)
if result.returncode:
# Check if the error specifically mentions broken metadata
# But exclude common "not a LUKS device" messages
error_lower = result.stderr.lower()
# These indicate broken LUKS (partial/corrupted headers)
broken_hints = ['broken', 'invalid', 'corrupted', 'damaged']
# These indicate no LUKS at all (normal for fresh files)
no_luks_hints = ['not a luks device', 'no luks header',
'unrecognized']
# Check for "no LUKS" messages first (these are normal)
if any(hint in error_lower for hint in no_luks_hints):
tout.info('No LUKS header detected (normal for fresh disk)')
return False
# Check for broken LUKS hints
if any(hint in error_lower for hint in broken_hints):
tout.info(f'Detected broken LUKS metadata: {result.stderr}')
return True
return False
except subprocess.TimeoutExpired:
tout.info('Timeout checking for broken LUKS metadata')
return False
except OSError as e:
tout.info(f'Error checking for broken LUKS: {e}')
return False
def wipe_luks_header(disk_path):
"""Wipe broken LUKS header from disk
Args:
disk_path (str): Path to disk image file
Returns:
bool: True on success, False on failure
"""
tout.info(f'Wiping LUKS header from {disk_path}')
try:
# Use dd to zero out the first 32MB (typical LUKS header size)
result = subprocess.run(
['dd', 'if=/dev/zero', f'of={disk_path}', 'bs=1M', 'count=32',
'conv=notrunc'],
capture_output=True,
text=True,
timeout=60,
check=True
)
if result.returncode:
tout.error(f'Error wiping LUKS header: {result.stderr}')
return False
tout.info('LUKS header wiped successfully')
if result.stderr: # dd output goes to stderr
tout.detail(f'dd output: {result.stderr}')
return True
except subprocess.TimeoutExpired:
tout.error('LUKS header wipe operation timed out')
return False
except FileNotFoundError:
tout.error('dd command not found')
return False
except OSError as e:
tout.error(f'Error during LUKS header wipe: {e}')
return False
def parse_fdisk_line(line, disk_path):
"""Parse a single fdisk output line into partition info
Args:
line (str): A line from fdisk -l output
disk_path (str): Path to disk image file
Returns:
dict or None: Partition info dict, or None if line is not a partition
"""
if disk_path not in line or not line.strip() or line.startswith('Disk'):
return None
parts = line.split()
if len(parts) < 6 or not parts[0].startswith(disk_path):
return None
try:
partition_num = int(parts[0].replace(disk_path, '').replace('p', ''))
# Handle bootable flag - if '*' is present, it shifts other fields
if '*' in parts[1]:
bootable = True
start_sector = int(parts[2])
end_sector = int(parts[3])
sectors = int(parts[4])
size = parts[5]
part_type = ' '.join(parts[7:]) if len(parts) > 7 else 'Unknown'
else:
bootable = False
start_sector = int(parts[1])
end_sector = int(parts[2])
sectors = int(parts[3])
size = parts[4]
part_type = ' '.join(parts[6:]) if len(parts) > 6 else 'Unknown'
return {
'number': partition_num,
'start': start_sector,
'end': end_sector,
'sectors': sectors,
'size': size,
'type': part_type,
'bootable': bootable
}
except (ValueError, IndexError):
return None
def get_disk_parts(disk_path):
"""Get partition information from disk image
Args:
disk_path (str): Path to disk image file
Returns:
list or None: List of partition info dicts, None on error
"""
try:
result = subprocess.run(
['fdisk', '-l', disk_path],
capture_output=True,
text=True,
timeout=30,
check=False
)
if result.returncode:
tout.info(f'Error running fdisk: {result.stderr}')
return None
partitions = []
for line in result.stdout.split('\n'):
part_info = parse_fdisk_line(line, disk_path)
if part_info:
partitions.append(part_info)
if partitions:
tout.info(f'Found {len(partitions)} partitions:')
for p in partitions:
boot_flag = ' (bootable)' if p['bootable'] else ''
tout.info(f" Partition {p['number']}: {p['size']} "
f"{p['type']}{boot_flag}")
return partitions
except subprocess.TimeoutExpired:
tout.info('Timeout reading partition table')
return None
except FileNotFoundError:
tout.info('fdisk command not found')
return None
except OSError as e:
tout.info(f'Error reading partition table: {e}')
return None
def select_partition(disk_path, partitions, args):
"""Select which partition to encrypt
Args:
disk_path (str): Path to disk image
partitions (list): List of partition info dicts
args (Namespace): Command line arguments
Returns:
int or None: Selected partition number, None for whole disk
"""
# If partition specified on command line, use it
if args.partition:
if any(p['number'] == args.partition for p in partitions):
tout.info(f'Using partition {args.partition} from command line')
return args.partition
tout.error(f'Partition {args.partition} not found')
return False # Return False to indicate error
# If no partitions found, encrypt whole disk
if not partitions:
tout.info('No partitions detected, will encrypt whole disk')
return None
# Show partition table and prompt for selection
tout.notice(f'Disk {disk_path} contains partitions:')
for p in partitions:
boot_flag = ' (bootable)' if p['bootable'] else ''
tout.notice(f" {p['number']}: {p['size']} {p['type']}{boot_flag}")
tout.notice(' 0: Encrypt whole disk')
while True:
try:
resp = input('Select partition to encrypt (0 for whole disk): ')
choice = int(resp)
if not choice:
return None # Whole disk
if any(p['number'] == choice for p in partitions):
return choice
nums = [p["number"] for p in partitions]
tout.notice(f'Invalid choice. Please select 0 or one of: {nums}')
except (ValueError, KeyboardInterrupt):
tout.notice('\nOperation cancelled')
return None
def setup_loop_dev(disk_path):
"""Set up loop device for disk image with partition support
Args:
disk_path (str): Path to disk image
Returns:
str or None: Loop device path on success, None on failure
"""
try:
# First, ensure sudo credentials are cached with an interactive command
if os.geteuid() != 0:
tout.info('Requesting sudo privileges for loop device operations...')
result = subprocess.run(['sudo', '-v'], timeout=30, check=False)
if result.returncode:
tout.error('Could not obtain sudo privileges')
return None
# Find available loop device
result = run_sudo(
['losetup', '--find', '--show', '--partscan', disk_path],
timeout=30,
capture=True # We need the output for the loop device path
)
if result.returncode:
tout.error(f'Error setting up loop device: {result.stderr}')
return None
loop_device = result.stdout.strip()
tout.info(f'Set up loop device {loop_device} for {disk_path}')
# Force partition scan
run_sudo(['partprobe', loop_device], timeout=10)
return loop_device
except subprocess.TimeoutExpired:
tout.error('Loop device setup timed out')
return None
except OSError as e:
tout.error(f'Error setting up loop device: {e}')
return None
def cleanup_loop_dev(loop_device):
"""Clean up loop device
Args:
loop_device (str): Loop device path (e.g., /dev/loop0)
Returns:
bool: True on success, False on failure
"""
try:
result = run_sudo(
['losetup', '--detach', loop_device],
timeout=30
)
if result.returncode:
tout.error(f'Error cleaning up loop device: {result.stderr}')
return False
tout.info(f'Cleaned up loop device {loop_device}')
return True
except subprocess.TimeoutExpired:
tout.error('Loop device cleanup timed out')
return False
except OSError as e:
tout.error(f'Error cleaning up loop device: {e}')
return False
def get_part_dev(loop_device, partition_num):
"""Get the device path for a specific partition
Args:
loop_device (str): Loop device path (e.g., /dev/loop0)
partition_num (int): Partition number
Returns:
str: Partition device path (e.g., /dev/loop0p2)
"""
return f'{loop_device}p{partition_num}'
def check_disk_image(disk_path):
"""Check disk image file and determine if it needs space for LUKS header
Args:
disk_path (str): Path to disk image file
Returns:
SimpleNamespace or None: Namespace with disk info on success, None on
failure. Contains: size, needs_resize, luks_hdrsize,
already_encrypted
"""
if not os.path.exists(disk_path):
tout.error(f'Disk image {disk_path} does not exist')
return None
if not os.path.isfile(disk_path):
tout.error(f'{disk_path} is not a regular file')
return None
# Get current file size
stat_info = os.stat(disk_path)
current_size = stat_info.st_size
size_mb = current_size / (1024 * 1024)
tout.info(f'Disk image size: {current_size} bytes ({size_mb:.1f} MB)')
# LUKS header is typically 16 MB, but we'll use 32 MB for safety
luks_hdrsize = 32 * 1024 * 1024 # 32 MB
# Check if disk is already LUKS encrypted
# Read first few bytes to check for LUKS signature
try:
with open(disk_path, 'rb') as f:
header = f.read(16)
if header.startswith(b'LUKS'):
tout.info('Disk image appears to already be LUKS encrypted')
return SimpleNamespace(
size=current_size,
needs_resize=False,
luks_hdrsize=0,
already_encrypted=True,
broken_luks=False
)
except IOError as e:
tout.error(f'Error reading disk image: {e}')
return None
# Check for broken LUKS metadata using cryptsetup
broken_luks = check_broken_luks(disk_path)
if broken_luks:
return SimpleNamespace(
size=current_size,
needs_resize=False,
luks_hdrsize=0,
already_encrypted=False,
broken_luks=True
)
# For unencrypted disks, we need to add space for LUKS header
return SimpleNamespace(
size=current_size,
needs_resize=True,
luks_hdrsize=luks_hdrsize,
already_encrypted=False,
broken_luks=False
)
def resize_disk(disk_path, additional_size):
"""Resize disk image to add space for LUKS header
Args:
disk_path (str): Path to disk image file
additional_size (int): Additional bytes to add to the image
Returns:
bool: True on success, False on failure
"""
add_mb = additional_size / (1024 * 1024)
tout.info(f'Resizing disk image to add {additional_size} bytes '
f'({add_mb:.1f} MB)')
try:
# Use truncate to extend the file
# This is safer than dd as it doesn't actually write the data
current_size = os.path.getsize(disk_path)
new_size = current_size + additional_size
tout.info(f'Extending from {current_size} to {new_size} bytes')
# Use truncate command which is available on most systems
result = subprocess.run(
['truncate', '--size', str(new_size), disk_path],
capture_output=True,
text=True,
timeout=30,
check=True
)
if result.returncode:
tout.error(f'Error resizing disk image: {result.stderr}')
return False
tout.info('Disk image resized successfully')
return True
except subprocess.TimeoutExpired:
tout.error('Disk resize operation timed out')
return False
except OSError:
return False
def backup_part_data(devpath, orig_mount, backup_dir):
"""Mount partition read-only and backup all data
Args:
devpath (str): Path to partition device
orig_mount (str): Mount point for original partition
backup_dir (str): Directory to backup data to
Returns:
bool: True on success, False on failure
"""
tout.info(f'Mounting original partition at {orig_mount}')
result = run_sudo(['mount', '-o', 'ro', devpath, orig_mount],
timeout=30)
if result.returncode:
tout.error(f'Error mounting original partition: {result.stderr}')
return False
tout.info(f'Backing up data to {backup_dir}')
result = run_sudo(['cp', '-a', f'{orig_mount}/.', backup_dir], timeout=300)
if result.returncode:
tout.error(f'Error backing up data: {result.stderr}')
run_sudo(['umount', orig_mount], timeout=30)
return False
tout.info('Data backup completed successfully')
result = run_sudo(['du', '-sb', backup_dir], timeout=30)
if not result.returncode:
backup_size = int(result.stdout.split()[0])
size_mb = backup_size / (1024 * 1024)
tout.info(f'Backed up {backup_size} bytes ({size_mb:.1f} MB)')
run_sudo(['umount', orig_mount], timeout=30)
return True
def format_luks_part(devpath, keyfile):
"""Format a partition with LUKS2
Args:
devpath (str): Path to partition device
keyfile (str): Path to key file
Returns:
bool: True on success, False on failure
"""
tout.info(f'Creating LUKS partition on {devpath}')
cmd = [
'cryptsetup', 'luksFormat',
'--type', 'luks2',
'--cipher', 'aes-xts-plain64',
'--key-size', '512',
'--hash', 'sha256',
'--use-random',
'--key-file', keyfile,
'--batch-mode',
devpath
]
result = run_sudo(cmd, timeout=120)
if result.returncode:
tout.error(f'Error formatting partition with LUKS: {result.stderr}')
return False
return True
def restore_to_luks(dev_path, keyfile, backup_dir, enc_mount):
"""Open LUKS, create filesystem, and restore backed up data
Args:
dev_path (str): Path to LUKS partition device
keyfile (str): Path to key file
backup_dir (str): Directory containing backed up data
enc_mount (str): Mount point for encrypted partition
Returns:
bool: True on success, False on failure
"""
mapper = f'tkey-temp-{os.getpid()}'
tout.info(f'Opening LUKS partition as {mapper}')
result = run_sudo(['cryptsetup', 'open', '--key-file', keyfile, dev_path,
mapper], timeout=30)
if result.returncode:
tout.error(f'Error opening LUKS partition: {result.stderr}')
return False
try:
mapper_dev = f'/dev/mapper/{mapper}'
tout.info(f'Creating ext4 filesystem on {mapper_dev}')
result = run_sudo(['mkfs.ext4', '-F', mapper_dev], timeout=60)
if result.returncode:
tout.error(f'Error creating filesystem: {result.stderr}')
return False
tout.info(f'Mounting encrypted partition at {enc_mount}')
result = run_sudo(['mount', mapper_dev, enc_mount], timeout=30)
if result.returncode:
tout.error(f'Error mounting encrypted partition: {result.stderr}')
return False
try:
tout.info('Copying backed up data to encrypted partition')
result = run_sudo(['cp', '-a', f'{backup_dir}/.', enc_mount],
timeout=300)
if result.returncode:
tout.error(f'Error copying data: {result.stderr}')
return False
run_sudo(['sync'], timeout=30)
tout.info('Data successfully copied to encrypted partition')
return True
finally:
run_sudo(['umount', enc_mount], timeout=30)
finally:
run_sudo(['cryptsetup', 'close', mapper], timeout=30)
def encrypt_part_luks_copy(dev_path, key_material):
"""Encrypt a partition with LUKS and copy existing data
Args:
dev_path (str): Path to partition device (e.g., /dev/loop0p2)
key_material (bytes): Raw key material for encryption
Returns:
bool: True on success, False on failure
"""
tout.info(f'Encrypting partition {dev_path} with data preservation')
with tempfile.TemporaryDirectory() as temp_dir:
orig_mount = os.path.join(temp_dir, 'original')
enc_mount = os.path.join(temp_dir, 'encrypted')
backup_dir = os.path.join(temp_dir, 'backup')
os.makedirs(orig_mount)
os.makedirs(enc_mount)
os.makedirs(backup_dir)
with tempfile.NamedTemporaryFile(mode='wb', suffix='.key',
delete=False) as key_f:
keyfile = key_f.name
key_f.write(key_material)
try:
os.chmod(keyfile, 0o600)
if not backup_part_data(dev_path, orig_mount, backup_dir):
return False
if not format_luks_part(dev_path, keyfile):
return False
if not restore_to_luks(dev_path, keyfile, backup_dir, enc_mount):
return False
tout.info(f'LUKS partition created successfully on {dev_path}')
return True
finally:
try:
os.unlink(keyfile)
except OSError:
pass
def encrypt_part_luks(devpath, key_material):
"""Encrypt a partition with LUKS, destroying existing filesystem
Args:
devpath (str): Path to partition device (e.g., /dev/loop0p2)
key_material (bytes): Raw key material for encryption
Returns:
bool: True on success, False on failure
"""
tout.info(f'Formatting partition {devpath} with LUKS')
# Create temporary keyfile
with tempfile.NamedTemporaryFile(mode='wb', suffix='.key',
delete=False) as key_f:
keyfile = key_f.name
key_f.write(key_material)
try:
# Set restrictive permissions on keyfile
os.chmod(keyfile, 0o600)
# Use luksFormat to create fresh LUKS partition (destroys existing data)
cmd = [
'cryptsetup', 'luksFormat',
'--type', 'luks2',
'--cipher', 'aes-xts-plain64',
'--key-size', '512',
'--hash', 'sha256',
'--use-random',
'--key-file', keyfile,
'--batch-mode',
devpath
]
result = run_sudo(cmd, timeout=120)
if result.returncode:
tout.error(f'Error formatting partition with LUKS: {result.stderr}')
return False
tout.info(f'LUKS partition created successfully on {devpath}')
return True
finally:
# Clean up keyfile
try:
os.unlink(keyfile)
except OSError:
pass
def create_fs_in_luks(devpath, key_material, fstype='ext4'):
"""Open LUKS partition and create filesystem inside
Args:
devpath (str): Path to LUKS partition device
key_material (bytes): Key material to open LUKS partition
fstype (str): Type of filesystem to create (ext4, ext3, etc.)
Returns:
bool: True on success, False on failure
"""
# Create temporary keyfile
with tempfile.NamedTemporaryFile(mode='wb', suffix='.key',
delete=False) as key_f:
keyfile = key_f.name
key_f.write(key_material)
try:
# Set restrictive permissions on keyfile
os.chmod(keyfile, 0o600)
# Generate unique mapper name
name = f'tkey-fs-{os.getpid()}'
# Open LUKS partition
cmd = ['cryptsetup', 'open', '--key-file', keyfile, devpath, name]
result = run_sudo(cmd, timeout=30)
if result.returncode:
tout.error(f'Error opening LUKS partition: {result.stderr}')
return False
try:
# Create filesystem
mapper_device = f'/dev/mapper/{name}'
if fstype == 'ext4':
fs_cmd = ['mkfs.ext4', '-F', mapper_device]
elif fstype == 'ext3':
fs_cmd = ['mkfs.ext3', '-F', mapper_device]
else:
tout.error(f'Unsupported filesystem type: {fstype}')
return False
result = run_sudo(fs_cmd, timeout=60)
if result.returncode:
tout.error(f'Error creating {fstype} filesystem: '
f'{result.stderr}')
return False
tout.info(f'{fstype} filesystem created in LUKS partition')
return True # Success path for inner operation
finally:
# Close LUKS partition
close_cmd = ['cryptsetup', 'close', name]
run_sudo(close_cmd, timeout=10) # close mapper name
except (subprocess.SubprocessError, OSError) as e:
tout.error(f'Error during filesystem creation: {e}')
return False
finally:
# Clean up keyfile
if os.path.exists(keyfile):
try:
os.unlink(keyfile)
except OSError: # More specific exception for file operations
pass
def encrypt_disk_luks(disk_path, key_material):
"""Encrypt disk image with LUKS using cryptsetup reencrypt
Args:
disk_path (str): Path to disk image file
key_material (bytes): Raw key material for encryption
Returns:
bool: True on success, False on failure
"""
tout.info(f'Encrypting disk image {disk_path} with LUKS')
# Create temporary keyfile
with tempfile.NamedTemporaryFile(mode='wb', suffix='.key',
delete=False) as key_f:
keyfile = key_f.name
key_f.write(key_material)
try:
# Set restrictive permissions on keyfile
os.chmod(keyfile, 0o600)
# Assume fresh disk (no existing LUKS) for partition encryption
is_luks = False
if is_luks:
# Existing LUKS - use reencrypt
cmd = [
'cryptsetup', 'reencrypt',
'--encrypt',
'--type', 'luks2',
'--cipher', 'aes-xts-plain64',
'--key-size', '512',
'--hash', 'sha256',
'--use-random',
'--key-file', keyfile,
'--batch-mode',
disk_path
]
else:
# Fresh disk - use detached header to avoid corrupting filesystem
header_file = f'{disk_path}.luks'
cmd = [
'cryptsetup', 'reencrypt',
'--encrypt',
'--type', 'luks2',
'--cipher', 'aes-xts-plain64',
'--key-size', '512',
'--hash', 'sha256',
'--use-random',
'--key-file', keyfile,
'--header', header_file,
'--batch-mode',
disk_path
]
result = run_sudo(
cmd,
timeout=3600 # 1 hour timeout for large disks
)
if result.returncode:
tout.error(f'Error encrypting disk with LUKS: {result.stderr}')
return False
tout.info('Disk encryption completed successfully')
if result.stdout:
tout.detail(f'cryptsetup output: {result.stdout}')
return True
except subprocess.TimeoutExpired:
tout.error('Disk encryption operation timed out')
return False
except FileNotFoundError:
tout.error('cryptsetup command not found. Please install cryptsetup.')
return False
except OSError as e:
tout.error(f'Error during disk encryption: {e}')
return False
finally:
# Clean up keyfile
if os.path.exists(keyfile):
os.unlink(keyfile)
def check_mapper_status(mapper_name):
"""Check if a device mapper name is already in use
Args:
mapper_name (str): Device mapper name to check
Returns:
bool: True if mapper is active, False otherwise
"""
mapper_path = f'/dev/mapper/{mapper_name}'
if os.path.exists(mapper_path):
tout.info(f'Device mapper {mapper_name} is already active at '
f'{mapper_path}')
return True
# Also check via dmsetup
try:
result = run_sudo(
['dmsetup', 'info', mapper_name],
timeout=10
)
if not result.returncode:
tout.info(f'Device mapper {mapper_name} is active (dmsetup)')
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return False
def open_luks_disk(disk_path, mapper_name, key_material):
"""Open LUKS encrypted disk using cryptsetup
Args:
disk_path (str): Path to LUKS encrypted disk image
mapper_name (str): Device mapper name for the opened disk
key_material (bytes): Raw key material for decryption
Returns:
str or None: Path to opened device (/dev/mapper/name) on success, None
on failure
"""
tout.info(f'Opening LUKS disk {disk_path} as {mapper_name}')
# Check if already opened
if check_mapper_status(mapper_name):
mapper_path = f'/dev/mapper/{mapper_name}'
tout.notice(f'Disk is already opened at {mapper_path}')
return mapper_path
# Check if disk is LUKS encrypted or has detached header
header_file = f'{disk_path}.luks'
has_detached_header = os.path.exists(header_file)
if not has_detached_header:
try:
with open(disk_path, 'rb') as f:
header = f.read(16)
if not header.startswith(b'LUKS'):
tout.error(f'{disk_path} is not LUKS encrypted')
return None
except IOError as e:
tout.error(f'Error reading disk image: {e}')
return None
# Create temporary keyfile
with tempfile.NamedTemporaryFile(mode='wb', suffix='.key',
delete=False) as key_f:
keyfile = key_f.name
key_f.write(key_material)
try:
# Set restrictive permissions on keyfile
os.chmod(keyfile, 0o600)
# Use cryptsetup open (luksOpen) - requires root privileges
cmd = ['cryptsetup', 'open', '--type', 'luks', '--key-file', keyfile]
# Add detached header if it exists
if has_detached_header:
cmd.extend(['--header', header_file])
cmd.extend([disk_path, mapper_name])
result = run_sudo(cmd, timeout=60)
if result.returncode:
tout.error(f'Error opening LUKS disk: {result.stderr}')
return None
mapper_path = f'/dev/mapper/{mapper_name}'
# Verify the device was created
if not os.path.exists(mapper_path):
tout.error(f'Device {mapper_path} was not created')
return None
tout.info(f'LUKS disk opened successfully at {mapper_path}')
if result.stdout:
tout.detail(f'cryptsetup output: {result.stdout}')
return mapper_path
except subprocess.TimeoutExpired:
tout.error('Disk opening operation timed out')
except FileNotFoundError:
tout.error('cryptsetup command not found. Please install cryptsetup.')
except OSError as e:
tout.error(f'Error during disk opening: {e}')
finally:
# Clean up keyfile
if os.path.exists(keyfile):
os.unlink(keyfile)
return None
def output_key(key_material, args):
"""Output the derived key material
Args:
key_material (bytes): Raw key material to output
args (argparse.Namespace): Parsed command line arguments
Returns:
bool: True on success, False on failure
"""
if args.output:
if save_key_to_file(key_material, args.output, args.force):
tout.notice(f'Encryption key derived and saved to {args.output}')
if not args.binary:
tout.notice(f'Key (hex): {format_key_for_luks(key_material)}')
return True
return False
if args.binary:
# Output raw binary to stdout
sys.stdout.buffer.write(key_material)
else:
print(format_key_for_luks(key_material))
return True
def validate_args(args):
"""Validate command line arguments
Args:
args (argparse.Namespace): Parsed command line arguments
Returns:
bool: True if arguments are valid, False otherwise
"""
if args.encrypt_disk and args.binary and not args.output:
tout.error('--binary to stdout not compatible with --encrypt-disk')
tout.error('Use --output to save binary key when encrypting')
return False
if args.encrypt_disk and args.open_disk:
tout.error('Cannot encrypt and open disk in same operation')
return False
if args.open_disk and args.binary and not args.output:
tout.error('--binary to stdout not compatible with --open-disk')
tout.error('Use --output to save binary key when opening')
return False
return True
def do_encrypt_disk(args, key):
"""Handle disk encryption
Args:
args (Namespace): Command line arguments
key (bytes): Encryption key material
Returns:
bool: True on success, False on failure
"""
loop_device = None
try:
partitions = get_disk_parts(args.encrypt_disk)
sel_part = select_partition(args.encrypt_disk, partitions, args)
if sel_part is False:
return False
if sel_part is None:
target_device = args.encrypt_disk
disk_info = check_disk_image(args.encrypt_disk)
if not disk_info:
return False
if disk_info.already_encrypted:
tout.warning(f'{args.encrypt_disk} is already LUKS encrypted')
if input('Continue anyway? (y/N): ').lower() != 'y':
tout.notice('Cancelled')
return False
if hasattr(disk_info, 'broken_luks') and disk_info.broken_luks:
tout.error(f'{args.encrypt_disk} has broken LUKS metadata')
tout.notice('Previous encryption may have been interrupted.')
if input('Wipe broken LUKS header? (y/N): ').lower() != 'y':
tout.notice('Cancelled')
tout.notice('Manual fix: dd if=/dev/zero of=disk.img '
'bs=1M count=32 conv=notrunc')
return False
if not wipe_luks_header(args.encrypt_disk):
tout.error('Failed to wipe LUKS header')
return False
disk_info = check_disk_image(args.encrypt_disk)
if not disk_info:
return False
if disk_info.needs_resize and disk_info.luks_hdrsize:
if not resize_disk(args.encrypt_disk, disk_info.luks_hdrsize):
return False
else:
tout.notice(f'Setting up loop device for partition {sel_part}...')
loop_device = setup_loop_dev(args.encrypt_disk)
if not loop_device:
return False
target_device = get_part_dev(loop_device, sel_part)
time.sleep(1)
if not os.path.exists(target_device):
tout.error(f'Partition {target_device} not found')
tout.info(f'Available devices: {glob.glob(f"{loop_device}*")}')
return False
tout.info(f'Will encrypt partition {sel_part} at {target_device}')
if sel_part:
if not encrypt_part_luks_copy(target_device, key):
return False
tout.notice(f'Partition {sel_part} of {args.encrypt_disk} '
'encrypted with LUKS')
else:
if not encrypt_disk_luks(target_device, key):
return False
tout.notice(f'{args.encrypt_disk} encrypted with LUKS')
return True
finally:
if loop_device:
cleanup_loop_dev(loop_device)
def do_open_disk(args, key):
"""Handle disk opening
Args:
args (Namespace): Command line arguments
key (bytes): Encryption key material
Returns:
bool: True on success, False on failure
"""
loop_device = None
try:
partitions = get_disk_parts(args.open_disk)
if partitions:
tout.notice('Checking partitions for LUKS...')
loop_device = setup_loop_dev(args.open_disk)
if not loop_device:
return False
enc_part = None
for part in partitions:
part_dev = get_part_dev(loop_device, part['number'])
time.sleep(1)
if os.path.exists(part_dev):
try:
with tempfile.NamedTemporaryFile() as temp_f:
cmd = ['dd', f'if={part_dev}', f'of={temp_f.name}',
'bs=16', 'count=1']
result = run_sudo(cmd, timeout=10)
if not result.returncode:
temp_f.seek(0)
if temp_f.read(16).startswith(b'LUKS'):
enc_part = part
target_device = part_dev
tout.info(f'Found LUKS partition '
f'{part["number"]}')
break
except (OSError, IOError) as e:
tout.info(f'Error checking partition '
f'{part["number"]}: {e}')
if not enc_part:
tout.error('No LUKS partitions found')
return False
mapper_path = open_luks_disk(target_device, args.mapper_name, key)
if not mapper_path:
return False
tout.notice(f'LUKS partition {enc_part["number"]} opened at '
f'{mapper_path}')
else:
mapper_path = open_luks_disk(args.open_disk, args.mapper_name, key)
if not mapper_path:
return False
tout.notice(f'LUKS disk opened at {mapper_path}')
tout.notice(f'Mount with: sudo mount {mapper_path} /mnt')
tout.notice(f'Close with: sudo cryptsetup close {args.mapper_name}')
return True
finally:
if loop_device:
tout.info(f'Loop device {loop_device} left active for LUKS access')
def main():
"""Main function
Returns:
int: Exit code (0 for success, 1 for failure)
"""
args = None
try:
args = parse_args()
if args.debug:
tout.init(tout.DEBUG)
elif args.verbose:
tout.init(tout.INFO)
else:
tout.init(tout.NOTICE)
if not validate_args(args):
return 1
password = get_password(args)
if not password:
return 1
if not detect_tkey(args.device):
tout.error('Failed to detect TKey')
return 1
key = derive_fde_key(password, args.device)
if not key:
tout.error('Failed to derive encryption key')
return 1
password = None
if args.encrypt_disk:
if not do_encrypt_disk(args, key):
return 1
tout.notice('Disk encryption completed successfully')
elif args.open_disk:
if not do_open_disk(args, key):
return 1
tout.notice('Disk opening completed successfully')
else:
if not output_key(key, args):
return 1
tout.notice('Key derivation completed successfully')
if args.output:
if not output_key(key, args):
return 1
return 0
except KeyboardInterrupt:
tout.notice('\nCancelled')
return 1
except subprocess.TimeoutExpired:
tout.error('Operation timed out')
return 1
except FileNotFoundError as e:
if 'tkey-sign' in str(e):
tout.error('tkey-sign not found')
if os.environ.get('SUDO_USER') and not os.geteuid():
tout.notice('Note: tkey-sign may not be in root PATH.')
tout.notice('Try: sudo env PATH=$PATH ./scripts/tkey-fde-key.py')
elif 'cryptsetup' in str(e):
tout.error('cryptsetup not found. Install cryptsetup.')
elif 'truncate' in str(e):
tout.error('truncate not found')
elif 'dmsetup' in str(e):
tout.error('dmsetup not found')
else:
tout.error(f'File not found - {e}')
return 1
except PermissionError as e:
tout.error(f'Permission denied - {e}')
tout.notice('Note: Disk operations may require root privileges')
return 1
except OSError as e:
tout.error(f'OS/IO operation failed - {e}')
return 1
except Exception as e: # pylint: disable=broad-exception-caught
if args and args.debug:
import traceback # pylint: disable=import-outside-toplevel
tout.error('Full traceback:')
traceback.print_exc()
else:
tout.error(f'Unexpected error: {e}')
return 1
if __name__ == '__main__':
sys.exit(main())