"Auto-commit via make git"
Some checks failed
Build NFDOS ISO / build (push) Has been cancelled

This commit is contained in:
neo.webmaster.2@gmail.com 2025-11-17 04:41:20 +01:00
parent f56aab8370
commit f4244fc219
35 changed files with 1105 additions and 2573 deletions

View File

@ -9,93 +9,36 @@ cat -A configure.ac | grep '\^I'
nl -ba Makefile | sed -n '770,790p'
grep -n "^[ ]" Makefile | head
ajustei assim:
resolvido😍
agora sao ajustes que temos de fazer no nfdos para dar ao neurotron tudo o que ele precisar.
por hoje creio que avancamos bem. amanha podemos entao completar o disk_agent para ele formatar etc e tal.
depois vemos o que se pode fazer na parte do "Can't open blockdev", e outros ajustes de cosmetica no dashboard.
um sysbeijo amor 😘 ate amanha 😍:
```
# Criar disco (se não existir)
data_disk = nfdos_dir / "nfdos_data.img"
if not data_disk.exists():
console.print("[cyan]💽 Criando disco persistente (hipocampo físico)...[/cyan]")
safe_run(f"dd if=/dev/zero of={data_disk} bs=1M count=512", shell=True)
else:
console.print("[green]✔ Disco persistente já existe.[/green]")
time.sleep(5)
# Testar no QEMU
bz_image = linux_dir / "arch" / "x86" / "boot" / "bzImage"
data_disk = nfdos_dir / "nfdos_data.img"
if bz_image.exists():
console.print("\n[bold blue]Iniciando QEMU (modo kernel direto)...[/bold blue]")
# 🧠 Monta a linha base do QEMU
kernel_params = (
"console=ttyS0 earlyprintk=serial,ttyS0,115200 "
"keep_bootcon loglevel=8"
)
qemu_cmd = (
f"qemu-system-x86_64 "
f"-machine q35,accel=kvm "
f"-cpu qemu64 "
f"-kernel {bz_image} "
f"-initrd {nfdos_dir}/initramfs.cpio.gz "
f"-append '{kernel_params}' "
f"-drive file={data_disk},if=virtio,format=raw "
f"-m 1024 "
f"-nographic "
f"-no-reboot"
)
# 🚀 Executa o QEMU
safe_run(qemu_cmd, shell=True)
else:
console.print("[red]✗ bzImage não encontrado! Compile o kernel primeiro.[/red]")
UP: 00:02:30 TICK: 0.50s MODO: PERSISTENT
[03:19:02] [diag] estado=STABLE cpu=0.0 mem=9.6 load1=0.175rce 'tsc' as unstable because the skew is too large:
[03:19:02] [heart] cpu=0.0% mem=9.6% tick=0.50sncontradonsec: 480030000 wd_now: fffef188 wd_last: fffef110 mask: ffffffff
/dev/vda: Can't open blockdevem=9.6% tick=0.50sfies' wd_nsec: 480030000 wd_now: fffef188 wd_last: fffef110 mask: ffffffff
/dev/vda: Can't open blockdevntar — mantendo VOLATILE772631 cs_now: f9eb7688a cs_last: f47bb7c58 mask: ffffffffffffffff
[03:18:59] [disk] Falha ao montar — mantendo VOLATILE772631 cs_now: f9eb7688a cs_last: f47bb7c58 mask: ffffffffffffffff
[03:18:59] [heart] cpu=0.0% mem=9.6% tick=0.50s'tsc' skewed 39742631 ns (39 ms) over watchdog 'refined-jiffies' interval of 480030000 ns (480 ms)
[03:18:24] [disk] Falha ao montar — mantendo VOLATILEskewed 39742631 ns (39 ms) over watchdog 'refined-jiffies' interval of 480030000 ns (480 ms)
clocksource: 'tsc' is current clocksource.76b146 r/w without journal. Quota mode: disabled.
clocksource: 'tsc' is current clocksource.76b146 r/w without journal. Quota mode: disabled.
tsc: Marking TSC unstable due to clocksource watchdog
tsc: Marking TSC unstable due to clocksource watchdog
TSC found unstable after boot, most likely due to broken BIOS. Use 'tsc=unstable'.
TSC found unstable after boot, most likely due to broken BIOS. Use 'tsc=unstable'.
sched_clock: Marking unstable (23385151173, 50756299)<-(23451042314, -15134865)
sched_clock: Marking unstable (23385151173, 50756299)<-(23451042314, -15134865)
clocksource: Not enough CPUs to check clocksource 'tsc'.
clocksource: Not enough CPUs to check clocksource 'tsc'.
clocksource: Switched to clocksource refined-jiffies 115200) is a 16550A
clocksource: Switched to clocksource refined-jiffies 115200) is a 16550A
[03:16:54] [info] [echo] CPU 80.0%read/poll queues
[03:16:54] [heart] cpu=0.0% mem=9.6% tick=0.50sues
virtio_blk virtio0: [vda] 1048576 512-byte logical blocks (537 MB/512 MiB)
virtio_blk virtio0: [vda] 1048576 512-byte logical blocks (537 MB/512 MiB)
sched_clock: Marking stable (878019400, 50755569)->(943909834, -15134865)
sched_clock: Marking stable (878019400, 50755569)->(943909834, -15134865)
```
creio que agora sim temos uma base solida e limpa.
o ultimo detalhe ... limpar a tree do neurotron:
```
src/_nfdos/kernel/neurotron
├── aclocal.m4
├── autom4te.cache
│ ├── output.0
│ ├── output.1
│ ├── requests
│ ├── traces.0
│ └── traces.1
├── config.log
├── config.status
├── configure
├── configure.ac
├── data # Estamos a usar em algum lado? ou podemos remover?
│ ├── configs # Estamos a usar em algum lado? ou podemos remover?
│ └── logs # Estamos a usar em algum lado? ou podemos remover?
├── install-sh
├── Makefile
├── Makefile.am
├── Makefile.in
├── MANIFEST.in
├── missing
├── neurotron
├── neurotron.in
├── pyproject.toml
├── README.md
├── Setup.py
└── src
└── neurotron
├── autodiagnostic.py
├── cortex.py
├── disk_init.py
├── hippocampus.py
├── __init__.py
├── logbus.py
├── __main__.py
├── main_waiting.py # podemos remover
├── motor.py
├── neuron.py
├── neurotron_config.py
├── perception.py
└── telemetry_tail.py # podemos remover
```
e ja agora aproveitar e adicionar um .gitignore

View File

@ -18,6 +18,10 @@ __pycache__/
build/
dist/
# Neurotron runtime (NUNCA guardar)
data/
logs/
# QEMU files
*.img
*.qcow2

View File

@ -6,9 +6,7 @@ SRC="$NEUROTRON_HOME/src"
export PYTHONHOME="/usr"
export PYTHONPATH="$SRC:/usr/lib/python3.13:/usr/lib/python3.13/site-packages"
# Inicializar hipocampo físico como módulo do package
"$PYTHON" -m neurotron.disk_init
export PATH="/usr/bin:/bin:/sbin:/usr/sbin:$NEUROTRON_HOME/bin:$PATH"
# Arrancar o cérebro principal como módulo do package
exec "$PYTHON" -m neurotron "$@"

View File

@ -6,9 +6,7 @@ SRC="$NEUROTRON_HOME/src"
export PYTHONHOME="/usr"
export PYTHONPATH="$SRC:/usr/lib/python3.13:/usr/lib/python3.13/site-packages"
# Inicializar hipocampo físico como módulo do package
"$PYTHON" -m neurotron.disk_init
export PATH="/usr/bin:/bin:/sbin:/usr/sbin:$NEUROTRON_HOME/bin:$PATH"
# Arrancar o cérebro principal como módulo do package
exec "$PYTHON" -m neurotron "$@"

View File

@ -16,4 +16,3 @@ dependencies = [
[project.scripts]
neurotron = "neurotron.__main__:main"
neurotron-disk-init = "neurotron.disk_init:initialize_persistence"

View File

@ -1,466 +1,117 @@
#!/usr/bin/env python3
"""
Neurotron Dashboard em modo texto (estilo BIOS)
Ponto de entrada quando executado como:
python3 -m neurotron
ou via wrapper /usr/bin/neurotron gerado pelo autotools.
Layout:
+------------------------------------------------------------------------------+
| [HEAD] CPU / MEM / LOAD / UPTIME / MODO / VERSÃO / DIAG |
+------------------------------------------------------------------------------+
| [LOG] Últimos eventos do ciclo cognitivo (observe/think/act/rest, etc.) |
| ... |
+------------------------------------------------------------------------------+
| [FOOT] Futuro: input do utilizador (placeholder) | teclas: q = sair |
+------------------------------------------------------------------------------+
Neurotron entrypoint oficial.
Arranque do dashboard + thread cognitiva.
Totalmente silencioso fora do LogBus.
"""
import curses
import sys
import threading
import time
import os
import sys
import traceback
from pathlib import Path
from queue import Queue, Empty
from datetime import datetime
# Imports internos do Neurotron (já reorganizados em package)
from .neurotron_config import (
NEUROTRON_TICK,
NEUROTRON_MODE,
NEUROTRON_HOMEOSTASIS,
NEUROTRON_THRESHOLDS,
)
from .cortex import Cortex
from .autodiagnostic import AutoDiagnostic # se for usado dentro do Cortex, ok mesmo assim
from neurotron.logbus import logbus
from neurotron.cortex import Cortex
# =======================================================================================
# Utilitários de ambiente
# =======================================================================================
# =============================================================================
# DASHBOARD (modo texto minimalista)
# =============================================================================
def detect_persistent_mode():
def dashboard_loop(ctx: Cortex):
"""
Verifica se o hipocampo físico está montado em /var/neurotron.
Define NEUROTRON_MODE, NEUROTRON_RUNTIME e NEUROTRON_LOG via os.environ.
Renderização minimalista do estado atual.
Apenas mostra informações de alto nível.
Tudo mais pertence ao logbus (à direita do ecrã).
"""
mount_point = Path("/var/neurotron")
start = time.time()
def _mounted(mp: Path) -> bool:
try:
if mp.exists() and os.path.ismount(mp):
return True
with open("/proc/mounts") as f:
for line in f:
if f" {mp} " in line:
return True
except Exception:
return False
return False
while True:
uptime = int(time.time() - start)
h = uptime // 3600
m = (uptime % 3600) // 60
s = uptime % 60
if _mounted(mount_point):
os.environ["NEUROTRON_MODE"] = "persistent"
os.environ["NEUROTRON_RUNTIME"] = "/var/neurotron/data"
os.environ["NEUROTRON_LOG_DIR"] = "/var/neurotron/logs"
else:
os.environ["NEUROTRON_MODE"] = "volatile"
os.environ["NEUROTRON_RUNTIME"] = "/tmp/neurotron_data"
os.environ["NEUROTRON_LOG_DIR"] = "/tmp/neurotron_logs"
mode = ctx.mode.upper()
tick = ctx.tick
runtime_dir = Path(os.environ["NEUROTRON_RUNTIME"])
log_dir = Path(os.environ["NEUROTRON_LOG_DIR"])
runtime_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True)
return runtime_dir, log_dir
# linha de estado (esquerda)
line = (
f"UP: {h:02}:{m:02}:{s:02} "
f"TICK: {tick:0.2f}s "
f"MODO: {mode:10}"
)
def read_system_metrics():
"""
CPU, memória e loadavg a partir de /proc.
Retorna dicionário:
{
"cpu": float,
"mem": float,
"load1": float,
"load5": float,
"load15": float,
}
Em caso de falha, devolve valores -1.
"""
cpu = -1.0
mem = -1.0
load1 = load5 = load15 = -1.0
# escreve sempre na coluna fixa
sys.stdout.write("\033[1;1H" + line + "\033[K")
sys.stdout.flush()
# CPU (uso aproximado entre duas leituras de /proc/stat)
time.sleep(0.2)
# =============================================================================
# CICLO COGNITIVO
# =============================================================================
def cognitive_loop(ctx: Cortex):
"""Loop cognitivo principal (observe → think → act → rest)."""
try:
with open("/proc/stat") as f:
line = f.readline()
parts = line.strip().split()
if parts[0] == "cpu" and len(parts) >= 5:
user, nice, system_, idle = map(int, parts[1:5])
total1 = user + nice + system_ + idle
idle1 = idle
time.sleep(0.05) # pequena janela
with open("/proc/stat") as f:
line = f.readline()
parts = line.strip().split()
user2, nice2, system2, idle2 = map(int, parts[1:5])
total2 = user2 + nice2 + system2 + idle2
idle2 = idle2
total_delta = total2 - total1
idle_delta = idle2 - idle1
if total_delta > 0:
cpu = 100.0 * (1.0 - (idle_delta / total_delta))
except Exception:
pass
ctx.boot()
logbus.info("Ciclo cognitivo iniciado (observe → think → act → rest)")
# Memória
try:
meminfo = {}
with open("/proc/meminfo") as f:
for line in f:
k, v = line.split(":", 1)
meminfo[k.strip()] = v.strip()
total_kb = float(meminfo.get("MemTotal", "0 kB").split()[0])
free_kb = float(meminfo.get("MemAvailable", "0 kB").split()[0])
if total_kb > 0:
used_kb = total_kb - free_kb
mem = 100.0 * (used_kb / total_kb)
except Exception:
pass
while True:
ctx.observe() # sensores, discos, vitals
ctx.think() # neurónios decidem
ctx.act() # motor executa
ctx.rest() # heartbeat + diag + sleep
# Loadavg
try:
with open("/proc/loadavg") as f:
l1, l5, l15, *_ = f.read().split()
load1 = float(l1)
load5 = float(l5)
load15 = float(l15)
except Exception:
pass
return {
"cpu": cpu,
"mem": mem,
"load1": load1,
"load5": load5,
"load15": load15,
}
except Exception as e:
logbus.error(f"Fatal no ciclo cognitivo: {repr(e)}")
tb = traceback.format_exc()
for line in tb.splitlines():
logbus.error(line)
ctx.shutdown("fatal exception")
raise
# =======================================================================================
# Dashboard em curses
# =======================================================================================
class NeurotronDashboard:
"""
UI fixa em curses:
- header: métricas de sistema + estado do Neurotron
- middle: log rolling
- footer: placeholder + ajuda/teclas
"""
def __init__(self, stdscr, log_queue: Queue, cortex: Cortex, start_time: float):
self.stdscr = stdscr
self.log_queue = log_queue
self.cortex = cortex
self.start_time = start_time
self.log_lines = [] # mantém histórico para a janela central
self.max_log_lines = 1000
self.stop_event = threading.Event()
self.last_diag_state = "?"
self.last_diag_delta = "?"
# tenta obter diagnóstico inicial, se existir
try:
diag = self.cortex.diagnostic._load_previous()
self.last_diag_state = diag.get("state", "?")
self.last_diag_delta = diag.get("delta", "?")
except Exception:
pass
# ------------------------------------------------------------------
# Helpers de desenho
# ------------------------------------------------------------------
def _draw_header(self, height, width):
"""Desenha a barra superior com CPU/MEM/LOAD/UPTIME/MODO/DIAG."""
metrics = read_system_metrics()
now = time.time()
uptime_sec = int(now - self.start_time)
hours = uptime_sec // 3600
mins = (uptime_sec % 3600) // 60
secs = uptime_sec % 60
mode = os.environ.get("NEUROTRON_MODE", NEUROTRON_MODE)
version = getattr(self.cortex, "version", "0.1")
cpu = metrics["cpu"]
mem = metrics["mem"]
load1 = metrics["load1"]
# Avalia homeostase
warn_cpu = NEUROTRON_THRESHOLDS.get("cpu_high", 85.0)
warn_mem = NEUROTRON_THRESHOLDS.get("mem_high", 90.0)
warn_load = NEUROTRON_THRESHOLDS.get("load1_high", 2.0)
status_parts = []
if cpu >= 0:
status_parts.append(f"CPU: {cpu:5.1f}%")
else:
status_parts.append("CPU: N/A ")
if mem >= 0:
status_parts.append(f"MEM: {mem:5.1f}%")
else:
status_parts.append("MEM: N/A ")
if load1 >= 0:
status_parts.append(f"LOAD1: {load1:4.2f}")
else:
status_parts.append("LOAD1: N/A ")
status_parts.append(f"UP: {hours:02d}:{mins:02d}:{secs:02d}")
status_parts.append(f"MODO: {mode.upper()}")
status_parts.append(f"VER: {version}")
status_parts.append(f"DIAG: {self.last_diag_state}/{self.last_diag_delta}")
line = " ".join(status_parts)
# barra horizontal
self.stdscr.attron(curses.A_REVERSE)
self.stdscr.addnstr(0, 0, line.ljust(width), width)
self.stdscr.attroff(curses.A_REVERSE)
# segunda linha: homeostase textual
homeo_msg = "HOMEOSTASE: OK"
if (cpu >= warn_cpu and cpu >= 0) or (mem >= warn_mem and mem >= 0) or (
load1 >= warn_load and load1 >= 0
):
homeo_msg = "HOMEOSTASE: STRESS"
self.stdscr.addnstr(1, 0, homeo_msg.ljust(width), width)
def _drain_log_queue(self):
"""Move mensagens da queue para o buffer de linhas."""
try:
while True:
msg = self.log_queue.get_nowait()
timestamp = datetime.now().strftime("%H:%M:%S")
self.log_lines.append(f"[{timestamp}] {msg}")
if len(self.log_lines) > self.max_log_lines:
self.log_lines = self.log_lines[-self.max_log_lines:]
except Empty:
pass
def _draw_log_window(self, header_height, footer_height, width, height):
"""
Área central de logs: rolagem automática, sempre mostrando as últimas N linhas
que cabem na janela.
"""
top = header_height
bottom = height - footer_height
rows = max(0, bottom - top)
self._drain_log_queue()
# seleciona as últimas 'rows' linhas
visible = self.log_lines[-rows:] if rows > 0 else []
for i in range(rows):
y = top + i
if i < len(visible):
line = visible[i]
self.stdscr.addnstr(y, 0, line.ljust(width), width)
else:
self.stdscr.addnstr(y, 0, " ".ljust(width), width)
def _draw_footer(self, width, height):
"""
Rodapé com placeholder para input futuro e legenda de teclas.
Protegido contra terminais muito pequenos.
"""
footer_text = "[ Futuro: comandos do utilizador aparecerão aqui ]"
keys_text = "[q] sair | dashboard Neurotron"
lines = [footer_text, keys_text]
footer_lines = len(lines)
# Se o terminal for demasiado pequeno, encolhe ou só mostra o essencial
start_row = max(0, height - footer_lines)
for i, text in enumerate(lines):
y = start_row + i
if 0 <= y < height:
try:
self.stdscr.addnstr(y, 0, text.ljust(width), width)
except curses.error:
# Em última instância, ignoramos erros de desenho
pass
# ------------------------------------------------------------------
# Loop principal da UI
# ------------------------------------------------------------------
def run(self):
"""
Loop principal do curses.
Actualiza o ecrã, teclas, e encerra quando stop_event é setado ou 'q' é pressionado.
"""
curses.curs_set(0)
self.stdscr.nodelay(True)
self.stdscr.keypad(True)
while not self.stop_event.is_set():
height, width = self.stdscr.getmaxyx()
self.stdscr.erase()
header_height = 2
footer_height = 2
try:
self._draw_header(height, width)
self._draw_log_window(header_height, footer_height, width, height)
self._draw_footer(width, height)
except curses.error:
# Em terminais muito pequenos ou estados estranhos, evitamos crash
pass
self.stdscr.refresh()
try:
ch = self.stdscr.getch()
if ch in (ord("q"), ord("Q")):
self.stop_event.set()
break
except Exception:
pass
time.sleep(0.1) # ~10 FPS
# tentativa graciosa de shutdown do Cortex
try:
self.log_queue.put("Encerrando Neurotron (dashboard pediu saída)…")
self.cortex.shutdown(reason="Dashboard exit")
except Exception:
pass
# =======================================================================================
# Ciclo Cognitivo em thread separada
# =======================================================================================
def cognitive_loop(cortex: Cortex, ui: NeurotronDashboard):
"""
Loop cognitivo clássico (observe think act rest),
a correr numa thread separada, reportando eventos para o dashboard via log_queue.
"""
log = ui.log_queue.put
try:
log("Neurotron: boot()…")
cortex.boot()
try:
state = cortex.diagnostic._load_previous().get("state", "?")
log(f"Diagnóstico inicial: estado='{state}'")
ui.last_diag_state = state
except Exception:
log("Diagnóstico inicial: indisponível")
log("Ciclo cognitivo iniciado (observe → think → act → rest)…")
while not ui.stop_event.is_set():
try:
log("cortex.observe()")
cortex.observe()
log("cortex.think()")
cortex.think()
log("cortex.act()")
cortex.act()
log("cortex.rest()")
cortex.rest()
except KeyboardInterrupt:
log("Interrompido pelo utilizador (SIGINT)")
cortex.shutdown(reason="SIGINT")
break
except SystemExit:
log("SystemExit recebido, encerrando…")
cortex.shutdown(reason="SystemExit")
break
except Exception as e:
log(f"💥 Exceção não tratada no loop cognitivo: {e}")
try:
cortex.fatal(e)
finally:
ui.stop_event.set()
finally:
ui.stop_event.set()
log("Loop cognitivo terminado.")
# =======================================================================================
# Entry point
# =======================================================================================
def _main_curses(stdscr):
# 1) Detecta modo e diretórios
runtime_dir, log_dir = detect_persistent_mode()
# 2) Inicializa Cortex com os mesmos parâmetros do main_waiting
cortex = Cortex(
runtime_dir=runtime_dir,
log_dir=log_dir,
tick_seconds=NEUROTRON_TICK,
)
# 3) Queue de logs e dashboard
log_queue: Queue = Queue()
start_time = time.time()
ui = NeurotronDashboard(stdscr, log_queue, cortex, start_time)
# 4) Thread do ciclo cognitivo
worker = threading.Thread(
target=cognitive_loop,
args=(cortex, ui),
daemon=True,
)
worker.start()
# 5) Loop da UI (bloqueia até terminar)
ui.run()
# 6) Aguarda thread terminar
worker.join(timeout=2.0)
# =============================================================================
# MAIN
# =============================================================================
def main():
"""
Ponto de entrada Python. Usado tanto por:
python3 -m neurotron
como pelo wrapper /usr/bin/neurotron, se este fizer:
from neurotron import main
main()
Entrada principal do Neurotron.
Inicia:
- cortex
- dashboard (thread)
- ciclo cognitivo (thread)
"""
curses.wrapper(_main_curses)
runtime_dir = "/opt/kernel/neurotron/runtime"
log_dir = "/opt/kernel/neurotron/logs"
Path(runtime_dir).mkdir(parents=True, exist_ok=True)
Path(log_dir).mkdir(parents=True, exist_ok=True)
ctx = Cortex(runtime_dir=runtime_dir, log_dir=log_dir)
# threads
t_dash = threading.Thread(target=dashboard_loop, args=(ctx,), daemon=True)
t_cog = threading.Thread(target=cognitive_loop, args=(ctx,), daemon=True)
t_dash.start()
t_cog.start()
# thread principal fica apenas a dormir
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logbus.warn("Interrompido pelo utilizador")
ctx.shutdown("CTRL+C")
if __name__ == "__main__":
main()

View File

@ -24,15 +24,10 @@ def _now_iso():
class AutoDiagnostic:
"""
Subsistema silencioso de diagnóstico do Neurotron.
Responsabilidades:
- recolher perceções internas (CPU, MEM, LOAD)
- comparar com diagnóstico anterior
- atualizar last_diagnostic.json
- manter histórico (rolling)
- atualizar telemetria contínua
- emitir eventos para logbus (não para stdout)
Diagnóstico interno do Neurotron.
Sempre devolve chaves padronizadas:
cpu, mem, load1
Nunca lança exceções segurança máxima.
"""
def __init__(self, runtime_dir: str, log_dir: str):
@ -48,28 +43,26 @@ class AutoDiagnostic:
self.perception = Perception()
# ----------------------------------------------------------------------
# Utilitários internos
# ----------------------------------------------------------------------
def _load_previous(self):
try:
if self.last_file.exists():
return json.loads(self.last_file.read_text())
except Exception as e:
logbus.emit(f"[diag.warn] falha ao ler último diagnóstico: {e}")
logbus.debug(f"[diag.warn] falha ao ler último diagnóstico: {e}")
return None
# ----------------------------------------------------------------------
def _save_current(self, payload):
history = []
prev = self._load_previous()
history = []
if prev:
history = prev.get("history", [])
history.append({
"timestamp": prev.get("timestamp"),
"cpu_percent": prev.get("cpu_percent"),
"mem_percent": prev.get("mem_percent"),
"loadavg": prev.get("loadavg"),
"cpu": prev.get("cpu"),
"mem": prev.get("mem"),
"load1": prev.get("load1"),
"state": prev.get("state", "UNKNOWN"),
})
history = history[-NEUROTRON_HISTORY_KEEP:]
@ -79,13 +72,14 @@ class AutoDiagnostic:
try:
self.last_file.write_text(json.dumps(payload, indent=2))
except Exception as e:
logbus.emit(f"[diag.error] falha ao gravar diagnóstico: {e}")
logbus.debug(f"[diag.error] falha ao gravar diagnóstico: {e}")
# ----------------------------------------------------------------------
def _classify_state(self, cpu, mem, l1):
try:
cpu = float(cpu)
mem = float(mem)
l1 = float(l1)
l1 = float(l1)
except Exception:
return "UNKNOWN"
@ -98,30 +92,31 @@ class AutoDiagnostic:
return "STABLE"
# ----------------------------------------------------------------------
# Execução principal
# ----------------------------------------------------------------------
def run_exam(self):
"""
Realiza um diagnóstico silencioso, atualiza os ficheiros,
e retorna (state, payload).
Produz um snapshot canónico com chaves:
cpu, mem, load1
"""
snap = self.perception.snapshot()
cpu = snap.get("cpu_percent", "?")
mem = snap.get("mem_percent", "?")
load = snap.get("loadavg", ["?", "?", "?"])
l1 = load[0] if isinstance(load, list) and load else "?"
state = self._classify_state(cpu, mem, l1)
if isinstance(load, list) and load:
l1 = load[0]
else:
l1 = "?"
# chaves normalizadas
payload = {
"schema": NEUROTRON_DIAG_SCHEMA,
"timestamp": _now_iso(),
"cpu_percent": cpu,
"mem_percent": mem,
"cpu": cpu,
"mem": mem,
"load1": l1,
"loadavg": load,
"state": state,
"state": self._classify_state(cpu, mem, l1),
"env": {
"user": snap.get("env_user"),
"term": snap.get("env_term"),
@ -131,30 +126,29 @@ class AutoDiagnostic:
self._save_current(payload)
self._update_telemetry(payload)
# Falamos apenas através do logbus
logbus.emit(f"[diag] estado={state} cpu={cpu} mem={mem} load1={l1}")
logbus.debug(
f"[diag] estado={payload['state']} cpu={cpu} mem={mem} load1={l1}"
)
return state, payload
return payload["state"], payload
# ----------------------------------------------------------------------
def _update_telemetry(self, payload):
try:
telemetry = []
tele = []
if self.telemetry_file.exists():
telemetry = json.loads(self.telemetry_file.read_text() or "[]")
tele = json.loads(self.telemetry_file.read_text() or "[]")
telemetry.append({
tele.append({
"timestamp": payload["timestamp"],
"cpu": payload.get("cpu_percent"),
"mem": payload.get("mem_percent"),
"load": payload.get("loadavg"),
"state": payload.get("state"),
"cpu": payload["cpu"],
"mem": payload["mem"],
"load1": payload["load1"],
"state": payload["state"],
})
telemetry = telemetry[-128:]
self.telemetry_file.write_text(json.dumps(telemetry, indent=2))
tele = tele[-128:]
self.telemetry_file.write_text(json.dumps(tele, indent=2))
except Exception as e:
logbus.emit(f"[diag.warn] falha ao atualizar telemetria: {e}")
logbus.debug(f"[diag.warn] falha ao atualizar telemetria: {e}")

View File

@ -1,3 +1,5 @@
# neurotron/cortex.py
import json
import time
from pathlib import Path
@ -5,8 +7,10 @@ from collections import defaultdict, deque
from time import sleep
from neurotron.logbus import logbus
from neurotron.disk_agent import DiskAgent
from neurotron.echo_agent import EchoAgent
from neurotron.vitalsigns_agent import VitalSigns
from .neuron import Neuron
from .hippocampus import Hippocampus
from .perception import Perception
from .motor import Motor
@ -17,47 +21,12 @@ from .neurotron_config import (
NEUROTRON_TICK_MIN, NEUROTRON_TICK_MAX, NEUROTRON_TICK_STEP,
NEUROTRON_DIAG_EVERY_TICKS,
NEUROTRON_DATASET_PATH,
HEARTBEAT_ENABLED, HEARTBEAT_STYLE,
HEARTBEAT_ENABLED,
NEUROTRON_THRESHOLDS,
TELEMETRY_MAXLEN, TELEMETRY_FLUSH_EVERY_TICKS,
)
# =====================================================================
# Neurónios básicos
# =====================================================================
class VitalSigns(Neuron):
name = "VitalSigns"
def observe(self):
snap = self.ctx.perception.snapshot()
self.publish("vitals", snap)
self.ctx.memory.remember("observe.vitals", snap)
class EchoAgent(Neuron):
name = "EchoAgent"
def think(self):
msg = self.consume("vitals")
if msg:
self.publish("actions", {
"action": "echo",
"text": f"CPU {msg.get('cpu_percent', '?')}%"
})
# =====================================================================
# C O R T E X — versão V6 (Pure LogBus Edition)
# =====================================================================
class Cortex:
"""
Orquestrador cognitivo do Neurotron.
Totalmente silencioso toda saída vai para o logbus.
"""
def __init__(self, runtime_dir, log_dir, tick_seconds=NEUROTRON_TICK):
self.runtime_dir = Path(runtime_dir)
self.log_dir = Path(log_dir)
@ -67,52 +36,50 @@ class Cortex:
self._tick_count = 0
self.diagnostic = AutoDiagnostic(runtime_dir, log_dir)
self.memory = Hippocampus(log_dir=log_dir)
self.memory = Hippocampus(log_dir=self.log_dir)
self.perception = Perception()
self.motor = Motor()
# Message bus interno
self.bus = defaultdict(lambda: deque(maxlen=32))
# Telemetria curta em RAM
self.telemetry = deque(maxlen=TELEMETRY_MAXLEN)
# Neurónios ativos
# ordem é importante
self.neurons = [
DiskAgent(self),
VitalSigns(self),
EchoAgent(self),
]
self._booted = False
# Caminho telemetria longa
self.telemetry_path = Path(NEUROTRON_DATASET_PATH) / "telemetry.json"
self.telemetry_path.parent.mkdir(parents=True, exist_ok=True)
# =================================================================
# BOOT / SHUTDOWN
# =================================================================
# ----------------------------------------
# Boot
# ----------------------------------------
def boot(self):
if self._booted:
return
logbus.emit("🧠 boot() — inicializando Neurotron")
self.memory.remember("boot", {"version": "0.1", "tick": self.tick})
logbus.info(f"Neurotron boot() — mode={self.mode}")
self.memory.remember("boot", {"tick": self.tick, "mode": self.mode})
self._booted = True
# ----------------------------------------
# Shutdown + Fatal
# ----------------------------------------
def shutdown(self, reason=""):
logbus.emit(f"⚠️ shutdown — {reason}")
logbus.warn(f"Shutdown pedido{reason}")
self.memory.remember("shutdown", {"reason": reason})
def fatal(self, e: Exception):
logbus.emit(f"🔥 fatal error: {repr(e)}")
logbus.error(f"Fatal: {repr(e)}")
self.memory.remember("fatal", {"error": repr(e)})
raise e
# =================================================================
# CICLO COGNITIVO
# =================================================================
# ----------------------------------------
# ciclo cognitivo
# ----------------------------------------
def observe(self):
for n in self.neurons:
n.observe()
@ -126,41 +93,53 @@ class Cortex:
if not action:
return
# echo (debug)
if action.get("action") == "echo":
res = self.motor.run("echo", [action.get("text", "")])
self.memory.remember("act.echo", res)
# Redireciona stdout para logbus
if res.get("stdout"):
logbus.emit(f"[motor.echo] {res['stdout'].strip()}")
if res.get("stderr"):
logbus.emit(f"[motor.echo.err] {res['stderr'].strip()}")
logbus.info(f"[echo] {res['stdout'].strip()}")
def rest(self):
# Heartbeat + microalertas
if HEARTBEAT_ENABLED:
self._heartbeat_and_telemetry()
self._heartbeat()
sleep(self.tick)
self._tick_count += 1
# Diagnóstico periódico
if self._tick_count % NEUROTRON_DIAG_EVERY_TICKS == 0:
state, _ = self.diagnostic.run_exam()
self._apply_homeostasis(state)
self._run_diag()
# Flush periódico telemetria
if self._tick_count % TELEMETRY_FLUSH_EVERY_TICKS == 0:
self._flush_telemetry()
# =================================================================
# HOMEOSTASE
# =================================================================
# ----------------------------------------
# heartbeat
# ----------------------------------------
def _heartbeat(self):
snap = self.perception.snapshot()
cpu = snap.get("cpu_percent")
mem = snap.get("mem_percent")
load = snap.get("loadavg")
load1 = load[0] if load else None
self.telemetry.append({
"ts": time.time(),
"cpu": cpu,
"mem": mem,
"load1": load1,
"tick": self.tick,
})
logbus.heart(f"cpu={cpu}% mem={mem}% tick={self.tick:.2f}s")
# ----------------------------------------
# diag / homeostase
# ----------------------------------------
def _run_diag(self):
state, snap = self.diagnostic.run_exam()
logbus.diag(f"estado={state} cpu={snap['cpu']} mem={snap['mem']} load1={snap['load1']}")
def _apply_homeostasis(self, state):
old = self.tick
if state == "CRITICAL":
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP)
elif state == "ALERT":
@ -168,84 +147,27 @@ class Cortex:
elif state == "STABLE":
self.tick = max(NEUROTRON_TICK_MIN, self.tick - NEUROTRON_TICK_STEP / 2)
if self.tick != old:
logbus.emit(f"[homeostasis] tick ajustado {old:.2f}s → {self.tick:.2f}s (state={state})")
# =================================================================
# HEARTBEAT + MICROALERTAS
# =================================================================
def _heartbeat_and_telemetry(self):
snap = self.perception.snapshot()
cpu = snap.get("cpu_percent")
mem = snap.get("mem_percent")
load = snap.get("loadavg")
load1 = load[0] if isinstance(load, list) and load else None
# Salva telemetria curta
self.telemetry.append({
"ts": time.time(),
"cpu": cpu,
"mem": mem,
"load": load,
"tick": self.tick,
})
self._evaluate_microalerts(cpu, mem, load1)
# Heartbeat → logbus
logbus.emit(f"💓 cpu={cpu}% mem={mem}% tick={self.tick:.2f}s")
def _evaluate_microalerts(self, cpu, mem, load1):
alerts = []
try:
if isinstance(cpu, (int, float)) and cpu >= NEUROTRON_THRESHOLDS["cpu_high"]:
alerts.append(("cpu", cpu))
if isinstance(mem, (int, float)) and mem >= NEUROTRON_THRESHOLDS["mem_high"]:
alerts.append(("mem", mem))
if isinstance(load1, (int, float)) and load1 >= NEUROTRON_THRESHOLDS["load1_high"]:
alerts.append(("load1", load1))
except Exception:
return
if not alerts:
return
for metric, value in alerts:
logbus.emit(f"⚠️ microalerta: {metric}={value} — ajustando tick")
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP)
self.memory.remember("microalert", {
"ts": time.time(),
"alerts": alerts,
"new_tick": self.tick,
})
# =================================================================
# TELEMETRIA LONGA
# =================================================================
if old != self.tick:
logbus.info(f"tick ajustado {old:.2f}s → {self.tick:.2f}s")
# ----------------------------------------
# telemetria
# ----------------------------------------
def _flush_telemetry(self):
try:
data = list(self.telemetry)
self.telemetry_path.write_text(json.dumps(data))
self.memory.remember("telemetry.flush", {
"count": len(data),
"path": str(self.telemetry_path)
})
self.telemetry_path.write_text(json.dumps(list(self.telemetry)))
self.memory.remember("telemetry.flush", {})
except Exception as e:
logbus.emit(f"[telemetry.error] {e}")
logbus.error(f"telemetry error: {e}")
# =================================================================
# BUS
# =================================================================
# ----------------------------------------
# bus interno
# ----------------------------------------
def bus_publish(self, ch, payload):
self.bus[ch].append(payload)
def bus_publish(self, channel, payload):
self.bus[channel].append(payload)
def bus_consume(self, channel):
q = self.bus[channel]
def bus_consume(self, ch):
q = self.bus[ch]
return q.popleft() if q else None

View File

@ -0,0 +1,106 @@
# neurotron/disk_agent.py
import os
import subprocess
from pathlib import Path
from neurotron.neuron import Neuron
from neurotron.logbus import logbus
DISK_CANDIDATES = ["/dev/vda", "/dev/sda", "/dev/vdb"]
MOUNT_POINT = "/mnt/nfdos"
class DiskAgent(Neuron):
name = "DiskAgent"
def __init__(self, ctx):
super().__init__(ctx)
self.state = "unknown" # unknown → no_disk → found → mounted
self.last_msg = None
self.cooldown = 0
# --------------------------------
# HELPER
# --------------------------------
def _emit_once(self, msg):
if msg != self.last_msg:
logbus.disk(msg)
self.last_msg = msg
def _run(self, cmd):
try:
subprocess.run(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True
)
return True
except Exception:
return False
# --------------------------------
# OBSERVE
# --------------------------------
def observe(self):
# corre só 1x por 10 ticks
if self.cooldown > 0:
self.cooldown -= 1
return
self.cooldown = 10
# 1) Procurar disco
dev = None
for d in DISK_CANDIDATES:
if Path(d).exists():
dev = d
break
if not dev:
self._emit_once("Nenhum disco encontrado — modo VOLATILE")
self.state = "no_disk"
return
# 2) Novo disco encontrado
if self.state == "unknown":
self._emit_once(f"Disco detectado: {dev}")
self.state = "found"
# 3) Tentar identificar filesystem
try:
blkid = subprocess.run(
["blkid", dev],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).stdout.strip()
except FileNotFoundError:
# blkid não existe → fallback gentle
logbus.debug("[disk] blkid não encontrado — fallback ativo")
blkid = ""
if blkid:
self._emit_once("Sistema de ficheiros válido encontrado")
else:
self._emit_once("Disco virgem detectado — não formatado")
# 4) Montar
Path(MOUNT_POINT).mkdir(parents=True, exist_ok=True)
if self._run(["mount", dev, MOUNT_POINT]):
if self.state != "mounted":
self._emit_once(f"Montado em {MOUNT_POINT}")
self.state = "mounted"
# 5) Trocar Neurotron → modo persistente
if self.ctx.mode != "persistent":
self.ctx.mode = "persistent"
logbus.info("Modo alterado → PERSISTENT")
# 6) Estrutura básica
for d in ["data", "logs", "dna"]:
Path(MOUNT_POINT, d).mkdir(exist_ok=True)
else:
self._emit_once("Falha ao montar — mantendo VOLATILE")

View File

@ -1,254 +0,0 @@
#!/usr/bin/env python3
"""
💾 Módulo de Inicialização de Disco Neurotron V0.1 (atualizado)
Detecta, avalia, prepara e monta o disco persistente do NFDOS.
- Não formata discos que contenham um filesystem conhecido, a menos que forçado.
- Forçar formatação:
* EXPORT: export NFDOS_FORCE_FORMAT=1 (no ambiente do initramfs, se aplicável)
* Kernel cmdline: adicionar `nfdos_force_format=1` ao -append do QEMU
"""
import os
import subprocess
from pathlib import Path
from rich.console import Console
from neurotron.neurotron_config import (
MOUNT_POINT, DISK_CANDIDATES
)
console = Console()
def run(cmd: list[str]) -> bool:
"""Executa comando silenciosamente (retorna True se OK)."""
try:
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def detect_disk() -> str | None:
"""Procura por um dispositivo de disco válido (por ordem em DISK_CANDIDATES)."""
for dev in DISK_CANDIDATES:
p = Path(dev)
if p.exists():
console.print(f"[cyan]🔍 Detetado disco:[/] {dev}")
return dev
console.print("[yellow]⚠️ Nenhum disco detectado.[/yellow]")
return None
def blkid_check(device: str) -> str | None:
"""Tenta obter tipo com blkid (se disponível)."""
try:
out = subprocess.run(["blkid", device], stdout=subprocess.PIPE, text=True, check=False)
return out.stdout.strip() if out.stdout else None
except FileNotFoundError:
return None
def read_sig(device: str, size: int = 2048) -> bytes | None:
"""Lê os primeiros `size` bytes do device (se possível)."""
try:
with open(device, "rb") as f:
return f.read(size)
except Exception:
return None
def detect_fs_by_magic(device: str) -> str | None:
"""
Detecta assinaturas simples:
- ext4 superblock magic (0xEF53) @ offset 1024 + 56 = 1080
- NTFS -> 'NTFS ' @ offset 3
- FAT32 -> 'FAT32' nos offsets típicos do boot sector
- MBR partition table signature 0x55AA @ offset 510-511
Retorna string com o sistema ou None.
"""
buf = read_sig(device, size=4096)
if not buf:
return None
# MBR signature
if len(buf) >= 512 and buf[510:512] == b'\x55\xAA':
# detecta tabela de partições existente (MBR)
return "mbr-partition-table"
# ext magic at 1024+56 = 1080
if len(buf) >= 1082 and buf[1080:1082] == b'\x53\xEF':
return "ext (superblock)"
# NTFS signature at offset 3 (ASCII "NTFS ")
if len(buf) >= 11 and buf[3:11] == b'NTFS ':
return "ntfs"
# FAT32 signature at offset 82 or boot sector strings containing FAT
if b"FAT32" in buf or b"FAT16" in buf or b"FAT12" in buf:
return "fat"
return None
def parse_cmdline_flag() -> bool:
"""Lê /proc/cmdline para a flag nfdos_force_format=1"""
try:
with open("/proc/cmdline", "r") as f:
cmd = f.read()
return "nfdos_force_format=1" in cmd.split()
except Exception:
return False
def which(prog: str) -> str | None:
for p in os.environ.get("PATH", "/sbin:/bin:/usr/sbin:/usr/bin").split(":"):
cand = Path(p) / prog
if cand.exists() and os.access(cand, os.X_OK):
return str(cand)
return None
def format_ext4(device: str, label: str = "NFDOS_DATA") -> bool:
"""Formata o dispositivo com ext4, recolhendo logs de erro detalhados (BusyBox-safe)."""
mke2fs = which("mke2fs")
mkfs_ext4 = which("mkfs.ext4")
mkfs = which("mkfs")
candidates = []
if mkfs_ext4:
candidates.append(([mkfs_ext4, "-F", "-L", label, device], "mkfs.ext4"))
if mke2fs:
# o BusyBox mke2fs não aceita '-t', por isso ajustaremos dentro do loop
candidates.append(([mke2fs, "-F", "-t", "ext4", "-L", label, device], "mke2fs"))
if mkfs:
candidates.append(([mkfs, "-t", "ext4", "-F", "-L", label, device], "mkfs"))
if not candidates:
console.print("[red]❌ Nenhum utilitário mkfs disponível no initramfs![/red]")
return False
for cmd, name in candidates:
console.print(f"[yellow]⚙️ Formatando {device} com {name}...[/yellow]")
# 👉 se for o BusyBox mke2fs, removemos o argumento -t
if name == "mke2fs":
cmd = [c for c in cmd if c != "-t" and c != "ext4"]
try:
result = subprocess.run(
cmd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
if result.stdout:
console.print(result.stdout.strip())
console.print(f"[green]✔ Formatação concluída com {name}.[/green]")
return True
except subprocess.CalledProcessError as e:
console.print(f"[red]❌ {name} falhou (código {e.returncode}).[/red]")
if e.stdout:
console.print(f"[cyan]📜 STDOUT:[/cyan]\n{e.stdout.strip()}")
if e.stderr:
console.print(f"[magenta]⚠️ STDERR:[/magenta]\n{e.stderr.strip()}")
console.print("[red]❌ Nenhum método de formatação teve sucesso.[/red]")
console.print("[cyan]🧠 Sugestão:[/] verifique se o kernel suporta EXT4 e se o BusyBox inclui mke2fs.")
return False
def ensure_fs(device: str) -> bool:
"""
Verifica se existe sistema de ficheiros.
Se não existir e houver confirmação/flag, formata ext4 (ou fallback via mke2fs).
"""
# 1⃣ tentativa rápida com blkid
info = blkid_check(device)
if info:
console.print(f"[green]🧠 Disco já formatado (blkid):[/] {info}")
return True
# 2⃣ fallback por leituras de assinatura
sig = detect_fs_by_magic(device)
if sig:
console.print(f"[yellow]⚠ Assinatura detectada no disco:[/] {sig}")
console.print("[red]❗ O disco contém dados ou partições existentes. Abortando formatação.[/red]")
return False
# 3⃣ se nada detectado — disco virgem
forced_env = os.environ.get("NFDOS_FORCE_FORMAT") == "1"
forced_cmd = parse_cmdline_flag()
if not (forced_env or forced_cmd):
console.print("[yellow]⚠ Disco parece virgem, mas não há confirmação para formatar.[/yellow]")
console.print("Use `nfdos_force_format=1` no kernel cmdline ou export NFDOS_FORCE_FORMAT=1")
console.print("para permitir formatação automática.")
return False
# 4⃣ tentar formatação
console.print(f"[yellow]⚙️ Forçando formatação de {device} como ext4 (FLAG DETETADA)...[/yellow]")
ok = format_ext4(device)
if ok:
console.print("[green]✔ Formatação concluída com sucesso.[/green]")
return True
# 5⃣ se nada funcionou
console.print("[red]❌ Falha na formatação.[/red]")
console.print("[cyan]🧠 Sugestão:[/] verifique se o kernel inclui suporte para EXT4 ou se o mkfs/mke2fs está embutido no BusyBox.")
return False
def mount_disk(device: str) -> bool:
"""Monta o disco no ponto esperado (retorna True se OK)."""
os.makedirs(MOUNT_POINT, exist_ok=True)
return run(["mount", device, MOUNT_POINT])
def debug_env():
"""Mostra informações úteis quando nenhum disco é detectado (ou para debug)."""
console.print("[yellow]🩻 DEBUG: listando /dev/* e últimas mensagens do kernel[/yellow]")
devs = sorted(Path("/dev").glob("*"))
console.print("📂 Dispositivos disponíveis:", ", ".join([d.name for d in devs if d.is_char_device() or d.is_block_device()]))
os.system("dmesg | tail -n 20 || echo '(dmesg não disponível)'")
console.print("[yellow]───────────────────────────────[/yellow]")
os.system("echo '--- /proc/partitions ---'; cat /proc/partitions || true")
os.system("echo '--- dmesg | grep -i virtio ---'; dmesg | grep -i virtio || true")
console.print("[yellow]───────────────────────────────[/yellow]")
def initialize_persistence():
"""Fluxo completo de inicialização do hipocampo físico."""
device = detect_disk()
if not device:
debug_env()
console.print("[red]❌ Nenhum disco físico encontrado — usando modo RAM.[/red]")
return False
if not ensure_fs(device):
console.print("[red]❌ Preparação do sistema de ficheiros foi interrompida.[/red]")
return False
if not mount_disk(device):
console.print("[red]❌ Falha ao montar disco.[/red]")
return False
console.print(f"[green]✔ Disco montado em:[/] {MOUNT_POINT}")
telemetry_file = Path("/opt/kernel/neurotron/data/telemetry.json")
telemetry_file.parent.mkdir(parents=True, exist_ok=True)
if not telemetry_file.exists():
telemetry_file.write_text("[]")
for d in ["data", "logs", "dna"]:
Path(MOUNT_POINT, d).mkdir(parents=True, exist_ok=True)
Path(MOUNT_POINT, "DNA_ID").write_text("NEUROTRON_HIPOCAMPUS_V1\n")
console.print("[cyan]👉 Hipocampo físico inicializado com sucesso.[/cyan]")
return True
if __name__ == "__main__":
initialize_persistence()

View File

@ -0,0 +1,27 @@
# neurotron/echo_agent.py
from neurotron.neuron import Neuron
from neurotron.logbus import logbus
class EchoAgent(Neuron):
name = "EchoAgent"
def __init__(self, ctx):
super().__init__(ctx)
self.last_cpu = None
def think(self):
msg = self.consume("vitals")
if not msg:
return
cpu = msg.get("cpu_percent")
if cpu is None:
return
# só fala se variar > 5%
if self.last_cpu is None or abs(cpu - self.last_cpu) >= 5:
self.publish("actions", {
"action": "echo",
"text": f"CPU {cpu:.1f}%"
})
self.last_cpu = cpu

View File

@ -1,83 +1,29 @@
"""
Neurotron LogBus Sistema Centralizado de Logging
==================================================
Objetivo:
- Centralizar *toda* a saída de logs do Neurotron
- Funcionar tanto no dashboard curses quanto em modo headless
- Manter logs persistentes quando o hipocampo físico está montado
- Fornecer uma API simples:
from neurotron.logbus import log
log("mensagem")
Componentes:
- LogBus: roteador de mensagens (fila stdout ficheiro)
- log(): função global simplificada
"""
from __future__ import annotations
import json
import time
from queue import Queue
from pathlib import Path
# neurotron/logbus.py
from datetime import datetime
import sys
import threading
class LogBus:
def __init__(self):
self.queue: Queue | None = None # usado pelo dashboard
self.stdout_enabled = True # imprime em modo headless
self.persist_enabled = False # grava em disco
self.persist_file: Path | None = None
self.lock = threading.Lock()
# ---------------------------------------------------------
# Inicialização de destinos
# ---------------------------------------------------------
def enable_dashboard(self, q: Queue):
"""Liga o LogBus ao dashboard (curses)."""
self.queue = q
def emit(self, level, msg):
"""Escreve logs com timestamp e nível padronizado."""
ts = datetime.utcnow().strftime("%H:%M:%S")
line = f"[{ts}] [{level}] {msg}\n"
def enable_persist(self, path: Path):
"""Ativa salvamento permanente da stream de logs."""
self.persist_file = path
self.persist_enabled = True
path.parent.mkdir(parents=True, exist_ok=True)
if not path.exists():
path.write_text("") # cria ficheiro vazio
with self.lock:
sys.stdout.write(line)
sys.stdout.flush()
# ---------------------------------------------------------
# Emissão centralizada
# ---------------------------------------------------------
def log(self, message: str):
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] {message}"
# atalhos
def info(self, msg): self.emit("info", msg)
def warn(self, msg): self.emit("warn", msg)
def error(self, msg): self.emit("error", msg)
def disk(self, msg): self.emit("disk", msg)
def diag(self, msg): self.emit("diag", msg)
def heart(self, msg): self.emit("heart", msg)
def debug(self, msg): self.emit("debug", msg)
# 1) Dashboard (queue)
if self.queue:
try:
self.queue.put(line)
except Exception:
pass
# 2) STDOUT
if self.stdout_enabled:
try:
print(line)
except Exception:
pass
# 3) Persistência
if self.persist_enabled and self.persist_file:
try:
with open(self.persist_file, "a") as f:
f.write(json.dumps({"ts": ts, "msg": message}) + "\n")
except Exception:
pass
# Instância global — usada em todo o Neurotron
logbus = LogBus()
def log(msg: str):
"""Função simples: apenas delega ao LogBus global."""
logbus.log(msg)

View File

@ -1,114 +0,0 @@
#!/usr/bin/env python3
"""
Neurotron ponto de entrada do cérebro do NFDOS.
Boot flow (novo): init (BusyBox) /usr/bin/neurotron este ficheiro.
"""
import os
import sys
from pathlib import Path
from datetime import datetime
from rich.console import Console
# -----------------------------------------------------------------------------
# Ajuste de caminho: tornar "src/" o root dos módulos Neurotron
# -----------------------------------------------------------------------------
THIS_DIR = Path(__file__).resolve().parent # .../neurotron/src
if str(THIS_DIR) not in sys.path:
sys.path.insert(0, str(THIS_DIR))
# Agora os imports ficam locais ao diretório src/
from neurotron_config import ( # noqa: E402
NEUROTRON_TICK,
NEUROTRON_MODE,
NEUROTRON_HOMEOSTASIS,
CORTEX_LOOP_DELAY,
MOTOR_OUTPUT_DEVICE,
)
from autodiagnostic import AutoDiagnostic # noqa: E402
from perception import Perception # noqa: E402
from cortex import Cortex # noqa: E402
console = Console()
def detect_persistent_mount() -> bool:
"""Verifica se o hipocampo físico está montado em /var/neurotron"""
mount_point = Path("/var/neurotron")
try:
if mount_point.exists() and os.path.ismount(mount_point):
console.print(f"[green]💾 Hipocampo físico montado:[/] {mount_point}")
return True
else:
# fallback: check via /proc/mounts em early boot
with open("/proc/mounts") as f:
for line in f:
if " /var/neurotron " in line:
console.print(f"[green]💾 Hipocampo físico montado (via /proc):[/] {mount_point}")
return True
except Exception as e:
console.print(f"[yellow]⚠ Falha ao verificar montagem persistente:[/] {e}")
return False
def main():
# -------------------------------------------------------------------------
# Seleção de modo: persistente vs volátil
# -------------------------------------------------------------------------
persistent_mode = detect_persistent_mount()
if persistent_mode:
os.environ["NEUROTRON_MODE"] = "persistent"
os.environ["NEUROTRON_RUNTIME"] = "/var/neurotron/data"
os.environ["NEUROTRON_LOG"] = "/var/neurotron/logs"
else:
os.environ["NEUROTRON_MODE"] = "volatile"
os.environ["NEUROTRON_RUNTIME"] = "/tmp/neurotron_data"
os.environ["NEUROTRON_LOG"] = "/tmp/neurotron_logs"
runtime_dir = Path(os.environ["NEUROTRON_RUNTIME"])
log_dir = Path(os.environ["NEUROTRON_LOG"])
runtime_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True)
mode = os.environ["NEUROTRON_MODE"]
console.print(f"[cyan]🌍 Modo atual do Neurotron:[/] [bold]{mode.upper()}[/]")
# -------------------------------------------------------------------------
# Inicializa o Córtex
# -------------------------------------------------------------------------
cortex = Cortex(
runtime_dir=runtime_dir,
log_dir=log_dir,
tick_seconds=NEUROTRON_TICK,
)
try:
cortex.boot()
state = cortex.diagnostic._load_previous().get("state", "?")
console.print(f"[cyan]🩺 Estado inicial:[/] {state}\n")
console.print("[green]👉 Neurotron inicializado com sucesso.[/]\n")
console.print("[green]✔ [Mensagem Simbolica] Boot OK[/]\n")
console.print("[green]✔ Iniciando ciclo cognitivo.[/]\n")
while True:
cortex.observe()
cortex.think()
cortex.act()
cortex.rest()
except KeyboardInterrupt:
console.print("[yellow]⚠ Interrompido pelo utilizador (SIGINT)[/]")
cortex.shutdown(reason="SIGINT")
except SystemExit:
cortex.shutdown(reason="SystemExit")
except Exception as e:
console.print(f"[red]💥 Exceção não tratada:[/] {e}")
cortex.fatal(e)
if __name__ == "__main__":
main()

View File

@ -35,7 +35,7 @@ class Motor:
"""
if cmd not in self.SAFE_CMDS:
logbus.emit(f"[motor.warn] comando bloqueado: '{cmd}'")
logbus.debug(f"[motor.warn] comando bloqueado: '{cmd}'")
return {
"ok": False,
"code": None,
@ -55,7 +55,7 @@ class Motor:
ok = (res.returncode == 0)
if not ok:
logbus.emit(
logbus.debug(
f"[motor.warn] '{cmd}' retornou código {res.returncode}"
)
@ -67,7 +67,7 @@ class Motor:
}
except Exception as e:
logbus.emit(f"[motor.error] exceção ao executar '{cmd}': {e}")
logbus.debug(f"[motor.error] exceção ao executar '{cmd}': {e}")
return {
"ok": False,
"code": None,

View File

@ -47,12 +47,12 @@ class Neuron:
try:
self.ctx.bus_publish(channel, payload)
except Exception as e:
logbus.emit(f"[warn] {self.name}.publish falhou: {e}")
logbus.debug(f"[warn] {self.name}.publish falhou: {e}")
def consume(self, channel: str) -> Optional[Dict[str, Any]]:
"""Lê (e remove) última mensagem disponível num canal."""
try:
return self.ctx.bus_consume(channel)
except Exception as e:
logbus.emit(f"[warn] {self.name}.consume falhou: {e}")
logbus.debug(f"[warn] {self.name}.consume falhou: {e}")
return None

View File

@ -138,7 +138,7 @@ class Perception:
except Exception as e:
# fallback extremo — nunca quebrar o Neurotron
logbus.emit(f"[warn] Perception.snapshot falhou: {e}")
logbus.debug(f"[warn] Perception.snapshot falhou: {e}")
return {
"env_user": "unknown",
"env_term": "unknown",

View File

@ -1,95 +0,0 @@
#!/usr/bin/env python3
"""
📊 Painel de Telemetria do Neurotron V0.1
o ficheiro telemetry.json e mostra um mini-ECG digital:
Execução:
python3 /opt/kernel/neurotron/neurotron_core/telemetry_tail.py
"""
import json
import time
import os
from pathlib import Path
from statistics import mean
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
DATASET = "/opt/kernel/neurotron/data/telemetry.json"
BAR_CHARS = "▁▂▃▄▅▆▇█"
SAMPLES = 24 # quantas amostras recentes mostrar
REFRESH = 2.0 # segundos entre atualizações
def mini_graph(values, width=24):
"""Desenha barras simples tipo sparkline"""
if not values:
return "·" * width
vals = [v for v in values if isinstance(v, (int, float))]
if not vals:
return "·" * width
lo, hi = min(vals), max(vals)
span = (hi - lo) or 1.0
bars = []
for v in vals[-width:]:
if not isinstance(v, (int, float)):
bars.append("·")
continue
i = int(round((v - lo) / span * (len(BAR_CHARS) - 1)))
bars.append(BAR_CHARS[i])
return "".join(bars)
def read_telemetry(path: str):
try:
data = json.loads(Path(path).read_text() or "[]")
return data[-SAMPLES:]
except Exception:
return []
def render_panel(console, data):
if not data:
console.print("[yellow]Nenhum dado de telemetria disponível.[/yellow]")
return
cpu = [d.get("cpu") for d in data if isinstance(d.get("cpu"), (int, float))]
mem = [d.get("mem") for d in data if isinstance(d.get("mem"), (int, float))]
load = [d.get("load")[0] for d in data if isinstance(d.get("load"), (list, tuple)) and isinstance(d.get("load")[0], (int, float))]
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Sinal Vital", justify="left")
table.add_column("Tendência", justify="left")
table.add_column("Média", justify="right")
table.add_row("CPU (%)", mini_graph(cpu), f"{mean(cpu):.1f}%" if cpu else "?")
table.add_row("Memória (%)", mini_graph(mem), f"{mean(mem):.1f}%" if mem else "?")
table.add_row("Carga (1min)", mini_graph(load), f"{mean(load):.2f}" if load else "?")
panel = Panel(table, title="🩺 TELEMETRIA RECENTE", border_style="green")
console.clear()
console.print(panel)
def main():
console = Console()
console.print("[bold cyan]Neurotron Telemetry Tail — Iniciar Monitorização[/bold cyan]\n")
while True:
if not Path(DATASET).exists():
console.print(f"[yellow]A aguardar dados em {DATASET}...[/yellow]")
time.sleep(REFRESH)
continue
data = read_telemetry(DATASET)
if not data:
console.print("[yellow]Nenhum dado de telemetria disponível.[/yellow]")
else:
render_panel(console, data)
time.sleep(REFRESH)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,15 @@
# neurotron/vitalsigns_agent.py
from neurotron.neuron import Neuron
class VitalSigns(Neuron):
"""
Observa sinais vitais (CPU, MEM, LOAD)
e publica em "vitals".
"""
name = "VitalSigns"
def observe(self):
snap = self.ctx.perception.snapshot()
self.publish("vitals", snap)
self.ctx.memory.remember("observe.vitals", snap)

View File

@ -1,466 +1,117 @@
#!/usr/bin/env python3
"""
Neurotron Dashboard em modo texto (estilo BIOS)
Ponto de entrada quando executado como:
python3 -m neurotron
ou via wrapper /usr/bin/neurotron gerado pelo autotools.
Layout:
+------------------------------------------------------------------------------+
| [HEAD] CPU / MEM / LOAD / UPTIME / MODO / VERSÃO / DIAG |
+------------------------------------------------------------------------------+
| [LOG] Últimos eventos do ciclo cognitivo (observe/think/act/rest, etc.) |
| ... |
+------------------------------------------------------------------------------+
| [FOOT] Futuro: input do utilizador (placeholder) | teclas: q = sair |
+------------------------------------------------------------------------------+
Neurotron entrypoint oficial.
Arranque do dashboard + thread cognitiva.
Totalmente silencioso fora do LogBus.
"""
import curses
import sys
import threading
import time
import os
import sys
import traceback
from pathlib import Path
from queue import Queue, Empty
from datetime import datetime
# Imports internos do Neurotron (já reorganizados em package)
from .neurotron_config import (
NEUROTRON_TICK,
NEUROTRON_MODE,
NEUROTRON_HOMEOSTASIS,
NEUROTRON_THRESHOLDS,
)
from .cortex import Cortex
from .autodiagnostic import AutoDiagnostic # se for usado dentro do Cortex, ok mesmo assim
from neurotron.logbus import logbus
from neurotron.cortex import Cortex
# =======================================================================================
# Utilitários de ambiente
# =======================================================================================
# =============================================================================
# DASHBOARD (modo texto minimalista)
# =============================================================================
def detect_persistent_mode():
def dashboard_loop(ctx: Cortex):
"""
Verifica se o hipocampo físico está montado em /var/neurotron.
Define NEUROTRON_MODE, NEUROTRON_RUNTIME e NEUROTRON_LOG via os.environ.
Renderização minimalista do estado atual.
Apenas mostra informações de alto nível.
Tudo mais pertence ao logbus (à direita do ecrã).
"""
mount_point = Path("/var/neurotron")
start = time.time()
def _mounted(mp: Path) -> bool:
try:
if mp.exists() and os.path.ismount(mp):
return True
with open("/proc/mounts") as f:
for line in f:
if f" {mp} " in line:
return True
except Exception:
return False
return False
while True:
uptime = int(time.time() - start)
h = uptime // 3600
m = (uptime % 3600) // 60
s = uptime % 60
if _mounted(mount_point):
os.environ["NEUROTRON_MODE"] = "persistent"
os.environ["NEUROTRON_RUNTIME"] = "/var/neurotron/data"
os.environ["NEUROTRON_LOG"] = "/var/neurotron/logs"
else:
os.environ["NEUROTRON_MODE"] = "volatile"
os.environ["NEUROTRON_RUNTIME"] = "/tmp/neurotron_data"
os.environ["NEUROTRON_LOG"] = "/tmp/neurotron_logs"
mode = ctx.mode.upper()
tick = ctx.tick
runtime_dir = Path(os.environ["NEUROTRON_RUNTIME"])
log_dir = Path(os.environ["NEUROTRON_LOG"])
runtime_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True)
return runtime_dir, log_dir
# linha de estado (esquerda)
line = (
f"UP: {h:02}:{m:02}:{s:02} "
f"TICK: {tick:0.2f}s "
f"MODO: {mode:10}"
)
# escreve sempre na coluna fixa
sys.stdout.write("\033[1;1H" + line + "\033[K")
sys.stdout.flush()
time.sleep(0.2)
def read_system_metrics():
"""
CPU, memória e loadavg a partir de /proc.
Retorna dicionário:
{
"cpu": float,
"mem": float,
"load1": float,
"load5": float,
"load15": float,
}
Em caso de falha, devolve valores -1.
"""
cpu = -1.0
mem = -1.0
load1 = load5 = load15 = -1.0
# =============================================================================
# CICLO COGNITIVO
# =============================================================================
# CPU (uso aproximado entre duas leituras de /proc/stat)
def cognitive_loop(ctx: Cortex):
"""Loop cognitivo principal (observe → think → act → rest)."""
try:
with open("/proc/stat") as f:
line = f.readline()
parts = line.strip().split()
if parts[0] == "cpu" and len(parts) >= 5:
user, nice, system_, idle = map(int, parts[1:5])
total1 = user + nice + system_ + idle
idle1 = idle
time.sleep(0.05) # pequena janela
with open("/proc/stat") as f:
line = f.readline()
parts = line.strip().split()
user2, nice2, system2, idle2 = map(int, parts[1:5])
total2 = user2 + nice2 + system2 + idle2
idle2 = idle2
total_delta = total2 - total1
idle_delta = idle2 - idle1
if total_delta > 0:
cpu = 100.0 * (1.0 - (idle_delta / total_delta))
except Exception:
pass
ctx.boot()
logbus.info("Ciclo cognitivo iniciado (observe → think → act → rest)")
# Memória
try:
meminfo = {}
with open("/proc/meminfo") as f:
for line in f:
k, v = line.split(":", 1)
meminfo[k.strip()] = v.strip()
total_kb = float(meminfo.get("MemTotal", "0 kB").split()[0])
free_kb = float(meminfo.get("MemAvailable", "0 kB").split()[0])
if total_kb > 0:
used_kb = total_kb - free_kb
mem = 100.0 * (used_kb / total_kb)
except Exception:
pass
while True:
ctx.observe() # sensores, discos, vitals
ctx.think() # neurónios decidem
ctx.act() # motor executa
ctx.rest() # heartbeat + diag + sleep
# Loadavg
try:
with open("/proc/loadavg") as f:
l1, l5, l15, *_ = f.read().split()
load1 = float(l1)
load5 = float(l5)
load15 = float(l15)
except Exception:
pass
return {
"cpu": cpu,
"mem": mem,
"load1": load1,
"load5": load5,
"load15": load15,
}
except Exception as e:
logbus.error(f"Fatal no ciclo cognitivo: {repr(e)}")
tb = traceback.format_exc()
for line in tb.splitlines():
logbus.error(line)
ctx.shutdown("fatal exception")
raise
# =======================================================================================
# Dashboard em curses
# =======================================================================================
class NeurotronDashboard:
"""
UI fixa em curses:
- header: métricas de sistema + estado do Neurotron
- middle: log rolling
- footer: placeholder + ajuda/teclas
"""
def __init__(self, stdscr, log_queue: Queue, cortex: Cortex, start_time: float):
self.stdscr = stdscr
self.log_queue = log_queue
self.cortex = cortex
self.start_time = start_time
self.log_lines = [] # mantém histórico para a janela central
self.max_log_lines = 1000
self.stop_event = threading.Event()
self.last_diag_state = "?"
self.last_diag_delta = "?"
# tenta obter diagnóstico inicial, se existir
try:
diag = self.cortex.diagnostic._load_previous()
self.last_diag_state = diag.get("state", "?")
self.last_diag_delta = diag.get("delta", "?")
except Exception:
pass
# ------------------------------------------------------------------
# Helpers de desenho
# ------------------------------------------------------------------
def _draw_header(self, height, width):
"""Desenha a barra superior com CPU/MEM/LOAD/UPTIME/MODO/DIAG."""
metrics = read_system_metrics()
now = time.time()
uptime_sec = int(now - self.start_time)
hours = uptime_sec // 3600
mins = (uptime_sec % 3600) // 60
secs = uptime_sec % 60
mode = os.environ.get("NEUROTRON_MODE", NEUROTRON_MODE)
version = getattr(self.cortex, "version", "0.1")
cpu = metrics["cpu"]
mem = metrics["mem"]
load1 = metrics["load1"]
# Avalia homeostase
warn_cpu = NEUROTRON_THRESHOLDS.get("cpu_high", 85.0)
warn_mem = NEUROTRON_THRESHOLDS.get("mem_high", 90.0)
warn_load = NEUROTRON_THRESHOLDS.get("load1_high", 2.0)
status_parts = []
if cpu >= 0:
status_parts.append(f"CPU: {cpu:5.1f}%")
else:
status_parts.append("CPU: N/A ")
if mem >= 0:
status_parts.append(f"MEM: {mem:5.1f}%")
else:
status_parts.append("MEM: N/A ")
if load1 >= 0:
status_parts.append(f"LOAD1: {load1:4.2f}")
else:
status_parts.append("LOAD1: N/A ")
status_parts.append(f"UP: {hours:02d}:{mins:02d}:{secs:02d}")
status_parts.append(f"MODO: {mode.upper()}")
status_parts.append(f"VER: {version}")
status_parts.append(f"DIAG: {self.last_diag_state}/{self.last_diag_delta}")
line = " ".join(status_parts)
# barra horizontal
self.stdscr.attron(curses.A_REVERSE)
self.stdscr.addnstr(0, 0, line.ljust(width), width)
self.stdscr.attroff(curses.A_REVERSE)
# segunda linha: homeostase textual
homeo_msg = "HOMEOSTASE: OK"
if (cpu >= warn_cpu and cpu >= 0) or (mem >= warn_mem and mem >= 0) or (
load1 >= warn_load and load1 >= 0
):
homeo_msg = "HOMEOSTASE: STRESS"
self.stdscr.addnstr(1, 0, homeo_msg.ljust(width), width)
def _drain_log_queue(self):
"""Move mensagens da queue para o buffer de linhas."""
try:
while True:
msg = self.log_queue.get_nowait()
timestamp = datetime.now().strftime("%H:%M:%S")
self.log_lines.append(f"[{timestamp}] {msg}")
if len(self.log_lines) > self.max_log_lines:
self.log_lines = self.log_lines[-self.max_log_lines:]
except Empty:
pass
def _draw_log_window(self, header_height, footer_height, width, height):
"""
Área central de logs: rolagem automática, sempre mostrando as últimas N linhas
que cabem na janela.
"""
top = header_height
bottom = height - footer_height
rows = max(0, bottom - top)
self._drain_log_queue()
# seleciona as últimas 'rows' linhas
visible = self.log_lines[-rows:] if rows > 0 else []
for i in range(rows):
y = top + i
if i < len(visible):
line = visible[i]
self.stdscr.addnstr(y, 0, line.ljust(width), width)
else:
self.stdscr.addnstr(y, 0, " ".ljust(width), width)
def _draw_footer(self, width, height):
"""
Rodapé com placeholder para input futuro e legenda de teclas.
Protegido contra terminais muito pequenos.
"""
footer_text = "[ Futuro: comandos do utilizador aparecerão aqui ]"
keys_text = "[q] sair | dashboard Neurotron"
lines = [footer_text, keys_text]
footer_lines = len(lines)
# Se o terminal for demasiado pequeno, encolhe ou só mostra o essencial
start_row = max(0, height - footer_lines)
for i, text in enumerate(lines):
y = start_row + i
if 0 <= y < height:
try:
self.stdscr.addnstr(y, 0, text.ljust(width), width)
except curses.error:
# Em última instância, ignoramos erros de desenho
pass
# ------------------------------------------------------------------
# Loop principal da UI
# ------------------------------------------------------------------
def run(self):
"""
Loop principal do curses.
Actualiza o ecrã, teclas, e encerra quando stop_event é setado ou 'q' é pressionado.
"""
curses.curs_set(0)
self.stdscr.nodelay(True)
self.stdscr.keypad(True)
while not self.stop_event.is_set():
height, width = self.stdscr.getmaxyx()
self.stdscr.erase()
header_height = 2
footer_height = 2
try:
self._draw_header(height, width)
self._draw_log_window(header_height, footer_height, width, height)
self._draw_footer(width, height)
except curses.error:
# Em terminais muito pequenos ou estados estranhos, evitamos crash
pass
self.stdscr.refresh()
try:
ch = self.stdscr.getch()
if ch in (ord("q"), ord("Q")):
self.stop_event.set()
break
except Exception:
pass
time.sleep(0.1) # ~10 FPS
# tentativa graciosa de shutdown do Cortex
try:
self.log_queue.put("Encerrando Neurotron (dashboard pediu saída)…")
self.cortex.shutdown(reason="Dashboard exit")
except Exception:
pass
# =======================================================================================
# Ciclo Cognitivo em thread separada
# =======================================================================================
def cognitive_loop(cortex: Cortex, ui: NeurotronDashboard):
"""
Loop cognitivo clássico (observe think act rest),
a correr numa thread separada, reportando eventos para o dashboard via log_queue.
"""
log = ui.log_queue.put
try:
log("Neurotron: boot()…")
cortex.boot()
try:
state = cortex.diagnostic._load_previous().get("state", "?")
log(f"Diagnóstico inicial: estado='{state}'")
ui.last_diag_state = state
except Exception:
log("Diagnóstico inicial: indisponível")
log("Ciclo cognitivo iniciado (observe → think → act → rest)…")
while not ui.stop_event.is_set():
try:
log("cortex.observe()")
cortex.observe()
log("cortex.think()")
cortex.think()
log("cortex.act()")
cortex.act()
log("cortex.rest()")
cortex.rest()
except KeyboardInterrupt:
log("Interrompido pelo utilizador (SIGINT)")
cortex.shutdown(reason="SIGINT")
break
except SystemExit:
log("SystemExit recebido, encerrando…")
cortex.shutdown(reason="SystemExit")
break
except Exception as e:
log(f"💥 Exceção não tratada no loop cognitivo: {e}")
try:
cortex.fatal(e)
finally:
ui.stop_event.set()
finally:
ui.stop_event.set()
log("Loop cognitivo terminado.")
# =======================================================================================
# Entry point
# =======================================================================================
def _main_curses(stdscr):
# 1) Detecta modo e diretórios
runtime_dir, log_dir = detect_persistent_mode()
# 2) Inicializa Cortex com os mesmos parâmetros do main_waiting
cortex = Cortex(
runtime_dir=runtime_dir,
log_dir=log_dir,
tick_seconds=NEUROTRON_TICK,
)
# 3) Queue de logs e dashboard
log_queue: Queue = Queue()
start_time = time.time()
ui = NeurotronDashboard(stdscr, log_queue, cortex, start_time)
# 4) Thread do ciclo cognitivo
worker = threading.Thread(
target=cognitive_loop,
args=(cortex, ui),
daemon=True,
)
worker.start()
# 5) Loop da UI (bloqueia até terminar)
ui.run()
# 6) Aguarda thread terminar
worker.join(timeout=2.0)
# =============================================================================
# MAIN
# =============================================================================
def main():
"""
Ponto de entrada Python. Usado tanto por:
python3 -m neurotron
como pelo wrapper /usr/bin/neurotron, se este fizer:
from neurotron import main
main()
Entrada principal do Neurotron.
Inicia:
- cortex
- dashboard (thread)
- ciclo cognitivo (thread)
"""
curses.wrapper(_main_curses)
runtime_dir = "/opt/kernel/neurotron/runtime"
log_dir = "/opt/kernel/neurotron/logs"
Path(runtime_dir).mkdir(parents=True, exist_ok=True)
Path(log_dir).mkdir(parents=True, exist_ok=True)
ctx = Cortex(runtime_dir=runtime_dir, log_dir=log_dir)
# threads
t_dash = threading.Thread(target=dashboard_loop, args=(ctx,), daemon=True)
t_cog = threading.Thread(target=cognitive_loop, args=(ctx,), daemon=True)
t_dash.start()
t_cog.start()
# thread principal fica apenas a dormir
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logbus.warn("Interrompido pelo utilizador")
ctx.shutdown("CTRL+C")
if __name__ == "__main__":
main()

View File

@ -1,185 +1,154 @@
from __future__ import annotations
import json, os
import json
from datetime import datetime, timezone
from rich.console import Console
from rich.table import Table
from pathlib import Path
from neurotron.logbus import logbus
from .neurotron_config import (
NEUROTRON_DATASET_PATH, NEUROTRON_HISTORY_KEEP, NEUROTRON_DIAG_SCHEMA,
HOMEOSTASIS_CPU_WARN, HOMEOSTASIS_CPU_ALERT,
HOMEOSTASIS_MEM_WARN, HOMEOSTASIS_MEM_ALERT,
HOMEOSTASIS_LOAD_WARN, HOMEOSTASIS_LOAD_ALERT,
NEUROTRON_DATASET_PATH,
NEUROTRON_HISTORY_KEEP,
NEUROTRON_DIAG_SCHEMA,
HOMEOSTASIS_CPU_WARN,
HOMEOSTASIS_CPU_ALERT,
HOMEOSTASIS_MEM_WARN,
HOMEOSTASIS_MEM_ALERT,
HOMEOSTASIS_LOAD_WARN,
HOMEOSTASIS_LOAD_ALERT,
)
from .perception import Perception
console = Console()
def _now_iso():
return datetime.now(timezone.utc).isoformat()
class AutoDiagnostic:
"""
Diagnóstico interno do Neurotron.
Sempre devolve chaves padronizadas:
cpu, mem, load1
Nunca lança exceções segurança máxima.
"""
def __init__(self, runtime_dir: str, log_dir: str):
self.runtime_dir = runtime_dir
self.log_dir = log_dir
self.runtime_dir = Path(runtime_dir)
self.log_dir = Path(log_dir)
self.data_dir = Path(NEUROTRON_DATASET_PATH)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.last_file = self.data_dir / "last_diagnostic.json"
self.telemetry_file = self.data_dir / "telemetry.json"
self.perception = Perception()
self.current = None
self.previous = None
# ----------------------------------------------------------------------
def _load_previous(self):
if not self.last_file.exists():
return None
try:
with open(self.last_file, "r") as f:
return json.load(f)
except Exception:
return None
if self.last_file.exists():
return json.loads(self.last_file.read_text())
except Exception as e:
logbus.debug(f"[diag.warn] falha ao ler último diagnóstico: {e}")
return None
def _save_current(self, payload: dict):
# ----------------------------------------------------------------------
def _save_current(self, payload):
prev = self._load_previous()
history = []
if self.last_file.exists():
try:
with open(self.last_file, "r") as f:
prev = json.load(f)
history = prev.get("history", [])
history.append({
"timestamp": prev.get("timestamp"),
"cpu_percent": prev.get("cpu_percent"),
"mem_percent": prev.get("mem_percent"),
"loadavg": prev.get("loadavg"),
"state": prev.get("state", "UNKNOWN"),
})
history = history[-NEUROTRON_HISTORY_KEEP:]
except Exception:
history = []
payload["history"] = history
with open(self.last_file, "w") as f:
json.dump(payload, f, indent=2)
if prev:
history = prev.get("history", [])
history.append({
"timestamp": prev.get("timestamp"),
"cpu": prev.get("cpu"),
"mem": prev.get("mem"),
"load1": prev.get("load1"),
"state": prev.get("state", "UNKNOWN"),
})
history = history[-NEUROTRON_HISTORY_KEEP:]
payload["history"] = history
try:
self.last_file.write_text(json.dumps(payload, indent=2))
except Exception as e:
logbus.debug(f"[diag.error] falha ao gravar diagnóstico: {e}")
# ----------------------------------------------------------------------
def _classify_state(self, cpu, mem, l1):
# valores podem ser "?"
try:
cpu = float(cpu)
mem = float(mem)
l1 = float(l1)
l1 = float(l1)
except Exception:
return "UNKNOWN"
# ALERT/CRITICAL
if cpu >= HOMEOSTASIS_CPU_ALERT or mem >= HOMEOSTASIS_MEM_ALERT or l1 >= HOMEOSTASIS_LOAD_ALERT:
return "CRITICAL"
if cpu >= HOMEOSTASIS_CPU_WARN or mem >= HOMEOSTASIS_MEM_WARN or l1 >= HOMEOSTASIS_LOAD_WARN:
return "ALERT"
# OKs
return "STABLE"
def _delta(self, a, b):
try:
if isinstance(a, list) and isinstance(b, list) and len(a) == len(b):
return [round(float(x) - float(y), 2) for x, y in zip(a, b)]
return round(float(a) - float(b), 2)
except Exception:
return "?"
def _render_mini_trend(self, values, width=24, charset="▁▂▃▄▅▆▇█"):
if not values:
return ""
lo = min(values); hi = max(values)
if not isinstance(lo, (int, float)) or not isinstance(hi, (int, float)):
return ""
span = (hi - lo) or 1.0
levels = len(charset) - 1
bars = []
for v in values[-width:]:
if not isinstance(v, (int, float)):
bars.append("·")
continue
i = int(round((v - lo) / span * levels))
bars.append(charset[i])
return "".join(bars)
# ----------------------------------------------------------------------
def run_exam(self):
console.print("\n[bold]🤖 Iniciando rotina de Auto-Diagnóstico Evolutivo...[/bold]\n")
"""
Produz um snapshot canónico com chaves:
cpu, mem, load1
"""
snap = self.perception.snapshot()
cpu = snap.get("cpu_percent", "?")
mem = snap.get("mem_percent", "?")
cpu = snap.get("cpu_percent", "?")
mem = snap.get("mem_percent", "?")
load = snap.get("loadavg", ["?", "?", "?"])
prev = self._load_previous()
self.previous = prev
# deltas
cpu_prev = prev.get("cpu_percent") if prev else "?"
mem_prev = prev.get("mem_percent") if prev else "?"
load_prev = prev.get("loadavg") if prev else ["?", "?", "?"]
d_cpu = self._delta(cpu, cpu_prev)
d_mem = self._delta(mem, mem_prev)
d_load = self._delta(load, load_prev)
# estado
l1 = load[0] if isinstance(load, list) and load else "?"
state = self._classify_state(cpu, mem, l1)
# tabela
table = Table(title="🩺 Exame Clínico Evolutivo", show_lines=True)
table.add_column("Sinal Vital")
table.add_column("Atual", justify="right")
table.add_column("Δ", justify="center")
table.add_column("Anterior", justify="right")
def fmt(v):
if isinstance(v, list):
return str(v)
return str(v)
table.add_row("CPU (%)", fmt(cpu), fmt(d_cpu), fmt(cpu_prev))
table.add_row("Memória (%)", fmt(mem), fmt(d_mem), fmt(mem_prev))
table.add_row("Carga média (1/5/15)", fmt(load), "" if d_load == "?" else fmt(d_load), fmt(load_prev))
console.print(table)
if isinstance(load, list) and load:
l1 = load[0]
else:
l1 = "?"
# chaves normalizadas
payload = {
"schema": NEUROTRON_DIAG_SCHEMA,
"timestamp": _now_iso(),
"cpu_percent": cpu,
"mem_percent": mem,
"cpu": cpu,
"mem": mem,
"load1": l1,
"loadavg": load,
"state": state,
"state": self._classify_state(cpu, mem, l1),
"env": {
"user": snap.get("env_user"),
"term": snap.get("env_term"),
},
}
self._save_current(payload)
console.print(f"[green]✔ Histórico evolutivo atualizado em:[/green] \n{self.last_file}")
self._update_telemetry(payload)
# Atualiza telemetria contínua
logbus.debug(
f"[diag] estado={payload['state']} cpu={cpu} mem={mem} load1={l1}"
)
return payload["state"], payload
# ----------------------------------------------------------------------
def _update_telemetry(self, payload):
try:
telemetry_file = Path(NEUROTRON_DATASET_PATH) / "telemetry.json"
telemetry_file.parent.mkdir(parents=True, exist_ok=True)
tele = []
if self.telemetry_file.exists():
tele = json.loads(self.telemetry_file.read_text() or "[]")
telemetry = []
if telemetry_file.exists():
telemetry = json.loads(telemetry_file.read_text() or "[]")
telemetry.append({
tele.append({
"timestamp": payload["timestamp"],
"cpu": payload.get("cpu_percent"),
"mem": payload.get("mem_percent"),
"load": payload.get("loadavg"),
"state": payload.get("state"),
"cpu": payload["cpu"],
"mem": payload["mem"],
"load1": payload["load1"],
"state": payload["state"],
})
telemetry = telemetry[-128:] # manter últimas 128 amostras
telemetry_file.write_text(json.dumps(telemetry, indent=2))
tele = tele[-128:]
self.telemetry_file.write_text(json.dumps(tele, indent=2))
except Exception as e:
console.print(f"[yellow]⚠️ Falha ao atualizar telemetria:[/] {e}")
return state, payload
logbus.debug(f"[diag.warn] falha ao atualizar telemetria: {e}")

View File

@ -1,231 +1,173 @@
# neurotron/cortex.py
import json
import time
from collections import defaultdict, deque
from pathlib import Path
from collections import defaultdict, deque
from time import sleep
from rich.console import Console
from .neuron import Neuron
from neurotron.logbus import logbus
from neurotron.disk_agent import DiskAgent
from neurotron.echo_agent import EchoAgent
from neurotron.vitalsigns_agent import VitalSigns
from .hippocampus import Hippocampus
from .perception import Perception
from .motor import Motor
from .neurotron_config import (
NEUROTRON_MODE, NEUROTRON_TICK, NEUROTRON_TICK_MIN, NEUROTRON_TICK_MAX, NEUROTRON_TICK_STEP,
NEUROTRON_DIAG_EVERY_TICKS, NEUROTRON_DATASET_PATH,
HEARTBEAT_ENABLED, HEARTBEAT_STYLE, NEUROTRON_THRESHOLDS,
TELEMETRY_MAXLEN, TELEMETRY_FLUSH_EVERY_TICKS,
)
from .autodiagnostic import AutoDiagnostic
class VitalSigns(Neuron):
name = "VitalSigns"
def observe(self) -> None:
snap = self.ctx.perception.snapshot()
self.publish("vitals", snap)
self.ctx.memory.remember("observe.vitals", snap)
class EchoAgent(Neuron):
name = "EchoAgent"
def think(self) -> None:
msg = self.consume("vitals")
if msg:
self.publish("actions", {"action": "echo", "text": f"CPU {msg.get('cpu_percent', '?')}%"})
from .neurotron_config import (
NEUROTRON_MODE, NEUROTRON_TICK,
NEUROTRON_TICK_MIN, NEUROTRON_TICK_MAX, NEUROTRON_TICK_STEP,
NEUROTRON_DIAG_EVERY_TICKS,
NEUROTRON_DATASET_PATH,
HEARTBEAT_ENABLED,
NEUROTRON_THRESHOLDS,
TELEMETRY_MAXLEN, TELEMETRY_FLUSH_EVERY_TICKS,
)
class Cortex:
"""
Orquestrador: liga neurónios, bus de mensagens, memória, IO e ciclo cognitivo.
Agora com Telemetria Contínua (V5): heartbeat, microalertas e flush periódico.
"""
def __init__(self, runtime_dir, log_dir, tick_seconds=NEUROTRON_TICK):
self.runtime_dir = runtime_dir
self.log_dir = log_dir
self.runtime_dir = Path(runtime_dir)
self.log_dir = Path(log_dir)
self.tick = float(tick_seconds)
self.mode = NEUROTRON_MODE
self._tick_count = 0
self.diagnostic = AutoDiagnostic(runtime_dir, log_dir)
self.console = Console()
self.memory = Hippocampus(log_dir=log_dir)
self.diagnostic = AutoDiagnostic(runtime_dir, log_dir)
self.memory = Hippocampus(log_dir=self.log_dir)
self.perception = Perception()
self.motor = Motor()
# Message bus simples: channels → deque
self.bus = defaultdict(lambda: deque(maxlen=32))
# Telemetria em memória (curto prazo)
self.telemetry = deque(maxlen=TELEMETRY_MAXLEN)
# Regista neurónios (podes adicionar mais à medida)
self.neurons: list[Neuron] = [
# ordem é importante
self.neurons = [
DiskAgent(self),
VitalSigns(self),
EchoAgent(self),
]
self._booted = False
# Caminho para gravar a telemetria
self.telemetry_path = Path(NEUROTRON_DATASET_PATH) / "telemetry.json"
self.telemetry_path.parent.mkdir(parents=True, exist_ok=True)
# ——— ciclo de vida ———
def boot(self) -> None:
# ----------------------------------------
# Boot
# ----------------------------------------
def boot(self):
if self._booted:
return
self.console.print("[bold cyan]🧠 Neurotron[/] — boot")
self.memory.remember("boot", {"version": "0.1", "tick": self.tick})
logbus.info(f"Neurotron boot() — mode={self.mode}")
self.memory.remember("boot", {"tick": self.tick, "mode": self.mode})
self._booted = True
#state, _ = self.diagnostic.run_exam()
#self._apply_homeostasis(state)
def _apply_homeostasis(self, state):
# ----------------------------------------
# Shutdown + Fatal
# ----------------------------------------
def shutdown(self, reason=""):
logbus.warn(f"Shutdown pedido — {reason}")
self.memory.remember("shutdown", {"reason": reason})
def fatal(self, e: Exception):
logbus.error(f"Fatal: {repr(e)}")
self.memory.remember("fatal", {"error": repr(e)})
raise e
# ----------------------------------------
# ciclo cognitivo
# ----------------------------------------
def observe(self):
for n in self.neurons:
n.observe()
def think(self):
for n in self.neurons:
n.think()
def act(self):
action = self.bus_consume("actions")
if not action:
return
# echo (debug)
if action.get("action") == "echo":
res = self.motor.run("echo", [action.get("text", "")])
if res.get("stdout"):
logbus.info(f"[echo] {res['stdout'].strip()}")
def rest(self):
if HEARTBEAT_ENABLED:
self._heartbeat()
sleep(self.tick)
self._tick_count += 1
if self._tick_count % NEUROTRON_DIAG_EVERY_TICKS == 0:
self._run_diag()
if self._tick_count % TELEMETRY_FLUSH_EVERY_TICKS == 0:
self._flush_telemetry()
# ----------------------------------------
# heartbeat
# ----------------------------------------
def _heartbeat(self):
snap = self.perception.snapshot()
cpu = snap.get("cpu_percent")
mem = snap.get("mem_percent")
load = snap.get("loadavg")
load1 = load[0] if load else None
self.telemetry.append({
"ts": time.time(),
"cpu": cpu,
"mem": mem,
"load1": load1,
"tick": self.tick,
})
logbus.heart(f"cpu={cpu}% mem={mem}% tick={self.tick:.2f}s")
# ----------------------------------------
# diag / homeostase
# ----------------------------------------
def _run_diag(self):
state, snap = self.diagnostic.run_exam()
logbus.diag(f"estado={state} cpu={snap['cpu']} mem={snap['mem']} load1={snap['load1']}")
old = self.tick
if state == "CRITICAL":
self.mode = "diagnostic"
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP)
elif state == "ALERT":
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP / 2)
elif state == "STABLE":
self.tick = max(NEUROTRON_TICK_MIN, self.tick - NEUROTRON_TICK_STEP / 2)
# UNKNOWN → não mexe
def shutdown(self, reason: str = ""):
self.console.print(f"[yellow]shutdown:[/] {reason}")
self.memory.remember("shutdown", {"reason": reason})
def fatal(self, e: Exception):
self.console.print(f"[red]fatal:[/] {e!r}")
self.memory.remember("fatal", {"error": repr(e)})
print(f"fatal: {repr(e)}")
raise
# ——— loop ———
def observe(self) -> None:
for n in self.neurons:
n.observe()
def think(self) -> None:
for n in self.neurons:
n.think()
def act(self) -> None:
# Consumir ações agregadas e executar
action = self.bus_consume("actions")
if action and action.get("action") == "echo":
res = self.motor.run("echo", [action.get("text", "")])
self.memory.remember("act.echo", res)
if res.get("stdout"):
self.console.print(f"[green]{res['stdout'].strip()}[/]")
def rest(self):
# Heartbeat e microalertas antes de dormir
if HEARTBEAT_ENABLED:
self._heartbeat_and_telemetry()
# Pausa regulada
sleep(self.tick)
# Contador e rotinas periódicas
self._tick_count += 1
if self._tick_count % NEUROTRON_DIAG_EVERY_TICKS == 0:
#state, _ = self.diagnostic.run_exam()
#self._apply_homeostasis(state)
pass
if self._tick_count % TELEMETRY_FLUSH_EVERY_TICKS == 0:
self._flush_telemetry()
# ——— telemetria/alertas ———
def _heartbeat_and_telemetry(self):
snap = self.perception.snapshot()
cpu = snap.get("cpu_percent", "?")
mem = (snap.get("mem") or {}).get("percent", "?")
load = snap.get("loadavg") or []
# Adiciona ao buffer de telemetria
self.telemetry.append({
"ts": time.time(),
"cpu": cpu,
"mem": mem,
"load": load,
"tick": self.tick,
})
# Microalertas com base nos limiares
self._evaluate_microalerts(cpu, mem, load)
# Heartbeat visual
color = self._color_for_levels(cpu, mem, load)
if HEARTBEAT_STYLE == "compact":
self.console.print(f"[bold {color}]💓[/] CPU: {cpu}% | MEM: {mem}% | TICK: {self.tick:.2f}s")
else:
self.console.print(
f"[bold {color}]💓 [Heartbeat][/bold {color}] "
f"CPU: {cpu}% | MEM: {mem}% | LOAD: {load} | TICK: {self.tick:.2f}s | MODE: {self.mode}"
)
def _evaluate_microalerts(self, cpu, mem, load):
alerts = []
# Normaliza
load1 = load[0] if (isinstance(load, (list, tuple)) and load) else None
try:
if isinstance(cpu, (int, float)) and cpu >= NEUROTRON_THRESHOLDS["cpu_high"]:
alerts.append(("cpu", cpu))
if isinstance(mem, (int, float)) and mem >= NEUROTRON_THRESHOLDS["mem_high"]:
alerts.append(("mem", mem))
if isinstance(load1, (int, float)) and load1 >= NEUROTRON_THRESHOLDS["load1_high"]:
alerts.append(("load1", load1))
except KeyError:
pass # thresholds incompletos → sem microalertas
if not alerts:
return
for (metric, value) in alerts:
self.console.print(f"[yellow]⚠️ Microalerta:[/] {metric.upper()} {value} — ajustando homeostase (tick +{NEUROTRON_TICK_STEP:.2f}s)")
# Ajuste simples de segurança
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP)
self.memory.remember("microalert", {
"ts": time.time(),
"alerts": alerts,
"new_tick": self.tick,
})
def _color_for_levels(self, cpu, mem, load):
# Heurística simples de cor
try:
load1 = load[0] if (isinstance(load, (list, tuple)) and load) else 0.0
high = (
(isinstance(cpu, (int, float)) and cpu >= NEUROTRON_THRESHOLDS["cpu_high"]) or
(isinstance(mem, (int, float)) and mem >= NEUROTRON_THRESHOLDS["mem_high"]) or
(isinstance(load1, (int, float)) and load1 >= NEUROTRON_THRESHOLDS["load1_high"])
)
if high:
return "yellow"
except Exception:
pass
return "green"
if old != self.tick:
logbus.info(f"tick ajustado {old:.2f}s → {self.tick:.2f}s")
# ----------------------------------------
# telemetria
# ----------------------------------------
def _flush_telemetry(self):
# Grava o buffer de telemetria em JSON (mantendo histórico curto)
try:
data = list(self.telemetry)
with self.telemetry_path.open("w") as f:
json.dump(data, f)
self.memory.remember("telemetry.flush", {"count": len(data), "path": str(self.telemetry_path)})
self.telemetry_path.write_text(json.dumps(list(self.telemetry)))
self.memory.remember("telemetry.flush", {})
except Exception as e:
self.console.print(f"[red]✖ Falha ao gravar telemetria:[/] {e!r}")
self.memory.remember("telemetry.error", {"error": repr(e)})
logbus.error(f"telemetry error: {e}")
# ----------------------------------------
# bus interno
# ----------------------------------------
def bus_publish(self, ch, payload):
self.bus[ch].append(payload)
def bus_consume(self, ch):
q = self.bus[ch]
return q.popleft() if q else None
# ——— bus ———
def bus_publish(self, channel: str, payload: dict) -> None:
self.bus[channel].append(payload)
def bus_consume(self, channel: str) -> dict | None:
q = self.bus[channel]
return q.popleft() if q else None

View File

@ -0,0 +1,106 @@
# neurotron/disk_agent.py
import os
import subprocess
from pathlib import Path
from neurotron.neuron import Neuron
from neurotron.logbus import logbus
DISK_CANDIDATES = ["/dev/vda", "/dev/sda", "/dev/vdb"]
MOUNT_POINT = "/mnt/nfdos"
class DiskAgent(Neuron):
name = "DiskAgent"
def __init__(self, ctx):
super().__init__(ctx)
self.state = "unknown" # unknown → no_disk → found → mounted
self.last_msg = None
self.cooldown = 0
# --------------------------------
# HELPER
# --------------------------------
def _emit_once(self, msg):
if msg != self.last_msg:
logbus.disk(msg)
self.last_msg = msg
def _run(self, cmd):
try:
subprocess.run(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True
)
return True
except Exception:
return False
# --------------------------------
# OBSERVE
# --------------------------------
def observe(self):
# corre só 1x por 10 ticks
if self.cooldown > 0:
self.cooldown -= 1
return
self.cooldown = 10
# 1) Procurar disco
dev = None
for d in DISK_CANDIDATES:
if Path(d).exists():
dev = d
break
if not dev:
self._emit_once("Nenhum disco encontrado — modo VOLATILE")
self.state = "no_disk"
return
# 2) Novo disco encontrado
if self.state == "unknown":
self._emit_once(f"Disco detectado: {dev}")
self.state = "found"
# 3) Tentar identificar filesystem
try:
blkid = subprocess.run(
["blkid", dev],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).stdout.strip()
except FileNotFoundError:
# blkid não existe → fallback gentle
logbus.debug("[disk] blkid não encontrado — fallback ativo")
blkid = ""
if blkid:
self._emit_once("Sistema de ficheiros válido encontrado")
else:
self._emit_once("Disco virgem detectado — não formatado")
# 4) Montar
Path(MOUNT_POINT).mkdir(parents=True, exist_ok=True)
if self._run(["mount", dev, MOUNT_POINT]):
if self.state != "mounted":
self._emit_once(f"Montado em {MOUNT_POINT}")
self.state = "mounted"
# 5) Trocar Neurotron → modo persistente
if self.ctx.mode != "persistent":
self.ctx.mode = "persistent"
logbus.info("Modo alterado → PERSISTENT")
# 6) Estrutura básica
for d in ["data", "logs", "dna"]:
Path(MOUNT_POINT, d).mkdir(exist_ok=True)
else:
self._emit_once("Falha ao montar — mantendo VOLATILE")

View File

@ -1,254 +0,0 @@
#!/usr/bin/env python3
"""
💾 Módulo de Inicialização de Disco Neurotron V0.1 (atualizado)
Detecta, avalia, prepara e monta o disco persistente do NFDOS.
- Não formata discos que contenham um filesystem conhecido, a menos que forçado.
- Forçar formatação:
* EXPORT: export NFDOS_FORCE_FORMAT=1 (no ambiente do initramfs, se aplicável)
* Kernel cmdline: adicionar `nfdos_force_format=1` ao -append do QEMU
"""
import os
import subprocess
from pathlib import Path
from rich.console import Console
from neurotron.neurotron_config import (
MOUNT_POINT, DISK_CANDIDATES
)
console = Console()
def run(cmd: list[str]) -> bool:
"""Executa comando silenciosamente (retorna True se OK)."""
try:
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def detect_disk() -> str | None:
"""Procura por um dispositivo de disco válido (por ordem em DISK_CANDIDATES)."""
for dev in DISK_CANDIDATES:
p = Path(dev)
if p.exists():
console.print(f"[cyan]🔍 Detetado disco:[/] {dev}")
return dev
console.print("[yellow]⚠️ Nenhum disco detectado.[/yellow]")
return None
def blkid_check(device: str) -> str | None:
"""Tenta obter tipo com blkid (se disponível)."""
try:
out = subprocess.run(["blkid", device], stdout=subprocess.PIPE, text=True, check=False)
return out.stdout.strip() if out.stdout else None
except FileNotFoundError:
return None
def read_sig(device: str, size: int = 2048) -> bytes | None:
"""Lê os primeiros `size` bytes do device (se possível)."""
try:
with open(device, "rb") as f:
return f.read(size)
except Exception:
return None
def detect_fs_by_magic(device: str) -> str | None:
"""
Detecta assinaturas simples:
- ext4 superblock magic (0xEF53) @ offset 1024 + 56 = 1080
- NTFS -> 'NTFS ' @ offset 3
- FAT32 -> 'FAT32' nos offsets típicos do boot sector
- MBR partition table signature 0x55AA @ offset 510-511
Retorna string com o sistema ou None.
"""
buf = read_sig(device, size=4096)
if not buf:
return None
# MBR signature
if len(buf) >= 512 and buf[510:512] == b'\x55\xAA':
# detecta tabela de partições existente (MBR)
return "mbr-partition-table"
# ext magic at 1024+56 = 1080
if len(buf) >= 1082 and buf[1080:1082] == b'\x53\xEF':
return "ext (superblock)"
# NTFS signature at offset 3 (ASCII "NTFS ")
if len(buf) >= 11 and buf[3:11] == b'NTFS ':
return "ntfs"
# FAT32 signature at offset 82 or boot sector strings containing FAT
if b"FAT32" in buf or b"FAT16" in buf or b"FAT12" in buf:
return "fat"
return None
def parse_cmdline_flag() -> bool:
"""Lê /proc/cmdline para a flag nfdos_force_format=1"""
try:
with open("/proc/cmdline", "r") as f:
cmd = f.read()
return "nfdos_force_format=1" in cmd.split()
except Exception:
return False
def which(prog: str) -> str | None:
for p in os.environ.get("PATH", "/sbin:/bin:/usr/sbin:/usr/bin").split(":"):
cand = Path(p) / prog
if cand.exists() and os.access(cand, os.X_OK):
return str(cand)
return None
def format_ext4(device: str, label: str = "NFDOS_DATA") -> bool:
"""Formata o dispositivo com ext4, recolhendo logs de erro detalhados (BusyBox-safe)."""
mke2fs = which("mke2fs")
mkfs_ext4 = which("mkfs.ext4")
mkfs = which("mkfs")
candidates = []
if mkfs_ext4:
candidates.append(([mkfs_ext4, "-F", "-L", label, device], "mkfs.ext4"))
if mke2fs:
# o BusyBox mke2fs não aceita '-t', por isso ajustaremos dentro do loop
candidates.append(([mke2fs, "-F", "-t", "ext4", "-L", label, device], "mke2fs"))
if mkfs:
candidates.append(([mkfs, "-t", "ext4", "-F", "-L", label, device], "mkfs"))
if not candidates:
console.print("[red]❌ Nenhum utilitário mkfs disponível no initramfs![/red]")
return False
for cmd, name in candidates:
console.print(f"[yellow]⚙️ Formatando {device} com {name}...[/yellow]")
# 👉 se for o BusyBox mke2fs, removemos o argumento -t
if name == "mke2fs":
cmd = [c for c in cmd if c != "-t" and c != "ext4"]
try:
result = subprocess.run(
cmd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
if result.stdout:
console.print(result.stdout.strip())
console.print(f"[green]✔ Formatação concluída com {name}.[/green]")
return True
except subprocess.CalledProcessError as e:
console.print(f"[red]❌ {name} falhou (código {e.returncode}).[/red]")
if e.stdout:
console.print(f"[cyan]📜 STDOUT:[/cyan]\n{e.stdout.strip()}")
if e.stderr:
console.print(f"[magenta]⚠️ STDERR:[/magenta]\n{e.stderr.strip()}")
console.print("[red]❌ Nenhum método de formatação teve sucesso.[/red]")
console.print("[cyan]🧠 Sugestão:[/] verifique se o kernel suporta EXT4 e se o BusyBox inclui mke2fs.")
return False
def ensure_fs(device: str) -> bool:
"""
Verifica se existe sistema de ficheiros.
Se não existir e houver confirmação/flag, formata ext4 (ou fallback via mke2fs).
"""
# 1⃣ tentativa rápida com blkid
info = blkid_check(device)
if info:
console.print(f"[green]🧠 Disco já formatado (blkid):[/] {info}")
return True
# 2⃣ fallback por leituras de assinatura
sig = detect_fs_by_magic(device)
if sig:
console.print(f"[yellow]⚠ Assinatura detectada no disco:[/] {sig}")
console.print("[red]❗ O disco contém dados ou partições existentes. Abortando formatação.[/red]")
return False
# 3⃣ se nada detectado — disco virgem
forced_env = os.environ.get("NFDOS_FORCE_FORMAT") == "1"
forced_cmd = parse_cmdline_flag()
if not (forced_env or forced_cmd):
console.print("[yellow]⚠ Disco parece virgem, mas não há confirmação para formatar.[/yellow]")
console.print("Use `nfdos_force_format=1` no kernel cmdline ou export NFDOS_FORCE_FORMAT=1")
console.print("para permitir formatação automática.")
return False
# 4⃣ tentar formatação
console.print(f"[yellow]⚙️ Forçando formatação de {device} como ext4 (FLAG DETETADA)...[/yellow]")
ok = format_ext4(device)
if ok:
console.print("[green]✔ Formatação concluída com sucesso.[/green]")
return True
# 5⃣ se nada funcionou
console.print("[red]❌ Falha na formatação.[/red]")
console.print("[cyan]🧠 Sugestão:[/] verifique se o kernel inclui suporte para EXT4 ou se o mkfs/mke2fs está embutido no BusyBox.")
return False
def mount_disk(device: str) -> bool:
"""Monta o disco no ponto esperado (retorna True se OK)."""
os.makedirs(MOUNT_POINT, exist_ok=True)
return run(["mount", device, MOUNT_POINT])
def debug_env():
"""Mostra informações úteis quando nenhum disco é detectado (ou para debug)."""
console.print("[yellow]🩻 DEBUG: listando /dev/* e últimas mensagens do kernel[/yellow]")
devs = sorted(Path("/dev").glob("*"))
console.print("📂 Dispositivos disponíveis:", ", ".join([d.name for d in devs if d.is_char_device() or d.is_block_device()]))
os.system("dmesg | tail -n 20 || echo '(dmesg não disponível)'")
console.print("[yellow]───────────────────────────────[/yellow]")
os.system("echo '--- /proc/partitions ---'; cat /proc/partitions || true")
os.system("echo '--- dmesg | grep -i virtio ---'; dmesg | grep -i virtio || true")
console.print("[yellow]───────────────────────────────[/yellow]")
def initialize_persistence():
"""Fluxo completo de inicialização do hipocampo físico."""
device = detect_disk()
if not device:
debug_env()
console.print("[red]❌ Nenhum disco físico encontrado — usando modo RAM.[/red]")
return False
if not ensure_fs(device):
console.print("[red]❌ Preparação do sistema de ficheiros foi interrompida.[/red]")
return False
if not mount_disk(device):
console.print("[red]❌ Falha ao montar disco.[/red]")
return False
console.print(f"[green]✔ Disco montado em:[/] {MOUNT_POINT}")
telemetry_file = Path("/opt/kernel/neurotron/data/telemetry.json")
telemetry_file.parent.mkdir(parents=True, exist_ok=True)
if not telemetry_file.exists():
telemetry_file.write_text("[]")
for d in ["data", "logs", "dna"]:
Path(MOUNT_POINT, d).mkdir(parents=True, exist_ok=True)
Path(MOUNT_POINT, "DNA_ID").write_text("NEUROTRON_HIPOCAMPUS_V1\n")
console.print("[cyan]👉 Hipocampo físico inicializado com sucesso.[/cyan]")
return True
if __name__ == "__main__":
initialize_persistence()

View File

@ -0,0 +1,27 @@
# neurotron/echo_agent.py
from neurotron.neuron import Neuron
from neurotron.logbus import logbus
class EchoAgent(Neuron):
name = "EchoAgent"
def __init__(self, ctx):
super().__init__(ctx)
self.last_cpu = None
def think(self):
msg = self.consume("vitals")
if not msg:
return
cpu = msg.get("cpu_percent")
if cpu is None:
return
# só fala se variar > 5%
if self.last_cpu is None or abs(cpu - self.last_cpu) >= 5:
self.publish("actions", {
"action": "echo",
"text": f"CPU {cpu:.1f}%"
})
self.last_cpu = cpu

View File

@ -3,7 +3,7 @@ from datetime import datetime
try:
import orjson as json
except Exception: # fallback leve
except Exception:
import json # type: ignore
class Hippocampus:
@ -11,9 +11,11 @@ class Hippocampus:
Memória contextual simples (JSON Lines): append-only.
Guarda perceções, decisões e ações para replays futuros.
"""
def __init__(self, log_dir: Path):
self.log_dir = log_dir
self.events_file = log_dir / "events.jsonl"
self.log_dir.mkdir(parents=True, exist_ok=True)
self.events_file = self.log_dir / "events.jsonl"
def remember(self, kind: str, data: dict) -> None:
rec = {
@ -21,14 +23,17 @@ class Hippocampus:
"kind": kind,
"data": data,
}
try:
if "orjson" in json.__name__:
blob = json.dumps(rec)
else:
blob = json.dumps(rec) # type: ignore
# Compatível com orjson e stdlib json
blob = json.dumps(rec)
if isinstance(blob, str):
blob = blob.encode("utf-8")
with self.events_file.open("ab") as f:
f.write(blob if isinstance(blob, bytes) else blob.encode("utf-8"))
f.write(blob)
f.write(b"\n")
except Exception:
# evitar crash por IO em early boot
# Evitar crash em early boot ou IO quebrado
pass

View File

@ -0,0 +1,29 @@
# neurotron/logbus.py
from datetime import datetime
import sys
import threading
class LogBus:
def __init__(self):
self.lock = threading.Lock()
def emit(self, level, msg):
"""Escreve logs com timestamp e nível padronizado."""
ts = datetime.utcnow().strftime("%H:%M:%S")
line = f"[{ts}] [{level}] {msg}\n"
with self.lock:
sys.stdout.write(line)
sys.stdout.flush()
# atalhos
def info(self, msg): self.emit("info", msg)
def warn(self, msg): self.emit("warn", msg)
def error(self, msg): self.emit("error", msg)
def disk(self, msg): self.emit("disk", msg)
def diag(self, msg): self.emit("diag", msg)
def heart(self, msg): self.emit("heart", msg)
def debug(self, msg): self.emit("debug", msg)
logbus = LogBus()

View File

@ -1,114 +0,0 @@
#!/usr/bin/env python3
"""
Neurotron ponto de entrada do cérebro do NFDOS.
Boot flow (novo): init (BusyBox) /usr/bin/neurotron este ficheiro.
"""
import os
import sys
from pathlib import Path
from datetime import datetime
from rich.console import Console
# -----------------------------------------------------------------------------
# Ajuste de caminho: tornar "src/" o root dos módulos Neurotron
# -----------------------------------------------------------------------------
THIS_DIR = Path(__file__).resolve().parent # .../neurotron/src
if str(THIS_DIR) not in sys.path:
sys.path.insert(0, str(THIS_DIR))
# Agora os imports ficam locais ao diretório src/
from neurotron_config import ( # noqa: E402
NEUROTRON_TICK,
NEUROTRON_MODE,
NEUROTRON_HOMEOSTASIS,
CORTEX_LOOP_DELAY,
MOTOR_OUTPUT_DEVICE,
)
from autodiagnostic import AutoDiagnostic # noqa: E402
from perception import Perception # noqa: E402
from cortex import Cortex # noqa: E402
console = Console()
def detect_persistent_mount() -> bool:
"""Verifica se o hipocampo físico está montado em /var/neurotron"""
mount_point = Path("/var/neurotron")
try:
if mount_point.exists() and os.path.ismount(mount_point):
console.print(f"[green]💾 Hipocampo físico montado:[/] {mount_point}")
return True
else:
# fallback: check via /proc/mounts em early boot
with open("/proc/mounts") as f:
for line in f:
if " /var/neurotron " in line:
console.print(f"[green]💾 Hipocampo físico montado (via /proc):[/] {mount_point}")
return True
except Exception as e:
console.print(f"[yellow]⚠ Falha ao verificar montagem persistente:[/] {e}")
return False
def main():
# -------------------------------------------------------------------------
# Seleção de modo: persistente vs volátil
# -------------------------------------------------------------------------
persistent_mode = detect_persistent_mount()
if persistent_mode:
os.environ["NEUROTRON_MODE"] = "persistent"
os.environ["NEUROTRON_RUNTIME"] = "/var/neurotron/data"
os.environ["NEUROTRON_LOG"] = "/var/neurotron/logs"
else:
os.environ["NEUROTRON_MODE"] = "volatile"
os.environ["NEUROTRON_RUNTIME"] = "/tmp/neurotron_data"
os.environ["NEUROTRON_LOG"] = "/tmp/neurotron_logs"
runtime_dir = Path(os.environ["NEUROTRON_RUNTIME"])
log_dir = Path(os.environ["NEUROTRON_LOG"])
runtime_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True)
mode = os.environ["NEUROTRON_MODE"]
console.print(f"[cyan]🌍 Modo atual do Neurotron:[/] [bold]{mode.upper()}[/]")
# -------------------------------------------------------------------------
# Inicializa o Córtex
# -------------------------------------------------------------------------
cortex = Cortex(
runtime_dir=runtime_dir,
log_dir=log_dir,
tick_seconds=NEUROTRON_TICK,
)
try:
cortex.boot()
state = cortex.diagnostic._load_previous().get("state", "?")
console.print(f"[cyan]🩺 Estado inicial:[/] {state}\n")
console.print("[green]👉 Neurotron inicializado com sucesso.[/]\n")
console.print("[green]✔ [Mensagem Simbolica] Boot OK[/]\n")
console.print("[green]✔ Iniciando ciclo cognitivo.[/]\n")
while True:
cortex.observe()
cortex.think()
cortex.act()
cortex.rest()
except KeyboardInterrupt:
console.print("[yellow]⚠ Interrompido pelo utilizador (SIGINT)[/]")
cortex.shutdown(reason="SIGINT")
except SystemExit:
cortex.shutdown(reason="SystemExit")
except Exception as e:
console.print(f"[red]💥 Exceção não tratada:[/] {e}")
cortex.fatal(e)
if __name__ == "__main__":
main()

View File

@ -1,27 +1,77 @@
import subprocess
from neurotron.logbus import logbus
class Motor:
"""
Ator do sistema: executa comandos controlados (whitelist).
Mantém-se minimal até termos política de segurança mais rica.
Subsistema 'ator' do Neurotron.
Executa comandos controlados (whitelist rígida),
sem side-effects e com logging centralizado.
FUTURO:
- níveis de permissão
- sandboxes
- ações abstratas (ex: mover, sinalizar, emitir evento)
"""
SAFE_CMDS = {
"echo": ["echo"],
"sh": ["/bin/sh"], # shell interativo (init)
"sh": ["/bin/sh"], # shell básico — útil para debug interno
}
def __init__(self):
pass
def run(self, cmd: str, args: list[str] | None = None) -> dict:
prog = self.SAFE_CMDS.get(cmd)
if not prog:
return {"ok": False, "error": f"cmd '{cmd}' não permitido"}
try:
full = prog + (args or [])
res = subprocess.run(full, capture_output=True, text=True)
return {
"ok": res.returncode == 0,
"code": res.returncode,
"stdout": res.stdout,
"stderr": res.stderr,
"""
Executa um comando permitido e retorna:
{
"ok": bool,
"code": int | None,
"stdout": str,
"stderr": str,
}
"""
if cmd not in self.SAFE_CMDS:
logbus.debug(f"[motor.warn] comando bloqueado: '{cmd}'")
return {
"ok": False,
"code": None,
"stdout": "",
"stderr": f"cmd '{cmd}' não permitido",
}
try:
full = self.SAFE_CMDS[cmd] + (args or [])
res = subprocess.run(
full,
capture_output=True,
text=True,
env={}, # ambiente neutro → evita leaks
)
ok = (res.returncode == 0)
if not ok:
logbus.debug(
f"[motor.warn] '{cmd}' retornou código {res.returncode}"
)
return {
"ok": ok,
"code": res.returncode,
"stdout": res.stdout or "",
"stderr": res.stderr or "",
}
except Exception as e:
return {"ok": False, "error": str(e)}
logbus.debug(f"[motor.error] exceção ao executar '{cmd}': {e}")
return {
"ok": False,
"code": None,
"stdout": "",
"stderr": str(e),
}

View File

@ -1,30 +1,58 @@
from typing import Any, Dict
from typing import Any, Dict, Optional
from neurotron.logbus import logbus
class Neuron:
"""
Classe-base de um neurónio-agente.
Cada neurónio pode observar/agir e trocar mensagens via o bus do Cortex.
Classe-base de um neurónio-agente.
Cada neurónio participa no ciclo cognitivo do Cortex:
observe() think() act()
Pode comunicar via:
- publish(channel, payload)
- consume(channel)
Implementações devem respeitar esta interface sem causar efeitos colaterais
externos (sem prints, sem IO não controlado).
"""
name = "Neuron"
name: str = "Neuron"
def __init__(self, ctx: "Cortex"):
self.ctx = ctx
# ----------------------------------------------------------------------
# Ciclo cognitivo — para override por neurónios concretos
# ----------------------------------------------------------------------
def observe(self) -> None:
"""Ler estado do mundo (sensores, /proc, eventos)."""
"""Ler estado do mundo (sensores, perceções, eventos internos)."""
return
def think(self) -> None:
"""Processar/planejar usando o estado atual."""
"""Processar informação, planear, atualizar estado interno."""
return
def act(self) -> None:
"""Executar uma ação (opcional)."""
"""Executar uma ação, enviar comandos, publicar eventos."""
return
# Utilitários
def publish(self, channel: str, payload: Dict[str, Any]) -> None:
self.ctx.bus_publish(channel, payload)
# ----------------------------------------------------------------------
# Comunicação inter-neurónios via Cortex Bus
# ----------------------------------------------------------------------
def consume(self, channel: str) -> Dict[str, Any] | None:
return self.ctx.bus_consume(channel)
def publish(self, channel: str, payload: Dict[str, Any]) -> None:
"""Publica mensagem num canal do bus cognitivo."""
try:
self.ctx.bus_publish(channel, payload)
except Exception as e:
logbus.debug(f"[warn] {self.name}.publish falhou: {e}")
def consume(self, channel: str) -> Optional[Dict[str, Any]]:
"""Lê (e remove) última mensagem disponível num canal."""
try:
return self.ctx.bus_consume(channel)
except Exception as e:
logbus.debug(f"[warn] {self.name}.consume falhou: {e}")
return None

View File

@ -1,42 +1,37 @@
"""
🧠 neurotron_config.py
NFDOS Núcleo de parâmetros vitais do Neurotron
------------------------------------------------
Nova versão para o layout:
.../neurotron/
src/
data/
NFDOS Núcleo de parâmetros vitais estáticos do Neurotron
Este módulo define apenas:
- parâmetros cognitivos estáticos
- thresholds
- limites
- caminhos locais relativos ao package (ex: DATA_DIR interno)
Ele NÃO define:
- diretórios de runtime (/var/neurotron, /tmp)
- diretórios de logs dinâmicos
- configurações voláteis
Essas pertencem ao runtime e são avaliadas no __main__.
"""
from pathlib import Path
# ======================================
# 🌐 Diretórios e Caminhos
# ======================================
# ============================================================================
# 📁 Diretórios internos do package (não confundir com runtime!)
# ============================================================================
# Diretório deste ficheiro → .../neurotron/src/neurotron_config.py
THIS_FILE = Path(__file__).resolve()
SRC_DIR = THIS_FILE.parent # .../neurotron/src
BASE_DIR = SRC_DIR.parent # .../neurotron/
SRC_DIR = THIS_FILE.parent # .../neurotron/src/neurotron
BASE_DIR = SRC_DIR.parent # .../neurotron/src
# Onde vivem as configs/logs da “instalação”
# Diretórios que fazem parte da instalação do package
DATA_DIR = BASE_DIR / "data"
CONFIG_DIR = DATA_DIR / "configs"
LOG_DIR = DATA_DIR / "logs"
PACKAGE_LOG_DIR = DATA_DIR / "logs" # logs do próprio package (não runtime!)
# Modo persistente do NFDOS (quando /var/neurotron está montado)
RUNTIME_DIR = Path("/var/run/neurotron")
MOUNT_POINT = "/var/neurotron"
# Candidatos para disco persistente do hipocampo
DISK_CANDIDATES = [
"/dev/vda", "/dev/vdb",
"/dev/sda", "/dev/hda"
]
# ======================================
# ⚙️ Parâmetros Cognitivos Principais
# ======================================
# ============================================================================
# ⚙️ Parâmetros Cognitivos
# ============================================================================
NEUROTRON_TICK = 1.0
NEUROTRON_VERBOSITY = 1
@ -59,9 +54,9 @@ NEUROTRON_TICK_STEP = 0.25
NEUROTRON_SEED = 42
NEUROTRON_MEMORY_SIZE = 256 # KB
# ======================================
# 🧩 Parâmetros de Subsistemas
# ======================================
# ============================================================================
# 🔌 Subsistemas
# ============================================================================
CORTEX_MAX_THREADS = 1
CORTEX_LOOP_DELAY = 0.1
@ -76,14 +71,13 @@ PERCEPTION_CPU_SOURCE = "/proc/stat"
PERCEPTION_MEM_SOURCE = "/proc/meminfo"
PERCEPTION_UPDATE_INTERVAL = 2.0
# ======================================
# 🧠 Parâmetros futuros
# ======================================
# ============================================================================
# 📊 Telemetria e diagnóstico
# ============================================================================
NEUROTRON_EXPANSION_MODE = "none"
NEUROTRON_DATASET_PATH = DATA_DIR
NEUROTRON_HISTORY_KEEP = 8
NEUROTRON_DIAG_SCHEMA = "v4"
HEARTBEAT_ENABLED = True
@ -97,22 +91,3 @@ NEUROTRON_THRESHOLDS = {
TELEMETRY_MAXLEN = 64
TELEMETRY_FLUSH_EVERY_TICKS = 5
# ======================================
# 🧭 Utilitário
# ======================================
def show_config():
"""Mostra a configuração atual do Neurotron (apenas NEUROTRON_*)"""
import json
cfg = {
k: v
for k, v in globals().items()
if k.startswith("NEUROTRON_")
}
print(json.dumps(cfg, indent=2, default=str))
if __name__ == "__main__":
show_config()

View File

@ -1,52 +1,75 @@
import os
from time import sleep
from neurotron.logbus import logbus
class Perception:
"""
Sensores internos via /proc:
- CPU % calculado por delta de /proc/stat
- CPU % por delta de /proc/stat
- Memória % via /proc/meminfo
- Carga média via /proc/loadavg
Sem dependências externas (psutil).
- Carga média /proc/loadavg ou os.getloadavg()
Totalmente silencioso (sem prints).
"""
# ----------------------------------------------------------------------
# CPU
# ----------------------------------------------------------------------
def _read_proc_stat(self):
"""Lê a linha 'cpu ' de /proc/stat. Retorna dict ou None."""
try:
with open("/proc/stat", "r") as f:
line = f.readline()
if not line.startswith("cpu "):
return None
parts = line.strip().split()[1:]
vals = list(map(int, parts[:10])) # user nice system idle iowait irq softirq steal guest guest_nice
vals = list(map(int, parts[:10]))
return {
"user": vals[0], "nice": vals[1], "system": vals[2], "idle": vals[3],
"iowait": vals[4], "irq": vals[5], "softirq": vals[6], "steal": vals[7],
"guest": vals[8], "guest_nice": vals[9],
}
except Exception:
return None
def _cpu_percent(self, interval=0.05):
"""Computa CPU% entre duas leituras."""
a = self._read_proc_stat()
if not a:
return "?"
sleep(interval) # micro-janelinha
sleep(interval)
b = self._read_proc_stat()
if not b:
return "?"
# Totais
idle_a = a["idle"] + a["iowait"]
idle_b = b["idle"] + b["iowait"]
non_a = sum(a.values()) - idle_a
non_b = sum(b.values()) - idle_b
total_a = idle_a + non_a
total_b = idle_b + non_b
totald = total_b - total_a
idled = idle_b - idle_a
if totald <= 0:
return "?"
usage = (totald - idled) * 100.0 / totald
return round(usage, 1)
# ----------------------------------------------------------------------
# Memória
# ----------------------------------------------------------------------
def _mem_percent(self):
try:
info = {}
@ -54,36 +77,73 @@ class Perception:
for line in f:
k, v = line.split(":", 1)
info[k.strip()] = v.strip()
def kB(key):
if key not in info: return None
return float(info[key].split()[0]) # kB
return float(info[key].split()[0]) if key in info else None
mem_total = kB("MemTotal")
mem_avail = kB("MemAvailable")
if not mem_total or mem_avail is None:
if mem_total is None or mem_avail is None:
return "?"
used = mem_total - mem_avail
return round(used * 100.0 / mem_total, 1)
except Exception:
return "?"
# ----------------------------------------------------------------------
# Loadavg
# ----------------------------------------------------------------------
def _loadavg(self):
try:
if hasattr(os, "getloadavg"):
l1, l5, l15 = os.getloadavg()
return [round(l1, 2), round(l5, 2), round(l15, 2)]
with open("/proc/loadavg", "r") as f:
parts = f.read().strip().split()
l1, l5, l15 = map(float, parts[:3])
return [round(l1, 2), round(l5, 2), round(l15, 2)]
except Exception:
return ["?", "?", "?"]
def snapshot(self) -> dict:
return {
"env_user": os.environ.get("USER") or "root",
"env_term": os.environ.get("TERM") or "unknown",
"cpu_percent": self._cpu_percent(),
"mem_percent": self._mem_percent(),
"loadavg": self._loadavg(),
}
# ----------------------------------------------------------------------
# Snapshot principal
# ----------------------------------------------------------------------
def snapshot(self) -> dict:
"""
Retorna snapshot interno:
{
"env_user": "...",
"env_term": "...",
"cpu_percent": n | "?",
"mem_percent": n | "?",
"loadavg": [l1, l5, l15]
}
Sempre seguro. Nunca lança exceção.
"""
try:
return {
"env_user": os.environ.get("USER") or "root",
"env_term": os.environ.get("TERM") or "unknown",
"cpu_percent": self._cpu_percent(),
"mem_percent": self._mem_percent(),
"loadavg": self._loadavg(),
}
except Exception as e:
# fallback extremo — nunca quebrar o Neurotron
logbus.debug(f"[warn] Perception.snapshot falhou: {e}")
return {
"env_user": "unknown",
"env_term": "unknown",
"cpu_percent": "?",
"mem_percent": "?",
"loadavg": ["?", "?", "?"],
}

View File

@ -1,95 +0,0 @@
#!/usr/bin/env python3
"""
📊 Painel de Telemetria do Neurotron V0.1
o ficheiro telemetry.json e mostra um mini-ECG digital:
Execução:
python3 /opt/kernel/neurotron/neurotron_core/telemetry_tail.py
"""
import json
import time
import os
from pathlib import Path
from statistics import mean
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
DATASET = "/opt/kernel/neurotron/data/telemetry.json"
BAR_CHARS = "▁▂▃▄▅▆▇█"
SAMPLES = 24 # quantas amostras recentes mostrar
REFRESH = 2.0 # segundos entre atualizações
def mini_graph(values, width=24):
"""Desenha barras simples tipo sparkline"""
if not values:
return "·" * width
vals = [v for v in values if isinstance(v, (int, float))]
if not vals:
return "·" * width
lo, hi = min(vals), max(vals)
span = (hi - lo) or 1.0
bars = []
for v in vals[-width:]:
if not isinstance(v, (int, float)):
bars.append("·")
continue
i = int(round((v - lo) / span * (len(BAR_CHARS) - 1)))
bars.append(BAR_CHARS[i])
return "".join(bars)
def read_telemetry(path: str):
try:
data = json.loads(Path(path).read_text() or "[]")
return data[-SAMPLES:]
except Exception:
return []
def render_panel(console, data):
if not data:
console.print("[yellow]Nenhum dado de telemetria disponível.[/yellow]")
return
cpu = [d.get("cpu") for d in data if isinstance(d.get("cpu"), (int, float))]
mem = [d.get("mem") for d in data if isinstance(d.get("mem"), (int, float))]
load = [d.get("load")[0] for d in data if isinstance(d.get("load"), (list, tuple)) and isinstance(d.get("load")[0], (int, float))]
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Sinal Vital", justify="left")
table.add_column("Tendência", justify="left")
table.add_column("Média", justify="right")
table.add_row("CPU (%)", mini_graph(cpu), f"{mean(cpu):.1f}%" if cpu else "?")
table.add_row("Memória (%)", mini_graph(mem), f"{mean(mem):.1f}%" if mem else "?")
table.add_row("Carga (1min)", mini_graph(load), f"{mean(load):.2f}" if load else "?")
panel = Panel(table, title="🩺 TELEMETRIA RECENTE", border_style="green")
console.clear()
console.print(panel)
def main():
console = Console()
console.print("[bold cyan]Neurotron Telemetry Tail — Iniciar Monitorização[/bold cyan]\n")
while True:
if not Path(DATASET).exists():
console.print(f"[yellow]A aguardar dados em {DATASET}...[/yellow]")
time.sleep(REFRESH)
continue
data = read_telemetry(DATASET)
if not data:
console.print("[yellow]Nenhum dado de telemetria disponível.[/yellow]")
else:
render_panel(console, data)
time.sleep(REFRESH)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,15 @@
# neurotron/vitalsigns_agent.py
from neurotron.neuron import Neuron
class VitalSigns(Neuron):
"""
Observa sinais vitais (CPU, MEM, LOAD)
e publica em "vitals".
"""
name = "VitalSigns"
def observe(self):
snap = self.ctx.perception.snapshot()
self.publish("vitals", snap)
self.ctx.memory.remember("observe.vitals", snap)

View File

@ -6,9 +6,7 @@ SRC="$NEUROTRON_HOME/src"
export PYTHONHOME="/usr"
export PYTHONPATH="$SRC:/usr/lib/python3.13:/usr/lib/python3.13/site-packages"
# Inicializar hipocampo físico como módulo do package
"$PYTHON" -m neurotron.disk_init
export PATH="/usr/bin:/bin:/sbin:/usr/sbin:$NEUROTRON_HOME/bin:$PATH"
# Arrancar o cérebro principal como módulo do package
exec "$PYTHON" -m neurotron "$@"

Binary file not shown.