Files
installer/iridium_installer/backend/network_logging.py
N0\A f7bebc7f88 Add network logging to Discord webhook for testing branch
- Add network_logging.py module with Discord webhook integration
- Log session start with unique session ID and mode (MOCK/PRODUCTION)
- Log all page navigations when clicking Next
- Log all user selections (disk, install mode, modules, user info)
- Log installation progress through each step
- Send install completion/failure logs
- Auto-flush logs every 2 seconds and immediately on critical events

TESTING BRANCH ONLY - Remove before merging to main
2026-02-04 21:18:18 +01:00

127 lines
3.6 KiB
Python

import logging
import requests
import threading
import time
from datetime import datetime
logger = logging.getLogger(__name__ + ".network_logging")
DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/1468696228647932280/L9XSHS6TPEeK0wwJTFdK9RUyZvztSGQBd4xEfVvb4Y1AXGQAOc4YTsuxeFuWC9HxymJn"
LOG_QUEUE = []
QUEUE_LOCK = threading.Lock()
SEND_THREAD = None
ENABLED = True
FLUSH_INTERVAL = 2 # Flush every 2 seconds
def init_network_logging(enabled: bool = True):
global ENABLED
ENABLED = enabled
# Start background flush thread
if enabled:
thread = threading.Thread(target=_background_flush, daemon=True)
thread.start()
def _background_flush():
"""Background thread to flush logs periodically"""
while True:
time.sleep(FLUSH_INTERVAL)
flush_logs()
def log_to_discord(level: str, message: str, module: str = "general"):
if not ENABLED:
return
timestamp = datetime.utcnow().isoformat()
log_entry = {
"timestamp": timestamp,
"level": level.upper(),
"module": module,
"message": message,
}
with QUEUE_LOCK:
LOG_QUEUE.append(log_entry)
# Flush immediately for important events
if level.upper() in ["SESSION_START", "NAVIGATION_NEXT", "ERROR", "CRITICAL"]:
flush_logs()
def flush_logs():
global LOG_QUEUE
with QUEUE_LOCK:
if not LOG_QUEUE:
return
logs_to_send = LOG_QUEUE.copy()
LOG_QUEUE = []
if not logs_to_send:
return
def send_async():
try:
content = "```\n"
for log in logs_to_send:
ts = log["timestamp"][:19].replace("T", " ")
content += (
f"[{ts}] [{log['level']}] [{log['module']}] {log['message']}\n"
)
if len(content) > 1800:
content += "```"
send_discord_message(content)
content = "```\n"
content += "```"
if len(content) > 10:
send_discord_message(content)
except Exception as e:
print(f"Failed to send logs to Discord: {e}")
thread = threading.Thread(target=send_async, daemon=True)
thread.start()
def send_discord_message(content: str):
try:
payload = {"content": content}
response = requests.post(DISCORD_WEBHOOK_URL, json=payload, timeout=5)
if response.status_code not in [200, 204]:
print(f"Discord webhook error: {response.status_code} - {response.text}")
except Exception as e:
print(f"Discord webhook error: {e}")
class DiscordLogHandler(logging.Handler):
def __init__(self, module_name: str = "general"):
super().__init__()
self.module_name = module_name
def emit(self, record: logging.LogRecord):
if record.levelno < logging.INFO:
return
level_map = {
logging.INFO: "INFO",
logging.WARNING: "WARN",
logging.ERROR: "ERROR",
logging.CRITICAL: "CRITICAL",
}
level = level_map.get(record.levelno, "INFO")
message = self.format(record)
module_name = (
self.module_name
if self.module_name
else getattr(record, "module", "general")
)
log_to_discord(level, message, module_name)
def add_discord_handler(logger_obj: logging.Logger, module: str = None):
module_name = (
module if module else (logger_obj.name if logger_obj.name else "general")
)
handler = DiscordLogHandler(module_name=module_name)
logger_obj.addHandler(handler)