first commit

This commit is contained in:
neoricalex 2025-11-15 04:20:00 +01:00
commit 33d6455d6b
16 changed files with 1665 additions and 0 deletions

17
Makefile.am Normal file
View File

@ -0,0 +1,17 @@
SUBDIRS = src
# Diretórios de destino
bindir = $(prefix)/bin
neurotrondir = $(prefix)/lib/neurotron
datadir = $(prefix)/share/neurotron
# Script instalável
bin_SCRIPTS = neurotron
EXTRA_DIST = neurotron.in
neurotron: neurotron.in
sed \
-e 's,[@]PYTHON[@],$(PYTHON),g' \
-e 's,[@]NEUROTRON_DIR[@],$(neurotrondir),g' \
< $(srcdir)/neurotron.in > neurotron
chmod +x neurotron

0
README.md Normal file
View File

11
configure.ac Normal file
View File

@ -0,0 +1,11 @@
AC_INIT([Neurotron], [1.0], [https://gitea.neoricalex.com/neo/neurotron])
AM_INIT_AUTOMAKE([foreign])
AM_PATH_PYTHON([3.0])
# Caminhos
AC_SUBST([NEUROTRON_DIR], [$PWD])
AC_SUBST([NEUROTRON_LIB], [${prefix}/lib/neurotron])
AC_SUBST([NEUROTRON_BIN], [${prefix}/bin])
AC_CONFIG_FILES([Makefile src/Makefile])
AC_OUTPUT

7
neurotron.in Normal file
View File

@ -0,0 +1,7 @@
#!/bin/sh
# Neurotron launcher - autogerado pelo autotools
NEUROTRON_HOME="@NEUROTRON_DIR@"
PYTHON="@PYTHON@"
exec "$PYTHON" "$NEUROTRON_HOME/src/__main__.py" "$@"

0
src/__init__.py Normal file
View File

449
src/__main__.py Normal file
View File

@ -0,0 +1,449 @@
#!/usr/bin/env python3
"""
Neurotron Dashboard em modo texto (estilo BIOS)
Ponto de entrada quando executado como:
python3 -m neurotron
ou via wrapper /usr/bin/neurotron gerado pelo autotools.
Layout:
+------------------------------------------------------------------------------+
| [HEAD] CPU / MEM / LOAD / UPTIME / MODO / VERSÃO / DIAG |
+------------------------------------------------------------------------------+
| [LOG] Últimos eventos do ciclo cognitivo (observe/think/act/rest, etc.) |
| ... |
+------------------------------------------------------------------------------+
| [FOOT] Futuro: input do utilizador (placeholder) | teclas: q = sair |
+------------------------------------------------------------------------------+
"""
import curses
import threading
import time
import os
import sys
from pathlib import Path
from queue import Queue, Empty
from datetime import datetime
# Imports internos do Neurotron (já reorganizados em package)
from .neurotron_config import (
NEUROTRON_TICK,
NEUROTRON_MODE,
NEUROTRON_HOMEOSTASIS,
NEUROTRON_THRESHOLDS,
)
from .cortex import Cortex
from .autodiagnostic import AutoDiagnostic # se for usado dentro do Cortex, ok mesmo assim
# =======================================================================================
# Utilitários de ambiente
# =======================================================================================
def detect_persistent_mode():
"""
Verifica se o hipocampo físico está montado em /var/neurotron.
Define NEUROTRON_MODE, NEUROTRON_RUNTIME e NEUROTRON_LOG via os.environ.
"""
mount_point = Path("/var/neurotron")
def _mounted(mp: Path) -> bool:
try:
if mp.exists() and os.path.ismount(mp):
return True
with open("/proc/mounts") as f:
for line in f:
if f" {mp} " in line:
return True
except Exception:
return False
return False
if _mounted(mount_point):
os.environ["NEUROTRON_MODE"] = "persistent"
os.environ["NEUROTRON_RUNTIME"] = "/var/neurotron/data"
os.environ["NEUROTRON_LOG"] = "/var/neurotron/logs"
else:
os.environ["NEUROTRON_MODE"] = "volatile"
os.environ["NEUROTRON_RUNTIME"] = "/tmp/neurotron_data"
os.environ["NEUROTRON_LOG"] = "/tmp/neurotron_logs"
runtime_dir = Path(os.environ["NEUROTRON_RUNTIME"])
log_dir = Path(os.environ["NEUROTRON_LOG"])
runtime_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True)
return runtime_dir, log_dir
def read_system_metrics():
"""
CPU, memória e loadavg a partir de /proc.
Retorna dicionário:
{
"cpu": float,
"mem": float,
"load1": float,
"load5": float,
"load15": float,
}
Em caso de falha, devolve valores -1.
"""
cpu = -1.0
mem = -1.0
load1 = load5 = load15 = -1.0
# CPU (uso aproximado entre duas leituras de /proc/stat)
try:
with open("/proc/stat") as f:
line = f.readline()
parts = line.strip().split()
if parts[0] == "cpu" and len(parts) >= 5:
user, nice, system_, idle = map(int, parts[1:5])
total1 = user + nice + system_ + idle
idle1 = idle
time.sleep(0.05) # pequena janela
with open("/proc/stat") as f:
line = f.readline()
parts = line.strip().split()
user2, nice2, system2, idle2 = map(int, parts[1:5])
total2 = user2 + nice2 + system2 + idle2
idle2 = idle2
total_delta = total2 - total1
idle_delta = idle2 - idle1
if total_delta > 0:
cpu = 100.0 * (1.0 - (idle_delta / total_delta))
except Exception:
pass
# Memória
try:
meminfo = {}
with open("/proc/meminfo") as f:
for line in f:
k, v = line.split(":", 1)
meminfo[k.strip()] = v.strip()
total_kb = float(meminfo.get("MemTotal", "0 kB").split()[0])
free_kb = float(meminfo.get("MemAvailable", "0 kB").split()[0])
if total_kb > 0:
used_kb = total_kb - free_kb
mem = 100.0 * (used_kb / total_kb)
except Exception:
pass
# Loadavg
try:
with open("/proc/loadavg") as f:
l1, l5, l15, *_ = f.read().split()
load1 = float(l1)
load5 = float(l5)
load15 = float(l15)
except Exception:
pass
return {
"cpu": cpu,
"mem": mem,
"load1": load1,
"load5": load5,
"load15": load15,
}
# =======================================================================================
# Dashboard em curses
# =======================================================================================
class NeurotronDashboard:
"""
UI fixa em curses:
- header: métricas de sistema + estado do Neurotron
- middle: log rolling
- footer: placeholder + ajuda/teclas
"""
def __init__(self, stdscr, log_queue: Queue, cortex: Cortex, start_time: float):
self.stdscr = stdscr
self.log_queue = log_queue
self.cortex = cortex
self.start_time = start_time
self.log_lines = [] # mantém histórico para a janela central
self.max_log_lines = 1000
self.stop_event = threading.Event()
self.last_diag_state = "?"
self.last_diag_delta = "?"
# tenta obter diagnóstico inicial, se existir
try:
diag = self.cortex.diagnostic._load_previous()
self.last_diag_state = diag.get("state", "?")
self.last_diag_delta = diag.get("delta", "?")
except Exception:
pass
# ------------------------------------------------------------------
# Helpers de desenho
# ------------------------------------------------------------------
def _draw_header(self, height, width):
"""Desenha a barra superior com CPU/MEM/LOAD/UPTIME/MODO/DIAG."""
metrics = read_system_metrics()
now = time.time()
uptime_sec = int(now - self.start_time)
hours = uptime_sec // 3600
mins = (uptime_sec % 3600) // 60
secs = uptime_sec % 60
mode = os.environ.get("NEUROTRON_MODE", NEUROTRON_MODE)
version = getattr(self.cortex, "version", "0.1")
cpu = metrics["cpu"]
mem = metrics["mem"]
load1 = metrics["load1"]
# Avalia homeostase
warn_cpu = NEUROTRON_THRESHOLDS.get("cpu_high", 85.0)
warn_mem = NEUROTRON_THRESHOLDS.get("mem_high", 90.0)
warn_load = NEUROTRON_THRESHOLDS.get("load1_high", 2.0)
status_parts = []
if cpu >= 0:
status_parts.append(f"CPU: {cpu:5.1f}%")
else:
status_parts.append("CPU: N/A ")
if mem >= 0:
status_parts.append(f"MEM: {mem:5.1f}%")
else:
status_parts.append("MEM: N/A ")
if load1 >= 0:
status_parts.append(f"LOAD1: {load1:4.2f}")
else:
status_parts.append("LOAD1: N/A ")
status_parts.append(f"UP: {hours:02d}:{mins:02d}:{secs:02d}")
status_parts.append(f"MODO: {mode.upper()}")
status_parts.append(f"VER: {version}")
status_parts.append(f"DIAG: {self.last_diag_state}/{self.last_diag_delta}")
line = " ".join(status_parts)
# barra horizontal
self.stdscr.attron(curses.A_REVERSE)
self.stdscr.addnstr(0, 0, line.ljust(width), width)
self.stdscr.attroff(curses.A_REVERSE)
# segunda linha: homeostase textual
homeo_msg = "HOMEOSTASE: OK"
if (cpu >= warn_cpu and cpu >= 0) or (mem >= warn_mem and mem >= 0) or (
load1 >= warn_load and load1 >= 0
):
homeo_msg = "HOMEOSTASE: STRESS"
self.stdscr.addnstr(1, 0, homeo_msg.ljust(width), width)
def _drain_log_queue(self):
"""Move mensagens da queue para o buffer de linhas."""
try:
while True:
msg = self.log_queue.get_nowait()
timestamp = datetime.now().strftime("%H:%M:%S")
self.log_lines.append(f"[{timestamp}] {msg}")
if len(self.log_lines) > self.max_log_lines:
self.log_lines = self.log_lines[-self.max_log_lines:]
except Empty:
pass
def _draw_log_window(self, header_height, footer_height, width, height):
"""
Área central de logs: rolagem automática, sempre mostrando as últimas N linhas
que cabem na janela.
"""
top = header_height
bottom = height - footer_height
rows = max(0, bottom - top)
self._drain_log_queue()
# seleciona as últimas 'rows' linhas
visible = self.log_lines[-rows:] if rows > 0 else []
for i in range(rows):
y = top + i
if i < len(visible):
line = visible[i]
self.stdscr.addnstr(y, 0, line.ljust(width), width)
else:
self.stdscr.addnstr(y, 0, " ".ljust(width), width)
def _draw_footer(self, width, height):
"""
Rodapé com placeholder para input futuro e legenda de teclas.
"""
footer_text = "[ Futuro: comandos do utilizador aparecerão aqui ]"
keys_text = "[q] sair | dashboard Neurotron"
y_footer = height - 2
self.stdscr.addnstr(y_footer, 0, footer_text.ljust(width), width)
self.stdscr.addnstr(y_footer + 1, 0, keys_text.ljust(width), width)
# ------------------------------------------------------------------
# Loop principal da UI
# ------------------------------------------------------------------
def run(self):
"""
Loop principal do curses.
Actualiza o ecrã, teclas, e encerra quando stop_event é setado ou 'q' é pressionado.
"""
curses.curs_set(0)
self.stdscr.nodelay(True)
self.stdscr.keypad(True)
while not self.stop_event.is_set():
height, width = self.stdscr.getmaxyx()
self.stdscr.erase()
header_height = 2
footer_height = 2
self._draw_header(height, width)
self._draw_log_window(header_height, footer_height, width, height)
self._draw_footer(width, height)
self.stdscr.refresh()
try:
ch = self.stdscr.getch()
if ch in (ord("q"), ord("Q")):
self.stop_event.set()
break
except Exception:
pass
time.sleep(0.1) # ~10 FPS
# tentativa graciosa de shutdown do Cortex
try:
self.log_queue.put("Encerrando Neurotron (dashboard pediu saída)…")
self.cortex.shutdown(reason="Dashboard exit")
except Exception:
pass
# =======================================================================================
# Ciclo Cognitivo em thread separada
# =======================================================================================
def cognitive_loop(cortex: Cortex, ui: NeurotronDashboard):
"""
Loop cognitivo clássico (observe think act rest),
a correr numa thread separada, reportando eventos para o dashboard via log_queue.
"""
log = ui.log_queue.put
try:
log("Neurotron: boot()…")
cortex.boot()
try:
state = cortex.diagnostic._load_previous().get("state", "?")
log(f"Diagnóstico inicial: estado='{state}'")
ui.last_diag_state = state
except Exception:
log("Diagnóstico inicial: indisponível")
log("Ciclo cognitivo iniciado (observe → think → act → rest)…")
while not ui.stop_event.is_set():
try:
log("cortex.observe()")
cortex.observe()
log("cortex.think()")
cortex.think()
log("cortex.act()")
cortex.act()
log("cortex.rest()")
cortex.rest()
except KeyboardInterrupt:
log("Interrompido pelo utilizador (SIGINT)")
cortex.shutdown(reason="SIGINT")
break
except SystemExit:
log("SystemExit recebido, encerrando…")
cortex.shutdown(reason="SystemExit")
break
except Exception as e:
log(f"💥 Exceção não tratada no loop cognitivo: {e}")
try:
cortex.fatal(e)
finally:
break
finally:
ui.stop_event.set()
log("Loop cognitivo terminado.")
# =======================================================================================
# Entry point
# =======================================================================================
def _main_curses(stdscr):
# 1) Detecta modo e diretórios
runtime_dir, log_dir = detect_persistent_mode()
# 2) Inicializa Cortex com os mesmos parâmetros do main_waiting
cortex = Cortex(
runtime_dir=runtime_dir,
log_dir=log_dir,
tick_seconds=NEUROTRON_TICK,
)
# 3) Queue de logs e dashboard
log_queue: Queue = Queue()
start_time = time.time()
ui = NeurotronDashboard(stdscr, log_queue, cortex, start_time)
# 4) Thread do ciclo cognitivo
worker = threading.Thread(
target=cognitive_loop,
args=(cortex, ui),
daemon=True,
)
worker.start()
# 5) Loop da UI (bloqueia até terminar)
ui.run()
# 6) Aguarda thread terminar
worker.join(timeout=2.0)
def main():
"""
Ponto de entrada Python. Usado tanto por:
python3 -m neurotron
como pelo wrapper /usr/bin/neurotron, se este fizer:
from neurotron import main
main()
"""
curses.wrapper(_main_curses)
if __name__ == "__main__":
main()

185
src/autodiagnostic.py Normal file
View File

@ -0,0 +1,185 @@
from __future__ import annotations
import json, os
from datetime import datetime, timezone
from rich.console import Console
from rich.table import Table
from pathlib import Path
from .neurotron_config import (
NEUROTRON_DATASET_PATH, NEUROTRON_HISTORY_KEEP, NEUROTRON_DIAG_SCHEMA,
HOMEOSTASIS_CPU_WARN, HOMEOSTASIS_CPU_ALERT,
HOMEOSTASIS_MEM_WARN, HOMEOSTASIS_MEM_ALERT,
HOMEOSTASIS_LOAD_WARN, HOMEOSTASIS_LOAD_ALERT,
)
from .perception import Perception
console = Console()
def _now_iso():
return datetime.now(timezone.utc).isoformat()
class AutoDiagnostic:
def __init__(self, runtime_dir: str, log_dir: str):
self.runtime_dir = runtime_dir
self.log_dir = log_dir
self.data_dir = Path(NEUROTRON_DATASET_PATH)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.last_file = self.data_dir / "last_diagnostic.json"
self.perception = Perception()
self.current = None
self.previous = None
def _load_previous(self):
if not self.last_file.exists():
return None
try:
with open(self.last_file, "r") as f:
return json.load(f)
except Exception:
return None
def _save_current(self, payload: dict):
history = []
if self.last_file.exists():
try:
with open(self.last_file, "r") as f:
prev = json.load(f)
history = prev.get("history", [])
history.append({
"timestamp": prev.get("timestamp"),
"cpu_percent": prev.get("cpu_percent"),
"mem_percent": prev.get("mem_percent"),
"loadavg": prev.get("loadavg"),
"state": prev.get("state", "UNKNOWN"),
})
history = history[-NEUROTRON_HISTORY_KEEP:]
except Exception:
history = []
payload["history"] = history
with open(self.last_file, "w") as f:
json.dump(payload, f, indent=2)
def _classify_state(self, cpu, mem, l1):
# valores podem ser "?"
try:
cpu = float(cpu)
mem = float(mem)
l1 = float(l1)
except Exception:
return "UNKNOWN"
# ALERT/CRITICAL
if cpu >= HOMEOSTASIS_CPU_ALERT or mem >= HOMEOSTASIS_MEM_ALERT or l1 >= HOMEOSTASIS_LOAD_ALERT:
return "CRITICAL"
if cpu >= HOMEOSTASIS_CPU_WARN or mem >= HOMEOSTASIS_MEM_WARN or l1 >= HOMEOSTASIS_LOAD_WARN:
return "ALERT"
# OKs
return "STABLE"
def _delta(self, a, b):
try:
if isinstance(a, list) and isinstance(b, list) and len(a) == len(b):
return [round(float(x) - float(y), 2) for x, y in zip(a, b)]
return round(float(a) - float(b), 2)
except Exception:
return "?"
def _render_mini_trend(self, values, width=24, charset="▁▂▃▄▅▆▇█"):
if not values:
return ""
lo = min(values); hi = max(values)
if not isinstance(lo, (int, float)) or not isinstance(hi, (int, float)):
return ""
span = (hi - lo) or 1.0
levels = len(charset) - 1
bars = []
for v in values[-width:]:
if not isinstance(v, (int, float)):
bars.append("·")
continue
i = int(round((v - lo) / span * levels))
bars.append(charset[i])
return "".join(bars)
def run_exam(self):
console.print("\n[bold]🤖 Iniciando rotina de Auto-Diagnóstico Evolutivo...[/bold]\n")
snap = self.perception.snapshot()
cpu = snap.get("cpu_percent", "?")
mem = snap.get("mem_percent", "?")
load = snap.get("loadavg", ["?", "?", "?"])
prev = self._load_previous()
self.previous = prev
# deltas
cpu_prev = prev.get("cpu_percent") if prev else "?"
mem_prev = prev.get("mem_percent") if prev else "?"
load_prev = prev.get("loadavg") if prev else ["?", "?", "?"]
d_cpu = self._delta(cpu, cpu_prev)
d_mem = self._delta(mem, mem_prev)
d_load = self._delta(load, load_prev)
# estado
l1 = load[0] if isinstance(load, list) and load else "?"
state = self._classify_state(cpu, mem, l1)
# tabela
table = Table(title="🩺 Exame Clínico Evolutivo", show_lines=True)
table.add_column("Sinal Vital")
table.add_column("Atual", justify="right")
table.add_column("Δ", justify="center")
table.add_column("Anterior", justify="right")
def fmt(v):
if isinstance(v, list):
return str(v)
return str(v)
table.add_row("CPU (%)", fmt(cpu), fmt(d_cpu), fmt(cpu_prev))
table.add_row("Memória (%)", fmt(mem), fmt(d_mem), fmt(mem_prev))
table.add_row("Carga média (1/5/15)", fmt(load), "" if d_load == "?" else fmt(d_load), fmt(load_prev))
console.print(table)
payload = {
"schema": NEUROTRON_DIAG_SCHEMA,
"timestamp": _now_iso(),
"cpu_percent": cpu,
"mem_percent": mem,
"loadavg": load,
"state": state,
"env": {
"user": snap.get("env_user"),
"term": snap.get("env_term"),
},
}
self._save_current(payload)
console.print(f"[green]✔ Histórico evolutivo atualizado em:[/green] \n{self.last_file}")
# Atualiza telemetria contínua
try:
telemetry_file = Path(NEUROTRON_DATASET_PATH) / "telemetry.json"
telemetry_file.parent.mkdir(parents=True, exist_ok=True)
telemetry = []
if telemetry_file.exists():
telemetry = json.loads(telemetry_file.read_text() or "[]")
telemetry.append({
"timestamp": payload["timestamp"],
"cpu": payload.get("cpu_percent"),
"mem": payload.get("mem_percent"),
"load": payload.get("loadavg"),
"state": payload.get("state"),
})
telemetry = telemetry[-128:] # manter últimas 128 amostras
telemetry_file.write_text(json.dumps(telemetry, indent=2))
except Exception as e:
console.print(f"[yellow]⚠️ Falha ao atualizar telemetria:[/] {e}")
return state, payload

230
src/cortex.py Normal file
View File

@ -0,0 +1,230 @@
import json
import time
from collections import defaultdict, deque
from pathlib import Path
from time import sleep
from rich.console import Console
from neuron import Neuron
from hippocampus import Hippocampus
from perception import Perception
from motor import Motor
from .neurotron_config import (
NEUROTRON_MODE, NEUROTRON_TICK, NEUROTRON_TICK_MIN, NEUROTRON_TICK_MAX, NEUROTRON_TICK_STEP,
NEUROTRON_DIAG_EVERY_TICKS, NEUROTRON_DATASET_PATH,
HEARTBEAT_ENABLED, HEARTBEAT_STYLE, NEUROTRON_THRESHOLDS,
TELEMETRY_MAXLEN, TELEMETRY_FLUSH_EVERY_TICKS,
)
from .autodiagnostic import AutoDiagnostic
class VitalSigns(Neuron):
name = "VitalSigns"
def observe(self) -> None:
snap = self.ctx.perception.snapshot()
self.publish("vitals", snap)
self.ctx.memory.remember("observe.vitals", snap)
class EchoAgent(Neuron):
name = "EchoAgent"
def think(self) -> None:
msg = self.consume("vitals")
if msg:
self.publish("actions", {"action": "echo", "text": f"CPU {msg.get('cpu_percent', '?')}%"})
class Cortex:
"""
Orquestrador: liga neurónios, bus de mensagens, memória, IO e ciclo cognitivo.
Agora com Telemetria Contínua (V5): heartbeat, microalertas e flush periódico.
"""
def __init__(self, runtime_dir, log_dir, tick_seconds=NEUROTRON_TICK):
self.runtime_dir = runtime_dir
self.log_dir = log_dir
self.tick = float(tick_seconds)
self.mode = NEUROTRON_MODE
self._tick_count = 0
self.diagnostic = AutoDiagnostic(runtime_dir, log_dir)
self.console = Console()
self.memory = Hippocampus(log_dir=log_dir)
self.perception = Perception()
self.motor = Motor()
# Message bus simples: channels → deque
self.bus = defaultdict(lambda: deque(maxlen=32))
# Telemetria em memória (curto prazo)
self.telemetry = deque(maxlen=TELEMETRY_MAXLEN)
# Regista neurónios (podes adicionar mais à medida)
self.neurons: list[Neuron] = [
VitalSigns(self),
EchoAgent(self),
]
self._booted = False
# Caminho para gravar a telemetria
self.telemetry_path = Path(NEUROTRON_DATASET_PATH) / "telemetry.json"
self.telemetry_path.parent.mkdir(parents=True, exist_ok=True)
# ——— ciclo de vida ———
def boot(self) -> None:
if self._booted:
return
self.console.print("[bold cyan]🧠 Neurotron[/] — boot")
self.memory.remember("boot", {"version": "0.1", "tick": self.tick})
self._booted = True
state, _ = self.diagnostic.run_exam()
self._apply_homeostasis(state)
def _apply_homeostasis(self, state):
if state == "CRITICAL":
self.mode = "diagnostic"
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP)
elif state == "ALERT":
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP / 2)
elif state == "STABLE":
self.tick = max(NEUROTRON_TICK_MIN, self.tick - NEUROTRON_TICK_STEP / 2)
# UNKNOWN → não mexe
def shutdown(self, reason: str = ""):
self.console.print(f"[yellow]shutdown:[/] {reason}")
self.memory.remember("shutdown", {"reason": reason})
def fatal(self, e: Exception):
self.console.print(f"[red]fatal:[/] {e!r}")
self.memory.remember("fatal", {"error": repr(e)})
print(f"fatal: {repr(e)}")
raise
# ——— loop ———
def observe(self) -> None:
for n in self.neurons:
n.observe()
def think(self) -> None:
for n in self.neurons:
n.think()
def act(self) -> None:
# Consumir ações agregadas e executar
action = self.bus_consume("actions")
if action and action.get("action") == "echo":
res = self.motor.run("echo", [action.get("text", "")])
self.memory.remember("act.echo", res)
if res.get("stdout"):
self.console.print(f"[green]{res['stdout'].strip()}[/]")
def rest(self):
# Heartbeat e microalertas antes de dormir
if HEARTBEAT_ENABLED:
self._heartbeat_and_telemetry()
# Pausa regulada
sleep(self.tick)
# Contador e rotinas periódicas
self._tick_count += 1
if self._tick_count % NEUROTRON_DIAG_EVERY_TICKS == 0:
state, _ = self.diagnostic.run_exam()
self._apply_homeostasis(state)
if self._tick_count % TELEMETRY_FLUSH_EVERY_TICKS == 0:
self._flush_telemetry()
# ——— telemetria/alertas ———
def _heartbeat_and_telemetry(self):
snap = self.perception.snapshot()
cpu = snap.get("cpu_percent", "?")
mem = (snap.get("mem") or {}).get("percent", "?")
load = snap.get("loadavg") or []
# Adiciona ao buffer de telemetria
self.telemetry.append({
"ts": time.time(),
"cpu": cpu,
"mem": mem,
"load": load,
"tick": self.tick,
})
# Microalertas com base nos limiares
self._evaluate_microalerts(cpu, mem, load)
# Heartbeat visual
color = self._color_for_levels(cpu, mem, load)
if HEARTBEAT_STYLE == "compact":
self.console.print(f"[bold {color}]💓[/] CPU: {cpu}% | MEM: {mem}% | TICK: {self.tick:.2f}s")
else:
self.console.print(
f"[bold {color}]💓 [Heartbeat][/bold {color}] "
f"CPU: {cpu}% | MEM: {mem}% | LOAD: {load} | TICK: {self.tick:.2f}s | MODE: {self.mode}"
)
def _evaluate_microalerts(self, cpu, mem, load):
alerts = []
# Normaliza
load1 = load[0] if (isinstance(load, (list, tuple)) and load) else None
try:
if isinstance(cpu, (int, float)) and cpu >= NEUROTRON_THRESHOLDS["cpu_high"]:
alerts.append(("cpu", cpu))
if isinstance(mem, (int, float)) and mem >= NEUROTRON_THRESHOLDS["mem_high"]:
alerts.append(("mem", mem))
if isinstance(load1, (int, float)) and load1 >= NEUROTRON_THRESHOLDS["load1_high"]:
alerts.append(("load1", load1))
except KeyError:
pass # thresholds incompletos → sem microalertas
if not alerts:
return
for (metric, value) in alerts:
self.console.print(f"[yellow]⚠️ Microalerta:[/] {metric.upper()} {value} — ajustando homeostase (tick +{NEUROTRON_TICK_STEP:.2f}s)")
# Ajuste simples de segurança
self.tick = min(NEUROTRON_TICK_MAX, self.tick + NEUROTRON_TICK_STEP)
self.memory.remember("microalert", {
"ts": time.time(),
"alerts": alerts,
"new_tick": self.tick,
})
def _color_for_levels(self, cpu, mem, load):
# Heurística simples de cor
try:
load1 = load[0] if (isinstance(load, (list, tuple)) and load) else 0.0
high = (
(isinstance(cpu, (int, float)) and cpu >= NEUROTRON_THRESHOLDS["cpu_high"]) or
(isinstance(mem, (int, float)) and mem >= NEUROTRON_THRESHOLDS["mem_high"]) or
(isinstance(load1, (int, float)) and load1 >= NEUROTRON_THRESHOLDS["load1_high"])
)
if high:
return "yellow"
except Exception:
pass
return "green"
def _flush_telemetry(self):
# Grava o buffer de telemetria em JSON (mantendo histórico curto)
try:
data = list(self.telemetry)
with self.telemetry_path.open("w") as f:
json.dump(data, f)
self.memory.remember("telemetry.flush", {"count": len(data), "path": str(self.telemetry_path)})
except Exception as e:
self.console.print(f"[red]✖ Falha ao gravar telemetria:[/] {e!r}")
self.memory.remember("telemetry.error", {"error": repr(e)})
# ——— bus ———
def bus_publish(self, channel: str, payload: dict) -> None:
self.bus[channel].append(payload)
def bus_consume(self, channel: str) -> dict | None:
q = self.bus[channel]
return q.popleft() if q else None

259
src/disk_init.py Normal file
View File

@ -0,0 +1,259 @@
#!/usr/bin/env python3
"""
💾 Módulo de Inicialização de Disco Neurotron V0.1 (atualizado)
Detecta, avalia, prepara e monta o disco persistente do NFDOS.
- Não formata discos que contenham um filesystem conhecido, a menos que forçado.
- Forçar formatação:
* EXPORT: export NFDOS_FORCE_FORMAT=1 (no ambiente do initramfs, se aplicável)
* Kernel cmdline: adicionar `nfdos_force_format=1` ao -append do QEMU
"""
import os
import subprocess
from pathlib import Path
from rich.console import Console
if __name__ == "__main__" and __package__ is None:
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[1]))
__package__ = "neurotron_core"
from .neurotron_config import (
MOUNT_POINT, DISK_CANDIDATES
)
console = Console()
def run(cmd: list[str]) -> bool:
"""Executa comando silenciosamente (retorna True se OK)."""
try:
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def detect_disk() -> str | None:
"""Procura por um dispositivo de disco válido (por ordem em DISK_CANDIDATES)."""
for dev in DISK_CANDIDATES:
p = Path(dev)
if p.exists():
console.print(f"[cyan]🔍 Detetado disco:[/] {dev}")
return dev
console.print("[yellow]⚠️ Nenhum disco detectado.[/yellow]")
return None
def blkid_check(device: str) -> str | None:
"""Tenta obter tipo com blkid (se disponível)."""
try:
out = subprocess.run(["blkid", device], stdout=subprocess.PIPE, text=True, check=False)
return out.stdout.strip() if out.stdout else None
except FileNotFoundError:
return None
def read_sig(device: str, size: int = 2048) -> bytes | None:
"""Lê os primeiros `size` bytes do device (se possível)."""
try:
with open(device, "rb") as f:
return f.read(size)
except Exception:
return None
def detect_fs_by_magic(device: str) -> str | None:
"""
Detecta assinaturas simples:
- ext4 superblock magic (0xEF53) @ offset 1024 + 56 = 1080
- NTFS -> 'NTFS ' @ offset 3
- FAT32 -> 'FAT32' nos offsets típicos do boot sector
- MBR partition table signature 0x55AA @ offset 510-511
Retorna string com o sistema ou None.
"""
buf = read_sig(device, size=4096)
if not buf:
return None
# MBR signature
if len(buf) >= 512 and buf[510:512] == b'\x55\xAA':
# detecta tabela de partições existente (MBR)
return "mbr-partition-table"
# ext magic at 1024+56 = 1080
if len(buf) >= 1082 and buf[1080:1082] == b'\x53\xEF':
return "ext (superblock)"
# NTFS signature at offset 3 (ASCII "NTFS ")
if len(buf) >= 11 and buf[3:11] == b'NTFS ':
return "ntfs"
# FAT32 signature at offset 82 or boot sector strings containing FAT
if b"FAT32" in buf or b"FAT16" in buf or b"FAT12" in buf:
return "fat"
return None
def parse_cmdline_flag() -> bool:
"""Lê /proc/cmdline para a flag nfdos_force_format=1"""
try:
with open("/proc/cmdline", "r") as f:
cmd = f.read()
return "nfdos_force_format=1" in cmd.split()
except Exception:
return False
def which(prog: str) -> str | None:
for p in os.environ.get("PATH", "/sbin:/bin:/usr/sbin:/usr/bin").split(":"):
cand = Path(p) / prog
if cand.exists() and os.access(cand, os.X_OK):
return str(cand)
return None
def format_ext4(device: str, label: str = "NFDOS_DATA") -> bool:
"""Formata o dispositivo com ext4, recolhendo logs de erro detalhados (BusyBox-safe)."""
mke2fs = which("mke2fs")
mkfs_ext4 = which("mkfs.ext4")
mkfs = which("mkfs")
candidates = []
if mkfs_ext4:
candidates.append(([mkfs_ext4, "-F", "-L", label, device], "mkfs.ext4"))
if mke2fs:
# o BusyBox mke2fs não aceita '-t', por isso ajustaremos dentro do loop
candidates.append(([mke2fs, "-F", "-t", "ext4", "-L", label, device], "mke2fs"))
if mkfs:
candidates.append(([mkfs, "-t", "ext4", "-F", "-L", label, device], "mkfs"))
if not candidates:
console.print("[red]❌ Nenhum utilitário mkfs disponível no initramfs![/red]")
return False
for cmd, name in candidates:
console.print(f"[yellow]⚙️ Formatando {device} com {name}...[/yellow]")
# 👉 se for o BusyBox mke2fs, removemos o argumento -t
if name == "mke2fs":
cmd = [c for c in cmd if c != "-t" and c != "ext4"]
try:
result = subprocess.run(
cmd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
if result.stdout:
console.print(result.stdout.strip())
console.print(f"[green]✔ Formatação concluída com {name}.[/green]")
return True
except subprocess.CalledProcessError as e:
console.print(f"[red]❌ {name} falhou (código {e.returncode}).[/red]")
if e.stdout:
console.print(f"[cyan]📜 STDOUT:[/cyan]\n{e.stdout.strip()}")
if e.stderr:
console.print(f"[magenta]⚠️ STDERR:[/magenta]\n{e.stderr.strip()}")
console.print("[red]❌ Nenhum método de formatação teve sucesso.[/red]")
console.print("[cyan]🧠 Sugestão:[/] verifique se o kernel suporta EXT4 e se o BusyBox inclui mke2fs.")
return False
def ensure_fs(device: str) -> bool:
"""
Verifica se existe sistema de ficheiros.
Se não existir e houver confirmação/flag, formata ext4 (ou fallback via mke2fs).
"""
# 1⃣ tentativa rápida com blkid
info = blkid_check(device)
if info:
console.print(f"[green]🧠 Disco já formatado (blkid):[/] {info}")
return True
# 2⃣ fallback por leituras de assinatura
sig = detect_fs_by_magic(device)
if sig:
console.print(f"[yellow]⚠ Assinatura detectada no disco:[/] {sig}")
console.print("[red]❗ O disco contém dados ou partições existentes. Abortando formatação.[/red]")
return False
# 3⃣ se nada detectado — disco virgem
forced_env = os.environ.get("NFDOS_FORCE_FORMAT") == "1"
forced_cmd = parse_cmdline_flag()
if not (forced_env or forced_cmd):
console.print("[yellow]⚠ Disco parece virgem, mas não há confirmação para formatar.[/yellow]")
console.print("Use `nfdos_force_format=1` no kernel cmdline ou export NFDOS_FORCE_FORMAT=1")
console.print("para permitir formatação automática.")
return False
# 4⃣ tentar formatação
console.print(f"[yellow]⚙️ Forçando formatação de {device} como ext4 (FLAG DETETADA)...[/yellow]")
ok = format_ext4(device)
if ok:
console.print("[green]✔ Formatação concluída com sucesso.[/green]")
return True
# 5⃣ se nada funcionou
console.print("[red]❌ Falha na formatação.[/red]")
console.print("[cyan]🧠 Sugestão:[/] verifique se o kernel inclui suporte para EXT4 ou se o mkfs/mke2fs está embutido no BusyBox.")
return False
def mount_disk(device: str) -> bool:
"""Monta o disco no ponto esperado (retorna True se OK)."""
os.makedirs(MOUNT_POINT, exist_ok=True)
return run(["mount", device, MOUNT_POINT])
def debug_env():
"""Mostra informações úteis quando nenhum disco é detectado (ou para debug)."""
console.print("[yellow]🩻 DEBUG: listando /dev/* e últimas mensagens do kernel[/yellow]")
devs = sorted(Path("/dev").glob("*"))
console.print("📂 Dispositivos disponíveis:", ", ".join([d.name for d in devs if d.is_char_device() or d.is_block_device()]))
os.system("dmesg | tail -n 20 || echo '(dmesg não disponível)'")
console.print("[yellow]───────────────────────────────[/yellow]")
os.system("echo '--- /proc/partitions ---'; cat /proc/partitions || true")
os.system("echo '--- dmesg | grep -i virtio ---'; dmesg | grep -i virtio || true")
console.print("[yellow]───────────────────────────────[/yellow]")
def initialize_persistence():
"""Fluxo completo de inicialização do hipocampo físico."""
device = detect_disk()
if not device:
debug_env()
console.print("[red]❌ Nenhum disco físico encontrado — usando modo RAM.[/red]")
return False
if not ensure_fs(device):
console.print("[red]❌ Preparação do sistema de ficheiros foi interrompida.[/red]")
return False
if not mount_disk(device):
console.print("[red]❌ Falha ao montar disco.[/red]")
return False
console.print(f"[green]✔ Disco montado em:[/] {MOUNT_POINT}")
telemetry_file = Path("/opt/kernel/neurotron/data/telemetry.json")
telemetry_file.parent.mkdir(parents=True, exist_ok=True)
if not telemetry_file.exists():
telemetry_file.write_text("[]")
for d in ["data", "logs", "dna"]:
Path(MOUNT_POINT, d).mkdir(parents=True, exist_ok=True)
Path(MOUNT_POINT, "DNA_ID").write_text("NEUROTRON_HIPOCAMPUS_V1\n")
console.print("[cyan]👉 Hipocampo físico inicializado com sucesso.[/cyan]")
return True
if __name__ == "__main__":
initialize_persistence()

34
src/hippocampus.py Normal file
View File

@ -0,0 +1,34 @@
from pathlib import Path
from datetime import datetime
try:
import orjson as json
except Exception: # fallback leve
import json # type: ignore
class Hippocampus:
"""
Memória contextual simples (JSON Lines): append-only.
Guarda perceções, decisões e ações para replays futuros.
"""
def __init__(self, log_dir: Path):
self.log_dir = log_dir
self.events_file = log_dir / "events.jsonl"
def remember(self, kind: str, data: dict) -> None:
rec = {
"ts": datetime.utcnow().isoformat() + "Z",
"kind": kind,
"data": data,
}
try:
if "orjson" in json.__name__:
blob = json.dumps(rec)
else:
blob = json.dumps(rec) # type: ignore
with self.events_file.open("ab") as f:
f.write(blob if isinstance(blob, bytes) else blob.encode("utf-8"))
f.write(b"\n")
except Exception:
# evitar crash por IO em early boot
pass

114
src/main_waiting.py Normal file
View File

@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
Neurotron ponto de entrada do cérebro do NFDOS.
Boot flow (novo): init (BusyBox) /usr/bin/neurotron este ficheiro.
"""
import os
import sys
from pathlib import Path
from datetime import datetime
from rich.console import Console
# -----------------------------------------------------------------------------
# Ajuste de caminho: tornar "src/" o root dos módulos Neurotron
# -----------------------------------------------------------------------------
THIS_DIR = Path(__file__).resolve().parent # .../neurotron/src
if str(THIS_DIR) not in sys.path:
sys.path.insert(0, str(THIS_DIR))
# Agora os imports ficam locais ao diretório src/
from neurotron_config import ( # noqa: E402
NEUROTRON_TICK,
NEUROTRON_MODE,
NEUROTRON_HOMEOSTASIS,
CORTEX_LOOP_DELAY,
MOTOR_OUTPUT_DEVICE,
)
from autodiagnostic import AutoDiagnostic # noqa: E402
from perception import Perception # noqa: E402
from cortex import Cortex # noqa: E402
console = Console()
def detect_persistent_mount() -> bool:
"""Verifica se o hipocampo físico está montado em /var/neurotron"""
mount_point = Path("/var/neurotron")
try:
if mount_point.exists() and os.path.ismount(mount_point):
console.print(f"[green]💾 Hipocampo físico montado:[/] {mount_point}")
return True
else:
# fallback: check via /proc/mounts em early boot
with open("/proc/mounts") as f:
for line in f:
if " /var/neurotron " in line:
console.print(f"[green]💾 Hipocampo físico montado (via /proc):[/] {mount_point}")
return True
except Exception as e:
console.print(f"[yellow]⚠ Falha ao verificar montagem persistente:[/] {e}")
return False
def main():
# -------------------------------------------------------------------------
# Seleção de modo: persistente vs volátil
# -------------------------------------------------------------------------
persistent_mode = detect_persistent_mount()
if persistent_mode:
os.environ["NEUROTRON_MODE"] = "persistent"
os.environ["NEUROTRON_RUNTIME"] = "/var/neurotron/data"
os.environ["NEUROTRON_LOG"] = "/var/neurotron/logs"
else:
os.environ["NEUROTRON_MODE"] = "volatile"
os.environ["NEUROTRON_RUNTIME"] = "/tmp/neurotron_data"
os.environ["NEUROTRON_LOG"] = "/tmp/neurotron_logs"
runtime_dir = Path(os.environ["NEUROTRON_RUNTIME"])
log_dir = Path(os.environ["NEUROTRON_LOG"])
runtime_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True)
mode = os.environ["NEUROTRON_MODE"]
console.print(f"[cyan]🌍 Modo atual do Neurotron:[/] [bold]{mode.upper()}[/]")
# -------------------------------------------------------------------------
# Inicializa o Córtex
# -------------------------------------------------------------------------
cortex = Cortex(
runtime_dir=runtime_dir,
log_dir=log_dir,
tick_seconds=NEUROTRON_TICK,
)
try:
cortex.boot()
state = cortex.diagnostic._load_previous().get("state", "?")
console.print(f"[cyan]🩺 Estado inicial:[/] {state}\n")
console.print("[green]👉 Neurotron inicializado com sucesso.[/]\n")
console.print("[green]✔ [Mensagem Simbolica] Boot OK[/]\n")
console.print("[green]✔ Iniciando ciclo cognitivo.[/]\n")
while True:
cortex.observe()
cortex.think()
cortex.act()
cortex.rest()
except KeyboardInterrupt:
console.print("[yellow]⚠ Interrompido pelo utilizador (SIGINT)[/]")
cortex.shutdown(reason="SIGINT")
except SystemExit:
cortex.shutdown(reason="SystemExit")
except Exception as e:
console.print(f"[red]💥 Exceção não tratada:[/] {e}")
cortex.fatal(e)
if __name__ == "__main__":
main()

27
src/motor.py Normal file
View File

@ -0,0 +1,27 @@
import subprocess
class Motor:
"""
Ator do sistema: executa comandos controlados (whitelist).
Mantém-se minimal até termos política de segurança mais rica.
"""
SAFE_CMDS = {
"echo": ["echo"],
"sh": ["/bin/sh"], # shell interativo (init)
}
def run(self, cmd: str, args: list[str] | None = None) -> dict:
prog = self.SAFE_CMDS.get(cmd)
if not prog:
return {"ok": False, "error": f"cmd '{cmd}' não permitido"}
try:
full = prog + (args or [])
res = subprocess.run(full, capture_output=True, text=True)
return {
"ok": res.returncode == 0,
"code": res.returncode,
"stdout": res.stdout,
"stderr": res.stderr,
}
except Exception as e:
return {"ok": False, "error": str(e)}

30
src/neuron.py Normal file
View File

@ -0,0 +1,30 @@
from typing import Any, Dict
class Neuron:
"""
Classe-base de um neurónio-agente.
Cada neurónio pode observar/agir e trocar mensagens via o bus do Cortex.
"""
name = "Neuron"
def __init__(self, ctx: "Cortex"):
self.ctx = ctx
def observe(self) -> None:
"""Ler estado do mundo (sensores, /proc, eventos)."""
return
def think(self) -> None:
"""Processar/planejar usando o estado atual."""
return
def act(self) -> None:
"""Executar uma ação (opcional)."""
return
# Utilitários
def publish(self, channel: str, payload: Dict[str, Any]) -> None:
self.ctx.bus_publish(channel, payload)
def consume(self, channel: str) -> Dict[str, Any] | None:
return self.ctx.bus_consume(channel)

118
src/neurotron_config.py Normal file
View File

@ -0,0 +1,118 @@
"""
🧠 neurotron_config.py
NFDOS Núcleo de parâmetros vitais do Neurotron
------------------------------------------------
Nova versão para o layout:
.../neurotron/
src/
data/
"""
from pathlib import Path
# ======================================
# 🌐 Diretórios e Caminhos
# ======================================
# Diretório deste ficheiro → .../neurotron/src/neurotron_config.py
THIS_FILE = Path(__file__).resolve()
SRC_DIR = THIS_FILE.parent # .../neurotron/src
BASE_DIR = SRC_DIR.parent # .../neurotron/
# Onde vivem as configs/logs da “instalação”
DATA_DIR = BASE_DIR / "data"
CONFIG_DIR = DATA_DIR / "configs"
LOG_DIR = DATA_DIR / "logs"
# Modo persistente do NFDOS (quando /var/neurotron está montado)
RUNTIME_DIR = Path("/var/run/neurotron")
MOUNT_POINT = "/var/neurotron"
# Candidatos para disco persistente do hipocampo
DISK_CANDIDATES = [
"/dev/vda", "/dev/vdb",
"/dev/sda", "/dev/hda"
]
# ======================================
# ⚙️ Parâmetros Cognitivos Principais
# ======================================
NEUROTRON_TICK = 1.0
NEUROTRON_VERBOSITY = 1
NEUROTRON_MODE = "diagnostic"
NEUROTRON_HOMEOSTASIS = 85.0
HOMEOSTASIS_CPU_WARN = 70.0
HOMEOSTASIS_CPU_ALERT = 85.0
HOMEOSTASIS_MEM_WARN = 75.0
HOMEOSTASIS_MEM_ALERT = 90.0
HOMEOSTASIS_LOAD_WARN = 1.5
HOMEOSTASIS_LOAD_ALERT = 3.0
NEUROTRON_DIAG_EVERY_TICKS = 5
NEUROTRON_TICK_MIN = 0.5
NEUROTRON_TICK_MAX = 3.0
NEUROTRON_TICK_STEP = 0.25
NEUROTRON_SEED = 42
NEUROTRON_MEMORY_SIZE = 256 # KB
# ======================================
# 🧩 Parâmetros de Subsistemas
# ======================================
CORTEX_MAX_THREADS = 1
CORTEX_LOOP_DELAY = 0.1
HIPPOCAMPUS_LOG_RETENTION = 100
HIPPOCAMPUS_AUTOSAVE = True
MOTOR_OUTPUT_DEVICE = "console"
MOTOR_SHOW_SYMBOLS = True
PERCEPTION_CPU_SOURCE = "/proc/stat"
PERCEPTION_MEM_SOURCE = "/proc/meminfo"
PERCEPTION_UPDATE_INTERVAL = 2.0
# ======================================
# 🧠 Parâmetros futuros
# ======================================
NEUROTRON_EXPANSION_MODE = "none"
NEUROTRON_DATASET_PATH = DATA_DIR
NEUROTRON_HISTORY_KEEP = 8
NEUROTRON_DIAG_SCHEMA = "v4"
HEARTBEAT_ENABLED = True
HEARTBEAT_STYLE = "compact"
NEUROTRON_THRESHOLDS = {
"cpu_high": 85.0,
"mem_high": 90.0,
"load1_high": 2.0,
}
TELEMETRY_MAXLEN = 64
TELEMETRY_FLUSH_EVERY_TICKS = 5
# ======================================
# 🧭 Utilitário
# ======================================
def show_config():
"""Mostra a configuração atual do Neurotron (apenas NEUROTRON_*)"""
import json
cfg = {
k: v
for k, v in globals().items()
if k.startswith("NEUROTRON_")
}
print(json.dumps(cfg, indent=2, default=str))
if __name__ == "__main__":
show_config()

89
src/perception.py Normal file
View File

@ -0,0 +1,89 @@
import os
from time import sleep
class Perception:
"""
Sensores internos via /proc:
- CPU % calculado por delta de /proc/stat
- Memória % via /proc/meminfo
- Carga média via /proc/loadavg
Sem dependências externas (psutil).
"""
def _read_proc_stat(self):
try:
with open("/proc/stat", "r") as f:
line = f.readline()
if not line.startswith("cpu "):
return None
parts = line.strip().split()[1:]
vals = list(map(int, parts[:10])) # user nice system idle iowait irq softirq steal guest guest_nice
return {
"user": vals[0], "nice": vals[1], "system": vals[2], "idle": vals[3],
"iowait": vals[4], "irq": vals[5], "softirq": vals[6], "steal": vals[7],
"guest": vals[8], "guest_nice": vals[9],
}
except Exception:
return None
def _cpu_percent(self, interval=0.05):
a = self._read_proc_stat()
if not a:
return "?"
sleep(interval) # micro-janelinha
b = self._read_proc_stat()
if not b:
return "?"
idle_a = a["idle"] + a["iowait"]
idle_b = b["idle"] + b["iowait"]
non_a = sum(a.values()) - idle_a
non_b = sum(b.values()) - idle_b
total_a = idle_a + non_a
total_b = idle_b + non_b
totald = total_b - total_a
idled = idle_b - idle_a
if totald <= 0:
return "?"
usage = (totald - idled) * 100.0 / totald
return round(usage, 1)
def _mem_percent(self):
try:
info = {}
with open("/proc/meminfo", "r") as f:
for line in f:
k, v = line.split(":", 1)
info[k.strip()] = v.strip()
def kB(key):
if key not in info: return None
return float(info[key].split()[0]) # kB
mem_total = kB("MemTotal")
mem_avail = kB("MemAvailable")
if not mem_total or mem_avail is None:
return "?"
used = mem_total - mem_avail
return round(used * 100.0 / mem_total, 1)
except Exception:
return "?"
def _loadavg(self):
try:
if hasattr(os, "getloadavg"):
l1, l5, l15 = os.getloadavg()
return [round(l1, 2), round(l5, 2), round(l15, 2)]
with open("/proc/loadavg", "r") as f:
parts = f.read().strip().split()
l1, l5, l15 = map(float, parts[:3])
return [round(l1, 2), round(l5, 2), round(l15, 2)]
except Exception:
return ["?", "?", "?"]
def snapshot(self) -> dict:
return {
"env_user": os.environ.get("USER") or "root",
"env_term": os.environ.get("TERM") or "unknown",
"cpu_percent": self._cpu_percent(),
"mem_percent": self._mem_percent(),
"loadavg": self._loadavg(),
}

95
src/telemetry_tail.py Normal file
View File

@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""
📊 Painel de Telemetria do Neurotron V0.1
o ficheiro telemetry.json e mostra um mini-ECG digital:
Execução:
python3 /opt/kernel/neurotron/neurotron_core/telemetry_tail.py
"""
import json
import time
import os
from pathlib import Path
from statistics import mean
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
DATASET = "/opt/kernel/neurotron/data/telemetry.json"
BAR_CHARS = "▁▂▃▄▅▆▇█"
SAMPLES = 24 # quantas amostras recentes mostrar
REFRESH = 2.0 # segundos entre atualizações
def mini_graph(values, width=24):
"""Desenha barras simples tipo sparkline"""
if not values:
return "·" * width
vals = [v for v in values if isinstance(v, (int, float))]
if not vals:
return "·" * width
lo, hi = min(vals), max(vals)
span = (hi - lo) or 1.0
bars = []
for v in vals[-width:]:
if not isinstance(v, (int, float)):
bars.append("·")
continue
i = int(round((v - lo) / span * (len(BAR_CHARS) - 1)))
bars.append(BAR_CHARS[i])
return "".join(bars)
def read_telemetry(path: str):
try:
data = json.loads(Path(path).read_text() or "[]")
return data[-SAMPLES:]
except Exception:
return []
def render_panel(console, data):
if not data:
console.print("[yellow]Nenhum dado de telemetria disponível.[/yellow]")
return
cpu = [d.get("cpu") for d in data if isinstance(d.get("cpu"), (int, float))]
mem = [d.get("mem") for d in data if isinstance(d.get("mem"), (int, float))]
load = [d.get("load")[0] for d in data if isinstance(d.get("load"), (list, tuple)) and isinstance(d.get("load")[0], (int, float))]
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Sinal Vital", justify="left")
table.add_column("Tendência", justify="left")
table.add_column("Média", justify="right")
table.add_row("CPU (%)", mini_graph(cpu), f"{mean(cpu):.1f}%" if cpu else "?")
table.add_row("Memória (%)", mini_graph(mem), f"{mean(mem):.1f}%" if mem else "?")
table.add_row("Carga (1min)", mini_graph(load), f"{mean(load):.2f}" if load else "?")
panel = Panel(table, title="🩺 TELEMETRIA RECENTE", border_style="green")
console.clear()
console.print(panel)
def main():
console = Console()
console.print("[bold cyan]Neurotron Telemetry Tail — Iniciar Monitorização[/bold cyan]\n")
while True:
if not Path(DATASET).exists():
console.print(f"[yellow]A aguardar dados em {DATASET}...[/yellow]")
time.sleep(REFRESH)
continue
data = read_telemetry(DATASET)
if not data:
console.print("[yellow]Nenhum dado de telemetria disponível.[/yellow]")
else:
render_panel(console, data)
time.sleep(REFRESH)
if __name__ == "__main__":
main()