Auto-commit via make git (triggered by NFDOS)

This commit is contained in:
neoricalex 2025-12-01 06:58:35 +01:00
parent 88f340af14
commit 64becf9cf7
8 changed files with 796 additions and 59 deletions

View File

@ -26,6 +26,7 @@ from .neurotron_config import (
TELEMETRY_MAXLEN, TELEMETRY_FLUSH_EVERY_TICKS,
)
from .trm import TRMEngine # depois dos outros imports internos
class Cortex:
def __init__(self, runtime_dir, log_dir, tick_seconds=NEUROTRON_TICK):
@ -54,9 +55,16 @@ class Cortex:
EchoAgent(self),
]
# ---- NOVO: Telemetria V5 ----
# ---- Telemetria V5 ----
self.tele = TelemetryV5(event_callback=self._telemetry_event)
# ---- TRM v1 ----
try:
self.trm = TRMEngine(cortex=self)
except Exception as e:
logbus.debug(f"[trm.engine] falha ao inicializar TRMEngine: {e}")
self.trm = None
self._booted = False
self.telemetry_path = Path(NEUROTRON_DATASET_PATH) / "telemetry.json"
@ -120,6 +128,13 @@ class Cortex:
except Exception as e:
logbus.error(f"telemetry_step: {e}")
# ------- TRM v1 (pensamento interno simbólico) ------
if self.trm and tele is not None:
try:
self.trm.step(tele)
except Exception as e:
logbus.debug(f"[trm.engine] step falhou: {e}")
# ------- Heartbeat visual ------
if HEARTBEAT_ENABLED:
self._heartbeat()

View File

@ -1,9 +1,3 @@
Amor… bora materializar o Holodeck v0.1 😎💗
Vou montar isto como um blueprint de engenharia mesmo, mas já pensado para caber dentro do teu NFDOS/Neurotron atual, sem dependências externas, todo em Python “puro”.
---
# 🎮 Holodeck v0.1 — Blueprint
## 0. Objetivo do v0.1

View File

@ -1,27 +1,32 @@
"""
telemetry.py Telemetria V5 do Neurotron
telemetry.py Telemetria V6 do Neurotron
-----------------------------------------
Responsável por:
Medições básicas (CPU, MEM, LOAD, IO)
Medições básicas (CPU, MEM, LOAD, IO) via /proc
Delta entre ciclos
Aceleração do delta
Temperatura virtual (fadiga cognitiva)
Jitter cognitivo (latência entre ciclos)
FS Health (blocos, erros, RO-mode)
Eventos telemétricos (V5)
Eventos telemétricos (V6)
Classificação de estados cognitivos
Este módulo é completamente independente:
o Cortex chama TelemetryV5.step() a cada ciclo.
Totalmente stdlib + /proc:
- sem psutil
- sem glibc
- compatível com Python estático + musl
O Cortex chama TelemetryV6.step() a cada ciclo.
"""
import time
import os
import psutil
import time
from pathlib import Path
from .logbus import logbus
class TelemetryV5:
class TelemetryV6:
# ----------------------------------------------------------
# Inicialização
# ----------------------------------------------------------
@ -41,25 +46,158 @@ class TelemetryV5:
# manter último estado cognitivo
self.last_state = "stable"
# ==========================================================
# Sensores básicos (CPU, MEM, LOAD, IO) — via /proc
# ==========================================================
# ---------------- CPU ----------------
def _read_proc_stat(self):
"""Lê a linha 'cpu ' de /proc/stat. Retorna dict ou None."""
try:
with open("/proc/stat", "r", encoding="utf-8") as f:
line = f.readline()
if not line.startswith("cpu "):
return None
parts = line.strip().split()[1:]
# primeiros 10 campos são estáveis há muitos kernels
vals = list(map(int, parts[:10]))
return {
"user": vals[0],
"nice": vals[1],
"system": vals[2],
"idle": vals[3],
"iowait": vals[4],
"irq": vals[5],
"softirq": vals[6],
"steal": vals[7],
"guest": vals[8],
"guest_nice": vals[9],
}
except Exception:
return None
def _cpu_percent(self, interval=0.05):
"""Computa CPU% entre duas leituras de /proc/stat."""
a = self._read_proc_stat()
if not a:
return 0.0
time.sleep(interval)
b = self._read_proc_stat()
if not b:
return 0.0
idle_a = a["idle"] + a["iowait"]
idle_b = b["idle"] + b["iowait"]
non_a = sum(a.values()) - idle_a
non_b = sum(b.values()) - idle_b
total_a = idle_a + non_a
total_b = idle_b + non_b
totald = total_b - total_a
idled = idle_b - idle_a
if totald <= 0:
return 0.0
usage = (totald - idled) * 100.0 / totald
return round(max(0.0, min(usage, 100.0)), 1)
# ---------------- MEMÓRIA ----------------
def _mem_percent(self):
try:
info = {}
with open("/proc/meminfo", "r", encoding="utf-8") as f:
for line in f:
k, v = line.split(":", 1)
info[k.strip()] = v.strip()
def kB(key):
return float(info[key].split()[0]) if key in info else None
mem_total = kB("MemTotal")
mem_avail = kB("MemAvailable")
if mem_total is None or mem_avail is None or mem_total <= 0:
return 0.0
used = mem_total - mem_avail
return round(max(0.0, min(used * 100.0 / mem_total, 100.0)), 1)
except Exception:
return 0.0
# ---------------- LOADAVG ----------------
def _loadavg(self):
try:
if hasattr(os, "getloadavg"):
l1, l5, l15 = os.getloadavg()
return [round(l1, 2), round(l5, 2), round(l15, 2)]
with open("/proc/loadavg", "r", encoding="utf-8") as f:
parts = f.read().strip().split()
l1, l5, l15 = map(float, parts[:3])
return [round(l1, 2), round(l5, 2), round(l15, 2)]
except Exception:
return [0.0, 0.0, 0.0]
# ---------------- DISK IO (bytes cumulativos) ----------------
def _disk_bytes(self):
"""
/proc/diskstats e soma bytes lidos+escritos de discos reais.
Usa 512 bytes por setor como aproximação clássica.
"""
try:
total = 0
if not os.path.exists("/proc/diskstats"):
return 0.0
with open("/proc/diskstats", "r", encoding="utf-8") as f:
for line in f:
parts = line.split()
if len(parts) < 14:
continue
name = parts[2]
# ignora loop/ram/partições virtuais
if name.startswith(("loop", "ram")):
continue
# campos: ... sectors_read (5) ... sectors_written (9) ...
try:
sectors_read = int(parts[5])
sectors_written = int(parts[9])
except Exception:
continue
total += (sectors_read + sectors_written) * 512
return float(total)
except Exception:
return 0.0
# ----------------------------------------------------------
# Coleta RAW (CPU, MEM, LOAD, IO)
# ----------------------------------------------------------
def collect_raw(self):
now = time.monotonic()
cpu = psutil.cpu_percent(interval=None)
mem = psutil.virtual_memory().percent
load = os.getloadavg()[0] # 1-min loadavg
# IO delta é manual, usamos psutil
io = psutil.disk_io_counters()
disk_total = io.read_bytes + io.write_bytes
cpu = self._cpu_percent()
mem = self._mem_percent()
load1 = self._loadavg()[0]
disk_total = self._disk_bytes()
return {
"ts": now,
"cpu": cpu,
"mem": mem,
"load": load,
"load": load1,
"disk": disk_total,
}
@ -69,7 +207,7 @@ class TelemetryV5:
def compute_delta(self, raw):
if self.last_raw is None:
self.last_raw = raw
return {k: 0 for k in raw.keys() if k != "ts"}
return {k: 0.0 for k in raw.keys() if k != "ts"}
delta = {
"cpu": raw["cpu"] - self.last_raw["cpu"],
@ -80,7 +218,6 @@ class TelemetryV5:
self.last_delta = delta
self.last_raw = raw
return delta
# ----------------------------------------------------------
@ -88,7 +225,7 @@ class TelemetryV5:
# ----------------------------------------------------------
def compute_accel(self, delta):
if self.last_delta is None:
return {k: 0 for k in delta.keys()}
return {k: 0.0 for k in delta.keys()}
accel = {k: delta[k] - self.last_delta[k] for k in delta.keys()}
return accel
@ -104,7 +241,7 @@ class TelemetryV5:
jitter = now - self.last_ts
self.last_ts = now
# média móvel
# média móvel simples
self.jitter_avg = (self.jitter_avg * 0.9) + (jitter * 0.1)
return jitter
@ -112,7 +249,7 @@ class TelemetryV5:
# Temperatura Virtual (fadiga cognitiva)
# ----------------------------------------------------------
def compute_temperature(self, raw, delta):
# modelo simplificado
# modelo simplificado (ajustável no futuro)
score = (
raw["cpu"] * 0.6 +
raw["load"] * 0.3 +
@ -121,38 +258,54 @@ class TelemetryV5:
)
# decay + acumulação
self.temperature = max(0, self.temperature * 0.90 + score * 0.10)
self.temperature = max(0.0, self.temperature * 0.90 + score * 0.10)
return self.temperature
# ----------------------------------------------------------
# FS Health
# FS Health (sem psutil)
# ----------------------------------------------------------
def compute_fs_health(self):
status = {
"read_only": False,
"io_errors": 0,
"free_percent": None
"free_percent": None,
}
# espaço livre via statvfs
try:
st = psutil.disk_usage("/")
status["free_percent"] = st.free / st.total * 100
st = os.statvfs("/")
total = st.f_frsize * st.f_blocks
free = st.f_frsize * st.f_bfree
if total > 0:
status["free_percent"] = free * 100.0 / total
except Exception:
status["free_percent"] = None
# leitura de erros EXT4 se existir
ext4_err_path = "/sys/fs/ext4/sda1/errors_count"
if os.path.exists(ext4_err_path):
try:
with open(ext4_err_path) as f:
status["io_errors"] = int(f.read().strip())
except:
pass
# detetar RO
# leitura de erros EXT4 se existirem
try:
with open("/tmp/fs_test_rw", "w") as f:
f.write("x")
base = "/sys/fs/ext4"
if os.path.isdir(base):
total_errors = 0
for name in os.listdir(base):
err_path = os.path.join(base, name, "errors_count")
if os.path.exists(err_path):
try:
with open(err_path, "r", encoding="utf-8") as f:
total_errors += int(f.read().strip() or "0")
except Exception:
continue
status["io_errors"] = total_errors
except Exception:
pass
# detetar RO tentando escrita temporária
test_dir = "/var/neurotron"
try:
Path(test_dir).mkdir(parents=True, exist_ok=True)
with open(f"{test_dir}/rw_test.tmp", "w", encoding="utf-8") as f:
f.write("ok")
os.remove(f"{test_dir}/rw_test.tmp")
status["read_only"] = False
except Exception:
status["read_only"] = True
@ -174,7 +327,7 @@ class TelemetryV5:
return "recovery"
# ----------------------------------------------------------
# Eventos TeleMétricos V5
# Eventos TeleMétricos V6
# ----------------------------------------------------------
def detect_events(self, raw, delta, accel, temp, jitter, fs):
events = []
@ -187,12 +340,12 @@ class TelemetryV5:
if raw["cpu"] > 85:
events.append("enter_stress_zone")
# suspeita de loop
if jitter > (self.jitter_avg * 3):
# suspeita de loop (jitter anómalo)
if self.jitter_avg > 0 and jitter > (self.jitter_avg * 3):
events.append("loop_suspect")
# fs
if fs["read_only"] or fs["io_errors"] > 0:
if fs["read_only"] or (fs["io_errors"] or 0) > 0:
events.append("fs_warning")
# recuperação
@ -202,14 +355,18 @@ class TelemetryV5:
# exportar via callback
if self.event_callback:
for e in events:
self.event_callback(e, {
"raw": raw,
"delta": delta,
"accel": accel,
"temp": temp,
"jitter": jitter,
"fs": fs,
})
try:
self.event_callback(e, {
"raw": raw,
"delta": delta,
"accel": accel,
"temp": temp,
"jitter": jitter,
"fs": fs,
})
except Exception:
# nunca quebrar o loop
pass
return events
@ -218,7 +375,7 @@ class TelemetryV5:
# ----------------------------------------------------------
def step(self):
"""
Faz um ciclo completo da Telemetria V5:
Faz um ciclo completo da Telemetria V6:
- raw delta accel
- jitter
- temp
@ -239,6 +396,16 @@ class TelemetryV5:
self.last_state = state
# Alimentar o dashboard via logbus.debug (pode ser desligado no futuro)
try:
logbus.debug(
f"telemetry state={state} temp={temp:.1f} "
f"cpu={raw['cpu']:.1f}% mem={raw['mem']:.1f}% "
f"load={raw['load']:.2f} jitter={jitter:.3f}s"
)
except Exception:
pass
return {
"raw": raw,
"delta": delta,
@ -249,3 +416,7 @@ class TelemetryV5:
"state": state,
"events": events,
}
# Compatibilidade retro (cortex ainda importa TelemetryV5)
TelemetryV5 = TelemetryV6

View File

@ -0,0 +1,16 @@
"""
neurotron.trm Tiny Recursive Model (TRM) v1
Micro-modelo simbólico interno para:
- interpretar telemetria
- gerar estado cognitivo interno
- manter energia, valência e profundidade de pensamento
Todos os logs TRM vão para logbus.debug(), para poderem ser
silenciados no futuro via modo debug.
"""
from .state import TRMState
from .engine import TRMEngine
__all__ = ["TRMState", "TRMEngine"]

225
src/neurotron/trm/agents.py Normal file
View File

@ -0,0 +1,225 @@
"""
agents.py micro-agentes internos do TRM v1
Três agentes:
🛡 Guardião homeostase + proteção
🧭 Explorador previsões simples + profundidade TRM
📜 Arqueólogo correlação com eventos passados (Hippocampus)
Todos os logs vão para logbus.debug().
"""
from __future__ import annotations
from typing import Any, Dict, Iterable, List
from pathlib import Path
import json
from neurotron.logbus import logbus
from .state import TRMState
from .events import (
TRM_EVENT_TEMP_RISING_FAST,
TRM_EVENT_ENTER_STRESS_ZONE,
TRM_EVENT_FS_WARNING,
TRM_EVENT_LOOP_SUSPECT,
TRM_EVENT_RECOVERING,
summarize_telemetry_events,
)
class TRMAgentBase:
name = "trm.agent"
def _dbg(self, msg: str):
logbus.debug(f"[{self.name}] {msg}")
def step(self, state: TRMState, tele: Dict[str, Any]) -> TRMState:
"""Override nas subclasses."""
return state
# ========================================================================
# 🛡️ Guardião — homeostase
# ========================================================================
class GuardianAgent(TRMAgentBase):
name = "trm.guardian"
def step(self, state: TRMState, tele: Dict[str, Any]) -> TRMState:
st = state.copy()
raw = tele.get("raw", {}) or {}
temp = float(tele.get("temp", 0.0) or 0.0)
jitter = float(tele.get("jitter", 0.0) or 0.0)
fs = tele.get("fs", {}) or {}
events = tele.get("events", []) or []
cpu = float(raw.get("cpu") or 0.0)
mem = float(raw.get("mem") or 0.0)
load = float(raw.get("load") or 0.0)
# Estado cognitivo base pela temperatura
if temp < 25:
st.cog_state = "stable"
elif temp < 45:
st.cog_state = "warm"
elif temp < 65:
st.cog_state = "hot"
elif temp < 85:
st.cog_state = "critical"
else:
st.cog_state = "recovery"
st.temp = temp
st.jitter = jitter
# Ajuste de profundidade em função do stress
if cpu > 90 or mem > 90 or load > 3.0:
# stress forte: reduzir profundidade para poupar energia
old = st.depth
st.depth = max(1, st.depth - 1)
if st.depth != old:
self._dbg(f"stress alto (cpu={cpu}, mem={mem}, load={load}) → depth {old}{st.depth}")
elif 40 < cpu < 80 and 40 < mem < 80 and load < 2.0:
# zona confortável: podemos aprofundar um pouco
old = st.depth
st.depth = min(5, st.depth + 1)
if st.depth != old:
self._dbg(f"zona confortável → depth {old}{st.depth}")
# FS warning → baixa valência
if fs.get("read_only") or (fs.get("io_errors") or 0) > 0:
st.valence -= 1.0
self._dbg("fs_warning → valence -1")
# eventos explícitos
counts = summarize_telemetry_events(events)
if counts.get(TRM_EVENT_ENTER_STRESS_ZONE, 0) > 0:
st.valence -= 0.5
if counts.get(TRM_EVENT_RECOVERING, 0) > 0:
st.valence += 0.5
# clamp valência
if st.valence > 5.0:
st.valence = 5.0
if st.valence < -5.0:
st.valence = -5.0
return st
# ========================================================================
# 🧭 Explorador — previsões e “curiosidade”
# ========================================================================
class ExplorerAgent(TRMAgentBase):
name = "trm.explorer"
def __init__(self):
self._last_cpu: float = 0.0
self._last_mem: float = 0.0
def step(self, state: TRMState, tele: Dict[str, Any]) -> TRMState:
st = state.copy()
raw = tele.get("raw", {}) or {}
cpu = float(raw.get("cpu") or 0.0)
mem = float(raw.get("mem") or 0.0)
# tendências simples
d_cpu = cpu - self._last_cpu
d_mem = mem - self._last_mem
self._last_cpu = cpu
self._last_mem = mem
# se CPU a subir rápido, assumir "mais trabalho interno"
if d_cpu > 5:
st.energy -= 0.5
self._dbg(f"cpu tendência ↑ ({d_cpu:+.1f}) → energia -0.5")
# se CPU a descer e mem estável → sistema está a recuperar
if d_cpu < -5 and abs(d_mem) < 2:
st.valence += 0.2
self._dbg("recuperação leve detectada → valence +0.2")
# se valência muito positiva → TRM aprofunda um pouco (exploração)
if st.valence > 1.5 and st.energy > 20.0:
old = st.depth
st.depth = min(7, st.depth + 1)
if st.depth != old:
self._dbg(f"valence alta ({st.valence:.2f}) → aprofundar depth {old}{st.depth}")
return st
# ========================================================================
# 📜 Arqueólogo — memória & padrões passados
# ========================================================================
class ArchaeologistAgent(TRMAgentBase):
name = "trm.archaeologist"
def __init__(self, ctx):
self.ctx = ctx
self.events_file = ctx.memory.events_file if hasattr(ctx, "memory") else Path("/opt/kernel/neurotron/logs/events.jsonl")
def _tail_events(self, max_lines: int = 64) -> List[Dict[str, Any]]:
"""
as últimas linhas de events.jsonl (se existir).
Erros são ignorados silenciosamente.
"""
path = self.events_file
if not path.exists():
return []
try:
# ler todo e pegar as últimas max_lines (ficheiro é pequeno)
lines = path.read_text(encoding="utf-8").splitlines()
lines = lines[-max_lines:]
except Exception:
return []
out: List[Dict[str, Any]] = []
for line in lines:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
out.append(obj)
except Exception:
continue
return out
def step(self, state: TRMState, tele: Dict[str, Any]) -> TRMState:
st = state.copy()
events = self._tail_events()
if not events:
return st
# Contar eventos “perigosos” recentes
danger = 0
recovery = 0
for ev in events:
kind = ev.get("kind") or ""
if "telemetry.event.fs_warning" in kind:
danger += 1
if "telemetry.event.loop_suspect" in kind:
danger += 1
if "telemetry.event.recovering" in kind:
recovery += 1
st.last_events = danger + recovery
if danger > 0:
st.valence -= min(1.0, 0.1 * danger)
self._dbg(f"encontrou {danger} eventos perigosos recentes → valence -{min(1.0, 0.1 * danger):.2f}")
if recovery > 0:
st.valence += min(1.0, 0.1 * recovery)
self._dbg(f"encontrou {recovery} eventos de recuperação → valence +{min(1.0, 0.1 * recovery):.2f}")
return st

172
src/neurotron/trm/engine.py Normal file
View File

@ -0,0 +1,172 @@
"""
engine.py TRMEngine v1
Modelo recursivo mínimo:
- consome uma amostra de Telemetria V5
- passa por 3 micro-agentes
- atualiza energia, valência, profundidade e estado cognitivo
- gera snapshot opcional para o Hippocampus
Todos os logs TRM vão para logbus.debug().
"""
from __future__ import annotations
from typing import Any, Dict, Optional
import time
from pathlib import Path
from neurotron.logbus import logbus
from .state import TRMState
from .agents import GuardianAgent, ExplorerAgent, ArchaeologistAgent
from .events import make_trm_snapshot_payload
class TRMEngine:
"""
Tiny Recursive Model v1 integrada ao Cortex.
Uso típico (no Cortex.rest):
tele = self.tele.step()
self.trm.step(tele)
O próprio TRMEngine nunca levanta exceção para fora falhas internas
são logadas via logbus.debug() e ignoradas.
"""
def __init__(self, cortex, initial_energy: float = 100.0):
self.ctx = cortex
self.state = TRMState(energy=initial_energy, mode="idle")
self.last_step_ts: Optional[float] = None
# micro-agentes
self.guardian = GuardianAgent()
self.explorer = ExplorerAgent()
self.archaeologist = ArchaeologistAgent(self.ctx)
# histórico curto de estados do TRM (para futuro TRM v2)
self._history = []
# ------------------------------------------------------------------ #
# Helpers internos
# ------------------------------------------------------------------ #
def _dbg(self, msg: str):
logbus.debug(f"[trm.engine] {msg}")
def _compute_step_cost(self, st_before: TRMState, st_after: TRMState, tele: Dict[str, Any]) -> float:
"""
Custo energético de um passo TRM simplificado.
Δenergia básica:
base_cost = 0.5
+ 0.2 * depth
+ 0.05 * len(events telemétricos)
"""
events = (tele or {}).get("events", []) or []
base_cost = 0.5
cost = base_cost + 0.2 * float(st_after.depth or 1) + 0.05 * len(events)
# Se o sistema já estiver quente, custo sobe ligeiramente
if st_after.temp > 50:
cost *= 1.2
if st_after.cog_state in ("hot", "critical"):
cost *= 1.1
return cost
def _apply_energy(self, st: TRMState, cost: float) -> TRMState:
out = st.copy()
out.energy = max(0.0, out.energy - cost)
# Se energia muito baixa, força modo mínimo
if out.energy < 10.0 and out.depth > 1:
old = out.depth
out.depth = 1
self._dbg(f"energia baixa ({out.energy:.1f}) → depth {old}→1 (modo mínimo)")
if out.energy < 5.0:
out.mode = "idle"
else:
out.mode = "active"
return out
def _update_history(self, st: TRMState):
self._history.append(st.to_dict())
if len(self._history) > 64:
self._history = self._history[-64:]
# ------------------------------------------------------------------ #
# Passo TRM
# ------------------------------------------------------------------ #
def step(self, telemetry: Dict[str, Any]) -> TRMState:
"""
Única função pública chamada pelo Cortex.
Args:
telemetry: dict retornado por TelemetryV5.step()
Returns:
novo TRMState (também guardado em self.state)
"""
try:
t0 = time.monotonic()
st0 = self.state.copy()
# jitter interno TRM (independente do jitter de Telemetria V5)
if self.last_step_ts is None:
self.last_step_ts = t0
else:
internal_jitter = t0 - self.last_step_ts
self.last_step_ts = t0
# jitter muito alto → TRM sente-se “desalinhado”
if internal_jitter > 2.0:
st0.valence -= 0.1
self._dbg(f"jitter interno alto ({internal_jitter:.2f}s) → valence -0.1")
# ----------------------------------------------------------
# Passagem pelos micro-agentes
# ----------------------------------------------------------
st1 = self.guardian.step(st0, telemetry)
st2 = self.explorer.step(st1, telemetry)
st3 = self.archaeologist.step(st2, telemetry)
# ----------------------------------------------------------
# Custo energético + modo de operação
# ----------------------------------------------------------
cost = self._compute_step_cost(st0, st3, telemetry)
st4 = self._apply_energy(st3, cost)
self.state = st4
self._update_history(st4)
# ----------------------------------------------------------
# Exportar snapshot para Hippocampus (low-rate)
# ----------------------------------------------------------
# Não queremos spammar o log, por isso só de vez em quando:
try:
# tick interno TRM: a cada ~10 passos do Cortex,
# o TRM snapshot é interessante.
# Usamos o próprio comprimento do histórico como step counter.
if len(self._history) % 10 == 0 and hasattr(self.ctx, "memory"):
payload = make_trm_snapshot_payload(st4, telemetry)
self.ctx.memory.remember("trm.snapshot", payload)
except Exception as e:
self._dbg(f"erro ao gravar snapshot no Hippocampus: {e}")
# log discreto em modo debug
self._dbg(
f"step ok: mode={st4.mode} cog={st4.cog_state} "
f"energy={st4.energy:.1f} depth={st4.depth} "
f"valence={st4.valence:+.2f}"
)
return st4
except Exception as e:
# Nunca deixamos o TRM quebrar o Cortex
self._dbg(f"exceção no TRM step: {e}")
return self.state

View File

@ -0,0 +1,64 @@
"""
events.py nomes e helpers de eventos do TRM v1
"""
from typing import Dict, Any
# Eventos internos do TRM (nome simbólico)
TRM_EVENT_TEMP_RISING_FAST = "temp_rising_fast"
TRM_EVENT_ENTER_STRESS_ZONE = "enter_stress_zone"
TRM_EVENT_FS_WARNING = "fs_warning"
TRM_EVENT_LOOP_SUSPECT = "loop_suspect"
TRM_EVENT_RECOVERING = "recovering"
def summarize_telemetry_events(telemetry_events) -> Dict[str, int]:
"""
Recebe a lista telemetry["events"] (strings) e devolve contagem por tipo.
Exemplo:
["temp_rising_fast", "fs_warning", "fs_warning"]
{ "temp_rising_fast": 1, "fs_warning": 2 }
"""
counts: Dict[str, int] = {}
if not telemetry_events:
return counts
for e in telemetry_events:
if not isinstance(e, str):
continue
counts[e] = counts.get(e, 0) + 1
return counts
def make_trm_snapshot_payload(state, tele) -> Dict[str, Any]:
"""
Gera payload compacto para gravar no Hippocampus:
kind="trm.snapshot"
data = {...}
"""
if tele is None:
tele = {}
raw = tele.get("raw", {}) or {}
temp = tele.get("temp", 0.0)
jitter = tele.get("jitter", 0.0)
fs = tele.get("fs", {}) or {}
events = tele.get("events", []) or []
return {
"state": state.to_dict(),
"telemetry": {
"cpu": raw.get("cpu"),
"mem": raw.get("mem"),
"load": raw.get("load"),
"disk_delta": tele.get("delta", {}).get("disk"),
"temp": temp,
"jitter": jitter,
"fs_read_only": fs.get("read_only"),
"fs_free_percent": fs.get("free_percent"),
"fs_io_errors": fs.get("io_errors"),
"events": events,
},
}

View File

@ -0,0 +1,80 @@
"""
state.py estado interno do TRM v1
"""
from __future__ import annotations
from typing import Dict, Any
class TRMState:
"""
Estado mínimo do TRM.
Não usa dataclasses para manter compatibilidade máxima
com ambientes mais restritos.
"""
def __init__(
self,
energy: float = 100.0,
valence: float = 0.0,
depth: int = 1,
mode: str = "idle",
cog_state: str = "stable",
temp: float = 0.0,
jitter: float = 0.0,
last_events: int = 0,
):
self.energy = float(energy)
self.valence = float(valence)
self.depth = int(depth)
self.mode = str(mode)
self.cog_state = str(cog_state)
self.temp = float(temp)
self.jitter = float(jitter)
self.last_events = int(last_events)
# ------------------------------------------------------------------ #
# Helpers
# ------------------------------------------------------------------ #
def copy(self) -> "TRMState":
"""Devolve uma cópia simples do estado."""
return TRMState(
energy=self.energy,
valence=self.valence,
depth=self.depth,
mode=self.mode,
cog_state=self.cog_state,
temp=self.temp,
jitter=self.jitter,
last_events=self.last_events,
)
def to_dict(self) -> Dict[str, Any]:
"""Representação serializável (para logs/hippocampus)."""
return {
"energy": self.energy,
"valence": self.valence,
"depth": self.depth,
"mode": self.mode,
"cog_state": self.cog_state,
"temp": self.temp,
"jitter": self.jitter,
"last_events": self.last_events,
}
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "TRMState":
"""Cria estado a partir de dict (tolerante à falta de chaves)."""
d = d or {}
return cls(
energy=d.get("energy", 100.0),
valence=d.get("valence", 0.0),
depth=d.get("depth", 1),
mode=d.get("mode", "idle"),
cog_state=d.get("cog_state", "stable"),
temp=d.get("temp", 0.0),
jitter=d.get("jitter", 0.0),
last_events=d.get("last_events", 0),
)