cli + web recorder ui

This commit is contained in:
MasterPhooey
2026-01-17 16:17:21 -06:00
parent b57fcd9b05
commit c52f92d3c9
8 changed files with 332 additions and 273 deletions

View File

@@ -4,9 +4,9 @@ import re
import subprocess
import threading
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
from typing import Dict, Any, List, Optional
from fastapi import FastAPI, UploadFile, File, Form, Query
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
@@ -24,14 +24,9 @@ PERSONAL_DIR = Path(os.environ.get("PERSONAL_DIR", str(DATA_DIR / "personal_samp
# CLI folder inside repo
CLI_DIR = Path(os.environ.get("CLI_DIR", str(ROOT_DIR / "cli"))).resolve()
# If you want cleanup defaults for auto dataset setup, set these env vars:
# REC_DATASET_CLEANUP_ARCHIVES=true/false
# REC_DATASET_CLEANUP_INTERMEDIATE_FILES=true/false
DATASET_CLEANUP_ARCHIVES = os.environ.get("REC_DATASET_CLEANUP_ARCHIVES", "false").lower() in ("1", "true", "yes", "y")
DATASET_CLEANUP_INTERMEDIATE = os.environ.get("REC_DATASET_CLEANUP_INTERMEDIATE_FILES", "false").lower() in ("1", "true", "yes", "y")
# We want "Start training" to trigger your CLI entrypoint, using the existing venv
# (train_wake_word should be in /data/.venv/bin via setup_python_venv)
TRAIN_CMD = os.environ.get(
"TRAIN_CMD",
f"source '{DATA_DIR}/.venv/bin/activate' && train_wake_word --data-dir '{DATA_DIR}'"
@@ -40,14 +35,13 @@ TRAIN_CMD = os.environ.get(
TAKES_PER_SPEAKER_DEFAULT = int(os.environ.get("REC_TAKES_PER_SPEAKER", "10"))
SPEAKERS_TOTAL_DEFAULT = int(os.environ.get("REC_SPEAKERS_TOTAL", "1"))
# How many lines to show in WebUI (tail)
# Tail lines shown to UI
TRAIN_LOG_TAIL_LINES = int(os.environ.get("REC_TRAIN_LOG_TAIL_LINES", "400"))
# If you prefer bytes-based tailing (fast), keep this non-zero.
# Safety cap for reads (bytes) to avoid giant file reads
TRAIN_LOG_MAX_BYTES = int(os.environ.get("REC_TRAIN_LOG_MAX_BYTES", str(512 * 1024))) # 512KB
app = FastAPI(title="microWakeWord Personal Recorder")
# Serve static UI
STATIC_DIR.mkdir(parents=True, exist_ok=True)
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
@@ -60,7 +54,6 @@ def safe_name(raw: str) -> str:
return s or "wakeword"
# -------------------- In-memory session state --------------------
STATE: Dict[str, Any] = {
"raw_phrase": None,
"safe_word": None,
@@ -74,12 +67,13 @@ STATE: Dict[str, Any] = {
"training": {
"running": False,
"exit_code": None,
"log_lines": [], # legacy in-memory tail (still maintained)
"log_path": None, # path to recorder_training.log
"log_lines": [], # legacy in-memory tail (kept, but not relied on)
"log_path": None, # path to recorder_training.log
"safe_word": None,
# NEW: byte offset for efficient log tailing
"log_offset": 0,
# NEW: prevent UI duplication when UI appends:
"last_sent_tail": [], # last tail snapshot (list of lines)
"last_log_size": 0, # detect truncation
},
}
@@ -95,6 +89,26 @@ def _reset_personal_samples_dir():
pass
def _clear_training_log():
"""
Truncate recorder_training.log for a fresh session.
"""
log_path = DATA_DIR / "recorder_training.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, "w", encoding="utf-8") as lf:
lf.write("================================================================================\n")
lf.write("===== New recorder session started =====\n")
lf.write("================================================================================\n")
lf.flush()
with STATE_LOCK:
STATE["training"]["log_path"] = str(log_path)
STATE["training"]["log_lines"] = []
STATE["training"]["last_sent_tail"] = []
STATE["training"]["last_log_size"] = 0
def _append_train_log(line: str):
line = (line or "").rstrip("\n")
with STATE_LOCK:
@@ -105,7 +119,6 @@ def _append_train_log(line: str):
def _title_from_phrase(raw_phrase: str) -> str:
# Keep it human-friendly for the optional <wake_word_title> argument
s = re.sub(r"[^a-zA-Z0-9 ]+", " ", raw_phrase or "").strip()
s = re.sub(r"\s+", " ", s)
return s.title() if s else ""
@@ -118,12 +131,6 @@ def _run_streamed(
header: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
) -> int:
"""
Run a command streaming stdout/stderr to both:
- recorder_training.log (disk)
- STATE["training"]["log_lines"] (UI) [best-effort]
Returns process exit code.
"""
if header:
_append_train_log(header)
@@ -156,9 +163,6 @@ def _run_streamed(
def _ensure_training_venv(log_path: Path) -> None:
"""
Ensure /data/.venv exists by running cli/setup_python_venv if needed.
"""
activate = DATA_DIR / ".venv" / "bin" / "activate"
if activate.exists():
_append_train_log("✅ Training venv found (skipping setup_python_venv)")
@@ -183,10 +187,6 @@ def _ensure_training_venv(log_path: Path) -> None:
def _ensure_training_datasets(log_path: Path) -> None:
"""
Always run setup_training_datasets before training.
The underlying scripts should skip work when already done.
"""
setup = CLI_DIR / "setup_training_datasets"
if not setup.exists():
raise RuntimeError(f"Missing setup_training_datasets at: {setup}")
@@ -217,67 +217,45 @@ def _ensure_training_datasets(log_path: Path) -> None:
raise RuntimeError(f"setup_training_datasets failed (exit_code={rc})")
def _read_log_tail_by_bytes(log_path: Path, max_bytes: int) -> str:
def _read_tail_lines(log_path: Path, max_lines: int) -> List[str]:
"""
Read up to the last max_bytes from a file (UTF-8 best effort).
Read the last N lines, bounded by TRAIN_LOG_MAX_BYTES.
Returns list of lines (no trailing newlines).
"""
if not log_path.exists():
return ""
return []
try:
size = log_path.stat().st_size
start = max(0, size - max_bytes)
start = max(0, size - TRAIN_LOG_MAX_BYTES)
with open(log_path, "rb") as f:
f.seek(start)
data = f.read()
# If we started in the middle of a line, it's ok; UI will show partial.
return data.decode("utf-8", errors="replace")
except Exception:
return ""
def _read_log_tail_by_lines(log_path: Path, max_lines: int) -> str:
"""
Read last N lines of a file (simple, may be slower on huge files).
"""
if not log_path.exists():
return ""
try:
# Read by bytes limited first, then line-tail
raw = _read_log_tail_by_bytes(log_path, TRAIN_LOG_MAX_BYTES)
if not raw:
return ""
lines = raw.splitlines()
if len(lines) <= max_lines:
return "\n".join(lines)
return "\n".join(lines[-max_lines:])
except Exception:
return ""
def _read_log_since_offset(log_path: Path, offset: int, max_bytes: int = 256 * 1024) -> Tuple[str, int]:
"""
Read log file incrementally starting from `offset`.
Returns (new_text, new_offset). Caps bytes read per call.
"""
if not log_path.exists():
return ("", offset)
try:
size = log_path.stat().st_size
# If file rotated/truncated, reset offset
if offset > size:
offset = 0
with open(log_path, "rb") as f:
f.seek(offset)
data = f.read(max_bytes)
new_offset = offset + len(data)
text = data.decode("utf-8", errors="replace")
return (text, new_offset)
lines = text.splitlines()
if len(lines) <= max_lines:
return lines
return lines[-max_lines:]
except Exception:
return ("", offset)
return []
def _compute_new_lines(prev_tail: List[str], new_tail: List[str]) -> List[str]:
"""
Given previous and current tail snapshots, return only the newly-added lines.
Works even if the tail window shifts.
"""
if not prev_tail:
return new_tail
# Try to find the largest suffix of prev_tail that matches a prefix of new_tail
max_k = min(len(prev_tail), len(new_tail))
for k in range(max_k, 0, -1):
if prev_tail[-k:] == new_tail[:k]:
return new_tail[k:]
# If no overlap, just return full new_tail (probably truncation or big jump)
return new_tail
def _run_training_background(safe_word: str, allow_no_personal: bool):
@@ -291,16 +269,15 @@ def _run_training_background(safe_word: str, allow_no_personal: bool):
STATE["training"]["exit_code"] = None
STATE["training"]["log_lines"] = []
STATE["training"]["safe_word"] = safe_word
STATE["training"]["last_sent_tail"] = []
STATE["training"]["last_log_size"] = 0
log_path = Path(str(DATA_DIR / "recorder_training.log"))
STATE["training"]["log_path"] = str(log_path)
STATE["training"]["log_offset"] = 0
# fresh header at the start of a run
_append_train_log("================================================================================")
_append_train_log("===== Recorder Training Run =====")
_append_train_log("================================================================================")
# Ensure the log exists and starts cleanly with a header separator for this run
try:
with open(log_path, "a", encoding="utf-8") as lf:
lf.write("\n" + ("=" * 80) + "\n")
@@ -311,13 +288,9 @@ def _run_training_background(safe_word: str, allow_no_personal: bool):
pass
try:
# 1) Ensure venv (auto-installs)
_ensure_training_venv(log_path)
# 2) Ensure datasets (auto-installs / skips if already present)
_ensure_training_datasets(log_path)
# 3) Run training
if wake_word_title:
cmd_str = f"{TRAIN_CMD} '{safe_word}' '{wake_word_title}'"
else:
@@ -361,7 +334,6 @@ def _run_training_background(safe_word: str, allow_no_personal: bool):
STATE["training"]["running"] = False
# -------------------- Routes --------------------
@app.get("/", response_class=HTMLResponse)
def index():
html_path = STATIC_DIR / "index.html"
@@ -394,10 +366,12 @@ def start_session(payload: Dict[str, Any]):
STATE["takes_per_speaker"] = takes_per_speaker
STATE["takes_received"] = 0
STATE["takes"] = []
# do not interrupt training if running
_reset_personal_samples_dir()
# Always wipe log on start_session (even if same wakeword)
_clear_training_log()
return {
"ok": True,
"raw_phrase": raw,
@@ -523,64 +497,42 @@ def train_now(payload: Dict[str, Any] = None):
@app.get("/api/train_status")
def train_status(
offset: int = Query(0, ge=0),
max_bytes: int = Query(65536, ge=1024, le=262144),
last_size: int = Query(0, ge=0),
last_mtime: float = Query(0.0, ge=0.0),
):
def train_status():
"""
Stream training output from the log file on disk.
Robust to log overwrite/truncation:
- UI passes offset + last_size + last_mtime
- If file shrinks or mtime goes backwards/changes weirdly, reset offset to 0
Return only NEW lines since last poll (prevents UI duplication spam even if UI appends).
"""
with STATE_LOCK:
tr = dict(STATE["training"])
log_path_str = tr.get("log_path")
prev_tail = list(STATE["training"].get("last_sent_tail") or [])
prev_size = int(STATE["training"].get("last_log_size") or 0)
log_text = ""
next_offset = offset
log_size = 0
log_mtime = 0.0
new_lines: List[str] = []
full_tail: List[str] = []
size_now = 0
if log_path_str:
p = Path(log_path_str)
if p.exists():
try:
st = p.stat()
log_size = int(st.st_size)
log_mtime = float(st.st_mtime)
size_now = int(p.stat().st_size)
except Exception:
size_now = 0
# Detect overwrite/truncate/reset:
# - file shrank
# - file mtime moved "backwards" (rare) or changed while size reset
# If anything indicates a reset, restart from beginning.
if (log_size < last_size) or (last_mtime and log_mtime < last_mtime):
offset = 0
# If file was truncated/cleared, reset history
if size_now < prev_size:
prev_tail = []
# Clamp offset to current file size
if offset > log_size:
offset = log_size
full_tail = _read_tail_lines(p, TRAIN_LOG_TAIL_LINES)
new_lines = _compute_new_lines(prev_tail, full_tail)
# Read incrementally from the file
with p.open("rb") as f:
f.seek(offset)
chunk = f.read(max_bytes)
log_text = chunk.decode("utf-8", errors="replace")
next_offset = offset + len(chunk)
except Exception as e:
log_text = f"\n[log read error: {e!r}]\n"
next_offset = offset
tr["log_text"] = log_text
tr["next_offset"] = next_offset
tr["log_size"] = log_size
tr["log_mtime"] = log_mtime
# Save snapshot for next poll
with STATE_LOCK:
STATE["training"]["last_sent_tail"] = full_tail
STATE["training"]["last_log_size"] = size_now
tr["log_text"] = "\n".join(new_lines) # ONLY new lines
tr["log_tail_preview"] = "\n".join(full_tail) # optional: handy for debugging
return {"ok": True, "training": tr}