141 lines
4.4 KiB
Python
141 lines
4.4 KiB
Python
import subprocess
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def run_command(cmd, check=True):
|
|
logger.info(f"Running command: {' '.join(cmd)}")
|
|
try:
|
|
result = subprocess.run(cmd, check=check, capture_output=True, text=True)
|
|
return result
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"Command failed: {e.stderr}")
|
|
raise
|
|
|
|
def get_partition_device(disk_device, partition_number):
|
|
"""
|
|
Returns the partition device path.
|
|
Handles NVMe style (p1) vs sd style (1).
|
|
"""
|
|
if disk_device[-1].isdigit():
|
|
return f"{disk_device}p{partition_number}"
|
|
return f"{disk_device}{partition_number}"
|
|
|
|
def auto_partition_disk(disk_device):
|
|
"""
|
|
Automatically partitions the disk with a standard layout:
|
|
1. EFI System Partition (1GB)
|
|
2. Swap (4GB) - simpler fixed size for now
|
|
3. Root (Remaining)
|
|
"""
|
|
logger.info(f"Starting auto-partitioning on {disk_device}")
|
|
|
|
# 1. Zap the disk (destroy all data)
|
|
run_command(["sgdisk", "-Z", disk_device])
|
|
|
|
# 2. Create new GPT table
|
|
run_command(["sgdisk", "-o", disk_device])
|
|
|
|
# 3. Create EFI Partition (Part 1, 1GB, Type EF00)
|
|
# -n <partnum>:<start>:<end>
|
|
run_command(["sgdisk", "-n", "1:0:+1024M", "-t", "1:ef00", "-c", "1:EFI System", disk_device])
|
|
|
|
# 4. Create Swap Partition (Part 2, 4GB, Type 8200)
|
|
run_command(["sgdisk", "-n", "2:0:+4096M", "-t", "2:8200", "-c", "2:Swap", disk_device])
|
|
|
|
# 5. Create Root Partition (Part 3, Rest, Type 8300)
|
|
run_command(["sgdisk", "-n", "3:0:0", "-t", "3:8300", "-c", "3:Root", disk_device])
|
|
|
|
# Inform kernel of changes
|
|
run_command(["partprobe", disk_device])
|
|
|
|
# Wait a bit for nodes to appear? Usually partprobe handles it but sometimes there's a race.
|
|
import time
|
|
time.sleep(1)
|
|
|
|
# 6. Format Partitions
|
|
efi_part = get_partition_device(disk_device, 1)
|
|
swap_part = get_partition_device(disk_device, 2)
|
|
root_part = get_partition_device(disk_device, 3)
|
|
|
|
logger.info("Formatting EFI partition...")
|
|
run_command(["mkfs.vfat", "-F32", efi_part])
|
|
|
|
logger.info("Formatting Swap partition...")
|
|
run_command(["mkswap", swap_part])
|
|
|
|
logger.info("Formatting Root partition...")
|
|
run_command(["mkfs.ext4", "-F", root_part])
|
|
|
|
logger.info("Partitioning and formatting complete.")
|
|
|
|
return {
|
|
"efi": efi_part,
|
|
"swap": swap_part,
|
|
"root": root_part
|
|
}
|
|
|
|
def mount_partitions(partition_info, mount_root="/mnt"):
|
|
"""
|
|
Mounts the partitions into mount_root.
|
|
partition_info is the dict returned by auto_partition_disk.
|
|
"""
|
|
import os
|
|
|
|
# 1. Mount Root
|
|
if not os.path.exists(mount_root):
|
|
os.makedirs(mount_root)
|
|
|
|
run_command(["mount", partition_info["root"], mount_root])
|
|
|
|
# 2. Mount EFI
|
|
efi_mount = os.path.join(mount_root, "boot/efi")
|
|
if not os.path.exists(efi_mount):
|
|
os.makedirs(efi_mount, exist_ok=True)
|
|
|
|
run_command(["mount", partition_info["efi"], efi_mount])
|
|
|
|
# 3. Enable Swap (optional, but might as well)
|
|
run_command(["swapon", partition_info["swap"]])
|
|
|
|
logger.info(f"Partitions mounted at {mount_root}")
|
|
|
|
def create_partition(disk_device, size_mb, type_code="8300", name="Linux filesystem"):
|
|
"""
|
|
Creates a new partition on the disk.
|
|
type_code: ef00 (EFI), 8200 (Swap), 8300 (Linux)
|
|
"""
|
|
# Find next available partition number
|
|
res = run_command(["sgdisk", "-p", disk_device])
|
|
# Very basic parsing to find the next number
|
|
existing_nums = []
|
|
for line in res.stdout.splitlines():
|
|
parts = line.split()
|
|
if parts and parts[0].isdigit():
|
|
existing_nums.append(int(parts[0]))
|
|
|
|
next_num = 1
|
|
while next_num in existing_nums:
|
|
next_num += 1
|
|
|
|
run_command([
|
|
"sgdisk",
|
|
"-n", f"{next_num}:0:+{size_mb}M",
|
|
"-t", f"{next_num}:{type_code}",
|
|
"-c", f"{next_num}:{name}",
|
|
disk_device
|
|
])
|
|
run_command(["partprobe", disk_device])
|
|
return next_num
|
|
|
|
def delete_partition(disk_device, part_num):
|
|
"""Deletes a partition by number."""
|
|
run_command(["sgdisk", "-d", str(part_num), disk_device])
|
|
run_command(["partprobe", disk_device])
|
|
|
|
def wipe_disk(disk_device):
|
|
"""Zaps the disk and creates a new GPT table."""
|
|
run_command(["sgdisk", "-Z", disk_device])
|
|
run_command(["sgdisk", "-o", disk_device])
|
|
run_command(["partprobe", disk_device])
|