neurotron/src/disk_init.py
2025-11-15 04:20:00 +01:00

260 lines
9.4 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
💾 Módulo de Inicialização de Disco — Neurotron V0.1 (atualizado)
Detecta, avalia, prepara e monta o disco persistente do NFDOS.
- Não formata discos que já contenham um filesystem conhecido, a menos que forçado.
- Forçar formatação:
* EXPORT: export NFDOS_FORCE_FORMAT=1 (no ambiente do initramfs, se aplicável)
* Kernel cmdline: adicionar `nfdos_force_format=1` ao -append do QEMU
"""
import os
import subprocess
from pathlib import Path
from rich.console import Console
if __name__ == "__main__" and __package__ is None:
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[1]))
__package__ = "neurotron_core"
from .neurotron_config import (
MOUNT_POINT, DISK_CANDIDATES
)
console = Console()
def run(cmd: list[str]) -> bool:
"""Executa comando silenciosamente (retorna True se OK)."""
try:
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def detect_disk() -> str | None:
"""Procura por um dispositivo de disco válido (por ordem em DISK_CANDIDATES)."""
for dev in DISK_CANDIDATES:
p = Path(dev)
if p.exists():
console.print(f"[cyan]🔍 Detetado disco:[/] {dev}")
return dev
console.print("[yellow]⚠️ Nenhum disco detectado.[/yellow]")
return None
def blkid_check(device: str) -> str | None:
"""Tenta obter tipo com blkid (se disponível)."""
try:
out = subprocess.run(["blkid", device], stdout=subprocess.PIPE, text=True, check=False)
return out.stdout.strip() if out.stdout else None
except FileNotFoundError:
return None
def read_sig(device: str, size: int = 2048) -> bytes | None:
"""Lê os primeiros `size` bytes do device (se possível)."""
try:
with open(device, "rb") as f:
return f.read(size)
except Exception:
return None
def detect_fs_by_magic(device: str) -> str | None:
"""
Detecta assinaturas simples:
- ext4 superblock magic (0xEF53) @ offset 1024 + 56 = 1080
- NTFS -> 'NTFS ' @ offset 3
- FAT32 -> 'FAT32' nos offsets típicos do boot sector
- MBR partition table signature 0x55AA @ offset 510-511
Retorna string com o sistema ou None.
"""
buf = read_sig(device, size=4096)
if not buf:
return None
# MBR signature
if len(buf) >= 512 and buf[510:512] == b'\x55\xAA':
# detecta tabela de partições existente (MBR)
return "mbr-partition-table"
# ext magic at 1024+56 = 1080
if len(buf) >= 1082 and buf[1080:1082] == b'\x53\xEF':
return "ext (superblock)"
# NTFS signature at offset 3 (ASCII "NTFS ")
if len(buf) >= 11 and buf[3:11] == b'NTFS ':
return "ntfs"
# FAT32 signature at offset 82 or boot sector strings containing FAT
if b"FAT32" in buf or b"FAT16" in buf or b"FAT12" in buf:
return "fat"
return None
def parse_cmdline_flag() -> bool:
"""Lê /proc/cmdline para a flag nfdos_force_format=1"""
try:
with open("/proc/cmdline", "r") as f:
cmd = f.read()
return "nfdos_force_format=1" in cmd.split()
except Exception:
return False
def which(prog: str) -> str | None:
for p in os.environ.get("PATH", "/sbin:/bin:/usr/sbin:/usr/bin").split(":"):
cand = Path(p) / prog
if cand.exists() and os.access(cand, os.X_OK):
return str(cand)
return None
def format_ext4(device: str, label: str = "NFDOS_DATA") -> bool:
"""Formata o dispositivo com ext4, recolhendo logs de erro detalhados (BusyBox-safe)."""
mke2fs = which("mke2fs")
mkfs_ext4 = which("mkfs.ext4")
mkfs = which("mkfs")
candidates = []
if mkfs_ext4:
candidates.append(([mkfs_ext4, "-F", "-L", label, device], "mkfs.ext4"))
if mke2fs:
# o BusyBox mke2fs não aceita '-t', por isso ajustaremos dentro do loop
candidates.append(([mke2fs, "-F", "-t", "ext4", "-L", label, device], "mke2fs"))
if mkfs:
candidates.append(([mkfs, "-t", "ext4", "-F", "-L", label, device], "mkfs"))
if not candidates:
console.print("[red]❌ Nenhum utilitário mkfs disponível no initramfs![/red]")
return False
for cmd, name in candidates:
console.print(f"[yellow]⚙️ Formatando {device} com {name}...[/yellow]")
# 👉 se for o BusyBox mke2fs, removemos o argumento -t
if name == "mke2fs":
cmd = [c for c in cmd if c != "-t" and c != "ext4"]
try:
result = subprocess.run(
cmd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
if result.stdout:
console.print(result.stdout.strip())
console.print(f"[green]✔ Formatação concluída com {name}.[/green]")
return True
except subprocess.CalledProcessError as e:
console.print(f"[red]❌ {name} falhou (código {e.returncode}).[/red]")
if e.stdout:
console.print(f"[cyan]📜 STDOUT:[/cyan]\n{e.stdout.strip()}")
if e.stderr:
console.print(f"[magenta]⚠️ STDERR:[/magenta]\n{e.stderr.strip()}")
console.print("[red]❌ Nenhum método de formatação teve sucesso.[/red]")
console.print("[cyan]🧠 Sugestão:[/] verifique se o kernel suporta EXT4 e se o BusyBox inclui mke2fs.")
return False
def ensure_fs(device: str) -> bool:
"""
Verifica se existe sistema de ficheiros.
Se não existir e houver confirmação/flag, formata ext4 (ou fallback via mke2fs).
"""
# 1⃣ tentativa rápida com blkid
info = blkid_check(device)
if info:
console.print(f"[green]🧠 Disco já formatado (blkid):[/] {info}")
return True
# 2⃣ fallback por leituras de assinatura
sig = detect_fs_by_magic(device)
if sig:
console.print(f"[yellow]⚠ Assinatura detectada no disco:[/] {sig}")
console.print("[red]❗ O disco contém dados ou partições existentes. Abortando formatação.[/red]")
return False
# 3⃣ se nada detectado — disco virgem
forced_env = os.environ.get("NFDOS_FORCE_FORMAT") == "1"
forced_cmd = parse_cmdline_flag()
if not (forced_env or forced_cmd):
console.print("[yellow]⚠ Disco parece virgem, mas não há confirmação para formatar.[/yellow]")
console.print("Use `nfdos_force_format=1` no kernel cmdline ou export NFDOS_FORCE_FORMAT=1")
console.print("para permitir formatação automática.")
return False
# 4⃣ tentar formatação
console.print(f"[yellow]⚙️ Forçando formatação de {device} como ext4 (FLAG DETETADA)...[/yellow]")
ok = format_ext4(device)
if ok:
console.print("[green]✔ Formatação concluída com sucesso.[/green]")
return True
# 5⃣ se nada funcionou
console.print("[red]❌ Falha na formatação.[/red]")
console.print("[cyan]🧠 Sugestão:[/] verifique se o kernel inclui suporte para EXT4 ou se o mkfs/mke2fs está embutido no BusyBox.")
return False
def mount_disk(device: str) -> bool:
"""Monta o disco no ponto esperado (retorna True se OK)."""
os.makedirs(MOUNT_POINT, exist_ok=True)
return run(["mount", device, MOUNT_POINT])
def debug_env():
"""Mostra informações úteis quando nenhum disco é detectado (ou para debug)."""
console.print("[yellow]🩻 DEBUG: listando /dev/* e últimas mensagens do kernel[/yellow]")
devs = sorted(Path("/dev").glob("*"))
console.print("📂 Dispositivos disponíveis:", ", ".join([d.name for d in devs if d.is_char_device() or d.is_block_device()]))
os.system("dmesg | tail -n 20 || echo '(dmesg não disponível)'")
console.print("[yellow]───────────────────────────────[/yellow]")
os.system("echo '--- /proc/partitions ---'; cat /proc/partitions || true")
os.system("echo '--- dmesg | grep -i virtio ---'; dmesg | grep -i virtio || true")
console.print("[yellow]───────────────────────────────[/yellow]")
def initialize_persistence():
"""Fluxo completo de inicialização do hipocampo físico."""
device = detect_disk()
if not device:
debug_env()
console.print("[red]❌ Nenhum disco físico encontrado — usando modo RAM.[/red]")
return False
if not ensure_fs(device):
console.print("[red]❌ Preparação do sistema de ficheiros foi interrompida.[/red]")
return False
if not mount_disk(device):
console.print("[red]❌ Falha ao montar disco.[/red]")
return False
console.print(f"[green]✔ Disco montado em:[/] {MOUNT_POINT}")
telemetry_file = Path("/opt/kernel/neurotron/data/telemetry.json")
telemetry_file.parent.mkdir(parents=True, exist_ok=True)
if not telemetry_file.exists():
telemetry_file.write_text("[]")
for d in ["data", "logs", "dna"]:
Path(MOUNT_POINT, d).mkdir(parents=True, exist_ok=True)
Path(MOUNT_POINT, "DNA_ID").write_text("NEUROTRON_HIPOCAMPUS_V1\n")
console.print("[cyan]👉 Hipocampo físico inicializado com sucesso.[/cyan]")
return True
if __name__ == "__main__":
initialize_persistence()