"Auto-commit via make git"
Some checks are pending
Build NFDOS ISO / build (push) Waiting to run

This commit is contained in:
neo.webmaster.2@gmail.com 2025-11-16 23:07:22 +01:00
parent a005ac4e13
commit f56aab8370
12 changed files with 656 additions and 365 deletions

View File

@ -9,6 +9,93 @@ cat -A configure.ac | grep '\^I'
nl -ba Makefile | sed -n '770,790p' nl -ba Makefile | sed -n '770,790p'
grep -n "^[ ]" Makefile | head grep -n "^[ ]" Makefile | head
agora vamos descansar um pouco 😘 ja temos o kernel neurotron nativo sem panicar o kernel linux...ja avancamos bastante. agora chegou a hora de descansarmos 😍😘 ajustei assim:
``` ```
``` # Criar disco (se não existir)
data_disk = nfdos_dir / "nfdos_data.img"
if not data_disk.exists():
console.print("[cyan]💽 Criando disco persistente (hipocampo físico)...[/cyan]")
safe_run(f"dd if=/dev/zero of={data_disk} bs=1M count=512", shell=True)
else:
console.print("[green]✔ Disco persistente já existe.[/green]")
time.sleep(5)
# Testar no QEMU
bz_image = linux_dir / "arch" / "x86" / "boot" / "bzImage"
data_disk = nfdos_dir / "nfdos_data.img"
if bz_image.exists():
console.print("\n[bold blue]Iniciando QEMU (modo kernel direto)...[/bold blue]")
# 🧠 Monta a linha base do QEMU
kernel_params = (
"console=ttyS0 earlyprintk=serial,ttyS0,115200 "
"keep_bootcon loglevel=8"
)
qemu_cmd = (
f"qemu-system-x86_64 "
f"-machine q35,accel=kvm "
f"-cpu qemu64 "
f"-kernel {bz_image} "
f"-initrd {nfdos_dir}/initramfs.cpio.gz "
f"-append '{kernel_params}' "
f"-drive file={data_disk},if=virtio,format=raw "
f"-m 1024 "
f"-nographic "
f"-no-reboot"
)
# 🚀 Executa o QEMU
safe_run(qemu_cmd, shell=True)
else:
console.print("[red]✗ bzImage não encontrado! Compile o kernel primeiro.[/red]")
```
creio que agora sim temos uma base solida e limpa.
o ultimo detalhe ... limpar a tree do neurotron:
```
src/_nfdos/kernel/neurotron
├── aclocal.m4
├── autom4te.cache
│ ├── output.0
│ ├── output.1
│ ├── requests
│ ├── traces.0
│ └── traces.1
├── config.log
├── config.status
├── configure
├── configure.ac
├── data # Estamos a usar em algum lado? ou podemos remover?
│ ├── configs # Estamos a usar em algum lado? ou podemos remover?
│ └── logs # Estamos a usar em algum lado? ou podemos remover?
├── install-sh
├── Makefile
├── Makefile.am
├── Makefile.in
├── MANIFEST.in
├── missing
├── neurotron
├── neurotron.in
├── pyproject.toml
├── README.md
├── Setup.py
└── src
└── neurotron
├── autodiagnostic.py
├── cortex.py
├── disk_init.py
├── hippocampus.py
├── __init__.py
├── logbus.py
├── __main__.py
├── main_waiting.py # podemos remover
├── motor.py
├── neuron.py
├── neurotron_config.py
├── perception.py
└── telemetry_tail.py # podemos remover
```
e ja agora aproveitar e adicionar um .gitignore

30
src/_nfdos/kernel/neurotron/.gitignore vendored Normal file
View File

@ -0,0 +1,30 @@
# Autotools
Makefile
Makefile.in
aclocal.m4
autom4te.cache/
config.log
config.status
configure
install-sh
missing
# Python build artefacts
__pycache__/
*.pyc
*.pyo
*.pyd
*.egg-info/
build/
dist/
# QEMU files
*.img
*.qcow2
# Editor junk
*.swp
*.swo
*~
.idea/
.vscode/

View File

@ -38,6 +38,7 @@ from .neurotron_config import (
) )
from .cortex import Cortex from .cortex import Cortex
from .autodiagnostic import AutoDiagnostic # se for usado dentro do Cortex, ok mesmo assim from .autodiagnostic import AutoDiagnostic # se for usado dentro do Cortex, ok mesmo assim
from neurotron.logbus import logbus
# ======================================================================================= # =======================================================================================
@ -66,19 +67,18 @@ def detect_persistent_mode():
if _mounted(mount_point): if _mounted(mount_point):
os.environ["NEUROTRON_MODE"] = "persistent" os.environ["NEUROTRON_MODE"] = "persistent"
os.environ["NEUROTRON_RUNTIME"] = "/var/neurotron/data" os.environ["NEUROTRON_RUNTIME"] = "/var/neurotron/data"
os.environ["NEUROTRON_LOG"] = "/var/neurotron/logs" os.environ["NEUROTRON_LOG_DIR"] = "/var/neurotron/logs"
else: else:
os.environ["NEUROTRON_MODE"] = "volatile" os.environ["NEUROTRON_MODE"] = "volatile"
os.environ["NEUROTRON_RUNTIME"] = "/tmp/neurotron_data" os.environ["NEUROTRON_RUNTIME"] = "/tmp/neurotron_data"
os.environ["NEUROTRON_LOG"] = "/tmp/neurotron_logs" os.environ["NEUROTRON_LOG_DIR"] = "/tmp/neurotron_logs"
runtime_dir = Path(os.environ["NEUROTRON_RUNTIME"]) runtime_dir = Path(os.environ["NEUROTRON_RUNTIME"])
log_dir = Path(os.environ["NEUROTRON_LOG"]) log_dir = Path(os.environ["NEUROTRON_LOG_DIR"])
runtime_dir.mkdir(parents=True, exist_ok=True) runtime_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True) log_dir.mkdir(parents=True, exist_ok=True)
return runtime_dir, log_dir return runtime_dir, log_dir
def read_system_metrics(): def read_system_metrics():
""" """
CPU, memória e loadavg a partir de /proc. CPU, memória e loadavg a partir de /proc.

View File

@ -1,67 +1,87 @@
from __future__ import annotations from __future__ import annotations
import json, os import json
from datetime import datetime, timezone from datetime import datetime, timezone
from rich.console import Console
from rich.table import Table
from pathlib import Path from pathlib import Path
from neurotron.logbus import logbus
from .neurotron_config import ( from .neurotron_config import (
NEUROTRON_DATASET_PATH, NEUROTRON_HISTORY_KEEP, NEUROTRON_DIAG_SCHEMA, NEUROTRON_DATASET_PATH,
HOMEOSTASIS_CPU_WARN, HOMEOSTASIS_CPU_ALERT, NEUROTRON_HISTORY_KEEP,
HOMEOSTASIS_MEM_WARN, HOMEOSTASIS_MEM_ALERT, NEUROTRON_DIAG_SCHEMA,
HOMEOSTASIS_LOAD_WARN, HOMEOSTASIS_LOAD_ALERT, HOMEOSTASIS_CPU_WARN,
HOMEOSTASIS_CPU_ALERT,
HOMEOSTASIS_MEM_WARN,
HOMEOSTASIS_MEM_ALERT,
HOMEOSTASIS_LOAD_WARN,
HOMEOSTASIS_LOAD_ALERT,
) )
from .perception import Perception from .perception import Perception
console = Console()
def _now_iso(): def _now_iso():
return datetime.now(timezone.utc).isoformat() return datetime.now(timezone.utc).isoformat()
class AutoDiagnostic: class AutoDiagnostic:
"""
Subsistema silencioso de diagnóstico do Neurotron.
Responsabilidades:
- recolher perceções internas (CPU, MEM, LOAD)
- comparar com diagnóstico anterior
- atualizar last_diagnostic.json
- manter histórico (rolling)
- atualizar telemetria contínua
- emitir eventos para logbus (não para stdout)
"""
def __init__(self, runtime_dir: str, log_dir: str): def __init__(self, runtime_dir: str, log_dir: str):
self.runtime_dir = runtime_dir self.runtime_dir = Path(runtime_dir)
self.log_dir = log_dir self.log_dir = Path(log_dir)
self.data_dir = Path(NEUROTRON_DATASET_PATH) self.data_dir = Path(NEUROTRON_DATASET_PATH)
self.data_dir.mkdir(parents=True, exist_ok=True) self.data_dir.mkdir(parents=True, exist_ok=True)
self.last_file = self.data_dir / "last_diagnostic.json" self.last_file = self.data_dir / "last_diagnostic.json"
self.telemetry_file = self.data_dir / "telemetry.json"
self.perception = Perception() self.perception = Perception()
self.current = None
self.previous = None # ----------------------------------------------------------------------
# Utilitários internos
# ----------------------------------------------------------------------
def _load_previous(self): def _load_previous(self):
if not self.last_file.exists():
return None
try: try:
with open(self.last_file, "r") as f: if self.last_file.exists():
return json.load(f) return json.loads(self.last_file.read_text())
except Exception: except Exception as e:
return None logbus.emit(f"[diag.warn] falha ao ler último diagnóstico: {e}")
return None
def _save_current(self, payload: dict): def _save_current(self, payload):
history = [] history = []
if self.last_file.exists(): prev = self._load_previous()
try:
with open(self.last_file, "r") as f: if prev:
prev = json.load(f) history = prev.get("history", [])
history = prev.get("history", []) history.append({
history.append({ "timestamp": prev.get("timestamp"),
"timestamp": prev.get("timestamp"), "cpu_percent": prev.get("cpu_percent"),
"cpu_percent": prev.get("cpu_percent"), "mem_percent": prev.get("mem_percent"),
"mem_percent": prev.get("mem_percent"), "loadavg": prev.get("loadavg"),
"loadavg": prev.get("loadavg"), "state": prev.get("state", "UNKNOWN"),
"state": prev.get("state", "UNKNOWN"), })
}) history = history[-NEUROTRON_HISTORY_KEEP:]
history = history[-NEUROTRON_HISTORY_KEEP:]
except Exception:
history = []
payload["history"] = history payload["history"] = history
with open(self.last_file, "w") as f:
json.dump(payload, f, indent=2) try:
self.last_file.write_text(json.dumps(payload, indent=2))
except Exception as e:
logbus.emit(f"[diag.error] falha ao gravar diagnóstico: {e}")
def _classify_state(self, cpu, mem, l1): def _classify_state(self, cpu, mem, l1):
# valores podem ser "?"
try: try:
cpu = float(cpu) cpu = float(cpu)
mem = float(mem) mem = float(mem)
@ -69,81 +89,32 @@ class AutoDiagnostic:
except Exception: except Exception:
return "UNKNOWN" return "UNKNOWN"
# ALERT/CRITICAL
if cpu >= HOMEOSTASIS_CPU_ALERT or mem >= HOMEOSTASIS_MEM_ALERT or l1 >= HOMEOSTASIS_LOAD_ALERT: if cpu >= HOMEOSTASIS_CPU_ALERT or mem >= HOMEOSTASIS_MEM_ALERT or l1 >= HOMEOSTASIS_LOAD_ALERT:
return "CRITICAL" return "CRITICAL"
if cpu >= HOMEOSTASIS_CPU_WARN or mem >= HOMEOSTASIS_MEM_WARN or l1 >= HOMEOSTASIS_LOAD_WARN: if cpu >= HOMEOSTASIS_CPU_WARN or mem >= HOMEOSTASIS_MEM_WARN or l1 >= HOMEOSTASIS_LOAD_WARN:
return "ALERT" return "ALERT"
# OKs
return "STABLE" return "STABLE"
def _delta(self, a, b): # ----------------------------------------------------------------------
try: # Execução principal
if isinstance(a, list) and isinstance(b, list) and len(a) == len(b): # ----------------------------------------------------------------------
return [round(float(x) - float(y), 2) for x, y in zip(a, b)]
return round(float(a) - float(b), 2)
except Exception:
return "?"
def _render_mini_trend(self, values, width=24, charset="▁▂▃▄▅▆▇█"):
if not values:
return ""
lo = min(values); hi = max(values)
if not isinstance(lo, (int, float)) or not isinstance(hi, (int, float)):
return ""
span = (hi - lo) or 1.0
levels = len(charset) - 1
bars = []
for v in values[-width:]:
if not isinstance(v, (int, float)):
bars.append("·")
continue
i = int(round((v - lo) / span * levels))
bars.append(charset[i])
return "".join(bars)
def run_exam(self): def run_exam(self):
console.print("\n[bold]🤖 Iniciando rotina de Auto-Diagnóstico Evolutivo...[/bold]\n") """
Realiza um diagnóstico silencioso, atualiza os ficheiros,
e retorna (state, payload).
"""
snap = self.perception.snapshot() snap = self.perception.snapshot()
cpu = snap.get("cpu_percent", "?")
mem = snap.get("mem_percent", "?") cpu = snap.get("cpu_percent", "?")
mem = snap.get("mem_percent", "?")
load = snap.get("loadavg", ["?", "?", "?"]) load = snap.get("loadavg", ["?", "?", "?"])
l1 = load[0] if isinstance(load, list) and load else "?"
prev = self._load_previous()
self.previous = prev
# deltas
cpu_prev = prev.get("cpu_percent") if prev else "?"
mem_prev = prev.get("mem_percent") if prev else "?"
load_prev = prev.get("loadavg") if prev else ["?", "?", "?"]
d_cpu = self._delta(cpu, cpu_prev)
d_mem = self._delta(mem, mem_prev)
d_load = self._delta(load, load_prev)
# estado
l1 = load[0] if isinstance(load, list) and load else "?"
state = self._classify_state(cpu, mem, l1) state = self._classify_state(cpu, mem, l1)
# tabela
table = Table(title="🩺 Exame Clínico Evolutivo", show_lines=True)
table.add_column("Sinal Vital")
table.add_column("Atual", justify="right")
table.add_column("Δ", justify="center")
table.add_column("Anterior", justify="right")
def fmt(v):
if isinstance(v, list):
return str(v)
return str(v)
table.add_row("CPU (%)", fmt(cpu), fmt(d_cpu), fmt(cpu_prev))
table.add_row("Memória (%)", fmt(mem), fmt(d_mem), fmt(mem_prev))
table.add_row("Carga média (1/5/15)", fmt(load), "" if d_load == "?" else fmt(d_load), fmt(load_prev))
console.print(table)
payload = { payload = {
"schema": NEUROTRON_DIAG_SCHEMA, "schema": NEUROTRON_DIAG_SCHEMA,
"timestamp": _now_iso(), "timestamp": _now_iso(),
@ -156,17 +127,22 @@ class AutoDiagnostic:
"term": snap.get("env_term"), "term": snap.get("env_term"),
}, },
} }
self._save_current(payload) self._save_current(payload)
console.print(f"[green]✔ Histórico evolutivo atualizado em:[/green] \n{self.last_file}") self._update_telemetry(payload)
# Atualiza telemetria contínua # Falamos apenas através do logbus
logbus.emit(f"[diag] estado={state} cpu={cpu} mem={mem} load1={l1}")
return state, payload
# ----------------------------------------------------------------------
def _update_telemetry(self, payload):
try: try:
telemetry_file = Path(NEUROTRON_DATASET_PATH) / "telemetry.json"
telemetry_file.parent.mkdir(parents=True, exist_ok=True)
telemetry = [] telemetry = []
if telemetry_file.exists(): if self.telemetry_file.exists():
telemetry = json.loads(telemetry_file.read_text() or "[]") telemetry = json.loads(self.telemetry_file.read_text() or "[]")
telemetry.append({ telemetry.append({
"timestamp": payload["timestamp"], "timestamp": payload["timestamp"],
@ -176,10 +152,9 @@ class AutoDiagnostic:
"state": payload.get("state"), "state": payload.get("state"),
}) })
telemetry = telemetry[-128:] # manter últimas 128 amostras telemetry = telemetry[-128:]
telemetry_file.write_text(json.dumps(telemetry, indent=2)) self.telemetry_file.write_text(json.dumps(telemetry, indent=2))
except Exception as e: except Exception as e:
console.print(f"[yellow]⚠️ Falha ao atualizar telemetria:[/] {e}") logbus.emit(f"[diag.warn] falha ao atualizar telemetria: {e}")
return state, payload

View File

@ -1,27 +1,36 @@
import json import json
import time import time
from collections import defaultdict, deque
from pathlib import Path from pathlib import Path
from collections import defaultdict, deque
from time import sleep from time import sleep
from rich.console import Console
from neurotron.logbus import logbus
from .neuron import Neuron from .neuron import Neuron
from .hippocampus import Hippocampus from .hippocampus import Hippocampus
from .perception import Perception from .perception import Perception
from .motor import Motor from .motor import Motor
from .neurotron_config import (
NEUROTRON_MODE, NEUROTRON_TICK, NEUROTRON_TICK_MIN, NEUROTRON_TICK_MAX, NEUROTRON_TICK_STEP,
NEUROTRON_DIAG_EVERY_TICKS, NEUROTRON_DATASET_PATH,
HEARTBEAT_ENABLED, HEARTBEAT_STYLE, NEUROTRON_THRESHOLDS,
TELEMETRY_MAXLEN, TELEMETRY_FLUSH_EVERY_TICKS,
)
from .autodiagnostic import AutoDiagnostic from .autodiagnostic import AutoDiagnostic
from .neurotron_config import (
NEUROTRON_MODE, NEUROTRON_TICK,
NEUROTRON_TICK_MIN, NEUROTRON_TICK_MAX, NEUROTRON_TICK_STEP,
NEUROTRON_DIAG_EVERY_TICKS,
NEUROTRON_DATASET_PATH,
HEARTBEAT_ENABLED, HEARTBEAT_STYLE,
NEUROTRON_THRESHOLDS,
TELEMETRY_MAXLEN, TELEMETRY_FLUSH_EVERY_TICKS,
)
# =====================================================================
# Neurónios básicos
# =====================================================================
class VitalSigns(Neuron): class VitalSigns(Neuron):
name = "VitalSigns" name = "VitalSigns"
def observe(self) -> None:
def observe(self):
snap = self.ctx.perception.snapshot() snap = self.ctx.perception.snapshot()
self.publish("vitals", snap) self.publish("vitals", snap)
self.ctx.memory.remember("observe.vitals", snap) self.ctx.memory.remember("observe.vitals", snap)
@ -29,123 +38,152 @@ class VitalSigns(Neuron):
class EchoAgent(Neuron): class EchoAgent(Neuron):
name = "EchoAgent" name = "EchoAgent"
def think(self) -> None:
def think(self):
msg = self.consume("vitals") msg = self.consume("vitals")
if msg: if msg:
self.publish("actions", {"action": "echo", "text": f"CPU {msg.get('cpu_percent', '?')}%"}) self.publish("actions", {
"action": "echo",
"text": f"CPU {msg.get('cpu_percent', '?')}%"
})
# =====================================================================
# C O R T E X — versão V6 (Pure LogBus Edition)
# =====================================================================
class Cortex: class Cortex:
""" """
Orquestrador: liga neurónios, bus de mensagens, memória, IO e ciclo cognitivo. Orquestrador cognitivo do Neurotron.
Agora com Telemetria Contínua (V5): heartbeat, microalertas e flush periódico. Totalmente silencioso toda saída vai para o logbus.
""" """
def __init__(self, runtime_dir, log_dir, tick_seconds=NEUROTRON_TICK): def __init__(self, runtime_dir, log_dir, tick_seconds=NEUROTRON_TICK):
self.runtime_dir = runtime_dir self.runtime_dir = Path(runtime_dir)
self.log_dir = log_dir self.log_dir = Path(log_dir)
self.tick = float(tick_seconds) self.tick = float(tick_seconds)
self.mode = NEUROTRON_MODE self.mode = NEUROTRON_MODE
self._tick_count = 0 self._tick_count = 0
self.diagnostic = AutoDiagnostic(runtime_dir, log_dir)
self.console = Console() self.diagnostic = AutoDiagnostic(runtime_dir, log_dir)
self.memory = Hippocampus(log_dir=log_dir) self.memory = Hippocampus(log_dir=log_dir)
self.perception = Perception() self.perception = Perception()
self.motor = Motor() self.motor = Motor()
# Message bus simples: channels → deque # Message bus interno
self.bus = defaultdict(lambda: deque(maxlen=32)) self.bus = defaultdict(lambda: deque(maxlen=32))
# Telemetria em memória (curto prazo) # Telemetria curta em RAM
self.telemetry = deque(maxlen=TELEMETRY_MAXLEN) self.telemetry = deque(maxlen=TELEMETRY_MAXLEN)
# Regista neurónios (podes adicionar mais à medida) # Neurónios ativos
self.neurons: list[Neuron] = [ self.neurons = [
VitalSigns(self), VitalSigns(self),
EchoAgent(self), EchoAgent(self),
] ]
self._booted = False self._booted = False
# Caminho para gravar a telemetria # Caminho telemetria longa
self.telemetry_path = Path(NEUROTRON_DATASET_PATH) / "telemetry.json" self.telemetry_path = Path(NEUROTRON_DATASET_PATH) / "telemetry.json"
self.telemetry_path.parent.mkdir(parents=True, exist_ok=True) self.telemetry_path.parent.mkdir(parents=True, exist_ok=True)
# ——— ciclo de vida ——— # =================================================================
def boot(self) -> None: # BOOT / SHUTDOWN
# =================================================================
def boot(self):
if self._booted: if self._booted:
return return
self.console.print("[bold cyan]🧠 Neurotron[/] — boot") logbus.emit("🧠 boot() — inicializando Neurotron")
self.memory.remember("boot", {"version": "0.1", "tick": self.tick}) self.memory.remember("boot", {"version": "0.1", "tick": self.tick})
self._booted = True self._booted = True
#state, _ = self.diagnostic.run_exam()
#self._apply_homeostasis(state) def shutdown(self, reason=""):
logbus.emit(f"⚠️ shutdown — {reason}")
self.memory.remember("shutdown", {"reason": reason})
def fatal(self, e: Exception):
logbus.emit(f"🔥 fatal error: {repr(e)}")
self.memory.remember("fatal", {"error": repr(e)})
raise e
# =================================================================
# CICLO COGNITIVO
# =================================================================
def observe(self):
for n in self.neurons:
n.observe()
def think(self):
for n in self.neurons:
n.think()
def act(self):
action = self.bus_consume("actions")
if not action:
return
if action.get("action") == "echo":
res = self.motor.run("echo", [action.get("text", "")])
self.memory.remember("act.echo", res)
# Redireciona stdout para logbus
if res.get("stdout"):
logbus.emit(f"[motor.echo] {res['stdout'].strip()}")
if res.get("stderr"):
logbus.emit(f"[motor.echo.err] {res['stderr'].strip()}")
def rest(self):
# Heartbeat + microalertas
if HEARTBEAT_ENABLED:
self._heartbeat_and_telemetry()
sleep(self.tick)
self._tick_count += 1
# Diagnóstico periódico
if self._tick_count % NEUROTRON_DIAG_EVERY_TICKS == 0:
state, _ = self.diagnostic.run_exam()
self._apply_homeostasis(state)
# Flush periódico telemetria
if self._tick_count % TELEMETRY_FLUSH_EVERY_TICKS == 0:
self._flush_telemetry()
# =================================================================
# HOMEOSTASE
# =================================================================
def _apply_homeostasis(self, state): def _apply_homeostasis(self, state):
old = self.tick
if state == "CRITICAL": if state == "CRITICAL":
self.mode = "diagnostic"
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP) self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP)
elif state == "ALERT": elif state == "ALERT":
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP / 2) self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP / 2)
elif state == "STABLE": elif state == "STABLE":
self.tick = max(NEUROTRON_TICK_MIN, self.tick - NEUROTRON_TICK_STEP / 2) self.tick = max(NEUROTRON_TICK_MIN, self.tick - NEUROTRON_TICK_STEP / 2)
# UNKNOWN → não mexe
def shutdown(self, reason: str = ""): if self.tick != old:
self.console.print(f"[yellow]shutdown:[/] {reason}") logbus.emit(f"[homeostasis] tick ajustado {old:.2f}s → {self.tick:.2f}s (state={state})")
self.memory.remember("shutdown", {"reason": reason})
def fatal(self, e: Exception): # =================================================================
self.console.print(f"[red]fatal:[/] {e!r}") # HEARTBEAT + MICROALERTAS
self.memory.remember("fatal", {"error": repr(e)}) # =================================================================
print(f"fatal: {repr(e)}")
raise
# ——— loop ———
def observe(self) -> None:
for n in self.neurons:
n.observe()
def think(self) -> None:
for n in self.neurons:
n.think()
def act(self) -> None:
# Consumir ações agregadas e executar
action = self.bus_consume("actions")
if action and action.get("action") == "echo":
res = self.motor.run("echo", [action.get("text", "")])
self.memory.remember("act.echo", res)
if res.get("stdout"):
self.console.print(f"[green]{res['stdout'].strip()}[/]")
def rest(self):
# Heartbeat e microalertas antes de dormir
if HEARTBEAT_ENABLED:
self._heartbeat_and_telemetry()
# Pausa regulada
sleep(self.tick)
# Contador e rotinas periódicas
self._tick_count += 1
if self._tick_count % NEUROTRON_DIAG_EVERY_TICKS == 0:
#state, _ = self.diagnostic.run_exam()
#self._apply_homeostasis(state)
pass
if self._tick_count % TELEMETRY_FLUSH_EVERY_TICKS == 0:
self._flush_telemetry()
# ——— telemetria/alertas ———
def _heartbeat_and_telemetry(self): def _heartbeat_and_telemetry(self):
snap = self.perception.snapshot() snap = self.perception.snapshot()
cpu = snap.get("cpu_percent", "?")
mem = (snap.get("mem") or {}).get("percent", "?")
load = snap.get("loadavg") or []
# Adiciona ao buffer de telemetria cpu = snap.get("cpu_percent")
mem = snap.get("mem_percent")
load = snap.get("loadavg")
load1 = load[0] if isinstance(load, list) and load else None
# Salva telemetria curta
self.telemetry.append({ self.telemetry.append({
"ts": time.time(), "ts": time.time(),
"cpu": cpu, "cpu": cpu,
@ -154,23 +192,13 @@ class Cortex:
"tick": self.tick, "tick": self.tick,
}) })
# Microalertas com base nos limiares self._evaluate_microalerts(cpu, mem, load1)
self._evaluate_microalerts(cpu, mem, load)
# Heartbeat visual # Heartbeat → logbus
color = self._color_for_levels(cpu, mem, load) logbus.emit(f"💓 cpu={cpu}% mem={mem}% tick={self.tick:.2f}s")
if HEARTBEAT_STYLE == "compact":
self.console.print(f"[bold {color}]💓[/] CPU: {cpu}% | MEM: {mem}% | TICK: {self.tick:.2f}s")
else:
self.console.print(
f"[bold {color}]💓 [Heartbeat][/bold {color}] "
f"CPU: {cpu}% | MEM: {mem}% | LOAD: {load} | TICK: {self.tick:.2f}s | MODE: {self.mode}"
)
def _evaluate_microalerts(self, cpu, mem, load): def _evaluate_microalerts(self, cpu, mem, load1):
alerts = [] alerts = []
# Normaliza
load1 = load[0] if (isinstance(load, (list, tuple)) and load) else None
try: try:
if isinstance(cpu, (int, float)) and cpu >= NEUROTRON_THRESHOLDS["cpu_high"]: if isinstance(cpu, (int, float)) and cpu >= NEUROTRON_THRESHOLDS["cpu_high"]:
@ -179,15 +207,15 @@ class Cortex:
alerts.append(("mem", mem)) alerts.append(("mem", mem))
if isinstance(load1, (int, float)) and load1 >= NEUROTRON_THRESHOLDS["load1_high"]: if isinstance(load1, (int, float)) and load1 >= NEUROTRON_THRESHOLDS["load1_high"]:
alerts.append(("load1", load1)) alerts.append(("load1", load1))
except KeyError: except Exception:
pass # thresholds incompletos → sem microalertas return
if not alerts: if not alerts:
return return
for (metric, value) in alerts: for metric, value in alerts:
self.console.print(f"[yellow]⚠️ Microalerta:[/] {metric.upper()} {value} — ajustando homeostase (tick +{NEUROTRON_TICK_STEP:.2f}s)") logbus.emit(f"⚠️ microalerta: {metric}={value} — ajustando tick")
# Ajuste simples de segurança
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP) self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP)
self.memory.remember("microalert", { self.memory.remember("microalert", {
@ -196,36 +224,28 @@ class Cortex:
"new_tick": self.tick, "new_tick": self.tick,
}) })
def _color_for_levels(self, cpu, mem, load): # =================================================================
# Heurística simples de cor # TELEMETRIA LONGA
try: # =================================================================
load1 = load[0] if (isinstance(load, (list, tuple)) and load) else 0.0
high = (
(isinstance(cpu, (int, float)) and cpu >= NEUROTRON_THRESHOLDS["cpu_high"]) or
(isinstance(mem, (int, float)) and mem >= NEUROTRON_THRESHOLDS["mem_high"]) or
(isinstance(load1, (int, float)) and load1 >= NEUROTRON_THRESHOLDS["load1_high"])
)
if high:
return "yellow"
except Exception:
pass
return "green"
def _flush_telemetry(self): def _flush_telemetry(self):
# Grava o buffer de telemetria em JSON (mantendo histórico curto)
try: try:
data = list(self.telemetry) data = list(self.telemetry)
with self.telemetry_path.open("w") as f: self.telemetry_path.write_text(json.dumps(data))
json.dump(data, f) self.memory.remember("telemetry.flush", {
self.memory.remember("telemetry.flush", {"count": len(data), "path": str(self.telemetry_path)}) "count": len(data),
"path": str(self.telemetry_path)
})
except Exception as e: except Exception as e:
self.console.print(f"[red]✖ Falha ao gravar telemetria:[/] {e!r}") logbus.emit(f"[telemetry.error] {e}")
self.memory.remember("telemetry.error", {"error": repr(e)})
# ——— bus ——— # =================================================================
def bus_publish(self, channel: str, payload: dict) -> None: # BUS
# =================================================================
def bus_publish(self, channel, payload):
self.bus[channel].append(payload) self.bus[channel].append(payload)
def bus_consume(self, channel: str) -> dict | None: def bus_consume(self, channel):
q = self.bus[channel] q = self.bus[channel]
return q.popleft() if q else None return q.popleft() if q else None

View File

@ -3,7 +3,7 @@ from datetime import datetime
try: try:
import orjson as json import orjson as json
except Exception: # fallback leve except Exception:
import json # type: ignore import json # type: ignore
class Hippocampus: class Hippocampus:
@ -11,9 +11,11 @@ class Hippocampus:
Memória contextual simples (JSON Lines): append-only. Memória contextual simples (JSON Lines): append-only.
Guarda perceções, decisões e ações para replays futuros. Guarda perceções, decisões e ações para replays futuros.
""" """
def __init__(self, log_dir: Path): def __init__(self, log_dir: Path):
self.log_dir = log_dir self.log_dir = log_dir
self.events_file = log_dir / "events.jsonl" self.log_dir.mkdir(parents=True, exist_ok=True)
self.events_file = self.log_dir / "events.jsonl"
def remember(self, kind: str, data: dict) -> None: def remember(self, kind: str, data: dict) -> None:
rec = { rec = {
@ -21,14 +23,17 @@ class Hippocampus:
"kind": kind, "kind": kind,
"data": data, "data": data,
} }
try: try:
if "orjson" in json.__name__: # Compatível com orjson e stdlib json
blob = json.dumps(rec) blob = json.dumps(rec)
else: if isinstance(blob, str):
blob = json.dumps(rec) # type: ignore blob = blob.encode("utf-8")
with self.events_file.open("ab") as f: with self.events_file.open("ab") as f:
f.write(blob if isinstance(blob, bytes) else blob.encode("utf-8")) f.write(blob)
f.write(b"\n") f.write(b"\n")
except Exception: except Exception:
# evitar crash por IO em early boot # Evitar crash em early boot ou IO quebrado
pass pass

View File

@ -0,0 +1,83 @@
"""
Neurotron LogBus Sistema Centralizado de Logging
==================================================
Objetivo:
- Centralizar *toda* a saída de logs do Neurotron
- Funcionar tanto no dashboard curses quanto em modo headless
- Manter logs persistentes quando o hipocampo físico está montado
- Fornecer uma API simples:
from neurotron.logbus import log
log("mensagem")
Componentes:
- LogBus: roteador de mensagens (fila stdout ficheiro)
- log(): função global simplificada
"""
from __future__ import annotations
import json
import time
from queue import Queue
from pathlib import Path
class LogBus:
def __init__(self):
self.queue: Queue | None = None # usado pelo dashboard
self.stdout_enabled = True # imprime em modo headless
self.persist_enabled = False # grava em disco
self.persist_file: Path | None = None
# ---------------------------------------------------------
# Inicialização de destinos
# ---------------------------------------------------------
def enable_dashboard(self, q: Queue):
"""Liga o LogBus ao dashboard (curses)."""
self.queue = q
def enable_persist(self, path: Path):
"""Ativa salvamento permanente da stream de logs."""
self.persist_file = path
self.persist_enabled = True
path.parent.mkdir(parents=True, exist_ok=True)
if not path.exists():
path.write_text("") # cria ficheiro vazio
# ---------------------------------------------------------
# Emissão centralizada
# ---------------------------------------------------------
def log(self, message: str):
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] {message}"
# 1) Dashboard (queue)
if self.queue:
try:
self.queue.put(line)
except Exception:
pass
# 2) STDOUT
if self.stdout_enabled:
try:
print(line)
except Exception:
pass
# 3) Persistência
if self.persist_enabled and self.persist_file:
try:
with open(self.persist_file, "a") as f:
f.write(json.dumps({"ts": ts, "msg": message}) + "\n")
except Exception:
pass
# Instância global — usada em todo o Neurotron
logbus = LogBus()
def log(msg: str):
"""Função simples: apenas delega ao LogBus global."""
logbus.log(msg)

View File

@ -1,27 +1,77 @@
import subprocess import subprocess
from neurotron.logbus import logbus
class Motor: class Motor:
""" """
Ator do sistema: executa comandos controlados (whitelist). Subsistema 'ator' do Neurotron.
Mantém-se minimal até termos política de segurança mais rica.
Executa comandos controlados (whitelist rígida),
sem side-effects e com logging centralizado.
FUTURO:
- níveis de permissão
- sandboxes
- ações abstratas (ex: mover, sinalizar, emitir evento)
""" """
SAFE_CMDS = { SAFE_CMDS = {
"echo": ["echo"], "echo": ["echo"],
"sh": ["/bin/sh"], # shell interativo (init) "sh": ["/bin/sh"], # shell básico — útil para debug interno
} }
def __init__(self):
pass
def run(self, cmd: str, args: list[str] | None = None) -> dict: def run(self, cmd: str, args: list[str] | None = None) -> dict:
prog = self.SAFE_CMDS.get(cmd) """
if not prog: Executa um comando permitido e retorna:
return {"ok": False, "error": f"cmd '{cmd}' não permitido"} {
try: "ok": bool,
full = prog + (args or []) "code": int | None,
res = subprocess.run(full, capture_output=True, text=True) "stdout": str,
return { "stderr": str,
"ok": res.returncode == 0,
"code": res.returncode,
"stdout": res.stdout,
"stderr": res.stderr,
} }
"""
if cmd not in self.SAFE_CMDS:
logbus.emit(f"[motor.warn] comando bloqueado: '{cmd}'")
return {
"ok": False,
"code": None,
"stdout": "",
"stderr": f"cmd '{cmd}' não permitido",
}
try:
full = self.SAFE_CMDS[cmd] + (args or [])
res = subprocess.run(
full,
capture_output=True,
text=True,
env={}, # ambiente neutro → evita leaks
)
ok = (res.returncode == 0)
if not ok:
logbus.emit(
f"[motor.warn] '{cmd}' retornou código {res.returncode}"
)
return {
"ok": ok,
"code": res.returncode,
"stdout": res.stdout or "",
"stderr": res.stderr or "",
}
except Exception as e: except Exception as e:
return {"ok": False, "error": str(e)} logbus.emit(f"[motor.error] exceção ao executar '{cmd}': {e}")
return {
"ok": False,
"code": None,
"stdout": "",
"stderr": str(e),
}

View File

@ -1,30 +1,58 @@
from typing import Any, Dict from typing import Any, Dict, Optional
from neurotron.logbus import logbus
class Neuron: class Neuron:
""" """
Classe-base de um neurónio-agente. Classe-base de um neurónio-agente.
Cada neurónio pode observar/agir e trocar mensagens via o bus do Cortex.
Cada neurónio participa no ciclo cognitivo do Cortex:
observe() think() act()
Pode comunicar via:
- publish(channel, payload)
- consume(channel)
Implementações devem respeitar esta interface sem causar efeitos colaterais
externos (sem prints, sem IO não controlado).
""" """
name = "Neuron"
name: str = "Neuron"
def __init__(self, ctx: "Cortex"): def __init__(self, ctx: "Cortex"):
self.ctx = ctx self.ctx = ctx
# ----------------------------------------------------------------------
# Ciclo cognitivo — para override por neurónios concretos
# ----------------------------------------------------------------------
def observe(self) -> None: def observe(self) -> None:
"""Ler estado do mundo (sensores, /proc, eventos).""" """Ler estado do mundo (sensores, perceções, eventos internos)."""
return return
def think(self) -> None: def think(self) -> None:
"""Processar/planejar usando o estado atual.""" """Processar informação, planear, atualizar estado interno."""
return return
def act(self) -> None: def act(self) -> None:
"""Executar uma ação (opcional).""" """Executar uma ação, enviar comandos, publicar eventos."""
return return
# Utilitários # ----------------------------------------------------------------------
def publish(self, channel: str, payload: Dict[str, Any]) -> None: # Comunicação inter-neurónios via Cortex Bus
self.ctx.bus_publish(channel, payload) # ----------------------------------------------------------------------
def consume(self, channel: str) -> Dict[str, Any] | None: def publish(self, channel: str, payload: Dict[str, Any]) -> None:
return self.ctx.bus_consume(channel) """Publica mensagem num canal do bus cognitivo."""
try:
self.ctx.bus_publish(channel, payload)
except Exception as e:
logbus.emit(f"[warn] {self.name}.publish falhou: {e}")
def consume(self, channel: str) -> Optional[Dict[str, Any]]:
"""Lê (e remove) última mensagem disponível num canal."""
try:
return self.ctx.bus_consume(channel)
except Exception as e:
logbus.emit(f"[warn] {self.name}.consume falhou: {e}")
return None

View File

@ -1,42 +1,37 @@
""" """
🧠 neurotron_config.py 🧠 neurotron_config.py
NFDOS Núcleo de parâmetros vitais do Neurotron NFDOS Núcleo de parâmetros vitais estáticos do Neurotron
------------------------------------------------
Nova versão para o layout: Este módulo define apenas:
.../neurotron/ - parâmetros cognitivos estáticos
src/ - thresholds
data/ - limites
- caminhos locais relativos ao package (ex: DATA_DIR interno)
Ele NÃO define:
- diretórios de runtime (/var/neurotron, /tmp)
- diretórios de logs dinâmicos
- configurações voláteis
Essas pertencem ao runtime e são avaliadas no __main__.
""" """
from pathlib import Path from pathlib import Path
# ====================================== # ============================================================================
# 🌐 Diretórios e Caminhos # 📁 Diretórios internos do package (não confundir com runtime!)
# ====================================== # ============================================================================
# Diretório deste ficheiro → .../neurotron/src/neurotron_config.py
THIS_FILE = Path(__file__).resolve() THIS_FILE = Path(__file__).resolve()
SRC_DIR = THIS_FILE.parent # .../neurotron/src SRC_DIR = THIS_FILE.parent # .../neurotron/src/neurotron
BASE_DIR = SRC_DIR.parent # .../neurotron/ BASE_DIR = SRC_DIR.parent # .../neurotron/src
# Onde vivem as configs/logs da “instalação” # Diretórios que fazem parte da instalação do package
DATA_DIR = BASE_DIR / "data" DATA_DIR = BASE_DIR / "data"
CONFIG_DIR = DATA_DIR / "configs" CONFIG_DIR = DATA_DIR / "configs"
LOG_DIR = DATA_DIR / "logs" PACKAGE_LOG_DIR = DATA_DIR / "logs" # logs do próprio package (não runtime!)
# Modo persistente do NFDOS (quando /var/neurotron está montado) # ============================================================================
RUNTIME_DIR = Path("/var/run/neurotron") # ⚙️ Parâmetros Cognitivos
MOUNT_POINT = "/var/neurotron" # ============================================================================
# Candidatos para disco persistente do hipocampo
DISK_CANDIDATES = [
"/dev/vda", "/dev/vdb",
"/dev/sda", "/dev/hda"
]
# ======================================
# ⚙️ Parâmetros Cognitivos Principais
# ======================================
NEUROTRON_TICK = 1.0 NEUROTRON_TICK = 1.0
NEUROTRON_VERBOSITY = 1 NEUROTRON_VERBOSITY = 1
@ -59,9 +54,9 @@ NEUROTRON_TICK_STEP = 0.25
NEUROTRON_SEED = 42 NEUROTRON_SEED = 42
NEUROTRON_MEMORY_SIZE = 256 # KB NEUROTRON_MEMORY_SIZE = 256 # KB
# ====================================== # ============================================================================
# 🧩 Parâmetros de Subsistemas # 🔌 Subsistemas
# ====================================== # ============================================================================
CORTEX_MAX_THREADS = 1 CORTEX_MAX_THREADS = 1
CORTEX_LOOP_DELAY = 0.1 CORTEX_LOOP_DELAY = 0.1
@ -76,14 +71,13 @@ PERCEPTION_CPU_SOURCE = "/proc/stat"
PERCEPTION_MEM_SOURCE = "/proc/meminfo" PERCEPTION_MEM_SOURCE = "/proc/meminfo"
PERCEPTION_UPDATE_INTERVAL = 2.0 PERCEPTION_UPDATE_INTERVAL = 2.0
# ====================================== # ============================================================================
# 🧠 Parâmetros futuros # 📊 Telemetria e diagnóstico
# ====================================== # ============================================================================
NEUROTRON_EXPANSION_MODE = "none" NEUROTRON_EXPANSION_MODE = "none"
NEUROTRON_DATASET_PATH = DATA_DIR NEUROTRON_DATASET_PATH = DATA_DIR
NEUROTRON_HISTORY_KEEP = 8 NEUROTRON_HISTORY_KEEP = 8
NEUROTRON_DIAG_SCHEMA = "v4" NEUROTRON_DIAG_SCHEMA = "v4"
HEARTBEAT_ENABLED = True HEARTBEAT_ENABLED = True
@ -97,22 +91,3 @@ NEUROTRON_THRESHOLDS = {
TELEMETRY_MAXLEN = 64 TELEMETRY_MAXLEN = 64
TELEMETRY_FLUSH_EVERY_TICKS = 5 TELEMETRY_FLUSH_EVERY_TICKS = 5
# ======================================
# 🧭 Utilitário
# ======================================
def show_config():
"""Mostra a configuração atual do Neurotron (apenas NEUROTRON_*)"""
import json
cfg = {
k: v
for k, v in globals().items()
if k.startswith("NEUROTRON_")
}
print(json.dumps(cfg, indent=2, default=str))
if __name__ == "__main__":
show_config()

View File

@ -1,52 +1,75 @@
import os import os
from time import sleep from time import sleep
from neurotron.logbus import logbus
class Perception: class Perception:
""" """
Sensores internos via /proc: Sensores internos via /proc:
- CPU % calculado por delta de /proc/stat - CPU % por delta de /proc/stat
- Memória % via /proc/meminfo - Memória % via /proc/meminfo
- Carga média via /proc/loadavg - Carga média /proc/loadavg ou os.getloadavg()
Sem dependências externas (psutil). Totalmente silencioso (sem prints).
""" """
# ----------------------------------------------------------------------
# CPU
# ----------------------------------------------------------------------
def _read_proc_stat(self): def _read_proc_stat(self):
"""Lê a linha 'cpu ' de /proc/stat. Retorna dict ou None."""
try: try:
with open("/proc/stat", "r") as f: with open("/proc/stat", "r") as f:
line = f.readline() line = f.readline()
if not line.startswith("cpu "): if not line.startswith("cpu "):
return None return None
parts = line.strip().split()[1:] parts = line.strip().split()[1:]
vals = list(map(int, parts[:10])) # user nice system idle iowait irq softirq steal guest guest_nice vals = list(map(int, parts[:10]))
return { return {
"user": vals[0], "nice": vals[1], "system": vals[2], "idle": vals[3], "user": vals[0], "nice": vals[1], "system": vals[2], "idle": vals[3],
"iowait": vals[4], "irq": vals[5], "softirq": vals[6], "steal": vals[7], "iowait": vals[4], "irq": vals[5], "softirq": vals[6], "steal": vals[7],
"guest": vals[8], "guest_nice": vals[9], "guest": vals[8], "guest_nice": vals[9],
} }
except Exception: except Exception:
return None return None
def _cpu_percent(self, interval=0.05): def _cpu_percent(self, interval=0.05):
"""Computa CPU% entre duas leituras."""
a = self._read_proc_stat() a = self._read_proc_stat()
if not a: if not a:
return "?" return "?"
sleep(interval) # micro-janelinha
sleep(interval)
b = self._read_proc_stat() b = self._read_proc_stat()
if not b: if not b:
return "?" return "?"
# Totais
idle_a = a["idle"] + a["iowait"] idle_a = a["idle"] + a["iowait"]
idle_b = b["idle"] + b["iowait"] idle_b = b["idle"] + b["iowait"]
non_a = sum(a.values()) - idle_a non_a = sum(a.values()) - idle_a
non_b = sum(b.values()) - idle_b non_b = sum(b.values()) - idle_b
total_a = idle_a + non_a total_a = idle_a + non_a
total_b = idle_b + non_b total_b = idle_b + non_b
totald = total_b - total_a totald = total_b - total_a
idled = idle_b - idle_a idled = idle_b - idle_a
if totald <= 0: if totald <= 0:
return "?" return "?"
usage = (totald - idled) * 100.0 / totald usage = (totald - idled) * 100.0 / totald
return round(usage, 1) return round(usage, 1)
# ----------------------------------------------------------------------
# Memória
# ----------------------------------------------------------------------
def _mem_percent(self): def _mem_percent(self):
try: try:
info = {} info = {}
@ -54,36 +77,73 @@ class Perception:
for line in f: for line in f:
k, v = line.split(":", 1) k, v = line.split(":", 1)
info[k.strip()] = v.strip() info[k.strip()] = v.strip()
def kB(key): def kB(key):
if key not in info: return None return float(info[key].split()[0]) if key in info else None
return float(info[key].split()[0]) # kB
mem_total = kB("MemTotal") mem_total = kB("MemTotal")
mem_avail = kB("MemAvailable") mem_avail = kB("MemAvailable")
if not mem_total or mem_avail is None:
if mem_total is None or mem_avail is None:
return "?" return "?"
used = mem_total - mem_avail used = mem_total - mem_avail
return round(used * 100.0 / mem_total, 1) return round(used * 100.0 / mem_total, 1)
except Exception: except Exception:
return "?" return "?"
# ----------------------------------------------------------------------
# Loadavg
# ----------------------------------------------------------------------
def _loadavg(self): def _loadavg(self):
try: try:
if hasattr(os, "getloadavg"): if hasattr(os, "getloadavg"):
l1, l5, l15 = os.getloadavg() l1, l5, l15 = os.getloadavg()
return [round(l1, 2), round(l5, 2), round(l15, 2)] return [round(l1, 2), round(l5, 2), round(l15, 2)]
with open("/proc/loadavg", "r") as f: with open("/proc/loadavg", "r") as f:
parts = f.read().strip().split() parts = f.read().strip().split()
l1, l5, l15 = map(float, parts[:3]) l1, l5, l15 = map(float, parts[:3])
return [round(l1, 2), round(l5, 2), round(l15, 2)] return [round(l1, 2), round(l5, 2), round(l15, 2)]
except Exception: except Exception:
return ["?", "?", "?"] return ["?", "?", "?"]
def snapshot(self) -> dict: # ----------------------------------------------------------------------
return { # Snapshot principal
"env_user": os.environ.get("USER") or "root", # ----------------------------------------------------------------------
"env_term": os.environ.get("TERM") or "unknown",
"cpu_percent": self._cpu_percent(), def snapshot(self) -> dict:
"mem_percent": self._mem_percent(), """
"loadavg": self._loadavg(), Retorna snapshot interno:
} {
"env_user": "...",
"env_term": "...",
"cpu_percent": n | "?",
"mem_percent": n | "?",
"loadavg": [l1, l5, l15]
}
Sempre seguro. Nunca lança exceção.
"""
try:
return {
"env_user": os.environ.get("USER") or "root",
"env_term": os.environ.get("TERM") or "unknown",
"cpu_percent": self._cpu_percent(),
"mem_percent": self._mem_percent(),
"loadavg": self._loadavg(),
}
except Exception as e:
# fallback extremo — nunca quebrar o Neurotron
logbus.emit(f"[warn] Perception.snapshot falhou: {e}")
return {
"env_user": "unknown",
"env_term": "unknown",
"cpu_percent": "?",
"mem_percent": "?",
"loadavg": ["?", "?", "?"],
}

View File

@ -537,10 +537,8 @@ def run():
else: else:
console.print("[green]✔ Disco persistente já existe.[/green]") console.print("[green]✔ Disco persistente já existe.[/green]")
time.sleep(5) time.sleep(5)
# Testar no QEMU # Testar no QEMU
bz_image = linux_dir / "arch" / "x86" / "boot" / "bzImage" bz_image = linux_dir / "arch" / "x86" / "boot" / "bzImage"
data_disk = nfdos_dir / "nfdos_data.img" data_disk = nfdos_dir / "nfdos_data.img"
@ -548,32 +546,12 @@ def run():
if bz_image.exists(): if bz_image.exists():
console.print("\n[bold blue]Iniciando QEMU (modo kernel direto)...[/bold blue]") console.print("\n[bold blue]Iniciando QEMU (modo kernel direto)...[/bold blue]")
# 🔒 Pergunta ao utilizador se deve permitir formatação automática do disco
console.print(
"\n[yellow]O Neurotron detetará o disco físico e poderá formatá-lo se estiver vazio.[/yellow]"
)
console.print(
"[bright_yellow]Deseja permitir formatação automática do disco?[/bright_yellow] "
"([green]y[/green]/[red]N[/red]): ",
end=""
)
choice = input().strip().lower()
allow_format = choice in ("y", "yes", "sim", "s")
# 🧠 Monta a linha base do QEMU # 🧠 Monta a linha base do QEMU
kernel_params = ( kernel_params = (
"console=ttyS0 earlyprintk=serial,ttyS0,115200 " "console=ttyS0 earlyprintk=serial,ttyS0,115200 "
"keep_bootcon loglevel=8" "keep_bootcon loglevel=8"
) )
# 🧩 Injeta flag de formatação se permitido
if allow_format:
kernel_params += " nfdos_force_format=1"
console.print("[green]✔ Formatação automática permitida neste boot.[/green]")
else:
console.print("[cyan] Formatação automática desativada (modo seguro).[/cyan]")
qemu_cmd = ( qemu_cmd = (
f"qemu-system-x86_64 " f"qemu-system-x86_64 "
f"-machine q35,accel=kvm " f"-machine q35,accel=kvm "