import subprocess import logging import os logger = logging.getLogger(__name__) # Import network logging for critical disk operations from .network_logging import log_to_discord def log_disk_operation(operation, status="start", details=""): log_to_discord("INFO", f"DISK_{operation}_{status}: {details}", module="disk") class CommandResult: def __init__(self, stdout, stderr, returncode): self.stdout = stdout self.stderr = stderr self.returncode = returncode def run_command(cmd, check=True): logger.info(f"Running command: {' '.join(cmd)}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) stdout_lines = [] stderr_lines = [] def read_stream(stream, line_list, log_level): for line in stream: line_clean = line.strip() if line_clean: log_level(line_clean) line_list.append(line) import threading t1 = threading.Thread( target=read_stream, args=(process.stdout, stdout_lines, logger.info) ) t2 = threading.Thread( target=read_stream, args=(process.stderr, stderr_lines, logger.error) ) t1.start() t2.start() t1.join() t2.join() returncode = process.wait() stdout_str = "".join(stdout_lines) stderr_str = "".join(stderr_lines) if check and returncode != 0: raise subprocess.CalledProcessError( returncode, cmd, output=stdout_str, stderr=stderr_str ) return CommandResult(stdout_str, stderr_str, returncode) def get_partition_device(disk_device, partition_number): """ Returns the partition device path. Handles NVMe style (p1) vs sd style (1). """ if disk_device[-1].isdigit(): return f"{disk_device}p{partition_number}" return f"{disk_device}{partition_number}" def get_total_memory(): """Returns total system memory in bytes.""" try: return os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") except (ValueError, OSError): with open("/proc/meminfo", "r") as f: for line in f: if line.startswith("MemTotal:"): parts = line.split() return int(parts[1]) * 1024 return 0 def get_disk_size(disk_device): """Returns disk size in bytes using blockdev.""" try: res = run_command(["blockdev", "--getsize64", disk_device]) return int(res.stdout.strip()) except Exception as e: logger.error(f"Failed to get disk size: {e}") return 0 def ensure_unmounted(disk_device): """ Finds all mounted partitions belonging to disk_device and unmounts them. """ logger.info(f"Ensuring all partitions on {disk_device} are unmounted...") try: # Get all partitions for this disk from lsblk res = run_command(["lsblk", "-nlo", "NAME,MOUNTPOINT", disk_device]) for line in res.stdout.splitlines(): parts = line.split() if len(parts) >= 2: # NAME is usually the leaf name (e.g. vda1), we need full path # But lsblk -lp gives full paths pass # Simpler approach: findmnt or just look at /proc/mounts # Let's use lsblk -lp to get full paths res = run_command(["lsblk", "-lpno", "NAME,MOUNTPOINT", disk_device]) for line in res.stdout.splitlines(): # line looks like "/dev/vda1 /mnt/boot" or "/dev/vda1" parts = line.split() if len(parts) >= 2: dev_path = parts[0] mount_point = parts[1] if mount_point and mount_point != "None": logger.info(f"Unmounting {dev_path} from {mount_point}...") run_command(["umount", "-l", dev_path]) except Exception as e: logger.warning(f"Error during pre-partition unmount: {e}") def is_uefi(): """Checks if the system is booted in UEFI mode.""" return os.path.exists("/sys/firmware/efi") def auto_partition_disk(disk_device): """ Automatically partitions the disk based on boot mode: UEFI (GPT): ESP (1GB), Root (Remaining), Swap (RAM+2GB) BIOS (MBR): Root (Remaining, Bootable), Swap (RAM+2GB) """ uefi = is_uefi() logger.info(f"Starting auto-partitioning on {disk_device} (UEFI: {uefi})") log_disk_operation("PARTITION", "start", f"Disk: {disk_device}, UEFI: {uefi}") ensure_unmounted(disk_device) disk_size = get_disk_size(disk_device) ram_size = get_total_memory() # Calculate sizes swap_mb = int((ram_size / (1024 * 1024)) + 2048) # Cap swap at 16GB for sanity on large RAM systems unless disk is huge if disk_size < 100 * 1024**3: # < 100GB swap_mb = min(swap_mb, 4096) disk_mb = disk_size / (1024 * 1024) use_swap = disk_mb > (10240 + swap_mb) # Only use swap if we have at least 10GB for root if uefi: # GPT Layout logger.info("Using GPT layout for UEFI") run_command(["sgdisk", "-Z", disk_device]) run_command(["sgdisk", "-o", disk_device]) # 1. ESP (1GB) run_command(["sgdisk", "-n", "1:0:+1024M", "-t", "1:ef00", "-c", "1:EFI System", disk_device]) # 2. Swap (if enabled) if use_swap: run_command(["sgdisk", "-n", f"3:-{swap_mb}M:0", "-t", "3:8200", "-c", "3:Swap", disk_device]) # 3. Root run_command(["sgdisk", "-n", "2:0:0", "-t", "2:8300", "-c", "2:Root", disk_device]) else: # MBR Layout for BIOS logger.info("Using MBR layout for BIOS") # Wipe disk using dd to be sure MBR is cleared run_command(["dd", "if=/dev/zero", f"of={disk_device}", "bs=512", "count=100"]) # Use parted for MBR run_command(["parted", "-s", disk_device, "mklabel", "msdos"]) if use_swap: # Root first, then swap at end root_end = int(disk_mb - swap_mb) run_command(["parted", "-s", disk_device, "mkpart", "primary", "ext4", "1MiB", f"{root_end}MiB"]) run_command(["parted", "-s", disk_device, "mkpart", "primary", "linux-swap", f"{root_end}MiB", "100%"]) run_command(["parted", "-s", disk_device, "set", "1", "boot", "on"]) else: run_command(["parted", "-s", disk_device, "mkpart", "primary", "ext4", "1MiB", "100%"]) run_command(["parted", "-s", disk_device, "set", "1", "boot", "on"]) run_command(["partprobe", disk_device]) import time time.sleep(2) # Identify partitions if uefi: efi_part = get_partition_device(disk_device, 1) root_part = get_partition_device(disk_device, 2) swap_part = get_partition_device(disk_device, 3) if use_swap else None else: efi_part = None # BIOS doesn't use ESP root_part = get_partition_device(disk_device, 1) swap_part = get_partition_device(disk_device, 2) if use_swap else None # Format if uefi: logger.info(f"Formatting EFI partition {efi_part}...") run_command(["mkfs.vfat", "-F32", efi_part]) if use_swap: logger.info(f"Formatting Swap partition {swap_part}...") run_command(["mkswap", swap_part]) logger.info(f"Formatting Root partition {root_part}...") run_command(["mkfs.ext4", "-F", root_part]) result = {"efi": efi_part, "root": root_part, "swap": swap_part} log_disk_operation("PARTITION", "complete", str(result)) return result def mount_partitions(partition_info, mount_root="/mnt"): """ Mounts the partitions into mount_root. """ import os log_disk_operation("MOUNT", "start", f"Mounting to {mount_root}") # 1. Mount Root if not os.path.exists(mount_root): os.makedirs(mount_root) run_command(["mount", partition_info["root"], mount_root]) # 2. Mount EFI (if exists) if partition_info.get("efi"): efi_mount = os.path.join(mount_root, "boot/efi") os.makedirs(efi_mount, exist_ok=True) run_command(["mount", partition_info["efi"], efi_mount]) else: # For BIOS, /boot is just a directory on root or we could make a separate /boot # Current logic keeps it on root. pass # 3. Enable Swap if partition_info.get("swap"): run_command(["swapon", partition_info["swap"]]) logger.info(f"Partitions mounted at {mount_root}") log_disk_operation("MOUNT", "complete", f"Root: {partition_info['root']}") def create_partition( disk_device, size_mb, type_code="8300", name="Linux filesystem", fstype=None ): """ Creates a new partition on the disk and formats it. type_code: ef00 (EFI), 8200 (Swap), 8300 (Linux) fstype: ext4, fat32, swap """ # Find next available partition number res = run_command(["sgdisk", "-p", disk_device]) # Very basic parsing to find the next number existing_nums = [] for line in res.stdout.splitlines(): parts = line.split() if parts and parts[0].isdigit(): existing_nums.append(int(parts[0])) next_num = 1 while next_num in existing_nums: next_num += 1 # Use -g (mbrtogpt) to ensure we can work on the disk if it was MBR # size_mb=0 means use all available space in the gap size_spec = f"+{size_mb}M" if size_mb > 0 else "0" run_command( [ "sgdisk", "-g", "-n", f"{next_num}:0:{size_spec}", "-t", f"{next_num}:{type_code}", "-c", f"{next_num}:{name}", disk_device, ] ) run_command(["partprobe", disk_device]) # Wait for partition node to appear try: run_command(["udevadm", "settle", "--timeout=5"]) except Exception: import time time.sleep(2) part_dev = get_partition_device(disk_device, next_num) if fstype == "fat32": run_command(["mkfs.vfat", "-F32", part_dev]) elif fstype == "ext4": run_command(["mkfs.ext4", "-F", part_dev]) elif fstype == "swap": run_command(["mkswap", part_dev]) return next_num def delete_partition(disk_device, part_num): """Deletes a partition by number.""" run_command(["sgdisk", "-d", str(part_num), disk_device]) run_command(["partprobe", disk_device]) def wipe_disk(disk_device): """Zaps the disk and creates a new GPT table.""" run_command(["sgdisk", "-Z", disk_device]) run_command(["sgdisk", "-o", disk_device]) run_command(["partprobe", disk_device])