import subprocess import logging import os logger = logging.getLogger(__name__) class CommandResult: def __init__(self, stdout, stderr, returncode): self.stdout = stdout self.stderr = stderr self.returncode = returncode def run_command(cmd, check=True): logger.info(f"Running command: {' '.join(cmd)}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) stdout_lines = [] stderr_lines = [] def read_stream(stream, line_list, log_level): for line in stream: line_clean = line.strip() if line_clean: log_level(line_clean) line_list.append(line) import threading t1 = threading.Thread(target=read_stream, args=(process.stdout, stdout_lines, logger.info)) t2 = threading.Thread(target=read_stream, args=(process.stderr, stderr_lines, logger.error)) t1.start() t2.start() t1.join() t2.join() returncode = process.wait() stdout_str = "".join(stdout_lines) stderr_str = "".join(stderr_lines) if check and returncode != 0: raise subprocess.CalledProcessError(returncode, cmd, output=stdout_str, stderr=stderr_str) return CommandResult(stdout_str, stderr_str, returncode) def get_partition_device(disk_device, partition_number): """ Returns the partition device path. Handles NVMe style (p1) vs sd style (1). """ if disk_device[-1].isdigit(): return f"{disk_device}p{partition_number}" return f"{disk_device}{partition_number}" def get_total_memory(): """Returns total system memory in bytes.""" try: return os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") except (ValueError, OSError): with open("/proc/meminfo", "r") as f: for line in f: if line.startswith("MemTotal:"): parts = line.split() return int(parts[1]) * 1024 return 0 def get_disk_size(disk_device): """Returns disk size in bytes using blockdev.""" try: res = run_command(["blockdev", "--getsize64", disk_device]) return int(res.stdout.strip()) except Exception as e: logger.error(f"Failed to get disk size: {e}") return 0 def auto_partition_disk(disk_device): """ Automatically partitions the disk: 1. EFI System Partition (2GB standard, 1GB small) 2. Root (Remaining) 3. Swap (RAM + 2GB, or 0 if small) Layout: P1=EFI, P3=Swap (End), P2=Root (Middle) """ logger.info(f"Starting auto-partitioning on {disk_device}") # Calculate sizes disk_size = get_disk_size(disk_device) ram_size = get_total_memory() # Defaults efi_mb = 2048 swap_mb = int((ram_size / (1024*1024)) + 2048) min_root_mb = 10240 # 10GB total_required_mb = efi_mb + swap_mb + min_root_mb disk_mb = disk_size / (1024*1024) use_swap = True if disk_mb < total_required_mb: logger.warning("Disk too small for standard layout. Adjusting...") efi_mb = 1024 use_swap = False # Check minimal viability if disk_mb < (efi_mb + min_root_mb): raise Exception("Disk too small for installation (Need ~11GB)") # 1. Zap the disk (destroy all data) run_command(["sgdisk", "-Z", disk_device]) # 2. Create new GPT table run_command(["sgdisk", "-o", disk_device]) # 3. Create EFI Partition (Part 1, Start) run_command(["sgdisk", "-n", f"1:0:+{efi_mb}M", "-t", "1:ef00", "-c", "1:EFI System", disk_device]) # 4. Create Swap Partition (Part 3, End) - If enabled if use_swap: # sgdisk negative start is from end of disk # We use partition 3 for Swap run_command(["sgdisk", "-n", f"3:-{swap_mb}M:0", "-t", "3:8200", "-c", "3:Swap", disk_device]) # 5. Create Root Partition (Part 2, Fill Gap) # This fills the space between P1 and P3 (or end if no swap) run_command(["sgdisk", "-n", "2:0:0", "-t", "2:8300", "-c", "2:Root", disk_device]) # Inform kernel of changes run_command(["partprobe", disk_device]) import time time.sleep(1) # 6. Format Partitions efi_part = get_partition_device(disk_device, 1) root_part = get_partition_device(disk_device, 2) swap_part = get_partition_device(disk_device, 3) if use_swap else None logger.info("Formatting EFI partition...") run_command(["mkfs.vfat", "-F32", efi_part]) if use_swap: logger.info("Formatting Swap partition...") run_command(["mkswap", swap_part]) logger.info("Formatting Root partition...") run_command(["mkfs.ext4", "-F", root_part]) logger.info("Partitioning and formatting complete.") result = { "efi": efi_part, "root": root_part } if use_swap: result["swap"] = swap_part else: # If no swap, we should probably return None or handle it in fstab generation # backend/os_install.py expects "swap" key for UUID. # We should probably pass a dummy or None, and update os_install to handle it. result["swap"] = None return result def mount_partitions(partition_info, mount_root="/mnt"): """ Mounts the partitions into mount_root. """ import os # 1. Mount Root if not os.path.exists(mount_root): os.makedirs(mount_root) run_command(["mount", partition_info["root"], mount_root]) # 2. Mount EFI efi_mount = os.path.join(mount_root, "boot/efi") if not os.path.exists(efi_mount): os.makedirs(efi_mount, exist_ok=True) run_command(["mount", partition_info["efi"], efi_mount]) # 3. Enable Swap if partition_info.get("swap"): run_command(["swapon", partition_info["swap"]]) logger.info(f"Partitions mounted at {mount_root}") def create_partition(disk_device, size_mb, type_code="8300", name="Linux filesystem", fstype=None): """ Creates a new partition on the disk and formats it. type_code: ef00 (EFI), 8200 (Swap), 8300 (Linux) fstype: ext4, fat32, swap """ # Find next available partition number res = run_command(["sgdisk", "-p", disk_device]) # Very basic parsing to find the next number existing_nums = [] for line in res.stdout.splitlines(): parts = line.split() if parts and parts[0].isdigit(): existing_nums.append(int(parts[0])) next_num = 1 while next_num in existing_nums: next_num += 1 # Use -g (mbrtogpt) to ensure we can work on the disk if it was MBR # size_mb=0 means use all available space in the gap size_spec = f"+{size_mb}M" if size_mb > 0 else "0" run_command([ "sgdisk", "-g", "-n", f"{next_num}:0:{size_spec}", "-t", f"{next_num}:{type_code}", "-c", f"{next_num}:{name}", disk_device ]) run_command(["partprobe", disk_device]) # Wait for partition node to appear try: run_command(["udevadm", "settle", "--timeout=5"]) except Exception: import time time.sleep(2) part_dev = get_partition_device(disk_device, next_num) if fstype == "fat32": run_command(["mkfs.vfat", "-F32", part_dev]) elif fstype == "ext4": run_command(["mkfs.ext4", "-F", part_dev]) elif fstype == "swap": run_command(["mkswap", part_dev]) return next_num def delete_partition(disk_device, part_num): """Deletes a partition by number.""" run_command(["sgdisk", "-d", str(part_num), disk_device]) run_command(["partprobe", disk_device]) def wipe_disk(disk_device): """Zaps the disk and creates a new GPT table.""" run_command(["sgdisk", "-Z", disk_device]) run_command(["sgdisk", "-o", disk_device]) run_command(["partprobe", disk_device])