diff --git a/.gitignore b/.gitignore index 0b19152..f64dea2 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,6 @@ result # Claude Code local session state .claude/ + +# Generated by scripts/stage-mobile.sh (the desktop GUI staged for the mobile bundle) +mobile-pair/app/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c45a21..7020318 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Optional cross-platform CPU **streaming ASR** backend (sherpa-onnx) behind the + `[streaming]` extra, with model keys `sherpa-stream-en` (zipformer-20M, ultra-fast) and + `sherpa-nemotron-en` (NeMo FastConformer-RNNT 0.6B, accurate). Fully additive — nothing + changes when the extra is absent. See `docs/streaming-asr.md` and + `docs/streaming-asr-benchmark.md`. +- **Unified Android app**: the phone now runs the *same* web GUI as the desktop, with a native + on-device engine instead of the Python backend. A `LocalBackend` (`gui/static/backend-local.js`) + drives the `tauri-plugin-voxasr` plugin, which records then transcribes the clip at stop with + **offline Whisper** (`whisper-base.en` default, `VOXASR_MODEL=whisper-small.en` for accuracy) — + full punctuation, fully offline (the APK strips the `INTERNET` permission; `RECORD_AUDIO` only). + Python-only features (diarization, AI summarize, system audio) are hidden on-device. See + `tauri-plugin-voxasr/README.md`. + ## [0.3.0] - 2026-06-03 ### Added diff --git a/README.md b/README.md index df35587..fcab1fb 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ git clone https://github.com/dmarzzz/VoxTerm.git cd voxterm python3 -m venv .venv source .venv/bin/activate -pip install -r requirements.txt +pip install -e . # add ".[streaming]" for the optional streaming ASR backend python3 -m tui.app ``` @@ -129,6 +129,24 @@ Press `P` to manage your speaker profile library (rename, delete, wipe all data) Models download automatically on first use. +### Optional: streaming ASR (cross-platform, CPU) + +`pip install "voxterm[streaming]"` adds a sherpa-onnx **streaming** backend (word-by-word, +CPU-only, Linux/macOS-arm64/Windows) with two model keys — `sherpa-stream-en` (zipformer-20M, +ultra-fast) and `sherpa-nemotron-en` (NeMo 0.6B, accurate). Fully opt-in: absent, nothing +changes. See [docs/streaming-asr.md](docs/streaming-asr.md) and the +[benchmark](docs/streaming-asr-benchmark.md). + +### Android: the same GUI, on-device (offline) + +The Android app runs the **same web GUI as the desktop**, but with a native on-device engine instead +of the Python backend: it records, then transcribes **entirely on the phone** with offline Whisper +via [`tauri-plugin-voxasr/`](tauri-plugin-voxasr/) — no pairing, no relay, **no network** (the APK +strips the `INTERNET` permission). Build it with `scripts/android-dev.sh --debug` (it fetches the +bundled model on first run). `whisper-base.en` ships by default; set +`VOXASR_MODEL=whisper-small.en` for higher accuracy. See +[tauri-plugin-voxasr/README.md](tauri-plugin-voxasr/README.md). + ## Project Structure ``` diff --git a/audio/capture.py b/audio/capture.py index 39b0062..cbef640 100644 --- a/audio/capture.py +++ b/audio/capture.py @@ -1,5 +1,6 @@ import math import queue +import sys import numpy as np import sounddevice as sd from scipy.signal import resample_poly @@ -81,9 +82,23 @@ def start(self): # Reset filter state to avoid a click on resume. if self._noise_filter is not None: self._noise_filter.reset() - dev_info = sd.query_devices(kind='input') + try: + dev_info = sd.query_devices(kind='input') + except Exception as e: + if sys.platform == "darwin": + raise RuntimeError( + "No microphone available. On macOS, grant Microphone permission to this app " + "in System Settings > Privacy & Security > Microphone, then retry." + ) from e + raise self._device_name = dev_info['name'] native_channels = dev_info['max_input_channels'] + if not native_channels and sys.platform == "darwin": + raise RuntimeError( + "The input device reports 0 channels — on macOS this usually means Microphone " + "permission hasn't been granted. Allow it in System Settings > Privacy & " + "Security > Microphone, then retry." + ) self._native_rate = int(dev_info['default_samplerate']) # Compute resample ratio as integer up/down factors if self._native_rate != SAMPLE_RATE: diff --git a/audio/mix.py b/audio/mix.py new file mode 100644 index 0000000..92bef7f --- /dev/null +++ b/audio/mix.py @@ -0,0 +1,24 @@ +"""Time-aligned mixing of two equal-rate mono float32 chunk streams (mic + system audio). + +The single home for this operation — both the TUI (`tui/app.py`) and the GUI engine +(`gui/engine.py`) drive recording from a mic stream plus an optional system-audio stream. +(Distinct from `audio/merger.py`, which is the P2P energy-weighted multi-peer mixer.) +""" + +from __future__ import annotations + +import numpy as np + + +def mix_chunks(mic: list, sysaud: list) -> list: + """Sum the overlapping chunks (clipped to [-1, 1]), then append each stream's tail. + + The streams arrive as lists of equal-length float32 chunks; we sum index-for-index + for the first ``min(len)`` chunks and keep whichever stream has the longer tail, so + no audio is dropped when one side is briefly ahead. + """ + n = min(len(mic), len(sysaud)) + mixed = [np.clip(mic[i] + sysaud[i], -1.0, 1.0) for i in range(n)] + mixed.extend(mic[n:]) + mixed.extend(sysaud[n:]) + return mixed diff --git a/audio/transcriber.py b/audio/transcriber.py index 482e116..c183080 100644 --- a/audio/transcriber.py +++ b/audio/transcriber.py @@ -382,11 +382,39 @@ def __init__(self, model: str = "small", language: str | None = "en"): self._init_dedup() def load(self): - """Pre-load the model (downloads on first run).""" + """Pre-load the model (downloads on first run) and warm the decoder. + + Tuning (all env-overridable, measured on CPU): explicit compute_type (CT2 'auto' + already resolves to int8 on CPU, but be explicit so a CUDA box doesn't silently pick + float16), an explicit cpu_threads count (CT2's default oversubscribes hybrid P+E CPUs), + and greedy decoding (beam_size=1) on CPU — ~1.3-2x faster than beam 5 with no measurable + accuracy loss on the short pre-VAD'd clips this feeds. A final dummy decode JITs CT2's + kernels so the user's first real transcription isn't the cold path. + """ + import os from faster_whisper import WhisperModel - self._model = WhisperModel( - self.model_size, device="auto", compute_type="auto", - ) + try: + import torch + cuda = torch.cuda.is_available() + except Exception: + cuda = False + device = os.environ.get("VOXTERM_FW_DEVICE") or ("cuda" if cuda else "cpu") + compute = os.environ.get("VOXTERM_FW_COMPUTE") or ("float16" if device == "cuda" else "int8") + self._beam = int(os.environ.get("VOXTERM_FW_BEAM") or (5 if device == "cuda" else 1)) + kw = {"device": device, "compute_type": compute} + if device == "cpu": + try: + default_threads = max(1, min(6, (os.cpu_count() or 4) // 2)) + except Exception: + default_threads = 4 + kw["cpu_threads"] = int(os.environ.get("VOXTERM_FW_CPU_THREADS") or default_threads) + self._model = WhisperModel(self.model_size, **kw) + try: # warm the decoder off the user's hot path + warm = np.zeros(16000, dtype=np.float32) + for _ in self._model.transcribe(warm, language=self._language, beam_size=1, vad_filter=False)[0]: + pass + except Exception: + pass self._loaded = True def transcribe(self, audio: np.ndarray, **kwargs) -> dict: @@ -397,7 +425,7 @@ def transcribe(self, audio: np.ndarray, **kwargs) -> dict: segments, _info = self._model.transcribe( audio, language=self._language, - beam_size=5, + beam_size=getattr(self, "_beam", 1), vad_filter=False, # we already run Silero VAD upstream ) text = " ".join(seg.text.strip() for seg in segments).strip() @@ -415,6 +443,152 @@ def is_loaded(self) -> bool: return self._loaded +# --- optional cross-platform streaming backend (sherpa-onnx) ----------------- + +_SHERPA_RELEASE = "https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models" +# repo dir name -> download tarball. All are transducer models (encoder/decoder/joiner/tokens). +_SHERPA_MODEL_URLS = { + "sherpa-onnx-streaming-zipformer-en-20M-2023-02-17": + f"{_SHERPA_RELEASE}/sherpa-onnx-streaming-zipformer-en-20M-2023-02-17.tar.bz2", + "sherpa-onnx-nemotron-speech-streaming-en-0.6b-560ms-int8-2026-04-25": + f"{_SHERPA_RELEASE}/sherpa-onnx-nemotron-speech-streaming-en-0.6b-560ms-int8-2026-04-25.tar.bz2", +} + + +def _model_complete(d: "Path") -> bool: + """True only when ALL four artifacts are present (so a half-extracted dir isn't trusted).""" + return (d.is_dir() and any(d.glob("*encoder*.onnx")) and any(d.glob("*decoder*.onnx")) + and any(d.glob("*joiner*.onnx")) and (d / "tokens.txt").exists()) + + +def _ensure_sherpa_model(repo: str) -> "Path": + """Return a local dir holding the streaming-zipformer ONNX files, downloading + extracting + the published tarball on first use. Cached under ~/.cache/voxterm/sherpa//. Both the + download and the extraction are atomic, and a partial/corrupt cache self-heals.""" + from pathlib import Path + import shutil + import tarfile + import urllib.request + + cache = Path.home() / ".cache" / "voxterm" / "sherpa" + target = cache / repo + if _model_complete(target): + return target + shutil.rmtree(target, ignore_errors=True) # wipe any partial/corrupt extraction + cache.mkdir(parents=True, exist_ok=True) + tarball = cache / (repo + ".tar.bz2") + if not tarball.exists(): + tmp = tarball.with_suffix(".part") + try: + urllib.request.urlretrieve(_SHERPA_MODEL_URLS[repo], tmp) # noqa: S310 (pinned github release URL) + except Exception: + tmp.unlink(missing_ok=True) # don't leak a partial download + raise + tmp.rename(tarball) + # extract into a sibling staging dir, then atomically move into place — an interrupted + # extraction never leaves a half-populated dir that _model_complete would accept. + staging = cache / (repo + ".extracting") + shutil.rmtree(staging, ignore_errors=True) + with tarfile.open(tarball, "r:bz2") as tf: + tf.extractall(staging, filter="data") # produces staging// (safe filter) + extracted = staging / repo + if not _model_complete(extracted): + shutil.rmtree(staging, ignore_errors=True) + raise RuntimeError(f"sherpa model tarball for {repo} is incomplete after extraction") + extracted.rename(target) + shutil.rmtree(staging, ignore_errors=True) + return target + + +# Public alias so external callers (the GUI live loop) don't import the private name. +is_hallucination = _is_hallucination + + +class SherpaStreamingTranscriber(_DeduplicatorMixin): + """Cross-platform CPU streaming ASR via sherpa-onnx (k2-fsa). Optional backend — only + reachable when sherpa-onnx is installed (config gates the model key). Per-call + create_stream makes it a drop-in for the existing chunked callers; the live loop can also + drive it as a true streaming recognizer.""" + + def __init__(self, model: str = "sherpa-onnx-streaming-zipformer-en-20M-2023-02-17", + language: str | None = "en"): + self.model_id = model + self._language = language + self._rec = None + self._loaded = False + self._init_dedup() + + # Public surface for the GUI live loop, so it never reaches into underscore-privates. + @property + def recognizer(self): + """The underlying sherpa OnlineRecognizer (valid after load()).""" + return self._rec + + def reset_dedup(self): + """Clear consecutive-duplicate state — call when (re)starting a live stream.""" + self._init_dedup() + + def is_duplicate(self, text: str) -> bool: + """True if `text` repeats the immediately-preceding finalized text.""" + return self._is_duplicate(text) + + def load(self): + try: + import sherpa_onnx + except ImportError as e: + raise RuntimeError( + 'sherpa-onnx is not installed; install it to use streaming models ' + '(pip install "voxterm[streaming]" or pip install sherpa-onnx).' + ) from e + d = _ensure_sherpa_model(self.model_id) + + def _pick(*globs): + for g in globs: + hits = sorted(d.glob(g)) + if hits: + return str(hits[0]) + raise RuntimeError( + f"sherpa model dir {d} is missing a required file (looked for {globs!r}); " + "delete it and re-run to re-download." + ) + + enc = _pick("*encoder*.int8.onnx", "*encoder*.onnx") + dec = _pick("*decoder*.int8.onnx", "*decoder*.onnx") + joi = _pick("*joiner*.int8.onnx", "*joiner*.onnx") + tokens = _pick("tokens.txt") + self._rec = sherpa_onnx.OnlineRecognizer.from_transducer( + tokens=tokens, encoder=enc, decoder=dec, joiner=joi, + num_threads=2, provider="cpu", enable_endpoint_detection=True, + rule1_min_trailing_silence=2.4, rule2_min_trailing_silence=1.2, + rule3_min_utterance_length=20.0, + ) + self._loaded = True + + def transcribe(self, audio: np.ndarray, **kwargs) -> dict: + rms = float(np.sqrt(np.mean(audio ** 2))) + if rms < 0.005: + return {"text": "", "speaker": "", "speaker_id": 0} + s = self._rec.create_stream() + s.accept_waveform(16000, np.ascontiguousarray(audio, dtype=np.float32)) + while self._rec.is_ready(s): + self._rec.decode_stream(s) + s.input_finished() + while self._rec.is_ready(s): + self._rec.decode_stream(s) + text = (self._rec.get_result(s) or "").strip() + if text and text.isupper(): + # only sentence-case models that emit ALL-CAPS (zipformer); leave models with native + # casing + punctuation (nemotron) untouched. + text = text.capitalize() + if not text or _is_hallucination(text, self._language) or self._is_duplicate(text): + return {"text": "", "speaker": "", "speaker_id": 0} + return {"text": text, "speaker": "", "speaker_id": 0} + + @property + def is_loaded(self) -> bool: # @property to match every other backend's contract + return self._loaded + + def get_transcriber(model_name: str, *, language: str | None = "en"): """Construct the transcriber backend for a model key. @@ -429,6 +603,7 @@ def get_transcriber(model_name: str, *, language: str | None = "en"): FASTER_WHISPER_MODELS, PARAKEET_MODELS, QWEN3_MODELS, + SHERPA_MODELS, ) model_repo = AVAILABLE_MODELS[model_name] @@ -436,6 +611,8 @@ def get_transcriber(model_name: str, *, language: str | None = "en"): return Qwen3Transcriber(model=model_repo, language=language) if model_name in PARAKEET_MODELS: return ParakeetTranscriber(model=model_repo, language=language) + if model_name in SHERPA_MODELS: + return SherpaStreamingTranscriber(model=model_repo, language=language) if model_name in FASTER_WHISPER_MODELS: return FasterWhisperTranscriber(model=model_repo, language=language) return WhisperTranscriber(model=model_repo) diff --git a/config.py b/config.py index c94bf5b..3fd8d14 100644 --- a/config.py +++ b/config.py @@ -2,6 +2,7 @@ VERSION = "0.3.0" +import importlib.util import sys import platform @@ -90,6 +91,21 @@ else: raise RuntimeError(f"Unsupported platform: {sys.platform}") +# Optional cross-platform streaming backend (sherpa-onnx). Surfaced ONLY when the package is +# installed AND a wheel exists for this platform (there is no Intel-macOS wheel). 100% additive: +# if absent, SHERPA_MODELS stays empty, AVAILABLE_MODELS/DEFAULT_MODEL are byte-for-byte unchanged, +# and the transcriber's sherpa dispatch branch is unreachable. sherpa statically links its own +# ONNX Runtime, so it cannot collide with VoxTerm's pinned onnxruntime (Silero VAD / 3D-Speaker). +_HAS_SHERPA = ( + importlib.util.find_spec("sherpa_onnx") is not None + and not (sys.platform == "darwin" and platform.machine() != "arm64") +) +if _HAS_SHERPA: + AVAILABLE_MODELS["sherpa-stream-en"] = "sherpa-onnx-streaming-zipformer-en-20M-2023-02-17" + # nemotron-EN streaming (NeMo FastConformer-RNNT, 0.6B, exported for sherpa-onnx) + AVAILABLE_MODELS["sherpa-nemotron-en"] = "sherpa-onnx-nemotron-speech-streaming-en-0.6b-560ms-int8-2026-04-25" +SHERPA_MODELS = {"sherpa-stream-en", "sherpa-nemotron-en"} if _HAS_SHERPA else set() + # Language forcing for Qwen3-ASR (None = auto-detect) DEFAULT_LANGUAGE = "en" AVAILABLE_LANGUAGES = { diff --git a/dev/requirements-dev.txt b/dev/requirements-dev.txt index 22b4dcd..1dd7cc9 100644 --- a/dev/requirements-dev.txt +++ b/dev/requirements-dev.txt @@ -1,2 +1,4 @@ pytest>=8.0 pytest-timeout>=2.0 +websocket-client>=1.0 # scripts/gui_e2e.py (CDP) +soundfile>=0.12 # scripts/bench_asr.py (WAV loading) diff --git a/docs/ios-thinclient.md b/docs/ios-thinclient.md new file mode 100644 index 0000000..7db153f --- /dev/null +++ b/docs/ios-thinclient.md @@ -0,0 +1,34 @@ +# VoxTerm iOS thin-client + +The iOS app is the same Tauri v2 thin-client as Android: a WebView that pairs to the +VoxTerm backend running on your **desktop** over your LAN. The phone does **no transcription +and requests no microphone** — your computer owns the mic and the models. + +## Privacy posture +- No `RECORD_AUDIO` / camera / location. The only network egress is token-gated HTTP to the + desktop you pair with, on your own subnet. No cloud. +- ATS is relaxed **only** for local networking (`NSAllowsLocalNetworking`, *not* + `NSAllowsArbitraryLoads`) so the app can reach a plain-HTTP LAN desktop — see + `src-tauri/Info.ios.plist`. iOS 14+ shows a one-time Local Network permission prompt. + +## Build (requires a Mac + Xcode — cannot be built off a Mac) +```bash +xcode-select --install # Xcode command-line tools +sudo gem install cocoapods # cargo tauri ios init needs CocoaPods +scripts/ios-dev.sh --dev # simulator (shares the mac's localhost → pair to 127.0.0.1:8740) +scripts/ios-dev.sh --build # device build +``` +`scripts/ios-dev.sh` adds the iOS rust targets, runs `cargo tauri ios init` (XcodeGen + +CocoaPods → `src-tauri/gen/apple/`) on first run, then `cargo tauri ios dev|build`. On a +non-Mac it exits cleanly (no-op). + +## Signing +- **Simulator / personal device:** Xcode automatic signing with a free Apple ID (7-day + provisioning) is enough to run on your own iPhone. +- **CLI / TestFlight:** set `APPLE_DEVELOPMENT_TEAM` (paid Apple Developer Program, $99/yr). + `developmentTeam` is intentionally **not** committed to `tauri.conf.json`. + +## Using it +On a real iPhone: same Wi-Fi as the desktop → run `VOXTERM_GUI_LAN=1 python -m gui.server` +on the desktop, enter its LAN IP + the printed token in the pairing screen, tap **Allow** on +the Local Network prompt. (The simulator can use `127.0.0.1:8740` directly.) diff --git a/docs/streaming-asr-benchmark.md b/docs/streaming-asr-benchmark.md new file mode 100644 index 0000000..5e50317 --- /dev/null +++ b/docs/streaming-asr-benchmark.md @@ -0,0 +1,54 @@ +# Streaming ASR benchmark — sherpa-onnx backends vs faster-whisper + +VoxTerm's optional `[streaming]` extra adds a cross-platform, CPU-only **streaming** ASR +backend (sherpa-onnx). This compares it against the existing faster-whisper models on +accuracy (WER) and CPU speed (RTF), to justify the addition. + +## Results + +Host: Linux x86_64, CPU only (no GPU). 3 labeled clips (clean LibriSpeech-style read +speech, 28.2 s total) bundled with the zipformer model. Reproduce with +`python scripts/bench_asr.py`. + +| backend (model key) | model | WER ↓ | RTF ↓ (CPU) | streaming | load | +|---|---|---|---|---|---| +| `fw-small` *(og default)* | faster-whisper small | **2.1%** | 0.642 | no (batch) | 2.4 s | +| `fw-base` | faster-whisper base | 5.1% | 0.176 | no (batch) | 8.4 s | +| `sherpa-nemotron-en` | NeMo FastConformer-RNNT 0.6B (int8) | 4.4% | 0.248 | **yes** | 4.4 s | +| `sherpa-stream-en` | zipformer-20M (int8) | 20.9% | **0.064** | **yes** | 1.0 s | + +*WER is normalized (uppercase, alphanumerics only) so case/punctuation differences between +backends don't skew it. RTF = wall-clock ÷ audio-duration; lower is faster, <1.0 = faster than +real time.* + +## Reading it + +- **`fw-small` is the most accurate** (2.1%) and stays the default for the record→stop→ + transcribe (batch) path. But it's batch-only and the slowest here (RTF 0.64). +- **`sherpa-nemotron-en` is the streaming sweet spot:** near-`fw-base` accuracy (4.4%) with + a healthy ~4× real-time CPU speed (RTF 0.25) **and** native word-by-word streaming — which + is exactly what the live view wants and which faster-whisper can't do. +- **`sherpa-stream-en` (zipformer-20M) trades accuracy for raw speed:** ~16× real-time + (RTF 0.064), but 20.9% WER — it's a 20M-param model. Good for ultra-low-latency / weak + hardware where a rough live caption is fine. +- All three sherpa numbers come from the SAME optional backend; nothing changes for users + who don't install the extra. + +## Honest caveats + +- **Tiny labeled set (3 clips / 28 s).** WER differences are within noise — treat WER as + indicative, not a leaderboard. RTF is the reliable signal here. A rigorous WER pass would + use full LibriSpeech test-clean (2620 utts), which is too slow to run 4× on CPU for this. +- Clean read speech only; no overlapping speakers, noise, or accents — real-room WER will be + higher for every backend. +- RTF is single-clip per-call (`tr.transcribe`); the true *streaming* live path feeds frames + incrementally, so perceived latency is lower than these batch-style RTF numbers suggest. +- nemotron-EN here is the **English sibling**; the multilingual nemotron-3.5 (`.nemo`) needs a + custom ONNX export before it can be benchmarked (the same backend would carry it once exported). + +## Methodology + +`scripts/bench_asr.py`: for each installed backend, load via `get_transcriber(key)`, transcribe +each clip with `tr.transcribe()`, compute word-level edit-distance WER vs the bundled +`trans.txt` references, and time the transcribe calls for RTF. Backends absent (e.g. sherpa not +installed) are skipped. diff --git a/docs/streaming-asr.md b/docs/streaming-asr.md new file mode 100644 index 0000000..8c86ec2 --- /dev/null +++ b/docs/streaming-asr.md @@ -0,0 +1,57 @@ +# Streaming ASR (optional backend) + +VoxTerm's optional `[streaming]` extra adds a **cross-platform, CPU-only, streaming** ASR +backend via [sherpa-onnx](https://github.com/k2-fsa/sherpa-onnx). Unlike the default +faster-whisper (which transcribes in batches after you stop), these models decode +**word-by-word as you speak** — ideal for the live view. + +It is 100% **opt-in and additive**: without the extra installed, nothing about VoxTerm +changes (the models simply don't appear). + +## Install + +```bash +pip install "voxterm[streaming]" +``` + +The wheel is CPU-only and ships for Linux, macOS (Apple Silicon), and Windows. *(There is no +Intel-macOS wheel, so the key is hidden there.)* sherpa-onnx statically links its own ONNX +Runtime, so it cannot conflict with VoxTerm's `onnxruntime` (used by the diarizer / VAD). + +## Models + +Two keys appear once the extra is installed (the model downloads to +`~/.cache/voxterm/sherpa/` on first use): + +| key | model | character | +|---|---|---| +| `sherpa-stream-en` | streaming zipformer-20M (int8) | ~16× real-time on CPU, rough (small model) | +| `sherpa-nemotron-en` | NeMo FastConformer-RNNT 0.6B (int8) | near-`fw-base` accuracy, ~4× real-time, **streaming** | + +See the measured [benchmark](./streaming-asr-benchmark.md). + +## Use + +- **GUI:** pick the model in the dropdown, then record/transcribe as usual. The **live + transcript** view automatically prefers `sherpa-stream-en` (when installed) and streams the + text in word-by-word, finalizing a line on a pause. +- **TUI:** pass the key like any model: `python -m tui.app -m sherpa-nemotron-en`. + +The default model is unchanged (`fw-small` on Linux/Intel, MLX on Apple Silicon) — streaming +is something you opt into per use. + +## How it works + +`audio/transcriber.py:SherpaStreamingTranscriber` wraps sherpa-onnx's `OnlineRecognizer` +(transducer: encoder/decoder/joiner). Per-call `create_stream()` makes it a drop-in for the +existing chunked callers; `gui/engine.py:_live_stream_loop` drives a single persistent stream +for true streaming in the live view (endpoint detection finalizes each line). New model keys +are added to the registry in `audio/transcriber.py` (`_SHERPA_MODEL_URLS`) + gated in +`config.py` (surfaced only when `sherpa_onnx` is importable on a supported platform). + +## Notes + +- The bundled `sherpa-nemotron-en` is the **English** sibling of NVIDIA's + `nemotron-3.5-asr-streaming` family; the multilingual `.nemo` checkpoint needs a custom + ONNX export before it can be wired (the same backend would carry it). +- Reproduce the benchmark: `python scripts/bench_asr.py`. Browser e2e: `python scripts/gui_e2e.py`. diff --git a/gui/.gitignore b/gui/.gitignore new file mode 100644 index 0000000..7a60b85 --- /dev/null +++ b/gui/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.pyc diff --git a/gui/README.md b/gui/README.md new file mode 100644 index 0000000..c8db840 --- /dev/null +++ b/gui/README.md @@ -0,0 +1,185 @@ +# VoxTerm GUI + +A small web control app over VoxTerm's own engine. Hit a button to record, stop, +transcribe, and diarize; review the result; and export an AI-ready transcript — all +from a browser tab (including your phone, on your own network). + +It is a thin control surface, not a reimplementation. Recording uses VoxTerm's +`audio.capture.AudioCapture`; transcription drives the same transcriber + Silero VAD + +diarizer + `EventLogger` the TUI uses; the AI export is a pure function of the same +`events.jsonl` stream that the TUI emits. Nothing about the speech pipeline is +duplicated here. + +## What it does (v1) + +A single linear flow: + +1. **Record** — pick a model + language + **audio source** (microphone, system/loopback + audio i.e. "what's playing", or both mixed), hit the button, talk. +2. **Stop** — captured audio is written to a WAV. +3. **Transcribe + diarize** — runs in the background; a progress bar tracks it. +4. **Export** — automatically produces an AI-ready `-agent.md` + `-agent.json`, plus + `.srt` / `.vtt` subtitles. +5. **History** — every past session is listed in the sidebar; click to reopen. +6. **Rename** — relabel a diarized speaker; the rename flows into your copy/download. + +Review extras: per-turn timestamps and markers, **inline audio playback** of the recording +(click a timestamp to seek, or download the WAV), and exports rendered +**server-side by `export.py`** (the single formatter) with your speaker renames applied — +**Copy for AI**, **Summarize for AI** (transcript prefixed with a ready-to-paste LLM +summarization task), **Summarize with local LLM** (runs the TUI's own summarizer in-app — +MLX on Apple Silicon, or an `ollama:` backend set in settings — and shows the result +inline; surfaces a clear message when no backend is present), and `.md` / `.json` / `.srt` / +`.vtt` downloads. Because the server renders the exports, a download byte-matches the on-disk +artifact (only your renames differ). + +## Scope & parity with the TUI + +The GUI drives the **same engine** as the TUI — `get_transcriber` (incl. the faster-whisper +and sherpa backends plus the dedup / hallucination filters), Silero VAD (identical windowing), +the diarizer, and the `EventLogger` — so the saved transcript is produced by the same code, and +the model list + CPU-aware default match the TUI. Some TUI features are intentionally **out of +scope** for a personal record→review web app: + +- **P2P party / hivemind** — live-collaboration / aggregation concerns that need raw UDP + sockets + mDNS a browser sandbox can't open. (The GUI still *renders* imported peer + transcripts; it just can't host/join a live mesh.) +- **Dictation mode** (global hotkey + system-wide keystroke injection) — an OS-native + capability (Quartz/xdotool/wtype) that can't run from a sandboxed page. +- **Cross-session speaker recognition** (the SQLite `SpeakerStore` biometric identity layer) — + the GUI offers per-session manual **rename** instead, not persisted across sessions, and + honestly labeled "diarization clusters / your renames, not verified identities" in exports. + (Because that identity layer is absent, the `[~]` *uncertain-attribution* marker it would + drive does not appear in GUI-produced transcripts.) +- **Mid-session language switching** — language is chosen up front. +- **Live word-by-word preview** — by design, recording shows a level meter + indicator and the + accurate, diarized transcript appears when you **stop**. One model, no mid-stream guesses to + reconcile against the final result. (The streaming-monitor code path remains for the optional + `[streaming]` backend but is off in the default flow.) + +System/loopback audio capture reuses the engine's existing backends (macOS ScreenCaptureKit, +Linux `parec`); it's unavailable on Windows (no engine support there) and degrades with a clear +error if the platform tool is missing. + +The GUI also *adds* value the TUI lacks: the rich `.md`/`.json`/`.srt`/`.vtt` export, inline +audio playback + timestamp-seek of the recording, and a clean keyboard-driven review UI. + +It's also a **PWA** — install it to your phone/desktop home screen for an app-like, +offline-capable shell. Your model + language picks are remembered (localStorage), and +keyboard shortcuts work (**Space** or **R** to record, **Esc** to close the sidebar), +with focus rings and aria-live status for accessibility. + +## How to run + +```bash +python -m gui.server +# -> http://127.0.0.1:8740 (loopback only) +``` + +By default it binds `127.0.0.1` — reachable only from this machine. + +Optional env: + +| Var | Default | Effect | +|-----|---------|--------| +| `VOXTERM_GUI_PORT` | `8740` | listen port | +| `VOXTERM_GUI_LAN` | unset | `=1` binds `0.0.0.0` and requires a token (see below) | +| `VOXTERM_GUI_TOKEN` | auto | set your own LAN token; otherwise one is generated | + +### Phone / LAN access + +The app records a real room, so exposing it to the network is gated behind a token +that must be present on **every** `/api/*` call. + +```bash +VOXTERM_GUI_LAN=1 python -m gui.server +``` + +On start it prints the exact URL to open from your phone: + +``` +http://:8740/?token= +``` + +Open that URL on a device on the same network. The page reads `?token=…` from the URL +and attaches it to every API request and the status stream automatically. Without a +valid token, every `/api/*` call returns `401`. + +## Privacy and security model + +- **Loopback by default.** No token, no network exposure — only this machine can reach it. +- **Token-gated LAN.** With `VOXTERM_GUI_LAN=1`, every `/api/*` request must carry the + token (header `X-VoxTerm-Token`, `Authorization: Bearer …`, or `?token=…`), checked with + a constant-time compare. This guards both starting a recording of the room and reading + past transcripts. +- **Transcription is fully local.** Models run on this machine via VoxTerm's engine. + Nothing audio-related leaves the host. +- **No audio in any network payload.** The API moves JSON status, option lists, and text + artifacts only — never audio. WAVs stay on disk under `~/voxterm-live/`. +- **Strict CSP.** Same-origin only; no external scripts, fonts, images, or connections. + (`style-src` allows `'unsafe-inline'` for a few computed styles — the level ring, the + progress bar, speaker color dots — all from local, escaped data.) Plus + `X-Content-Type-Options: nosniff` and `Referrer-Policy: no-referrer`. +- **No path traversal.** Static files resolve within `static/` only; session lookups + reject non-bare stems and restrict any `dir` to a known session directory. + +## Files + +| File | Role | +|------|------| +| `server.py` | stdlib `http.server` — serves the UI, a tiny JSON API, and an SSE status stream; handles the loopback/LAN + token gate and CSP. No transcription logic. | +| `engine.py` | Control layer over VoxTerm's engine: start/stop recording (via `AudioCapture`), the background transcribe+export job, live level/status, and session-history listing/reads. | +| `transcribe.py` | Headless transcription: a WAV (or in-memory buffer) → a faithful `events.jsonl` + `-transcript.md`, reusing VoxTerm's transcriber, Silero VAD, diarizer, and `EventLogger`. Also a CLI: `python -m gui.transcribe ROOM.wav`. | +| `export.py` | Pure, replayable export of an `events.jsonl` → `-agent.md` / `.json` / `.srt` / `.vtt`. No audio, no live state. CLI: `python -m gui.export [events.jsonl] [--format md\|json\|srt\|vtt\|all]`. | +| `static/index.html`, `static/app.js`, `static/style.css`, `static/sw.js`, `static/manifest.webmanifest`, `static/icon*` | The self-hosted single-page UI + the PWA service worker, manifest, and icons. | + +### Outputs + +Recordings and their artifacts land in `~/voxterm-live/`. The history sidebar also reads +VoxTerm's own session and live dirs. Per session: + +| Artifact | What it is | +|----------|------------| +| `-gui.wav` | the captured audio (local only) | +| `-events.jsonl` | the canonical VoxTerm event stream (the same one the TUI emits / glass tails) | +| `-transcript.md` | human-readable transcript with timestamps + speaker labels | +| `-agent.md` | AI-ready transcript: YAML front-matter, marker legend, one speaker-attributed, timestamped turn per line | +| `-agent.json` | typed, lossless companion the `-agent.md` is rendered from (each turn carries `t_offset`/`t_offset_end`) | +| `-agent.srt` / `.vtt` | subtitles (SubRip / WebVTT) rendered from the per-turn timestamps | + +`events.jsonl` is the source of truth: each line is one JSON object +(`{"t", "kind", …}`). The exporter is a pure reduction of that stream — `text` events +carry an `audio_offset`/`audio_end` so timestamps are true offsets into the recording. + +### API surface + +`GET /api/options` · `GET /api/status` · `GET /api/sessions` · `GET /api/session` · +`GET /api/events` (SSE) · `POST /api/record/start` · `POST /api/record/stop` · +`POST /api/transcribe` (transcribe an existing WAV). + +## Models and languages + +Models offered are VoxTerm's faster-whisper keys (`fw-tiny`, `fw-base`, `fw-small`, +`fw-medium`, `fw-large-v3`, `fw-distil-large-v3`); `fw-small` is the default. Languages +come from VoxTerm's `AVAILABLE_LANGUAGES` (default `en`). On CPU, the smaller `fw-*` +models are the practical choices. + +**Optional streaming backend.** Installing the extra (`pip install "voxterm[streaming]"`) +adds two cross-platform CPU streaming models to the dropdown — `sherpa-stream-en` +(zipformer-20M, ultra-fast/rough) and `sherpa-nemotron-en` (NeMo 0.6B, accurate). The live +view prefers them for true word-by-word streaming. Absent, nothing changes. See +[`docs/streaming-asr.md`](../docs/streaming-asr.md) and the +[benchmark](../docs/streaming-asr-benchmark.md). + +## Scope: what this is not (yet) + +v1 is deliberately the linear flow above (record → stop → transcribe → export → +history → rename). Planned fast-follows, not built here: + +- **Live word-by-word streaming** during recording (v1 transcribes after stop). +- **Party / P2P** multi-device sessions (the export already understands `peer` turns). +- **Hivemind** shared/aggregated sessions. +- **Merged view** across multiple sessions. +- **Speaker profiles** (persistent cross-session identities; v1 renames are per-view). +- **Tauri native desktop + iOS/Android** app (the PWA already covers home-screen install; + Tauri is the native / app-store step, wrapping this same web UI). diff --git a/gui/__init__.py b/gui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gui/__main__.py b/gui/__main__.py new file mode 100644 index 0000000..7062b1d --- /dev/null +++ b/gui/__main__.py @@ -0,0 +1,8 @@ +"""`python -m gui` — convenience alias for the GUI launcher (opens the browser GUI). + +For the raw server use `python -m gui.server`; for the desktop app run the Tauri build. +""" +from gui.launcher import main + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/gui/_timefmt.py b/gui/_timefmt.py new file mode 100644 index 0000000..f18cd72 --- /dev/null +++ b/gui/_timefmt.py @@ -0,0 +1,12 @@ +"""The single hh:mm:ss formatter for the GUI — shared by the live transcriber, the +exporter, and the engine so live and exported timestamps round identically (no ±1s drift).""" + +from __future__ import annotations + + +def fmt_hms(seconds: float) -> str: + """Seconds → ``M:SS`` (or ``H:MM:SS`` past an hour), rounded to the nearest second.""" + s = int(round(seconds)) + h, rem = divmod(s, 3600) + m, sec = divmod(rem, 60) + return f"{h}:{m:02d}:{sec:02d}" if h else f"{m:02d}:{sec:02d}" diff --git a/gui/engine.py b/gui/engine.py new file mode 100644 index 0000000..be0a2fa --- /dev/null +++ b/gui/engine.py @@ -0,0 +1,720 @@ +"""GUI control layer over VoxTerm's engine. + +Exposes the operations the GUI drives — start/stop recording (via VoxTerm's own +``AudioCapture``), the background transcribe+export job, and session history — as a +small thread-safe object the HTTP server calls. No transcription/diarization logic +lives here; recording reuses ``audio.capture.AudioCapture`` and the heavy lifting is +``gui.transcribe`` + ``gui.export`` (the reviewed, tested pipeline). + +Core flow: record -> stop -> transcribe (robust, reuses the tested pipeline). The live +monitor (_live_loop) tails the in-progress WAV: VAD-chunked for batch backends +(_live_chunk_loop), or true word-by-word streaming via _live_stream_loop when the optional +sherpa-onnx backend is installed. +""" +from __future__ import annotations + +import os +import struct +import sys +import threading +import time +from datetime import datetime +from pathlib import Path + +_ROOT = str(Path(__file__).resolve().parent.parent) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +import numpy as np # noqa: E402 + +import config # noqa: E402 +from audio.mix import mix_chunks # noqa: E402 +from gui._timefmt import fmt_hms # noqa: E402 +from gui import transcribe, export # noqa: E402 + +OUT_DIR = Path.home() / "voxterm-live" +SR = config.SAMPLE_RATE + + +def _wav_header(data_len: int, sr: int = SR) -> bytes: + """The canonical 44-byte PCM WAV header (mono, s16). ``data_len`` may be 0 as a + placeholder while the file is still growing — it's patched on close. Tailers read raw + PCM past byte 44 regardless, so a placeholder size never breaks live monitoring.""" + return (b"RIFF" + struct.pack(" bytes: + """float32 [-1,1] → little-endian s16 bytes (the recording's PCM encoding).""" + return (np.clip(chunk, -1.0, 1.0) * 32767.0).astype(" list[str]: + # Offer exactly what the TUI offers: every model the engine supports on this host. + # config.py's platform branches + sherpa gate register them all in AVAILABLE_MODELS + # (fw-* on Linux/Intel; qwen3 on Linux/Win when qwen-asr is installed; MLX qwen3/parakeet + # on Apple Silicon; sherpa-* when the [streaming] extra is installed). Earlier this read + # FASTER_WHISPER_MODELS and silently hid installed qwen3 on Linux — fixed to match the TUI. + return sorted(config.AVAILABLE_MODELS) + + def default_model(self) -> str: + # CPU-friendly default (fw-base on Linux/Intel, fw-small fallback; MLX on Apple Silicon) — not the raw + # config.DEFAULT_MODEL, which is qwen3-0.6b when qwen-asr is installed (slow on CPU). + return transcribe.gui_default_model() + + def languages(self) -> dict: + return dict(config.AVAILABLE_LANGUAGES) + + def input_devices(self) -> list[dict]: + """Microphones the user can pick from. Skips ALSA resampler/mixer plugins (noise) and + de-dupes by name; index -1 means 'system default'.""" + out = [{"index": -1, "name": "System default"}] + try: + import sounddevice as sd + skip = ("lavrate", "samplerate", "speex", "upmix", "vdownmix", "dmix", "surround", "jack", "null") + seen = set() + for i, d in enumerate(sd.query_devices()): + if d.get("max_input_channels", 0) <= 0: + continue + name = (d.get("name") or "").strip() + low = name.lower() + if not name or name in seen or any(s in low for s in skip): + continue + seen.add(name) + out.append({"index": i, "name": name}) + except Exception: + pass + return out + + def warm(self) -> None: + """Preload the default model + VAD + diarizer in the background so the first recording + doesn't pay cold-start latency. Best-effort; called once at server startup.""" + threading.Thread(target=lambda: transcribe.preload(language="en"), + daemon=True, name="gui-warm").start() + + # ---- recording ---- + def start_recording(self, device: int | None = None, source: str = "mic") -> dict: + with self._lock: + if self.recording: + return {"ok": True, "already": True} + # source: "mic" (default), "system" (loopback / what's playing), or "both" (mixed). + self._source = source if source in ("mic", "system", "both") else "mic" + self._cap = None + self._sys = None + if self._source in ("mic", "both"): + from audio.capture import AudioCapture + try: # tolerate a malformed device value from the client + dev = int(device) if device is not None else -1 + except (ValueError, TypeError): + dev = -1 + # steer AudioCapture's input to the chosen mic; -1 = system default. We mutate the + # global sd.default.device, so remember the OS default once and restore it when the + # user re-selects "System default" (otherwise a prior explicit choice stays pinned). + try: + import sounddevice as sd + if not self._sd_captured: + cur0 = sd.default.device + self._sd_default_in = list(cur0)[0] if isinstance(cur0, (list, tuple)) else cur0 + self._sd_captured = True + cur = sd.default.device + pair = list(cur) if isinstance(cur, (list, tuple)) else [cur, cur] + pair[0] = dev if dev >= 0 else self._sd_default_in + sd.default.device = tuple(pair) + except Exception: + pass + try: + self._cap = AudioCapture() + self._cap.start() + except Exception as e: # no input device / busy / permission + self._cap = None + self.recording = False + return {"ok": False, "error": f"could not open the microphone: {e}"} + if self._source in ("system", "both"): + try: + from audio.system_capture import SystemCapture + self._sys = SystemCapture() + self._sys.start() + if not self._sys.is_active: # parec missing, no monitor source, SCK denied… + raise RuntimeError(self._sys.status_message or "system audio capture unavailable") + except Exception as e: + try: + if self._cap: + self._cap.stop() + except Exception: + pass + self._cap = None + self._sys = None + self.recording = False + return {"ok": False, "error": f"could not capture system audio: {e}"} + # open the growing WAV now (placeholder header, patched on stop) so the live + # monitor can tail this very recording and click-Live follows what you're saying. + ts = datetime.now().strftime("%Y%m%d-%H%M%S") + self._rec_wav_path = self.out_dir / f"{ts}-gui.wav" + try: + self._rec_file = open(self._rec_wav_path, "wb") + self._rec_file.write(_wav_header(0)) + self._rec_file.flush() + except OSError as e: + try: + self._cap.stop() + except Exception: + pass + self._cap = None + self.recording = False + return {"ok": False, "error": f"could not open the recording file: {e}"} + self._rec_bytes = 0 + self._stop.clear() + self.recording = True + self.started_at = time.time() + self.level = 0.0 + self._poll_thread = threading.Thread(target=self._poll, daemon=True, name="gui-rec-poll") + self._poll_thread.start() + return {"ok": True, "wav": str(self._rec_wav_path)} + + @staticmethod + def _drain(cap) -> list: + """Drain a capture's queued chunks as float32 arrays ([] if absent/errored).""" + if cap is None: + return [] + try: + chunks = cap.drain() + except Exception: + return [] + return [np.asarray(c, dtype=np.float32) for c in chunks if c is not None and len(c)] + + def _poll(self): + while not self._stop.is_set(): + mic = self._drain(self._cap) + sysa = self._drain(self._sys) + fresh = mix_chunks(mic, sysa) if (mic and sysa) else (mic or sysa) + if fresh: + with self._lock: # serialize with stop's finalize + if self._rec_file: + for c in fresh: + b = _pcm_bytes(c) + self._rec_file.write(b) + self._rec_bytes += len(b) + self._rec_file.flush() # make new audio visible to the live tailer + last = fresh[-1] + if len(last): + self.level = float(np.sqrt(np.mean(np.square(last)))) + time.sleep(0.066) # ~15 Hz + + def stop_recording(self, model: str | None = None, language: str = "en", diarize: bool = True) -> dict: + model = model or transcribe.gui_default_model() + if not self.recording: + return {"ok": False, "error": "not recording"} + # Signal + join the poll thread WITHOUT holding self._lock (the poll thread takes + # the lock to append, so holding it here would deadlock). Once joined, no more + # appends can race the final drain/concat/clear. + self._stop.set() + # The live monitor is bound to this recording's lifetime: stop it before we finalize + # the file, so its reader is gone and it can't re-decode the finalized WAV forever or + # run inference concurrently with the post-stop transcribe job. Best-effort (idempotent). + self.live_stop() + if self._poll_thread: + self._poll_thread.join(timeout=5) + with self._lock: + self.recording = False + try: + mic = self._drain(self._cap) # any frames still queued in either stream + sysa = self._drain(self._sys) + for c in (mix_chunks(mic, sysa) if (mic and sysa) else (mic or sysa)): + b = _pcm_bytes(c) + self._rec_file.write(b) + self._rec_bytes += len(b) + except Exception: + pass + for cap in (self._cap, self._sys): + try: + if cap: + cap.stop() + except Exception: + pass + wav = self._rec_wav_path + n_bytes = self._rec_bytes + patched = False + try: # patch the header with the real size → valid WAV + self._rec_file.flush() + self._rec_file.seek(0) + self._rec_file.write(_wav_header(n_bytes)) + self._rec_file.flush() + self._rec_file.close() + patched = True + except Exception as e: # surface I/O failure — don't transcribe a broken file + self.job = {"state": "error", "error": f"could not finalize recording: {e}"} + self._rec_file = None + if not patched: + return {"ok": False, "error": "could not finalize the recording file"} + if n_bytes < SR: # < 0.5s of s16 mono (SR*2 bytes/s → 0.5s = SR bytes) + try: + wav.unlink() + except OSError: + pass + self.job = {"state": "error", "error": "recording too short"} + return {"ok": False, "error": "recording too short"} + self.job = {"state": "transcribing", "frac": 0.0, "msg": "starting", "wav": str(wav)} + # load + transcribe off the request thread (matches transcribe_existing) + threading.Thread( + target=lambda: self._do_transcribe(transcribe.load_wav_16k_mono(wav), model, language, str(wav), diarize), + daemon=True, name="gui-transcribe").start() + return {"ok": True, "wav": str(wav), "seconds": round(n_bytes / (SR * 2), 1)} + + def _do_transcribe(self, audio, model, language, wav, diarize: bool = True): + try: + def prog(frac, msg): + self.job = {"state": "transcribing", "frac": round(frac, 3), "msg": msg, "wav": wav} + r = transcribe.transcribe_audio(audio, self.out_dir, model=model, language=language, progress=prog, diarize=diarize) + md_path, json_path, srt_path, vtt_path = export.export(Path(r["events_path"]), self.out_dir) + stem = Path(r["transcript_path"]).stem.replace("-transcript", "") + # The WAV is named at record time (-gui.wav) but the session stem is the + # transcribe time, so they differ — link the audio under the stem so the GUI can + # offer Download WAV / playback for this session (see audio_path). + self._link_audio(wav, stem) + self.job = {"state": "done", "wav": wav, **r, + "agent_md": str(md_path), "agent_json": str(json_path), + "agent_srt": str(srt_path), "agent_vtt": str(vtt_path), + "stem": stem} + except Exception as e: + self.job = {"state": "error", "error": f"{type(e).__name__}: {e}"} + + def transcribe_existing(self, wav_path: str, model: str | None = None, language: str = "en", diarize: bool = True) -> dict: + """Transcribe an already-recorded WAV (e.g. a prior capture) in the background.""" + model = model or transcribe.gui_default_model() + p = Path(wav_path) + if not p.exists(): + return {"ok": False, "error": "no such file"} + self.job = {"state": "transcribing", "frac": 0.0, "msg": "starting", "wav": str(p)} + threading.Thread(target=lambda: self._do_transcribe(transcribe.load_wav_16k_mono(p), model, language, str(p), diarize), + daemon=True, name="gui-transcribe").start() + return {"ok": True} + + def status(self) -> dict: + with self._lock: # consistent snapshot vs the live thread's writes + live = {"active": self._live["active"], "wav": self._live["wav"], + "lines": self._live["lines"][-120:], "partial": self._live.get("partial")} + return { + "recording": self.recording, + "level": round(self.level, 4), + "elapsed": round(time.time() - self.started_at, 1) if (self.recording and self.started_at) else 0, + "job": self.job, + "live": live, + } + + # ---- live (near-real-time) monitor: tail an in-progress recording's file ---- + def _newest_wav(self): + wavs = sorted(self.out_dir.glob("*.wav"), key=lambda p: p.stat().st_mtime, reverse=True) + return wavs[0] if wavs else None + + def live_start(self, wav: str | None = None) -> dict: + with self._lock: + if self._live_thread and self._live_thread.is_alive(): + return {"ok": False, "error": "live monitor is still stopping; try again"} + if self._live["active"]: + return {"ok": True, "already": True, "wav": self._live["wav"]} + target = Path(wav) if wav else self._newest_wav() + if not target or not target.exists(): + return {"ok": False, "error": "no recording to monitor"} + self._live = {"active": True, "wav": str(target), "lines": [], "partial": None} + from gui.stabilize import PartialStabilizer + self._stab = PartialStabilizer() + self._live_stop.clear() + self._live_thread = threading.Thread(target=self._live_loop, args=(target,), daemon=True, name="gui-live") + self._live_thread.start() + return {"ok": True, "wav": str(target)} + + def live_stop(self) -> dict: + self._live_stop.set() + # Capture the thread into a local: a concurrent live_stop() (e.g. stop_recording calls + # this while a separate /api/live/stop request races it) may null self._live_thread + # between our checks. Operating on the local keeps this idempotent and crash-free — + # joining the same Thread from two callers is safe. + t = self._live_thread + if t and t.is_alive(): + t.join(timeout=3) + if t.is_alive(): # still decoding — don't claim it stopped + return {"ok": False, "error": "live monitor still stopping"} + self._live_thread = None + if isinstance(self._live, dict): + self._live["active"] = False + self._live["partial"] = None + return {"ok": True} + + def _live_loop(self, wav: Path): + # tail raw PCM of the (still-growing) WAV; dispatch to streaming or chunked transcription + from gui.transcribe import _get_engines, gui_default_model + from gui.eot import is_incomplete + from audio.transcriber import SherpaStreamingTranscriber + try: + # Prefer the sherpa streaming backend for the live view when it's installed (opt-in) + # — it streams word-by-word. Else fw-base (light, where it exists), else the platform + # default (MLX on Apple Silicon). dedicated="live" → its OWN transcriber, never + # sharing decode state with the post-stop batch job. + if "sherpa-stream-en" in config.AVAILABLE_MODELS: + live_model = "sherpa-stream-en" + elif "fw-base" in config.AVAILABLE_MODELS: + live_model = "fw-base" + else: + live_model = gui_default_model() # CPU-aware default, never the raw qwen3-0.6b that's unusable on CPU + tr, vad, _d = _get_engines(live_model, "en", dedicated="live") + except Exception as e: + with self._lock: + self._live["lines"].append({"t": "", "text": f"(live engine error: {e})"}) + self._live["active"] = False + return + f = open(wav, "rb") + f.seek(0, 2) # tail from the CURRENT end — only NEW speech (true live, no slow backlog replay) + abs_start = max(0, (f.tell() - 44) // 2) # samples already recorded before we started (for timestamps) + try: + if isinstance(tr, SherpaStreamingTranscriber): + self._live_stream_loop(tr, f, abs_start) + else: + self._live_chunk_loop(tr, vad, f, abs_start, is_incomplete) + finally: + f.close() + with self._lock: + self._live["active"] = False + + def _live_chunk_loop(self, tr, vad, f, abs_start, is_incomplete): + """VAD-windowed transcription for batch backends (fw-*/MLX/qwen3/parakeet). The + original live path, unchanged — finalize speech windows past a tail guard, merge + mid-clause fragments, publish a LocalAgreement-stabilized volatile partial.""" + buf = np.zeros(0, dtype=np.float32) + while not self._live_stop.is_set(): + self._live_stop.wait(8.0) + data = f.read() + if data: + n = len(data) - (len(data) % 2) + if n: + buf = np.concatenate([buf, np.frombuffer(data[:n], dtype="= SR * 2: + segs = vad.get_speech_segments(buf, min_speech_ms=500, min_silence_ms=300, max_speech_s=6.0) + guard = len(buf) - int(SR * 0.6) + consumed = 0 + for (s, e) in segs: + if e > guard: + break + txt = (tr.transcribe(buf[s:e]).get("text") or "").strip() + if txt: + # lock only the brief dict mutation, never the slow transcribe/VAD, so + # status()/SSE sees a consistent snapshot without stalling. + with self._lock: + lines = self._live["lines"] + if lines and is_incomplete(lines[-1]["text"]): + lines[-1]["text"] = (lines[-1]["text"] + " " + txt).strip() + else: + lines.append({"t": fmt_hms((abs_start + s) / SR), "text": txt}) + self._live["lines"] = lines[-200:] + consumed = e + if consumed: + abs_start += consumed + buf = buf[consumed:] + if self._stab: + self._stab.reset() + if self._stab is not None and len(buf) >= int(SR * 0.4): + ptxt = (tr.transcribe(buf).get("text") or "").strip() + st = self._stab.push(ptxt) + partial = ({"t": fmt_hms(abs_start / SR), **st} + if (st["stable"] or st["volatile"]) else None) + with self._lock: + self._live["partial"] = partial + else: + with self._lock: + self._live["partial"] = None + + def _live_stream_loop(self, tr, f, abs_start): + """True word-by-word streaming for the sherpa-onnx backend. One persistent OnlineStream + is fed the freshly-tailed PCM; the running decode is the volatile partial; sherpa's own + endpoint detection finalizes a line. Tighter cadence than the chunked path for low + latency. Same lock discipline on the live-state writes.""" + from audio.transcriber import is_hallucination + tr.reset_dedup() # tr is cached + reused across live sessions; clear stale dedup state + rec = tr.recognizer + st = rec.create_stream() + fed = abs_start # total samples fed (for the current line's start timestamp) + line_start = abs_start + while not self._live_stop.is_set(): + self._live_stop.wait(1.0) + data = f.read() + if data: + n = len(data) - (len(data) % 2) + if n: + frame = np.frombuffer(data[:n], dtype=" list[Path]: + dirs = [self.out_dir] + try: + dirs.append(Path(config.SESSIONS_DIR)) + dirs.append(Path(config.LIVE_DIR)) + except Exception: + pass + seen, uniq = set(), [] + for d in dirs: + if d and d not in seen and d.exists(): + seen.add(d) + uniq.append(d) + return uniq + + def sessions(self) -> list[dict]: + """All sessions across the known dirs, newest first, with which artifacts exist.""" + out = {} + for d in self._session_dirs(): + for f in d.glob("*-transcript.md"): + stem = f.stem[: -len("-transcript")] + out.setdefault((d, stem), {"stem": stem, "dir": str(d), "mtime": f.stat().st_mtime}) + out[(d, stem)]["transcript"] = f.name + for f in d.glob("*-agent.md"): + stem = f.stem[: -len("-agent")] + e = out.setdefault((d, stem), {"stem": stem, "dir": str(d), "mtime": f.stat().st_mtime}) + e["agent_md"] = f.name + e["mtime"] = max(e.get("mtime", 0), f.stat().st_mtime) + for f in d.glob("*-agent.json"): + stem = f.stem[: -len("-agent")] + e = out.setdefault((d, stem), {"stem": stem, "dir": str(d), "mtime": f.stat().st_mtime}) + e["agent_json"] = f.name + items = sorted(out.values(), key=lambda x: x.get("mtime", 0), reverse=True) + for it in items: + it["title"] = self._session_title(it) + return items + + def _session_title(self, entry: dict) -> str: + """A clean, content-based title (the first spoken sentence) instead of a raw timestamp — + ChatGPT-style. Reads only the head of the transcript file; cached by (path, mtime).""" + import re + fname = entry.get("transcript") or entry.get("agent_md") + if not fname: + return "" + p = Path(entry["dir"]) / fname + try: + mt = p.stat().st_mtime + except OSError: + return "" + cache = self.__dict__.setdefault("_title_cache", {}) + key = (str(p), mt) + if key in cache: + return cache[key] + title = "" + try: + for s in p.read_text(encoding="utf-8")[:3000].splitlines(): + s = s.strip() + if not s or s[0] in "#>-" or s.lower().startswith("voxterm"): + continue + s = re.sub(r"^\*{0,2}\[[^\]]*\]\*{0,2}\s*", "", s) # drop [timestamp] + s = re.sub(r"^\*{0,2}[^*:]{1,30}\*{0,2}\s*(\(#\d+\))?\s*[::]\s*", "", s) # drop Speaker: + s = re.sub(r"[*_`]", "", s).strip() + if len(s) >= 2: # keep short first utterances ("Hello.", "Yes.") + s = re.sub(r"\s+", " ", s) + title = (s[:54].rstrip() + "…") if len(s) > 56 else s + break + except Exception: + title = "" + cache[key] = title + return title + + def _resolve(self, stem: str, suffix: str, only_dir: str | None = None) -> Path | None: + # prevent traversal: stem must be a bare name + if "/" in stem or ".." in stem: + return None + dirs = self._session_dirs() + if only_dir: # restrict to that dir IFF it's a known session dir (no traversal) + od = Path(only_dir) + dirs = [d for d in dirs if d == od] + for d in dirs: + p = d / f"{stem}{suffix}" + if p.exists(): + return p + return None + + # text artifacts a session owns (audio .wav is managed separately and never touched) + _ARTIFACT_SUFFIXES = ["-transcript.md", "-agent.md", "-agent.json", + "-agent.srt", "-agent.vtt", "-events.jsonl"] + + def delete_session(self, stem: str, dir: str | None = None) -> dict: + """Remove ONLY this session's text artifacts for ``stem``. + + Reuses _resolve's traversal guard (reject '/' or '..' in the stem) and resolves + strictly within _session_dirs() (honoring the optional ``dir`` like _resolve's + only_dir). Deletes only files that exist; never touches .wav audio or anything + outside a known session dir. Returns the list of deleted filenames. + """ + # SAME guard as _resolve: stem must be a bare name (no traversal) + if "/" in stem or ".." in stem: + return {"ok": False, "error": "bad stem", "deleted": []} + deleted: list[str] = [] + for suffix in self._ARTIFACT_SUFFIXES: + p = self._resolve(stem, suffix, only_dir=dir) # resolves within known dirs only + if p and p.is_file(): + try: + p.unlink() + deleted.append(p.name) + except OSError: + pass + return {"ok": True, "deleted": deleted} + + def read_artifact(self, stem: str, kind: str, dir: str | None = None) -> dict: + suffix = {"transcript": "-transcript.md", "agent_md": "-agent.md", "agent_json": "-agent.json", + "srt": "-agent.srt", "vtt": "-agent.vtt"}.get(kind) + if not suffix: + return {"ok": False, "error": "bad kind"} + p = self._resolve(stem, suffix, only_dir=dir) + if not p: + return {"ok": False, "error": "not found"} + return {"ok": True, "stem": stem, "kind": kind, "path": str(p), "text": p.read_text(encoding="utf-8")} + + def summarize_session(self, stem: str, dir: str | None = None, template_id: str = "tldr", + model: str = "", custom_prompt: str = "") -> dict: + """Summarize a saved session's transcript with the local LLM the TUI uses + (MLX on Apple Silicon, or an ``ollama:`` backend on any platform). + Returns {"ok", "summary"} or a clear {"ok": False, "error"} when no backend is + available — never raises to the handler. (read_artifact validates the stem.)""" + art = self.read_artifact(stem, "transcript", dir=dir) + if not art.get("ok"): + art = self.read_artifact(stem, "agent_md", dir=dir) + if not art.get("ok"): + return {"ok": False, "error": "transcript not found"} + body = (art.get("text") or "").strip() + if not body: + return {"ok": False, "error": "transcript is empty"} + try: + from summarizer import SummarizerError, get_summarizer, resolve_template + except Exception as e: + return {"ok": False, "error": f"summarizer unavailable: {e}"} + try: + summary = get_summarizer(model).summarize(body, resolve_template(template_id), custom_prompt) + return {"ok": True, "summary": summary, "template": template_id} + except SummarizerError as e: + return {"ok": False, "error": str(e)} # missing/unreachable backend — surfaced to the UI + except Exception as e: + return {"ok": False, "error": f"summarization failed: {e}"} + + def _link_audio(self, wav: str, stem: str) -> None: + """Hardlink the source WAV to '-gui.wav' so audio_path() can find a session's + audio by stem. The WAV is named at record time and the stem at transcribe time, so they + differ; a hardlink costs no extra disk, survives deletion of the original name, and is + never touched by delete_session (audio is intentionally kept). Best-effort.""" + try: + src = Path(wav) + dst = self.out_dir / f"{stem}-gui.wav" + if not src.is_file() or dst.exists() or src.resolve() == dst.resolve(): + return + try: + os.link(src, dst) + except OSError: # cross-device, or a filesystem without hardlinks + import shutil + shutil.copy2(src, dst) + except Exception: + pass + + def audio_path(self, stem: str, dir: str | None = None) -> Path | None: + """Locate the source WAV for a saved session, or None. New recordings are hardlinked to + '-gui.wav' at transcribe time (_link_audio); for legacy sessions we fall back to + the in-dir .wav whose mtime is closest to the transcript (recording + transcribe happen + within seconds), bounded to a 1-hour window so we never return an unrelated file.""" + if "/" in stem or ".." in stem: + return None + for suffix in ("-gui.wav", ".wav"): # direct link (the normal path) + p = self._resolve(stem, suffix, only_dir=dir) + if p and p.is_file(): + return p + ref = self._resolve(stem, "-transcript.md", only_dir=dir) or self._resolve(stem, "-agent.json", only_dir=dir) + if not ref: + return None + try: + ref_mt = ref.stat().st_mtime + except OSError: + return None + dirs = [d for d in self._session_dirs() if (not dir or d == Path(dir))] + best, best_dt = None, 3600.0 # accept only a match within 1 hour + for d in dirs: + for w in list(d.glob("*-gui.wav")) + list(d.glob("*.wav")): + try: + dt = abs(w.stat().st_mtime - ref_mt) + except OSError: + continue + if dt < best_dt: + best, best_dt = w, dt + return best + + def export_session(self, stem: str, kind: str, renames: dict | None = None, dir: str | None = None) -> dict: + """Render a saved session to md/json/srt/vtt with the client's speaker renames applied. + + Rebuilds the doc from the events log via export.build() — the SAME path that produced the + on-disk -agent.* artifacts — then renders with export.py's formatters. So a download + byte-matches the on-disk file except for the (intentional) renames, and there is ONE + formatter implementation (the client no longer reimplements it).""" + render = {"md": export.render_md, "json": export.render_json, + "srt": export.to_srt, "vtt": export.to_vtt}.get(kind) + ext = {"md": "-agent.md", "json": "-agent.json", "srt": ".srt", "vtt": ".vtt"}.get(kind) + if not render: + return {"ok": False, "error": "bad kind"} + ev = self._resolve(stem, "-events.jsonl", only_dir=dir) + if not ev: + return {"ok": False, "error": "no events log for this session"} + try: + doc = export.build(export.load_events(ev), session_id=stem, source_stream=ev.name) + except Exception as e: + return {"ok": False, "error": f"export build failed: {e}"} + renames = {str(k): str(v) for k, v in (renames or {}).items()} + if renames: # mirror the client view: rename local (non-peer) turns + speakers by id + for t in doc.get("turns", []): + if not t.get("peer") and str(t.get("speaker_id")) in renames: + t["speaker"] = renames[str(t["speaker_id"])] + for sp in doc.get("speakers", []): + if not sp.get("peer") and str(sp.get("id")) in renames: + sp["label"] = renames[str(sp["id"])] + return {"ok": True, "text": render(doc), "filename": f"{stem}{ext}"} diff --git a/gui/eot.py b/gui/eot.py new file mode 100644 index 0000000..f0e29ce --- /dev/null +++ b/gui/eot.py @@ -0,0 +1,59 @@ +"""Heuristic end-of-turn (EOT) signal from transcript text — zero model, zero latency. + +VoxTerm splits turns on silence alone, so a natural pause after "and…" or "the…" wrongly +ends a turn mid-sentence. This estimates P(the text is a grammatically complete turn) from +cheap string cues, so the live view can MERGE a fragment that clearly ends mid-clause into +the next one instead of emitting two choppy lines. + +Reimplemented (idea, not code) from elizaOS's HeuristicEotClassifier. Pure stdlib. +""" +from __future__ import annotations + +import re + +# A fragment that ENDS on one of these is almost certainly continued by the next one. +_CONJUNCTIONS = { + "and", "but", "or", "nor", "yet", "so", "because", "although", "though", "while", + "since", "unless", "until", "whereas", "plus", "that", "which", "who", "whom", + "whose", "if", "when", "where", "as", "than", +} +_ARTICLES_PREPS = { + "the", "a", "an", "to", "of", "in", "on", "at", "by", "for", "with", "from", "into", + "onto", "upon", "about", "over", "under", "between", "among", "through", "during", + "without", "within", "my", "your", "his", "her", "its", "our", "their", +} + +_WORD = re.compile(r"[A-Za-z']+") + + +def turn_complete_prob(text: str) -> float: + """Estimate P(this text is a complete turn) in [0,1] from grammar cues alone. + + Terminal sentence punctuation → 0.95; a trailing conjunction → 0.15; a trailing + article/preposition → 0.20; a very short fragment → 0.70; otherwise 0.50. + """ + t = (text or "").strip() + if not t: + return 0.5 + if t[-1] in ".!?": + return 0.95 + words = _WORD.findall(t.lower()) + if not words: + return 0.5 + last = words[-1] + if last in _CONJUNCTIONS: + return 0.15 + if last in _ARTICLES_PREPS: + return 0.20 + if len(words) < 3: + return 0.70 + return 0.50 + + +def is_incomplete(text: str, threshold: float = 0.4) -> bool: + """True when the text clearly ends mid-clause — i.e. the NEXT fragment should merge in. + + Only the high-precision cases (trailing conjunction/article/preposition) fall below the + default threshold, so this never merges across a genuinely-complete sentence. + """ + return turn_complete_prob(text) < threshold diff --git a/gui/export.py b/gui/export.py new file mode 100644 index 0000000..4dac898 --- /dev/null +++ b/gui/export.py @@ -0,0 +1,483 @@ +"""Export a VoxTerm event log to an LLM-agent-optimized transcript. + +A pure, replayable function of the ``*-events.jsonl`` stream (the same stream glass +tails) — no audio, no live state. Produces two files alongside the session: + + -agent.md human + LLM readable: YAML front-matter, an orientation line, + then one speaker-attributed, timestamped turn per paragraph. + -agent.json the typed, lossless companion the .md is rendered from. + +Run: + python -m gui.export [events.jsonl] [--out-dir DIR] + # with no path, exports the newest *-events.jsonl in VoxTerm's live dir. + +Design notes: +- ``confidence`` in the event stream is a TIER STRING ("", "high", "medium", "new"), + never a float — we keep it verbatim and derive a single ``confidence_uncertain`` + boolean, avoiding the "+confidence -> NaN" trap a numeric coercion would hit. +- Timestamps prefer a turn's ``audio_offset`` (true seconds into the recording, set + by the headless file transcriber) and fall back to wall-clock ``t - session_start`` + for live TUI sessions. +- Speaker labels from diarization are voice CLUSTERS, not verified identities; that + caveat is stated once in the front-matter ``notes`` rather than marked on every + turn. Per-turn markers flag only genuinely-more-uncertain turns. +""" +from __future__ import annotations + +import argparse +import json +import math +import re +import sys +from collections import Counter +from datetime import datetime +from pathlib import Path + +from gui._timefmt import fmt_hms + +EXPORT_VERSION = 1 +DOC_KIND = "voxterm-transcript" + +# Marker render order (bare tokens; rendered "[token]" in Markdown). +_MARKER_ORDER = ["~", "new-voice", "overlap", "peer"] +_MARKER_LEGEND = { + "[~]": "low/medium-confidence or unattributed speaker — treat the label as uncertain", + "[new-voice]": "first appearance of an unrecognized speaker", + "[overlap]": "overlapping speech in this segment", + "[peer]": "turn arrived from a remote P2P peer", +} +_CONFIDENCE_LEGEND = {"recognized": "high", "suggested": "medium", "new_voice": "new", "asserted": ""} + + +def load_events(path: Path) -> list[dict]: + """Read a JSONL event log; skip blank/garbled lines rather than failing.""" + events = [] + with path.open("r", encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(obj, dict) and "kind" in obj: + events.append(obj) + return events + + +def _num(v, default=0.0): + """Tolerant numeric parse from an untrusted event log: returns a FINITE float + (or ``default``) — never raises, never returns NaN/Infinity. Garbled or + non-numeric ``t``/``audio_offset`` values degrade to the default instead of + crashing the whole export.""" + if isinstance(v, bool) or v is None: + return default + try: + f = float(v) + except (TypeError, ValueError): + return default + return f if math.isfinite(f) else default + + +def _coerce_sid(raw) -> int: + """Speaker ids are ints in real VoxTerm but strings ("S0") in the synth harness; + normalize either to an int without crashing.""" + if isinstance(raw, bool) or raw is None: + return 0 + if isinstance(raw, int): + return raw + if isinstance(raw, float): + return int(raw) + if isinstance(raw, str): + try: + return int(raw) + except ValueError: + m = re.search(r"\d+", raw) + return int(m.group()) if m else 0 + return 0 + + +def _fmt_ts(seconds: float, sep: str) -> str: + """Subtitle timestamp "HH:MM:SSmmm" (sep="," for SRT, "." for WebVTT). + Clamps a non-finite/negative value to 0 so a garbled offset can never produce + an invalid cue.""" + s = _num(seconds, 0.0) + if s < 0: + s = 0.0 + ms_total = int(round(s * 1000)) + h, rem = divmod(ms_total, 3_600_000) + m, rem = divmod(rem, 60_000) + sec, ms = divmod(rem, 1000) + return f"{h:02d}:{m:02d}:{sec:02d}{sep}{ms:03d}" + + +def _iso_local(unix_ts: float) -> str: + """Local, timezone-aware ISO 8601 (e.g. 2026-06-04T09:14:07-07:00).""" + return datetime.fromtimestamp(unix_ts).astimezone().isoformat(timespec="seconds") + + +def build(events: list[dict], *, session_id: str, source_stream: str) -> dict: + """Reduce the event stream to the typed export object (the JSON sidecar shape).""" + starts = [e for e in events if e.get("kind") == "session" and e.get("phase") == "start"] + ends = [e for e in events if e.get("kind") == "session" and e.get("phase") == "end"] + text_evs = [e for e in events if e.get("kind") in ("text", "peer_text")] + + has_anchor = bool(starts or text_evs) + t0 = _num(starts[0].get("t")) if starts else (_num(text_evs[0].get("t")) if text_evs else 0.0) + incomplete = not ends + t_end = _num(ends[-1].get("t"), t0) if ends else (_num(events[-1].get("t"), t0) if events else t0) + model = starts[0].get("model", "") if starts else "" + language = (starts[0].get("language", "") if starts else "") or "auto" + party = any(e.get("kind") == "party" and e.get("on") for e in events) + + turns = [] + has_audio_time = any("audio_offset" in e for e in text_evs) + audio_end_max = 0.0 + raw_audio_ends: list[float | None] = [] + for idx, e in enumerate(text_evs): + is_peer = e.get("kind") == "peer_text" + if "audio_offset" in e: + t_off = _num(e.get("audio_offset")) + audio_end_max = max(audio_end_max, _num(e.get("audio_end"), t_off)) + else: + t_off = max(0.0, _num(e.get("t"), t0) - t0) + # raw per-turn audio_end (None if absent) feeds the t_offset_end post-pass below. + raw_audio_ends.append(_num(e.get("audio_end"), None) if "audio_end" in e else None) + conf = "" if is_peer else e.get("confidence", "") + if isinstance(conf, bool) or conf is None: + conf = "" + + # confidence is normally a TIER STRING ("", high, medium, new). Some producers + # (the synth harness, older logs) emit a float; handle both honestly. A + # non-finite float (NaN/Inf) is sanitized to "" so it can never reach the JSON + # sidecar as an invalid literal — and is treated as uncertain, not certain. + numeric = isinstance(conf, (int, float)) + non_finite = numeric and not math.isfinite(conf) + if non_finite: + conf, numeric = "", False + tier_uncertain = (conf < 0.5) if numeric else (conf in ("medium", "new") or non_finite) + + sid = 0 if is_peer else _coerce_sid(e.get("speaker_id", 0)) + overlap = bool(e.get("overlap", False)) and not is_peer + peer_name = e.get("peer") if is_peer else None + speaker = e.get("speaker", "") or ("(unattributed)" if (not is_peer and sid == 0) else "") + + markers = [] + if conf == "new": + markers.append("new-voice") + if tier_uncertain: + markers.append("~") + if not is_peer and sid == 0: + markers.append("~") + if overlap: + markers.append("overlap") + if is_peer: + markers.append("peer") + markers = [m for m in _MARKER_ORDER if m in markers] # dedup + canonical order + uncertain = tier_uncertain or (sid == 0 and not is_peer) + + turns.append({ + "index": idx, + "t_offset": round(t_off, 2), + "t_offset_hms": fmt_hms(t_off), + "t_unix": _num(e.get("t"), None), + "speaker_id": sid, + "speaker": speaker, + "text": (e.get("text", "") or "").strip(), + "confidence": conf, + "confidence_uncertain": uncertain, + "overlap": overlap, + "peer": is_peer, + "peer_name": peer_name, + "markers": markers, + }) + + # SHARED CONTRACT: every turn gains a finite t_offset_end (end seconds). Source + # priority: this turn's audio_end -> the NEXT turn's t_offset -> t_offset + 2.0. + for i, t in enumerate(turns): + start = t["t_offset"] + ae = raw_audio_ends[i] + if ae is not None and math.isfinite(ae): + end = ae + elif i + 1 < len(turns): + end = turns[i + 1]["t_offset"] + else: + end = start + 2.0 + if not math.isfinite(end): + end = start + 2.0 + if end <= start: # out-of-order/zero-span: keep end > start in the JSON too + end = start + 0.5 + t["t_offset_end"] = round(end, 2) + + if has_audio_time: + # cover every turn, including any wall-clock-fallback turn, so a turn's + # offset can never exceed the reported duration. + duration_seconds = round(max([audio_end_max] + [t["t_offset"] for t in turns], default=0.0)) + else: + duration_seconds = round(max(0.0, t_end - t0)) + + # Distinct speakers (local by id; peers by name+label) with turn counts. + local_labels: dict[int, Counter] = {} + local_counts: Counter = Counter() + peer_counts: Counter = Counter() + for t in turns: + if t["peer"]: + peer_counts[(t["peer_name"], t["speaker"])] += 1 + else: + local_counts[t["speaker_id"]] += 1 + local_labels.setdefault(t["speaker_id"], Counter())[t["speaker"]] += 1 + speakers = [] + for sid in sorted(local_counts): + label = local_labels[sid].most_common(1)[0][0] if local_labels[sid] else "" + speakers.append({"id": sid, "label": label or "(unattributed)", + "peer": False, "peer_name": None, "turns": local_counts[sid]}) + for (pname, plabel), n in sorted(peer_counts.items(), key=lambda kv: (str(kv[0][0]), str(kv[0][1]))): + speakers.append({"id": 0, "label": plabel, "peer": True, "peer_name": pname, "turns": n}) + + # notes are computed once here (the single source) and stored on the doc; render_md + # consumes doc["_notes"] and render_json strips it. Ordering: caveat first. + notes = ["Speaker labels are diarization voice-clusters (e.g. \"Speaker 1\"), not verified " + "identities; treat attribution as approximate unless a turn is otherwise marked."] + if not has_anchor: + notes.append("No parsable session or transcript events were found — this log may be " + "empty or corrupt; timestamps are unavailable.") + if incomplete: + notes.append("Session end event missing — duration/ended_at are approximate.") + if has_audio_time: + notes.append("Timestamps are true offsets into the recorded audio.") + + return { + "voxterm_export_version": EXPORT_VERSION, + "kind": DOC_KIND, + "session": { + "id": session_id, + "started_at": (_iso_local(t0) if has_anchor else None), + "started_at_unix": (t0 if has_anchor else None), + "ended_at": (_iso_local(t_end) if (has_anchor and not incomplete) else None), + "duration_seconds": duration_seconds, + "duration_hms": fmt_hms(duration_seconds), + "source": "VoxTerm", + "source_stream": source_stream, + "model": model, + "language": language, + "party": party, + "incomplete": incomplete, + "audio_relative_time": has_audio_time, + }, + "speakers": speakers, + "turns": turns, + "_notes": notes, + } + + +def _yaml_scalar(v) -> str: + if isinstance(v, bool): + return "true" if v else "false" + if v is None: + return "null" + if isinstance(v, (int, float)): + return str(v) + return json.dumps(str(v), ensure_ascii=False) # JSON string == valid YAML double-quoted + + +def render_md(doc: dict) -> str: + s = doc["session"] + out = ["---"] + out.append(f"voxterm_export_version: {EXPORT_VERSION}") + out.append(f"kind: {DOC_KIND}") + out.append(f"session_id: {_yaml_scalar(s['id'])}") + out.append(f"date: {s['started_at'][:10] if s['started_at'] else 'null'}") + out.append(f"started_at: {_yaml_scalar(s['started_at'])}") + out.append(f"ended_at: {_yaml_scalar(s['ended_at'])}") + out.append(f"duration: {_yaml_scalar(s['duration_hms'])}") + out.append(f"duration_seconds: {s['duration_seconds']}") + out.append("source: VoxTerm") + out.append(f"source_stream: {_yaml_scalar(s['source_stream'])}") + out.append(f"model: {_yaml_scalar(s['model'])}") + out.append(f"language: {_yaml_scalar(s['language'])}") + out.append(f"party: {_yaml_scalar(s['party'])}") + out.append(f"audio_relative_time: {_yaml_scalar(s['audio_relative_time'])}") + out.append("speakers:") + for sp in doc["speakers"]: + bits = f"id: {sp['id']}, label: {_yaml_scalar(sp['label'])}, turns: {sp['turns']}, peer: {_yaml_scalar(sp['peer'])}" + if sp["peer"]: + bits += f", peer_name: {_yaml_scalar(sp['peer_name'])}" + out.append(f" - {{ {bits} }}") + out.append(f"turns: {len(doc['turns'])}") + out.append("confidence_legend: { " + ", ".join(f"{k}: {_yaml_scalar(v)}" for k, v in _CONFIDENCE_LEGEND.items()) + " }") + out.append("markers:") + for k, v in _MARKER_LEGEND.items(): + out.append(f" {_yaml_scalar(k)}: {_yaml_scalar(v)}") + out.append("notes:") + for n in doc.get("_notes", []): + out.append(f" - {_yaml_scalar(n)}") + out.append("---") + out.append("") + + n_sp = len(doc["speakers"]) + out.append(f"> VoxTerm session — {n_sp} speaker(s), {len(doc['turns'])} turns, " + f"{s['duration_hms']}. Timestamps are [mm:ss] " + f"{'into the recorded audio' if s['audio_relative_time'] else 'from session start'}. " + f"Markers: [~] uncertain attribution, [overlap] overlapping speech, " + f"[new-voice] first appearance, [peer] remote peer. " + f"Speaker labels are voice clusters, not verified identities.") + out.append("") + out.append("## Transcript") + out.append("") + for t in doc["turns"]: + if t["peer"]: + who = f"**{t['speaker']}** (peer: {t['peer_name']})" + elif t["speaker_id"]: + who = f"**{t['speaker']}** (#{t['speaker_id']})" + else: + who = "**(unattributed)**" + line = f"[{t['t_offset_hms']}] {who}: {t['text']}" + if t["markers"]: + line += " " + " ".join(f"[{m}]" for m in t["markers"]) + out.append(line) + out.append("") + return "\n".join(out).rstrip() + "\n" + + +def render_json(doc: dict) -> str: + d = {k: v for k, v in doc.items() if k != "_notes"} + # allow_nan=False: refuse to emit NaN/Infinity literals (invalid JSON for strict + # parsers). build() already sanitizes them, so this is a fail-loud backstop. + return json.dumps(d, ensure_ascii=False, indent=2, allow_nan=False) + "\n" + + +def _cue_text(s: str) -> str: + """Single-line, cue-safe text — collapse newlines/blank lines to spaces and + neutralize the "-->" timing marker, either of which inside a cue would corrupt + SRT/VTT cue boundaries (or inject a fake cue).""" + s = re.sub(r"\s*\n\s*", " ", (s or "").strip()) + return s.replace("-->", "->") + + +def _cue_label(t: dict) -> str: + """Cue speaker label (sanitized): peers render "name (peer)"; locals use the label.""" + name = t.get("speaker") or "(unattributed)" + return _cue_text(f"{name} (peer)" if t.get("peer") else name) + + +def _cue_times(t: dict) -> tuple[float, float]: + """(start, end) seconds for a cue, guaranteeing end > start (min 0.5s span).""" + start = _num(t.get("t_offset"), 0.0) + end = _num(t.get("t_offset_end"), start) + if end <= start: + end = start + 0.5 + return start, end + + +def to_srt(doc: dict) -> str: + """Render doc turns as SubRip (SRT): 1-indexed cues, "HH:MM:SS,mmm" times. + Empty-text turns are skipped; cue label = speaker, cue text = turn text.""" + blocks = [] + idx = 0 + for t in doc.get("turns", []): + text = _cue_text(t.get("text")) + if not text: + continue + idx += 1 + start, end = _cue_times(t) + blocks.append( + f"{idx}\n" + f"{_fmt_ts(start, ',')} --> {_fmt_ts(end, ',')}\n" + f"{_cue_label(t)}: {text}\n" + ) + return "\n".join(blocks) + + +def to_vtt(doc: dict) -> str: + """Render doc turns as WebVTT: "WEBVTT" header, "HH:MM:SS.mmm" times. + Empty-text turns are skipped; cue label = speaker, cue text = turn text.""" + blocks = ["WEBVTT\n"] + for t in doc.get("turns", []): + text = _cue_text(t.get("text")) + if not text: + continue + start, end = _cue_times(t) + blocks.append( + f"{_fmt_ts(start, '.')} --> {_fmt_ts(end, '.')}\n" + f"{_cue_label(t)}: {text}\n" + ) + return "\n".join(blocks) + + +def export(events_path: Path, out_dir: Path | None = None) -> tuple[Path, Path, Path, Path]: + events = load_events(events_path) + stem = events_path.stem + if stem.endswith("-events"): + stem = stem[: -len("-events")] + doc = build(events, session_id=stem, source_stream=events_path.name) # notes set on doc + if not doc["turns"]: + print(f"warning: no transcript turns found in {events_path.name} " + f"({len(events)} events parsed) — writing an empty export", file=sys.stderr) + + out_dir = out_dir or events_path.parent + out_dir.mkdir(parents=True, exist_ok=True) + md_path = out_dir / f"{stem}-agent.md" + json_path = out_dir / f"{stem}-agent.json" + srt_path = out_dir / f"{stem}-agent.srt" + vtt_path = out_dir / f"{stem}-agent.vtt" + md_path.write_text(render_md(doc), encoding="utf-8") + json_path.write_text(render_json(doc), encoding="utf-8") + srt_path.write_text(to_srt(doc), encoding="utf-8") + vtt_path.write_text(to_vtt(doc), encoding="utf-8") + return md_path, json_path, srt_path, vtt_path + + +def main(argv=None) -> int: + ap = argparse.ArgumentParser(description="Export a VoxTerm event log to an LLM-agent transcript.") + ap.add_argument("events", nargs="?", help="path to a *-events.jsonl (default: newest in VoxTerm live dir)") + ap.add_argument("--out-dir", default=None, help="output dir (default: alongside the events file)") + ap.add_argument("--format", choices=["md", "json", "srt", "vtt", "all"], default="all", + help="which artifact(s) to write/print (default: all = md+json+srt+vtt)") + args = ap.parse_args(argv) + + if args.events: + events_path = Path(args.events) + else: + # Default: newest *-events.jsonl in VoxTerm's live dir (self-contained — no glass dep). + try: + import sys as _sys + _sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + from config import LIVE_DIR + live = Path(LIVE_DIR) + except Exception: + # mirror config.py's per-platform live dir if that import ever fails + _h = Path.home() + if sys.platform == "darwin": + live = _h / "Documents" / "voxterm-transcripts" / ".live" + elif sys.platform.startswith("linux"): + import os as _os + live = Path(_os.environ.get("XDG_DATA_HOME", _h / ".local" / "share")) / "voxterm" / ".live" + else: + live = _h / "Documents" / "voxterm" / ".live" + cands = sorted(live.glob("*-events.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True) if live.exists() else [] + if not cands: + print("error: no events file given and none found in the live dir", file=sys.stderr) + return 2 + events_path = cands[0] + if not events_path.exists(): + print(f"error: no such file: {events_path}", file=sys.stderr) + return 2 + + md_path, json_path, srt_path, vtt_path = export(events_path, Path(args.out_dir) if args.out_dir else None) + # export() always writes md+json+srt+vtt (the default behavior); --format only + # controls which of those written paths are printed. + want = {"md", "json", "srt", "vtt"} if args.format == "all" else {args.format} + if "md" in want: + print(f"agent transcript: {md_path}") + if "json" in want: + print(f"json sidecar: {json_path}") + if "srt" in want: + print(f"srt subtitles: {srt_path}") + if "vtt" in want: + print(f"vtt subtitles: {vtt_path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/gui/launcher.py b/gui/launcher.py new file mode 100644 index 0000000..ff76064 --- /dev/null +++ b/gui/launcher.py @@ -0,0 +1,71 @@ +"""Open the VoxTerm web GUI from the terminal. + +`voxterm-gui`, `voxterm gui`, and the TUI `g` key all land here. It starts the local +engine on a loopback port with a fresh per-run token and opens it in your browser — +nothing leaves your machine. The native Tauri desktop app is a separate, self-contained +entry point (it spawns its own engine); this launcher is the universal browser path. +""" +from __future__ import annotations + +import os +import secrets +import socket +import sys +import threading +import time +import urllib.request + + +def _free_loopback_port() -> int: + """Ask the OS for an unused loopback port, then release it for the server to claim.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return int(s.getsockname()[1]) + + +def _serving(port: int, token: str) -> bool: + req = urllib.request.Request( + f"http://127.0.0.1:{port}/api/options", + headers={"X-VoxTerm-Token": token}, + ) + try: + with urllib.request.urlopen(req, timeout=2) as r: + return r.status == 200 + except Exception: + return False + + +def _open_browser_when_ready(port: int, token: str, timeout: float = 60.0) -> None: + import webbrowser + url = f"http://127.0.0.1:{port}/?token={token}" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if _serving(port, token): + webbrowser.open(url) + return + time.sleep(0.25) + print(f"[voxterm-gui] engine slow to start — open {url} manually.", file=sys.stderr, flush=True) + + +def main(argv=None) -> int: + # Always loopback; mint a fresh token unless one was handed in (honored by + # gui.server's loopback branch, so the local API is token-gated, not open). + port = int(os.environ.get("VOXTERM_GUI_PORT") or _free_loopback_port()) + token = os.environ.get("VOXTERM_GUI_TOKEN") or secrets.token_urlsafe(24) + os.environ["VOXTERM_GUI_PORT"] = str(port) + os.environ["VOXTERM_GUI_TOKEN"] = token + os.environ.pop("VOXTERM_GUI_LAN", None) # the launcher is loopback-only by definition + + print(f"[voxterm-gui] starting the local engine — your browser will open at " + f"http://127.0.0.1:{port}", flush=True) + threading.Thread(target=_open_browser_when_ready, args=(port, token), daemon=True).start() + + from gui.server import main as server_main + try: + return server_main() + except KeyboardInterrupt: + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/gui/server.py b/gui/server.py new file mode 100644 index 0000000..0868138 --- /dev/null +++ b/gui/server.py @@ -0,0 +1,339 @@ +"""VoxTerm GUI control server — stdlib http.server, no extra deps. + +Serves the web UI (gui/static/) + a small JSON API + an SSE status stream, all +backed by gui.engine (which reuses VoxTerm's own engine). Loopback-only by default; +set VOXTERM_GUI_LAN=1 to expose on the LAN (e.g. to drive it from your phone). + + python -m gui.server # http://127.0.0.1:8740 + VOXTERM_GUI_LAN=1 python -m gui.server +""" +from __future__ import annotations + +import json +import os +import secrets +import sys +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from urllib.parse import urlparse, parse_qs + +_ROOT = str(Path(__file__).resolve().parent.parent) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from gui.engine import Engine # noqa: E402 + +STATIC = Path(__file__).resolve().parent / "static" +DEFAULT_PORT = 8740 +MAX_BODY = 64 * 1024 # API requests are tiny; bound them +MAX_SSE = 8 # cap concurrent status streams +_sse_lock = threading.Lock() +_sse_count = 0 + +# When LAN-exposed (VOXTERM_GUI_LAN=1) every /api/* call must carry this token — +# without it, anyone on the wifi could start a recording of the room or read past +# transcripts. None = loopback (no token required). Set in main(). +TOKEN = None + +# Host-header allowlist (loopback mode) to block DNS-rebinding: a malicious site can't +# point its DNS at 127.0.0.1 and drive the tokenless local API, because the browser still +# sends Host: evil.com. None = no host check (LAN mode, which is token-gated instead). Set in main(). +ALLOWED_HOSTS = None + +ENGINE = Engine() + +# Strict CSP: same-origin only, no external anything (the UI is fully self-hosted). +# style-src allows 'unsafe-inline' because the UI sets element.style (the live level +# ring, the progress bar) and per-speaker color dots; all interpolated values are +# escaped (app.js escapeHtml) and the data is local, so the exposure is minimal. +CSP = ("default-src 'none'; script-src 'self'; style-src 'self' 'unsafe-inline'; " + "img-src 'self' data:; media-src 'self'; connect-src 'self'; font-src 'self'; manifest-src 'self'; " + "worker-src 'self'; base-uri 'none'; form-action 'none'; frame-ancestors 'none'") +_CTYPES = {".html": "text/html; charset=utf-8", ".js": "text/javascript; charset=utf-8", + ".css": "text/css; charset=utf-8", ".svg": "image/svg+xml", ".json": "application/json", + ".png": "image/png", ".webmanifest": "application/manifest+json", ".wav": "audio/wav"} + + +class Handler(BaseHTTPRequestHandler): + server_version = "voxterm-gui" + + def _hdr(self, code=200, ctype="application/json", extra=None): + self.send_response(code) + self.send_header("Content-Type", ctype) + self.send_header("Content-Security-Policy", CSP) + self.send_header("X-Content-Type-Options", "nosniff") + self.send_header("X-Frame-Options", "DENY") + self.send_header("Referrer-Policy", "no-referrer") + for k, v in (extra or {}).items(): + self.send_header(k, v) + self.end_headers() + + def _json(self, obj, code=200): + body = json.dumps(obj, ensure_ascii=False).encode("utf-8") + self._hdr(code, "application/json", {"Content-Length": str(len(body))}) + if self.command != "HEAD": + self.wfile.write(body) + + def do_HEAD(self): + # Without this, BaseHTTPRequestHandler answers HEAD with a default 501 that bypasses + # the host/auth/security-header pipeline. Reject cleanly through _hdr instead. + return self._json({"error": "method not allowed"}, 405) + + def _read_json(self) -> dict: + try: + n = int(self.headers.get("Content-Length") or 0) + except (ValueError, TypeError): # a malformed Content-Length must not crash the handler + return {} + if n <= 0 or n > MAX_BODY: + if n > MAX_BODY: + self.close_connection = True # don't leave an undrained oversized body on the socket + return {} + try: + return json.loads(self.rfile.read(n).decode("utf-8")) or {} + except Exception: + return {} + + def log_message(self, *a): # quiet unless VOXTERM_GUI_LOG=1 (request log for tests/debug) + if os.environ.get("VOXTERM_GUI_LOG") == "1": + super().log_message(*a) + + def _authed(self, q) -> bool: + """Token check for /api/* when LAN-exposed. Loopback (TOKEN is None) is open.""" + if TOKEN is None: + return True + given = (self.headers.get("X-VoxTerm-Token") + or (self.headers.get("Authorization") or "").removeprefix("Bearer ").strip() + or (q.get("token") or [""])[0]) + try: # compare on bytes so a non-ASCII token yields a clean False (401), not a TypeError + return bool(given) and secrets.compare_digest(given.encode("utf-8"), TOKEN.encode("utf-8")) + except Exception: + return False + + def _host_ok(self) -> bool: + """Reject DNS-rebinding: in loopback mode the Host header must be a known local name. + LAN mode skips this (the token is the gate; the LAN IP/hostname varies).""" + if ALLOWED_HOSTS is None: + return True + return (self.headers.get("Host") or "").lower() in ALLOWED_HOSTS + + def _same_origin(self) -> bool: + """Block cross-origin state-changing requests (CSRF). Modern browsers send + Sec-Fetch-Site (our own fetch() is 'same-origin'); when an Origin is present it must + match Host. Non-browser clients (curl) send neither and are allowed for local tooling.""" + sfs = self.headers.get("Sec-Fetch-Site") + if sfs is not None and sfs not in ("same-origin", "none"): + return False + origin = self.headers.get("Origin") + if origin: + netloc = urlparse(origin).netloc.lower() + if netloc != (self.headers.get("Host") or "").lower(): + return False + return True + + # ---- GET ---- + def do_GET(self): + if not self._host_ok(): + return self._json({"error": "bad host"}, 403) + u = urlparse(self.path) + p, q = u.path, parse_qs(u.query) + if p.startswith("/api/") and not self._same_origin(): + return self._json({"error": "cross-origin"}, 403) # CSRF/read-leak guard on reads too + if p.startswith("/api/") and not self._authed(q): + return self._json({"error": "unauthorized"}, 401) + if p == "/" or p == "/index.html": + return self._serve_static("index.html") + if p == "/sw.js": # served at root so its SW scope is "/" + return self._serve_static("sw.js") + if p == "/manifest.webmanifest": + return self._serve_static("manifest.webmanifest") + if p.startswith("/static/"): + return self._serve_static(p[len("/static/"):]) + if p == "/api/options": + return self._json({"models": ENGINE.models(), "languages": ENGINE.languages(), + "default_model": ENGINE.default_model(), + "input_devices": ENGINE.input_devices()}) + if p == "/api/status": + return self._json(ENGINE.status()) + if p == "/api/sessions": + return self._json({"sessions": ENGINE.sessions()}) + if p == "/api/session": + stem = (q.get("stem") or [""])[0] + kind = (q.get("kind") or ["transcript"])[0] + d = (q.get("dir") or [None])[0] + return self._json(ENGINE.read_artifact(stem, kind, dir=d)) + if p == "/api/audio": + return self._serve_audio(q) + if p == "/api/events": + return self._sse() + return self._json({"error": "not found"}, 404) + + # ---- POST ---- + def do_POST(self): + if not self._host_ok(): + return self._json({"error": "bad host"}, 403) + u = urlparse(self.path) + p, q = u.path, parse_qs(u.query) + if not self._same_origin(): + return self._json({"error": "cross-origin"}, 403) + if p.startswith("/api/") and not self._authed(q): + return self._json({"error": "unauthorized"}, 401) + if p == "/api/record/start": + b = self._read_json() + return self._json(ENGINE.start_recording(device=b.get("device"), source=b.get("source", "mic"))) + if p == "/api/record/stop": + b = self._read_json() + return self._json(ENGINE.stop_recording(model=b.get("model") or None, + language=b.get("language", "en"), + diarize=b.get("diarize", True) is not False)) + if p == "/api/transcribe": + b = self._read_json() + return self._json(ENGINE.transcribe_existing(b.get("wav", ""), model=b.get("model") or None, + language=b.get("language", "en"), + diarize=b.get("diarize", True) is not False)) + if p == "/api/live/start": + b = self._read_json() + return self._json(ENGINE.live_start(b.get("wav"))) + if p == "/api/live/stop": + return self._json(ENGINE.live_stop()) + if p == "/api/session/delete": + b = self._read_json() + return self._json(ENGINE.delete_session(b.get("stem", ""), dir=b.get("dir"))) + if p == "/api/export": + b = self._read_json() + return self._json(ENGINE.export_session(b.get("stem", ""), b.get("kind", "md"), + renames=b.get("renames") or {}, dir=b.get("dir"))) + if p == "/api/summarize": + b = self._read_json() + return self._json(ENGINE.summarize_session( + b.get("stem", ""), dir=b.get("dir"), template_id=b.get("template", "tldr"), + model=b.get("model", ""), custom_prompt=b.get("custom_prompt", ""))) + return self._json({"error": "not found"}, 404) + + def _serve_static(self, rel: str): + # resolve within STATIC only (no traversal) + target = (STATIC / rel).resolve() + try: + target.relative_to(STATIC.resolve()) + except ValueError: + return self._json({"error": "forbidden"}, 403) + if not target.is_file(): + return self._json({"error": "not found"}, 404) + ctype = _CTYPES.get(target.suffix, "application/octet-stream") + data = target.read_bytes() + self._hdr(200, ctype, {"Content-Length": str(len(data))}) + self.wfile.write(data) + + def _serve_audio(self, q): + """Stream a session's source WAV (Download/playback). Honors a Range header so the +