Files
installer/iridium_installer/backend/os_install.py

422 lines
15 KiB
Python

import logging
import os
import subprocess
import sys
from contextlib import contextmanager
logger = logging.getLogger(__name__)
# Import network logging for critical operations
from .network_logging import log_to_discord
def log_os_install(operation, status="start", details=""):
log_to_discord(
"INFO", f"OSINSTALL_{operation}_{status}: {details}", module="os_install"
)
class CommandResult:
def __init__(self, stdout, stderr, returncode):
self.stdout = stdout
self.stderr = stderr
self.returncode = returncode
def run_command(cmd, check=True):
logger.info(f"Running command: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
)
stdout_lines = []
stderr_lines = []
# Helper to read stream
def read_stream(stream, line_list, log_level):
for line in stream:
line_clean = line.strip()
if line_clean:
log_level(line_clean)
line_list.append(line)
import threading
t1 = threading.Thread(
target=read_stream, args=(process.stdout, stdout_lines, logger.info)
)
t2 = threading.Thread(
target=read_stream, args=(process.stderr, stderr_lines, logger.error)
)
t1.start()
t2.start()
t1.join()
t2.join()
returncode = process.wait()
stdout_str = "".join(stdout_lines)
stderr_str = "".join(stderr_lines)
if check and returncode != 0:
raise subprocess.CalledProcessError(
returncode, cmd, output=stdout_str, stderr=stderr_str
)
return CommandResult(stdout_str, stderr_str, returncode)
@contextmanager
def mount_pseudo_fs(mount_root):
"""
Context manager to bind mount /dev, /proc, /sys, /run, and efivarfs into mount_root.
"""
logger.info(f"Mounting pseudo-filesystems to {mount_root}...")
# dev/pts and dev/shm are often needed by scriptlets
mounts = ["dev", "proc", "sys", "run", "dev/pts", "dev/shm"]
mounted_paths = []
try:
for fs in mounts:
target = os.path.join(mount_root, fs)
os.makedirs(target, exist_ok=True)
run_command(["mount", "--bind", f"/{fs}", target])
mounted_paths.append(target)
# Bind mount RPM GPG keys from host
gpg_keys_host = "/etc/pki/rpm-gpg"
if os.path.exists(gpg_keys_host):
gpg_keys_target = os.path.join(mount_root, "etc/pki/rpm-gpg")
os.makedirs(gpg_keys_target, exist_ok=True)
run_command(["mount", "--bind", gpg_keys_host, gpg_keys_target])
mounted_paths.append(gpg_keys_target)
# Mount efivarfs if it exists on the host
efivars_path = "/sys/firmware/efi/efivars"
if os.path.exists(efivars_path):
target = os.path.join(mount_root, "sys/firmware/efi/efivars")
os.makedirs(target, exist_ok=True)
try:
run_command(["mount", "-t", "efivarfs", "efivarfs", target])
mounted_paths.append(target)
except Exception as e:
logger.warning(f"Failed to mount efivarfs: {e}")
yield
finally:
logger.info(f"Unmounting pseudo-filesystems from {mount_root}...")
for path in reversed(mounted_paths):
try:
run_command(["umount", "-l", path])
except Exception as e:
logger.warning(f"Failed to unmount {path}: {e}")
def has_rpms(path):
"""Checks if a directory contains any .rpm files."""
if not os.path.isdir(path):
return False
try:
for f in os.listdir(path):
if f.endswith(".rpm"):
return True
except Exception:
pass
return False
def find_iso_repo():
"""
Attempts to find a local repository on the ISO or other mounted media.
"""
possible_paths = [
"/run/install/repo",
"/run/install/source",
"/mnt/install/repo",
"/run/initramfs/live",
"/run/initramfs/isoscan",
]
def check_path(p):
if os.path.exists(os.path.join(p, "repodata")) or os.path.exists(os.path.join(p, "media.repo")):
return p
for sub in ["os", "Packages", "BaseOS", "AppStream"]:
sub_p = os.path.join(p, sub)
if os.path.exists(os.path.join(sub_p, "repodata")) or os.path.exists(os.path.join(sub_p, "media.repo")):
return sub_p
return None
# 1. Check known paths
for path in possible_paths:
res = check_path(path)
if res: return res
# 2. Check all mounted filesystems
try:
res = subprocess.run(["findmnt", "-nlo", "TARGET"], capture_output=True, text=True)
if res.returncode == 0:
for mount in res.stdout.splitlines():
if mount.startswith("/proc") or mount.startswith("/sys") or mount.startswith("/dev"):
continue
res = check_path(mount)
if res: return res
except Exception as e:
logger.debug(f"Error searching mount points: {e}")
# 3. Try to mount /dev/sr0 or /dev/cdrom
for dev in ["/dev/sr0", "/dev/cdrom"]:
try:
if os.path.exists(dev):
res = subprocess.run(["findmnt", "-n", dev], capture_output=True)
if res.returncode != 0:
try:
os.makedirs("/mnt/iso", exist_ok=True)
subprocess.run(["mount", "-o", "ro", dev, "/mnt/iso"], check=True)
res = check_path("/mnt/iso")
if res: return res
except Exception: pass
except Exception: pass
# 4. Check /run/media deeper
if os.path.exists("/run/media"):
try:
for root, dirs, files in os.walk("/run/media"):
if "repodata" in dirs or "media.repo" in files:
return root
# Limit depth for performance
if root.count(os.sep) > 5:
del dirs[:]
except Exception: pass
# 5. Last resort: any directory with .rpm files
for path in possible_paths:
if has_rpms(path): return path
for sub in ["os", "Packages", "BaseOS", "AppStream"]:
if has_rpms(os.path.join(path, sub)): return os.path.join(path, sub)
return None
def install_minimal_os(mount_root, releasever=None):
"""
Installs minimal Fedora packages to mount_root.
"""
if not releasever:
# Try to detect from host
try:
with open("/etc/os-release", "r") as f:
for line in f:
if line.startswith("VERSION_ID="):
releasever = line.split("=")[1].strip().strip('"')
break
except Exception:
pass
if not releasever:
releasever = "43" # Fallback
logger.info(f"Installing minimal Fedora {releasever} to {mount_root}...")
log_os_install("INSTALL", "start", f"Target: {mount_root}, Release: {releasever}")
packages = [
"basesystem",
"bash",
"coreutils",
"kernel",
"systemd",
"systemd-boot",
"dnf",
"shadow-utils",
"util-linux",
"efibootmgr",
"passwd",
"rootfiles",
"vim-minimal",
]
# Offline installation logic
iso_repo = find_iso_repo()
# Create a temporary dnf.conf to be strictly local if repo found
dnf_conf_path = "/tmp/iridium_dnf.conf"
with open(dnf_conf_path, "w") as f:
f.write("[main]\ngpgcheck=0\ninstallroot_managed_by_dnf=True\n")
dnf_args = [
f"--config={dnf_conf_path}",
"--setopt=cachedir=/tmp/dnf-cache",
"--setopt=install_weak_deps=False",
"--nodocs",
]
if iso_repo:
logger.info(f"Found ISO repository at {iso_repo}. Using strictly offline mode.")
dnf_args += [
"--disablerepo=*",
f"--repofrompath=iridium-iso,{iso_repo}",
"--enablerepo=iridium-iso",
"--nogpgcheck",
"--offline", # Supported by dnf5
"--setopt=iridium-iso.gpgcheck=0",
"--setopt=metadata_expire=-1",
]
else:
# Check if we are on a Live ISO but no repo found
if os.path.exists("/run/initramfs/live/LiveOS/squashfs.img"):
logger.warning("Detected Fedora Live environment, but no RPM repository was found on the media.")
logger.warning("Workstation Live ISOs usually do not contain a DNF repository for offline installation.")
logger.warning("ISO repository not found. DNF will attempt to use network.")
dnf_args.append("--use-host-config")
cmd = [
"dnf",
"install",
"-y",
f"--installroot={mount_root}",
f"--releasever={releasever}",
]
cmd += dnf_args + packages
# Copy resolv.conf if it exists (some scriptlets might need it for UID/GID lookups)
try:
os.makedirs(os.path.join(mount_root, "etc"), exist_ok=True)
if os.path.exists("/etc/resolv.conf"):
subprocess.run(["cp", "/etc/resolv.conf", os.path.join(mount_root, "etc/resolv.conf")])
except Exception: pass
with mount_pseudo_fs(mount_root):
run_command(cmd)
logger.info("Base system installation complete.")
log_os_install("INSTALL", "complete", f"Installed to {mount_root}")
def configure_system(mount_root, partition_info, user_info=None):
"""
Basic configuration: fstab, systemd-boot, and user creation.
"""
logger.info("Configuring system...")
log_os_install("CONFIGURE", "start", f"Configuring system in {mount_root}")
# 1. Generate fstab
def get_uuid(dev):
res = run_command(["blkid", "-s", "UUID", "-o", "value", dev])
return res.stdout.strip()
root_uuid = get_uuid(partition_info["root"])
efi_uuid = get_uuid(partition_info["efi"])
swap_entry = ""
if partition_info.get("swap"):
swap_uuid = get_uuid(partition_info["swap"])
swap_entry = f"UUID={swap_uuid} none swap defaults 0 0\n"
fstab_content = f"""
UUID={root_uuid} / ext4 defaults 1 1
UUID={efi_uuid} /boot vfat defaults 0 2
{swap_entry}
"""
os.makedirs(os.path.join(mount_root, "etc"), exist_ok=True)
with open(os.path.join(mount_root, "etc/fstab"), "w") as f:
f.write(fstab_content)
with mount_pseudo_fs(mount_root):
# 2. Configure User
if user_info:
logger.info(f"Creating user {user_info['username']}...")
# Create user and add to wheel group (sudoer)
run_command(["chroot", mount_root, "useradd", "-m", "-G", "wheel", user_info["username"]])
# Ensure changes to /etc/passwd and /etc/shadow are flushed
run_command(["sync"])
# Ensure wheel group has sudo privileges
sudoers_dir = os.path.join(mount_root, "etc/sudoers.d")
os.makedirs(sudoers_dir, exist_ok=True)
with open(os.path.join(sudoers_dir, "wheel"), "w") as f:
f.write("%wheel ALL=(ALL) ALL\n")
os.chmod(os.path.join(sudoers_dir, "wheel"), 0o440)
# Set user and root password using hashed passwords and usermod
try:
logger.info(f"Setting hashed passwords for {user_info['username']} and root...")
# Generate SHA512 hash using openssl on the host
res = subprocess.run(
["openssl", "passwd", "-6", user_info["password"]],
capture_output=True,
text=True,
check=True
)
hashed_pass = res.stdout.strip()
# Apply hash to user and root using usermod -p (takes encrypted password)
run_command(["chroot", mount_root, "usermod", "-p", hashed_pass, user_info["username"]])
run_command(["chroot", mount_root, "usermod", "-p", hashed_pass, "root"])
run_command(["sync"])
except Exception as e:
logger.error(f"Failed to set passwords: {e}")
raise e
# Set hostname
with open(os.path.join(mount_root, "etc/hostname"), "w") as f:
f.write(user_info["hostname"] + "\n")
# 3. Configure systemd-boot
# Ensure machine-id exists for kernel-install
if not os.path.exists(os.path.join(mount_root, "etc/machine-id")):
run_command(["chroot", mount_root, "systemd-machine-id-setup"])
# Set kernel command line
os.makedirs(os.path.join(mount_root, "etc/kernel"), exist_ok=True)
with open(os.path.join(mount_root, "etc/kernel/cmdline"), "w") as f:
f.write(f"root=UUID={root_uuid} rw quiet\n")
# Set kernel layout to BLS for systemd-boot
with open(os.path.join(mount_root, "etc/kernel/layout"), "w") as f:
f.write("bls\n")
# Install systemd-boot to the ESP (mounted at /boot)
# Note: removed --force as it's not supported by all bootctl versions
run_command(["chroot", mount_root, "bootctl", "install", "--path=/boot"])
# Add kernel entries
modules_dir = os.path.join(mount_root, "lib/modules")
if os.path.exists(modules_dir):
kvers = [d for d in os.listdir(modules_dir) if os.path.isdir(os.path.join(modules_dir, d))]
for kver in kvers:
logger.info(f"Adding kernel entry for {kver}...")
# Ensure initramfs exists. If not, generate it.
initrd_path_rel = f"boot/initramfs-{kver}.img"
initrd_full_path = os.path.join(mount_root, initrd_path_rel)
if not os.path.exists(initrd_full_path):
logger.info(f"Generating initramfs for {kver}...")
run_command(["chroot", mount_root, "dracut", "--force", f"/boot/initramfs-{kver}.img", kver])
kernel_image = f"/lib/modules/{kver}/vmlinuz"
initrd_arg = f"/boot/initramfs-{kver}.img"
# kernel-install add <version> <image> [initrd]
cmd = ["chroot", mount_root, "kernel-install", "add", kver, kernel_image]
if os.path.exists(os.path.join(mount_root, initrd_arg.lstrip("/"))):
cmd.append(initrd_arg)
run_command(cmd)
# Ensure all data is synced to disk
run_command(["sync"])
logger.info("System configuration complete.")
log_os_install("CONFIGURE", "complete", "systemd-boot and user configured successfully")