Files
installer/iridium_installer/backend/disk.py

261 lines
7.8 KiB
Python

import subprocess
import logging
import os
logger = logging.getLogger(__name__)
class CommandResult:
def __init__(self, stdout, stderr, returncode):
self.stdout = stdout
self.stderr = stderr
self.returncode = returncode
def run_command(cmd, check=True):
logger.info(f"Running command: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
stdout_lines = []
stderr_lines = []
def read_stream(stream, line_list, log_level):
for line in stream:
line_clean = line.strip()
if line_clean:
log_level(line_clean)
line_list.append(line)
import threading
t1 = threading.Thread(target=read_stream, args=(process.stdout, stdout_lines, logger.info))
t2 = threading.Thread(target=read_stream, args=(process.stderr, stderr_lines, logger.error))
t1.start()
t2.start()
t1.join()
t2.join()
returncode = process.wait()
stdout_str = "".join(stdout_lines)
stderr_str = "".join(stderr_lines)
if check and returncode != 0:
raise subprocess.CalledProcessError(returncode, cmd, output=stdout_str, stderr=stderr_str)
return CommandResult(stdout_str, stderr_str, returncode)
def get_partition_device(disk_device, partition_number):
"""
Returns the partition device path.
Handles NVMe style (p1) vs sd style (1).
"""
if disk_device[-1].isdigit():
return f"{disk_device}p{partition_number}"
return f"{disk_device}{partition_number}"
def get_total_memory():
"""Returns total system memory in bytes."""
try:
return os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")
except (ValueError, OSError):
with open("/proc/meminfo", "r") as f:
for line in f:
if line.startswith("MemTotal:"):
parts = line.split()
return int(parts[1]) * 1024
return 0
def get_disk_size(disk_device):
"""Returns disk size in bytes using blockdev."""
try:
res = run_command(["blockdev", "--getsize64", disk_device])
return int(res.stdout.strip())
except Exception as e:
logger.error(f"Failed to get disk size: {e}")
return 0
def auto_partition_disk(disk_device):
"""
Automatically partitions the disk:
1. EFI System Partition (2GB standard, 1GB small)
2. Root (Remaining)
3. Swap (RAM + 2GB, or 0 if small)
Layout: P1=EFI, P3=Swap (End), P2=Root (Middle)
"""
logger.info(f"Starting auto-partitioning on {disk_device}")
# Calculate sizes
disk_size = get_disk_size(disk_device)
ram_size = get_total_memory()
# Defaults
efi_mb = 2048
swap_mb = int((ram_size / (1024*1024)) + 2048)
min_root_mb = 10240 # 10GB
total_required_mb = efi_mb + swap_mb + min_root_mb
disk_mb = disk_size / (1024*1024)
use_swap = True
if disk_mb < total_required_mb:
logger.warning("Disk too small for standard layout. Adjusting...")
efi_mb = 1024
use_swap = False
# Check minimal viability
if disk_mb < (efi_mb + min_root_mb):
raise Exception("Disk too small for installation (Need ~11GB)")
# 1. Zap the disk (destroy all data)
run_command(["sgdisk", "-Z", disk_device])
# 2. Create new GPT table
run_command(["sgdisk", "-o", disk_device])
# 3. Create EFI Partition (Part 1, Start)
run_command(["sgdisk", "-n", f"1:0:+{efi_mb}M", "-t", "1:ef00", "-c", "1:EFI System", disk_device])
# 4. Create Swap Partition (Part 3, End) - If enabled
if use_swap:
# sgdisk negative start is from end of disk
# We use partition 3 for Swap
run_command(["sgdisk", "-n", f"3:-{swap_mb}M:0", "-t", "3:8200", "-c", "3:Swap", disk_device])
# 5. Create Root Partition (Part 2, Fill Gap)
# This fills the space between P1 and P3 (or end if no swap)
run_command(["sgdisk", "-n", "2:0:0", "-t", "2:8300", "-c", "2:Root", disk_device])
# Inform kernel of changes
run_command(["partprobe", disk_device])
import time
time.sleep(1)
# 6. Format Partitions
efi_part = get_partition_device(disk_device, 1)
root_part = get_partition_device(disk_device, 2)
swap_part = get_partition_device(disk_device, 3) if use_swap else None
logger.info("Formatting EFI partition...")
run_command(["mkfs.vfat", "-F32", efi_part])
if use_swap:
logger.info("Formatting Swap partition...")
run_command(["mkswap", swap_part])
logger.info("Formatting Root partition...")
run_command(["mkfs.ext4", "-F", root_part])
logger.info("Partitioning and formatting complete.")
result = {
"efi": efi_part,
"root": root_part
}
if use_swap:
result["swap"] = swap_part
else:
# If no swap, we should probably return None or handle it in fstab generation
# backend/os_install.py expects "swap" key for UUID.
# We should probably pass a dummy or None, and update os_install to handle it.
result["swap"] = None
return result
def mount_partitions(partition_info, mount_root="/mnt"):
"""
Mounts the partitions into mount_root.
"""
import os
# 1. Mount Root
if not os.path.exists(mount_root):
os.makedirs(mount_root)
run_command(["mount", partition_info["root"], mount_root])
# 2. Mount EFI
efi_mount = os.path.join(mount_root, "boot/efi")
if not os.path.exists(efi_mount):
os.makedirs(efi_mount, exist_ok=True)
run_command(["mount", partition_info["efi"], efi_mount])
# 3. Enable Swap
if partition_info.get("swap"):
run_command(["swapon", partition_info["swap"]])
logger.info(f"Partitions mounted at {mount_root}")
def create_partition(disk_device, size_mb, type_code="8300", name="Linux filesystem", fstype=None):
"""
Creates a new partition on the disk and formats it.
type_code: ef00 (EFI), 8200 (Swap), 8300 (Linux)
fstype: ext4, fat32, swap
"""
# Find next available partition number
res = run_command(["sgdisk", "-p", disk_device])
# Very basic parsing to find the next number
existing_nums = []
for line in res.stdout.splitlines():
parts = line.split()
if parts and parts[0].isdigit():
existing_nums.append(int(parts[0]))
next_num = 1
while next_num in existing_nums:
next_num += 1
# Use -g (mbrtogpt) to ensure we can work on the disk if it was MBR
# size_mb=0 means use all available space in the gap
size_spec = f"+{size_mb}M" if size_mb > 0 else "0"
run_command([
"sgdisk",
"-g",
"-n", f"{next_num}:0:{size_spec}",
"-t", f"{next_num}:{type_code}",
"-c", f"{next_num}:{name}",
disk_device
])
run_command(["partprobe", disk_device])
# Wait for partition node to appear
try:
run_command(["udevadm", "settle", "--timeout=5"])
except Exception:
import time
time.sleep(2)
part_dev = get_partition_device(disk_device, next_num)
if fstype == "fat32":
run_command(["mkfs.vfat", "-F32", part_dev])
elif fstype == "ext4":
run_command(["mkfs.ext4", "-F", part_dev])
elif fstype == "swap":
run_command(["mkswap", part_dev])
return next_num
def delete_partition(disk_device, part_num):
"""Deletes a partition by number."""
run_command(["sgdisk", "-d", str(part_num), disk_device])
run_command(["partprobe", disk_device])
def wipe_disk(disk_device):
"""Zaps the disk and creates a new GPT table."""
run_command(["sgdisk", "-Z", disk_device])
run_command(["sgdisk", "-o", disk_device])
run_command(["partprobe", disk_device])