diff --git a/README.md b/README.md index f508e957..6e7e9787 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ https://github.com/user-attachments/assets/4c223e85-2916-494f-b7b1-766ce1bdc991 - **Requires a Wayland session** (GNOME, KDE Plasma Wayland, Sway, Hyprland) - **Waybar** (optional, for status bar) -- **gtk4** (optional, for visualizer) +- **gtk4 + PyCairo** (optional, for visualizer) - **NVIDIA GPU** (optional, for CUDA acceleration) - **AMD/Intel GPU / APU** (optional, for Vulkan acceleration) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index fc247c06..560ed275 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -638,13 +638,16 @@ Two modes available: { "transcription_backend": "realtime-ws", "websocket_provider": "openai", - "websocket_model": "gpt-realtime-mini-2025-12-15", + "websocket_model": "gpt-realtime-whisper", "realtime_mode": "transcribe", // "transcribe" or "converse" + "realtime_transcription_delay": "low", // "minimal", "low", "medium", "high", or "xhigh" "realtime_timeout": 30, // Advanced: seconds to wait after stop for final transcript "realtime_buffer_max_seconds": 5 // Advanced: max unsent audio backlog (seconds) before dropping old chunks } ``` +With OpenAI `gpt-realtime-whisper`, the mic OSD can show a live partial transcript while recording. Only the completed final transcript is pasted after you stop. + #### Google Gemini Realtime streaming transcription via Google's Gemini Live API. @@ -701,6 +704,8 @@ Visual feedback that will auto-match Omarchy themes. } ``` +When OpenAI `gpt-realtime-whisper` live preview is enabled, partial transcript text is written to a restrictive runtime IPC file under `$XDG_RUNTIME_DIR/hyprwhspr/` so the OSD daemon can render it. The file is `0600`, cleared on normal hide/shutdown and scrubbed on service startup; if the machine loses power or the service is killed, the last partial may remain until the next startup/runtime-directory cleanup. + ### Audio feedback Optional sound notifications: diff --git a/lib/main.py b/lib/main.py index 8b1f9b78..0d6ee9ea 100644 --- a/lib/main.py +++ b/lib/main.py @@ -68,7 +68,7 @@ from paths import ( RECORDING_STATUS_FILE, RECORDING_CONTROL_FILE, AUDIO_LEVEL_FILE, RECOVERY_REQUESTED_FILE, RECOVERY_RESULT_FILE, MIC_ZERO_VOLUME_FILE, LOCK_FILE, LONGFORM_STATE_FILE, LONGFORM_SEGMENTS_DIR, - MODEL_UNLOADED_FILE, SOCKET_FILE + MODEL_UNLOADED_FILE, SOCKET_FILE, TRANSCRIPT_PREVIEW_FILE ) from backend_utils import normalize_backend from segment_manager import SegmentManager @@ -223,6 +223,9 @@ def __init__(self): import traceback traceback.print_exc() + if hasattr(self.whisper_manager, 'set_realtime_partial_callback'): + self.whisper_manager.set_realtime_partial_callback(self._set_mic_osd_preview_text) + # Set up global shortcuts (needed for headless operation) self._setup_global_shortcuts() @@ -1170,6 +1173,8 @@ def _start_recording(self, language_override=None): print("Recording started", flush=True) try: + self._clear_mic_osd_preview_text() + # Clear zero-volume signal file when starting a new recording # This allows waybar to recover immediately on successful start self._clear_zero_volume_signal() @@ -1366,6 +1371,10 @@ def _cleanup_recording_state(self): """Best-effort cleanup after any recording ends. Safe to call multiple times.""" self._notify_capture_subscriber("", final=True) + try: + self._clear_mic_osd_preview_text() + except Exception: + pass try: self._hide_mic_osd() except Exception: @@ -1436,6 +1445,7 @@ def _stop_recording(self): print("Recording stopped", flush=True) try: + self._clear_mic_osd_preview_text() # Set visualizer to processing state (keep it visible during transcription) self._set_visualizer_state('processing') @@ -1547,6 +1557,7 @@ def _process_audio(self, audio_data): print(f"[ERROR] Error processing audio: {e}", flush=True) finally: self._notify_capture_subscriber("", final=True) + self._clear_mic_osd_preview_text() self.is_processing = False # Show success/error state and hide OSD after delay self._show_result_and_hide(success) @@ -1683,6 +1694,7 @@ def _reset_stale_state(self): RECOVERY_REQUESTED_FILE, RECOVERY_RESULT_FILE, MODEL_UNLOADED_FILE, + TRANSCRIPT_PREVIEW_FILE, ] for f in stale_files: try: @@ -1708,6 +1720,7 @@ def _show_mic_osd(self): with self._cancel_pending_hide_lock: self._cancel_pending_hide = True if self._mic_osd_runner and self._mic_osd_runner.is_available(): + self._mic_osd_runner.clear_preview_text() self._mic_osd_runner.set_state('recording') self._mic_osd_runner.show() @@ -1718,6 +1731,24 @@ def _hide_mic_osd(self): try: runner.hide() runner.clear_state() + runner.clear_preview_text() + except Exception: + pass + + def _set_mic_osd_preview_text(self, text: str): + """Update live transcript preview text in the mic OSD.""" + runner = getattr(self, '_mic_osd_runner', None) + if runner: + try: + runner.set_preview_text(text) + except Exception: + pass + + def _clear_mic_osd_preview_text(self): + runner = getattr(self, '_mic_osd_runner', None) + if runner: + try: + runner.clear_preview_text() except Exception: pass diff --git a/lib/mic_osd/main.py b/lib/mic_osd/main.py index 219d7d77..766c2956 100644 --- a/lib/mic_osd/main.py +++ b/lib/mic_osd/main.py @@ -21,23 +21,37 @@ def is_gnome(): desktop = os.environ.get('XDG_CURRENT_DESKTOP', '').lower() return 'gnome' in desktop -from .window import OSDWindow, load_css -from .audio import AudioMonitor -from .visualizations import VISUALIZATIONS -from .theme import ThemeWatcher +_MIC_OSD_IMPORT_ERROR = None +try: + from .window import OSDWindow, load_css + from .audio import AudioMonitor + from .visualizations import VISUALIZATIONS + from .theme import ThemeWatcher +except ImportError as e: + _MIC_OSD_IMPORT_ERROR = e + OSDWindow = None + AudioMonitor = None + VISUALIZATIONS = {} + ThemeWatcher = None # Import paths with fallback for daemon context try: - from ..src.paths import RECORDING_STATUS_FILE, VISUALIZER_STATE_FILE + from ..src.paths import RECORDING_STATUS_FILE, VISUALIZER_STATE_FILE, TRANSCRIPT_PREVIEW_FILE except ImportError: try: - from src.paths import RECORDING_STATUS_FILE, VISUALIZER_STATE_FILE + from src.paths import RECORDING_STATUS_FILE, VISUALIZER_STATE_FILE, TRANSCRIPT_PREVIEW_FILE except ImportError: # Fallback: construct paths manually if imports fail home = Path.home() xdg_config = Path(os.environ.get('XDG_CONFIG_HOME', home / '.config')) + xdg_runtime = os.environ.get('XDG_RUNTIME_DIR') + if xdg_runtime: + runtime_dir = Path(xdg_runtime) / 'hyprwhspr' + else: + runtime_dir = Path(os.environ.get('TMPDIR', '/tmp')) / f"hyprwhspr-{os.getuid()}" RECORDING_STATUS_FILE = xdg_config / 'hyprwhspr' / 'recording_status' VISUALIZER_STATE_FILE = xdg_config / 'hyprwhspr' / 'visualizer_state' + TRANSCRIPT_PREVIEW_FILE = runtime_dir / 'transcript_preview' class MicOSD: @@ -54,6 +68,7 @@ def __init__(self, visualization="waveform", width=400, height=68, daemon=False) self._auto_hide_timeout_id = None self._state_poll_timer_id = None self._last_visualizer_state = None + self._last_preview_text = None self.daemon = daemon self.visible = False self.theme_watcher = None @@ -210,6 +225,8 @@ def _show(self): def _hide(self): """Hide the OSD and stop audio monitoring.""" + self._clear_preview_file() + if not self.visible: return @@ -221,6 +238,9 @@ def _hide(self): try: self.visible = False + if hasattr(self.window, 'set_preview_text'): + self.window.set_preview_text("") + self._last_preview_text = None self.window.set_visible(False) # Stop update timer @@ -277,6 +297,13 @@ def _hide(self): except Exception: pass self.audio_monitor = None + + def _clear_preview_file(self): + try: + if TRANSCRIPT_PREVIEW_FILE.exists(): + TRANSCRIPT_PREVIEW_FILE.unlink() + except Exception: + pass def _update(self): """Update visualization with current audio data.""" @@ -294,6 +321,8 @@ def _poll_state_file(self): state = f.read().strip() if state and state != self._last_visualizer_state: self._last_visualizer_state = state + if self.window and hasattr(self.window, 'set_visualizer_state'): + self.window.set_visualizer_state(state) # Update visualization state if it has the set_state method if hasattr(self.visualization, 'set_state'): self.visualization.set_state(state) @@ -301,8 +330,18 @@ def _poll_state_file(self): # No state file means default to recording state if self._last_visualizer_state != 'recording': self._last_visualizer_state = 'recording' + if self.window and hasattr(self.window, 'set_visualizer_state'): + self.window.set_visualizer_state('recording') if hasattr(self.visualization, 'set_state'): self.visualization.set_state('recording') + + preview = "" + if TRANSCRIPT_PREVIEW_FILE.exists(): + preview = TRANSCRIPT_PREVIEW_FILE.read_text(encoding='utf-8').rstrip('\r\n') + if preview != self._last_preview_text: + self._last_preview_text = preview + if self.window and hasattr(self.window, 'set_preview_text'): + self.window.set_preview_text(preview) except Exception: pass # Ignore file read errors return True # Continue polling @@ -358,6 +397,8 @@ def stop(self): def _cleanup(self): """Clean up resources.""" + self._clear_preview_file() + if self.update_timer_id: GLib.source_remove(self.update_timer_id) self.update_timer_id = None @@ -439,6 +480,10 @@ def main(): help="Run as daemon (start hidden, show on SIGUSR1, hide on SIGUSR2)" ) args = parser.parse_args() + + if _MIC_OSD_IMPORT_ERROR is not None: + print(f"[MIC-OSD] Unavailable: {_MIC_OSD_IMPORT_ERROR}", file=sys.stderr, flush=True) + return 1 # Set up signal handlers signal.signal(signal.SIGTERM, _signal_handler) diff --git a/lib/mic_osd/runner.py b/lib/mic_osd/runner.py index 5ad6b183..6f659294 100644 --- a/lib/mic_osd/runner.py +++ b/lib/mic_osd/runner.py @@ -10,14 +10,16 @@ import signal import sys import os +import threading +import time from pathlib import Path # Import paths try: - from ..src.paths import MIC_OSD_PID_FILE, VISUALIZER_STATE_FILE + from ..src.paths import MIC_OSD_PID_FILE, VISUALIZER_STATE_FILE, TRANSCRIPT_PREVIEW_FILE except ImportError: # Fallback for direct execution - from src.paths import MIC_OSD_PID_FILE, VISUALIZER_STATE_FILE + from src.paths import MIC_OSD_PID_FILE, VISUALIZER_STATE_FILE, TRANSCRIPT_PREVIEW_FILE class MicOSDRunner: @@ -26,16 +28,24 @@ class MicOSDRunner: Spawns mic-osd in daemon mode at init, then signals it to show/hide. """ + + PREVIEW_WRITE_INTERVAL_SECONDS = 0.05 def __init__(self): self._process = None self._mic_osd_dir = Path(__file__).parent self._orphaned_daemon_pid = None # Track PID when reusing orphaned daemon + self._preview_lock = threading.Lock() + self._last_preview_write_at = 0.0 + self._pending_preview_text = None + self._preview_flush_timer = None + self._preview_generation = 0 @staticmethod def is_available() -> bool: """Check if mic-osd can run.""" try: + import cairo # noqa: F401 import gi gi.require_version('Gtk', '4.0') gi.require_version('Gtk4LayerShell', '1.0') @@ -49,26 +59,32 @@ def _get_distro_packages() -> tuple: # Check for common distro indicators try: if Path('/etc/debian_version').exists(): - return ('python3-gi gir1.2-gtk-4.0', 'gir1.2-gtk4layershell-1.0') + return ('python3-gi python3-cairo gir1.2-gtk-4.0', 'gir1.2-gtk4layershell-1.0') elif Path('/etc/arch-release').exists(): - return ('python-gobject gtk4', 'gtk4-layer-shell') + return ('python-gobject python-cairo gtk4', 'gtk4-layer-shell') elif Path('/etc/fedora-release').exists(): - return ('python3-gobject gtk4', 'gtk4-layer-shell') + return ('python3-gobject python3-cairo gtk4', 'gtk4-layer-shell') elif Path('/etc/os-release').exists(): - content = Path('/etc/os-release').read_text().lower() + content = Path('/etc/os-release').read_text(encoding='utf-8').lower() if 'debian' in content or 'ubuntu' in content: - return ('python3-gi gir1.2-gtk-4.0', 'gir1.2-gtk4layershell-1.0') + return ('python3-gi python3-cairo gir1.2-gtk-4.0', 'gir1.2-gtk4layershell-1.0') elif 'fedora' in content or 'rhel' in content: - return ('python3-gobject gtk4', 'gtk4-layer-shell') + return ('python3-gobject python3-cairo gtk4', 'gtk4-layer-shell') + elif 'suse' in content: + return ('python3-gobject python3-pycairo typelib-1_0-Gtk-4_0', 'gtk4-layer-shell') except Exception: pass # Default to Arch-style names - return ('python-gobject gtk4', 'gtk4-layer-shell') + return ('python-gobject python-cairo gtk4', 'gtk4-layer-shell') @staticmethod def get_unavailable_reason() -> str: """Get reason why mic-osd is unavailable.""" gtk_pkg, layer_pkg = MicOSDRunner._get_distro_packages() + try: + import cairo # noqa: F401 + except ImportError: + return f"PyCairo bindings not installed. Install: {gtk_pkg}" try: import gi gi.require_version('Gtk', '4.0') @@ -90,8 +106,8 @@ def _ensure_daemon(self): if MIC_OSD_PID_FILE.exists(): try: pid = int(MIC_OSD_PID_FILE.read_text().strip()) - # Check if process still exists (signal 0 = existence check) - os.kill(pid, 0) + if not self._is_mic_osd_daemon_pid(pid): + raise ProcessLookupError(f"PID {pid} is not a mic-osd daemon") print(f"[MIC-OSD] Found orphaned daemon (PID {pid}), reusing it") # Create dummy process reference (we can't use wait() on it) # The actual daemon PID is tracked in _orphaned_daemon_pid @@ -148,6 +164,7 @@ def _ensure_daemon(self): break if lib_path: env['LD_PRELOAD'] = lib_path + env['HYPRWHSPR_MIC_OSD_DAEMON'] = '1' try: python_cmd = sys.executable or 'python3' @@ -197,6 +214,62 @@ def _ensure_daemon(self): traceback.print_exc() self._process = None return False + + @staticmethod + def _is_mic_osd_daemon_pid(pid: int) -> bool: + """Return True only if pid appears to be this project's mic-osd daemon.""" + if pid <= 0: + return False + + try: + os.kill(pid, 0) + except (ProcessLookupError, PermissionError, OSError): + return False + + proc_path = Path('/proc') / str(pid) + environ_path = proc_path / 'environ' + try: + environ = environ_path.read_bytes().split(b'\x00') + if b'HYPRWHSPR_MIC_OSD_DAEMON=1' in environ: + return True + except (FileNotFoundError, ProcessLookupError, PermissionError, OSError): + pass + + cmdline_path = proc_path / 'cmdline' + try: + raw_cmdline = cmdline_path.read_bytes() + except (FileNotFoundError, ProcessLookupError, PermissionError, OSError): + return False + + cmdline = raw_cmdline.replace(b'\x00', b' ').decode('utf-8', errors='ignore') + if '--daemon' not in cmdline: + return False + + return ( + 'mic_osd.main' in cmdline + or 'mic-osd' in cmdline + or 'com.hyprwhspr.mic-osd' in cmdline + ) + + def _signal_daemon(self, sig: signal.Signals) -> bool: + """Signal the tracked daemon after validating orphaned PID-file reuse.""" + pid = self._orphaned_daemon_pid if self._orphaned_daemon_pid is not None else self._process.pid + if self._orphaned_daemon_pid is not None and not self._is_mic_osd_daemon_pid(pid): + print(f"[MIC-OSD] Refusing to signal non mic-osd PID {pid}", flush=True) + self._process = None + self._orphaned_daemon_pid = None + self._unlink_pid_file() + return False + + os.kill(pid, sig) + return True + + def _unlink_pid_file(self): + try: + if MIC_OSD_PID_FILE.exists(): + MIC_OSD_PID_FILE.unlink() + except Exception: + pass def show(self) -> bool: """Show the mic-osd overlay (instant via signal).""" @@ -207,10 +280,7 @@ def show(self) -> bool: return False try: - # For orphaned daemons, use the tracked PID - pid = self._orphaned_daemon_pid if self._orphaned_daemon_pid is not None else self._process.pid - os.kill(pid, signal.SIGUSR1) - return True + return self._signal_daemon(signal.SIGUSR1) except (ProcessLookupError, OSError): self._process = None self._orphaned_daemon_pid = None @@ -218,6 +288,8 @@ def show(self) -> bool: def hide(self): """Hide the mic-osd overlay (instant via signal).""" + self.clear_preview_text() + if self._process is None: return @@ -225,10 +297,8 @@ def hide(self): # (poll() returns exit code of dummy process, not the actual daemon) if self._orphaned_daemon_pid is not None: try: - # Verify the orphaned daemon PID is still alive - os.kill(self._orphaned_daemon_pid, 0) - # PID exists, send hide signal - os.kill(self._orphaned_daemon_pid, signal.SIGUSR2) + if not self._signal_daemon(signal.SIGUSR2): + return return except (ProcessLookupError, OSError) as e: # Orphaned daemon is dead, clean up and log warning @@ -236,11 +306,8 @@ def hide(self): self._process = None self._orphaned_daemon_pid = None # Clean up stale PID file - if MIC_OSD_PID_FILE.exists(): - try: - MIC_OSD_PID_FILE.unlink() - except Exception: - pass + self._unlink_pid_file() + self.clear_preview_text() return # For normal daemons, verify process is actually alive before signaling @@ -250,11 +317,8 @@ def hide(self): self._process = None self._orphaned_daemon_pid = None # Clean up stale PID file - if MIC_OSD_PID_FILE.exists(): - try: - MIC_OSD_PID_FILE.unlink() - except Exception: - pass + self._unlink_pid_file() + self.clear_preview_text() return # Verify process is actually alive before sending signal @@ -266,11 +330,8 @@ def hide(self): self._process = None self._orphaned_daemon_pid = None # Clean up stale PID file - if MIC_OSD_PID_FILE.exists(): - try: - MIC_OSD_PID_FILE.unlink() - except Exception: - pass + self._unlink_pid_file() + self.clear_preview_text() return # Process is alive, send hide signal @@ -280,6 +341,7 @@ def hide(self): print(f"[MIC-OSD] Failed to send SIGUSR2 to daemon (PID {self._process.pid}): {e}", flush=True) self._process = None self._orphaned_daemon_pid = None + self.clear_preview_text() def set_state(self, state: str): """ @@ -302,6 +364,101 @@ def clear_state(self): except Exception as e: print(f"[MIC-OSD] Failed to clear visualizer state: {e}", flush=True) + def set_preview_text(self, text: str): + """Set live transcript preview text.""" + text = (text or "").rstrip('\r\n') + + with self._preview_lock: + if not text: + self._cancel_pending_preview_flush() + self._preview_generation += 1 + self._write_preview_text_file("") + return + + now = time.monotonic() + elapsed = now - self._last_preview_write_at + if elapsed >= self.PREVIEW_WRITE_INTERVAL_SECONDS: + self._cancel_pending_preview_flush() + self._last_preview_write_at = now + self._write_preview_text_file(text) + return + + self._pending_preview_text = text + if self._preview_flush_timer is None: + generation = self._preview_generation + delay = self.PREVIEW_WRITE_INTERVAL_SECONDS - elapsed + self._preview_flush_timer = threading.Timer( + delay, + self._flush_pending_preview_text, + args=(generation,), + ) + self._preview_flush_timer.daemon = True + self._preview_flush_timer.start() + + def _cancel_pending_preview_flush(self): + self._pending_preview_text = None + if self._preview_flush_timer: + self._preview_flush_timer.cancel() + self._preview_flush_timer = None + + def _flush_pending_preview_text(self, generation: int): + with self._preview_lock: + if generation != self._preview_generation: + return + + text = self._pending_preview_text + self._pending_preview_text = None + self._preview_flush_timer = None + + if text is None: + return + + self._last_preview_write_at = time.monotonic() + self._write_preview_text_file(text) + + def _write_preview_text_file(self, text: str): + """Write live preview text to the runtime IPC file.""" + try: + TRANSCRIPT_PREVIEW_FILE.parent.mkdir(parents=True, exist_ok=True, mode=0o700) + try: + TRANSCRIPT_PREVIEW_FILE.parent.chmod(0o700) + except Exception: + pass + if text: + temp_path = TRANSCRIPT_PREVIEW_FILE.with_name( + f".{TRANSCRIPT_PREVIEW_FILE.name}.{os.getpid()}.{threading.get_ident()}.tmp" + ) + fd = os.open(temp_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + try: + with os.fdopen(fd, 'w', encoding='utf-8') as f: + f.write(text) + os.replace(temp_path, TRANSCRIPT_PREVIEW_FILE) + finally: + try: + if temp_path.exists(): + temp_path.unlink() + except Exception: + pass + try: + TRANSCRIPT_PREVIEW_FILE.chmod(0o600) + except Exception: + pass + elif TRANSCRIPT_PREVIEW_FILE.exists(): + TRANSCRIPT_PREVIEW_FILE.unlink() + except Exception as e: + print(f"[MIC-OSD] Failed to write transcript preview: {e}", flush=True) + + def clear_preview_text(self): + """Clear live transcript preview text.""" + try: + with self._preview_lock: + self._cancel_pending_preview_flush() + self._preview_generation += 1 + if TRANSCRIPT_PREVIEW_FILE.exists(): + TRANSCRIPT_PREVIEW_FILE.unlink() + except Exception as e: + print(f"[MIC-OSD] Failed to clear transcript preview: {e}", flush=True) + def stop(self): """Stop the daemon completely.""" if self._process is None: @@ -309,8 +466,8 @@ def stop(self): try: # For orphaned daemons, use the tracked PID - pid = self._orphaned_daemon_pid if self._orphaned_daemon_pid is not None else self._process.pid - os.kill(pid, signal.SIGTERM) + if not self._signal_daemon(signal.SIGTERM): + return # Only wait if it's a normal process (not orphaned) if self._orphaned_daemon_pid is None: self._process.wait(timeout=1.0) @@ -320,15 +477,13 @@ def stop(self): time.sleep(0.5) except subprocess.TimeoutExpired: pid = self._orphaned_daemon_pid if self._orphaned_daemon_pid is not None else self._process.pid - os.kill(pid, signal.SIGKILL) + if self._orphaned_daemon_pid is None or self._is_mic_osd_daemon_pid(pid): + os.kill(pid, signal.SIGKILL) except (ProcessLookupError, OSError): pass finally: self._process = None self._orphaned_daemon_pid = None # Clean up PID file - if MIC_OSD_PID_FILE.exists(): - try: - MIC_OSD_PID_FILE.unlink() - except Exception: - pass + self._unlink_pid_file() + self.clear_preview_text() diff --git a/lib/mic_osd/window.py b/lib/mic_osd/window.py index 03953d9a..708883f9 100644 --- a/lib/mic_osd/window.py +++ b/lib/mic_osd/window.py @@ -5,9 +5,12 @@ for displaying audio visualizations. """ +from __future__ import annotations + import gi gi.require_version('Gtk', '4.0') gi.require_version('Gdk', '4.0') +import cairo try: gi.require_version('Gtk4LayerShell', '1.0') @@ -16,6 +19,7 @@ LAYER_SHELL_AVAILABLE = False from gi.repository import Gtk, Gdk, GLib +from .theme import theme if LAYER_SHELL_AVAILABLE: from gi.repository import Gtk4LayerShell @@ -28,6 +32,9 @@ class OSDWindow(Gtk.Window): Uses gtk4-layer-shell to create a Wayland layer surface that appears above all windows at the bottom of the screen. """ + + PREVIEW_WORD_LIMIT = 14 + PREVIEW_TIMER_RESERVE = 58 def __init__(self, visualization, width=300, height=60): """ @@ -43,6 +50,8 @@ def __init__(self, visualization, width=300, height=60): self.visualization = visualization self._width = width self._height = height + self._preview_text = "" + self._visualizer_state = "recording" # Layer shell MUST be initialized immediately after window creation # and BEFORE any other window configuration @@ -111,6 +120,8 @@ def _on_draw(self, area, cr, width, height): # Draw the visualization self.visualization.draw(cr, width, height) + + self._draw_preview_text(cr, width, height) def update(self, level: float, samples=None): """ @@ -122,11 +133,98 @@ def update(self, level: float, samples=None): """ self.visualization.update(level, samples) self.drawing_area.queue_draw() + + def set_preview_text(self, text: str): + """Set compact transcript preview text.""" + self._preview_text = (text or "").rstrip('\r\n') + self.drawing_area.queue_draw() + + def set_visualizer_state(self, state: str): + """Track visualizer state so partial previews only render while recording.""" + self._visualizer_state = (state or "recording").lower() + self.drawing_area.queue_draw() def set_visualization(self, visualization): """Change the visualization type.""" self.visualization = visualization self.drawing_area.queue_draw() + + def _draw_preview_text(self, cr: cairo.Context, width: int, height: int): + if not self._preview_text or self._visualizer_state != "recording": + return + + padding = 14 + max_width = max(0, width - padding * 2 - self.PREVIEW_TIMER_RESERVE) + + cr.select_font_face("sans-serif", cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_NORMAL) + cr.set_font_size(12) + text = self._ellipsize(cr, self._recent_preview_text(self._preview_text), max_width) + if not text: + return + + extents = cr.text_extents(text) + text_width = self._text_extent(extents, 'width', 2) + text_height = self._text_extent(extents, 'height', 3) + + y = height - 10 + bg_padding_x = 7 + bg_padding_y = 4 + bg_x = padding - bg_padding_x + bg_y = y - text_height - bg_padding_y - 1 + bg_w = min(max_width + bg_padding_x * 2, text_width + bg_padding_x * 2) + bg_h = text_height + bg_padding_y * 2 + 2 + + bg = theme.background + cr.set_source_rgba(bg[0], bg[1], bg[2], 0.88) + cr.rectangle(bg_x, bg_y, bg_w, bg_h) + cr.fill() + + text_color = theme.text + cr.set_source_rgba(text_color[0], text_color[1], text_color[2], 0.96) + cr.move_to(padding, y) + cr.show_text(text) + + @staticmethod + def _text_extent(extents, field: str, index: int) -> float: + if hasattr(extents, field): + return getattr(extents, field) + return extents[index] + + def _text_width(self, cr: cairo.Context, text: str) -> float: + return self._text_extent(cr.text_extents(text), 'width', 2) + + def _text_height(self, cr: cairo.Context, text: str) -> float: + return self._text_extent(cr.text_extents(text), 'height', 3) + + def _recent_preview_text(self, text: str) -> str: + words = text.split() + if len(words) <= self.PREVIEW_WORD_LIMIT: + return " ".join(words) + return "... " + " ".join(words[-self.PREVIEW_WORD_LIMIT:]) + + def _ellipsize(self, cr: cairo.Context, text: str, max_width: float) -> str: + if self._text_width(cr, text) <= max_width: + return text + + prefix = "... " + if text.startswith(prefix): + text = text[len(prefix):] + + available = max_width - self._text_width(cr, prefix) + if available <= 0: + return "" + + low = 0 + high = len(text) + while low < high: + mid = (low + high + 1) // 2 + if self._text_width(cr, text[-mid:]) <= available: + low = mid + else: + high = mid - 1 + + truncated = text[-low:].lstrip() + return prefix + truncated if truncated else prefix def make_click_through(self): """ diff --git a/lib/src/cli_commands.py b/lib/src/cli_commands.py index 8ee43868..3d70c2ce 100644 --- a/lib/src/cli_commands.py +++ b/lib/src/cli_commands.py @@ -1655,7 +1655,7 @@ def setup_command(python_path: Optional[str] = None): print("Mic-OSD Visualization") print("="*60) print("\nShows a visual overlay during recording with animated bars") - print("and a pulsing indicator. Requires GTK4 and gtk4-layer-shell.") + print("and a pulsing indicator. Requires GTK4, PyCairo, and gtk4-layer-shell.") # Check if dependencies are available using service's Python mic_osd_available, mic_osd_reason = _check_mic_osd_availability() @@ -1667,13 +1667,13 @@ def setup_command(python_path: Optional[str] = None): else: # Provide distro-appropriate package names if Path('/etc/debian_version').exists(): - pkg_hint = "python3-gi gir1.2-gtk-4.0 gir1.2-gtk4layershell-1.0" + pkg_hint = "python3-gi python3-cairo gir1.2-gtk-4.0 gir1.2-gtk4layershell-1.0" elif Path('/etc/fedora-release').exists(): - pkg_hint = "python3-gobject gtk4 gtk4-layer-shell" - elif Path('/etc/os-release').exists() and 'suse' in Path('/etc/os-release').read_text().lower(): - pkg_hint = "python3-gobject typelib-1_0-Gtk-4_0 (gtk4-layer-shell from community repo)" + pkg_hint = "python3-gobject python3-cairo gtk4 gtk4-layer-shell" + elif Path('/etc/os-release').exists() and 'suse' in Path('/etc/os-release').read_text(encoding='utf-8').lower(): + pkg_hint = "python3-gobject python3-pycairo typelib-1_0-Gtk-4_0 (gtk4-layer-shell from community repo)" else: - pkg_hint = "python-gobject gtk4 gtk4-layer-shell (Arch naming)" + pkg_hint = "python-gobject python-cairo gtk4 gtk4-layer-shell (Arch naming)" print(f"\nDependencies not found. Install: {pkg_hint}") setup_mic_osd_choice = Confirm.ask("Enable mic-osd anyway (will work after installing deps)?", default=False) diff --git a/lib/src/config_manager.py b/lib/src/config_manager.py index 2d252381..fbc83da7 100644 --- a/lib/src/config_manager.py +++ b/lib/src/config_manager.py @@ -111,11 +111,12 @@ def __init__(self): 'rest_audio_format': 'wav', # Audio format for remote transcription # WebSocket realtime backend settings 'websocket_provider': None, # Provider identifier for credential lookup (e.g., 'openai', 'google', 'elevenlabs') - 'websocket_model': None, # Model identifier (e.g., 'gpt-realtime-mini-2025-12-15') + 'websocket_model': None, # Model identifier (e.g., 'gpt-realtime-whisper') 'websocket_url': None, # Optional: explicit WebSocket URL (auto-derived if None) 'realtime_timeout': 30, # Completion timeout (seconds) 'realtime_buffer_max_seconds': 5, # Max buffer before dropping chunks 'realtime_mode': 'transcribe', # 'transcribe' (speech-to-text) or 'converse' (voice-to-AI) + 'realtime_transcription_delay': 'low', # gpt-realtime-whisper delay: minimal|low|medium|high|xhigh # ONNX-ASR backend settings (CPU-optimized) 'onnx_asr_model': 'nemo-parakeet-tdt-0.6b-v3', # Best balance of speed and quality for CPU (includes punctuation) 'onnx_asr_quantization': 'int8', # INT8 quantization for CPU performance (or None for fp32) diff --git a/lib/src/paths.py b/lib/src/paths.py index 61d79127..ff830afa 100644 --- a/lib/src/paths.py +++ b/lib/src/paths.py @@ -1,16 +1,22 @@ """Centralized path constants for hyprwhspr with XDG Base Directory support""" from pathlib import Path import os +import tempfile # XDG Base Directory specification # https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html HOME = Path.home() XDG_CONFIG_HOME = Path(os.environ.get('XDG_CONFIG_HOME', HOME / '.config')) XDG_DATA_HOME = Path(os.environ.get('XDG_DATA_HOME', HOME / '.local' / 'share')) +XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR') # hyprwhspr directories CONFIG_DIR = XDG_CONFIG_HOME / 'hyprwhspr' DATA_DIR = XDG_DATA_HOME / 'hyprwhspr' +if XDG_RUNTIME_DIR: + RUNTIME_DIR = Path(XDG_RUNTIME_DIR) / 'hyprwhspr' +else: + RUNTIME_DIR = Path(tempfile.gettempdir()) / f"hyprwhspr-{os.getuid()}" # Configuration files CONFIG_FILE = CONFIG_DIR / 'config.json' @@ -27,6 +33,7 @@ SUSPEND_MARKER_FILE = CONFIG_DIR / '.suspend_marker' LOCK_FILE = CONFIG_DIR / 'hyprwhspr.lock' VISUALIZER_STATE_FILE = CONFIG_DIR / 'visualizer_state' # recording|paused|processing|error|success +TRANSCRIPT_PREVIEW_FILE = RUNTIME_DIR / 'transcript_preview' # Secure credential storage CREDENTIALS_DIR = DATA_DIR diff --git a/lib/src/provider_registry.py b/lib/src/provider_registry.py index 4257df4e..ce249ec6 100644 --- a/lib/src/provider_registry.py +++ b/lib/src/provider_registry.py @@ -7,6 +7,8 @@ # Provider registry with known cloud transcription providers +# Model entries marked hidden stay out of the REST setup model list while +# remaining selectable for provider-specific realtime setup flows. PROVIDERS: Dict[str, Dict] = { 'openai': { 'name': 'OpenAI', @@ -30,6 +32,13 @@ 'description': 'Updated version of the faster, lighter transcription model', 'body': {'model': 'gpt-4o-mini-transcribe-2025-12-15'} }, + 'gpt-realtime-whisper': { + 'name': 'GPT Realtime Whisper', + 'description': 'Recommended realtime streaming transcription model', + 'body': {'model': 'gpt-realtime-whisper'}, + 'realtime_model': True, + 'hidden': True + }, 'gpt-realtime-mini-2025-12-15': { 'name': 'GPT Realtime Mini (2025-12-15)', 'description': 'Low-latency realtime streaming', @@ -210,4 +219,3 @@ def validate_api_key(provider_id: str, api_key: str) -> Tuple[bool, Optional[str return False, "API key appears too short" return True, None - diff --git a/lib/src/realtime_client.py b/lib/src/realtime_client.py index 27d32b85..0aad0684 100644 --- a/lib/src/realtime_client.py +++ b/lib/src/realtime_client.py @@ -31,6 +31,8 @@ class RealtimeClient: """Generic WebSocket client for realtime transcription APIs""" + + VALID_TRANSCRIPTION_DELAYS = {'minimal', 'low', 'medium', 'high', 'xhigh'} def __init__(self, mode: str = 'transcribe'): """ @@ -46,6 +48,8 @@ def __init__(self, mode: str = 'transcribe'): self.instructions = None self.mode = mode self.language = None # Language code for transcription (None = auto-detect) + self.transcription_delay = 'low' + self.partial_transcript_callback = None # Threading self.lock = threading.Lock() @@ -65,6 +69,7 @@ def __init__(self, mode: str = 'transcribe'): # Transcription assembly (transcribe mode) self._transcript_generation = 0 self._committed_segments = [] + self._partial_transcript = "" # Track whether new audio has been queued since the last received transcript. # This helps avoid returning stale mid-stream text on stop. @@ -355,18 +360,30 @@ def _handle_event(self, event: dict): transcript = event.get('transcript', '') or '' transcript = transcript.strip() with self.lock: + if not transcript: + transcript = self._partial_transcript.strip() if transcript: self._committed_segments.append(transcript) self._transcript_generation += 1 self._last_transcript_audio_activity_id = self._audio_activity_id + self._partial_transcript = "" # Keep legacy fields coherent self.current_response_text = transcript self.response_complete = True + self._notify_partial_transcript("") self.response_event.set() print( f'[REALTIME] Transcription completed ({len(transcript)} chars)', flush=True, ) + + elif event_type == 'conversation.item.input_audio_transcription.delta': + delta = event.get('delta', '') or '' + if delta: + with self.lock: + self._partial_transcript += delta + partial = self._partial_transcript + self._notify_partial_transcript(partial) elif event_type == 'input_audio_buffer.committed': print(f'[REALTIME] Audio buffer committed', flush=True) @@ -378,6 +395,8 @@ def _handle_event(self, event: dict): # Reset commit tracking - new speech means we haven't committed THIS audio yet with self.lock: self._buffer_committed = False + self._partial_transcript = "" + self._notify_partial_transcript("") elif event_type == 'input_audio_buffer.speech_stopped': print(f'[REALTIME] Speech ended', flush=True) @@ -386,6 +405,9 @@ def _handle_event(self, event: dict): error = event.get('error', {}) error_message = error.get('message', 'Unknown error') print(f'[REALTIME] Server error: {error_message}', flush=True) + with self.lock: + self._partial_transcript = "" + self._notify_partial_transcript("") self.response_complete = True self.response_event.set() # Unblock waiting thread @@ -397,10 +419,15 @@ def _send_session_update(self): if self.mode == 'transcribe': # Transcription-only session # Build transcription config - omit language for auto-detect - transcription_config = {'model': 'gpt-4o-mini-transcribe'} + model = self.model or 'gpt-4o-mini-transcribe' + transcription_config = {'model': model} if self.language: transcription_config['language'] = self.language + is_realtime_whisper = model == 'gpt-realtime-whisper' + if is_realtime_whisper: + transcription_config['delay'] = self._validated_transcription_delay() + session_data = { 'type': 'transcription', 'audio': { @@ -410,7 +437,7 @@ def _send_session_update(self): 'rate': 24000 }, 'transcription': transcription_config, - 'turn_detection': { + 'turn_detection': None if is_realtime_whisper else { 'type': 'server_vad', 'threshold': 0.5, 'prefix_padding_ms': 300, @@ -456,6 +483,36 @@ def update_language(self, language: Optional[str]): self.language = language if self.connected: self._send_session_update() + + def set_transcription_delay(self, delay: str): + """Set gpt-realtime-whisper transcription delay.""" + self.transcription_delay = self._normalize_transcription_delay(delay) + if self.connected: + self._send_session_update() + + def set_partial_transcript_callback(self, callback): + """Register a callback for live transcription deltas.""" + self.partial_transcript_callback = callback + + def _normalize_transcription_delay(self, delay: str) -> str: + delay = (delay or 'low').strip().lower() + if delay not in self.VALID_TRANSCRIPTION_DELAYS: + print(f"[REALTIME] Invalid realtime_transcription_delay '{delay}', using 'low'", flush=True) + return 'low' + return delay + + def _validated_transcription_delay(self) -> str: + self.transcription_delay = self._normalize_transcription_delay(self.transcription_delay) + return self.transcription_delay + + def _notify_partial_transcript(self, text: str): + callback = self.partial_transcript_callback + if not callback: + return + try: + callback(text) + except Exception as e: + print(f'[REALTIME] Partial transcript callback failed: {e}', flush=True) def _attempt_reconnect(self): """Attempt to reconnect with exponential backoff""" @@ -503,12 +560,14 @@ def clear_audio_buffer(self): self.response_complete = False self._transcript_generation = 0 self._committed_segments = [] + self._partial_transcript = "" self._audio_activity_id = 0 self._last_transcript_audio_activity_id = 0 self._dropped_chunks = 0 self._last_drop_log_time = 0.0 self._queue_cond.notify_all() self.response_event.clear() + self._notify_partial_transcript("") except Exception as e: print(f'[REALTIME] Failed to clear buffer: {e}', flush=True) @@ -756,4 +815,3 @@ def close(self): def set_max_buffer_seconds(self, seconds: float): """Set maximum buffer size in seconds for backpressure handling""" self.max_buffer_seconds = max(1.0, seconds) # Minimum 1 second - diff --git a/lib/src/whisper_manager.py b/lib/src/whisper_manager.py index bfe0e979..e518675b 100644 --- a/lib/src/whisper_manager.py +++ b/lib/src/whisper_manager.py @@ -59,6 +59,7 @@ def __init__(self, config_manager: Optional[ConfigManager] = None): # Realtime WebSocket client self._realtime_client = None self._realtime_streaming_callback = None + self._realtime_partial_callback = None # Connection parameters used for reconnect-on-demand. # (Stored in-memory only; do not log API keys.) self._realtime_connect_params = None @@ -486,6 +487,9 @@ def _send_direct(audio_chunk: np.ndarray): # Initialize RealtimeClient with mode realtime_mode = self.config.get_setting('realtime_mode', 'transcribe') + if provider_id == 'openai' and model_id == 'gpt-realtime-whisper' and realtime_mode != 'transcribe': + print('ERROR: gpt-realtime-whisper is supported only with realtime_mode="transcribe"', flush=True) + return False self._realtime_client = RealtimeClient(mode=realtime_mode) # Get WebSocket URL @@ -517,6 +521,14 @@ def _send_direct(audio_chunk: np.ndarray): # Set language in realtime client (for session.update) self._realtime_client.language = language + + delay = self.config.get_setting('realtime_transcription_delay', 'low') + self._realtime_client.set_transcription_delay(delay) + if self._is_realtime_whisper_preview_enabled(provider_id, model_id, realtime_mode): + self._realtime_client.set_partial_transcript_callback(self._realtime_partial_callback) + else: + self._realtime_client.set_partial_transcript_callback(None) + self._clear_realtime_partial_preview() # Set buffer max seconds buffer_max = self.config.get_setting('realtime_buffer_max_seconds', 5) @@ -1555,6 +1567,36 @@ def get_realtime_streaming_callback(self) -> Optional[Callable]: return self._realtime_streaming_callback return None + def set_realtime_partial_callback(self, callback: Optional[Callable[[str], None]]) -> None: + """Set callback for realtime partial transcript previews.""" + self._realtime_partial_callback = callback + if self._realtime_client and hasattr(self._realtime_client, 'set_partial_transcript_callback'): + provider_id = self.config.get_setting('websocket_provider') + model_id = self.config.get_setting('websocket_model') + realtime_mode = self.config.get_setting('realtime_mode', 'transcribe') + if self._is_realtime_whisper_preview_enabled(provider_id, model_id, realtime_mode): + self._realtime_client.set_partial_transcript_callback(callback) + else: + self._realtime_client.set_partial_transcript_callback(None) + self._clear_realtime_partial_preview() + + def _is_realtime_whisper_preview_enabled(self, provider_id: str, model_id: str, realtime_mode: str) -> bool: + return ( + self.config.get_setting('mic_osd_enabled', True) + and provider_id == 'openai' + and model_id == 'gpt-realtime-whisper' + and realtime_mode == 'transcribe' + and self._realtime_partial_callback is not None + ) + + def _clear_realtime_partial_preview(self) -> None: + if not self._realtime_partial_callback: + return + try: + self._realtime_partial_callback("") + except Exception as e: + print(f'[REALTIME] Failed to clear partial transcript preview: {e}', flush=True) + def _reconnect_realtime_client(self) -> bool: """Reconnect realtime client using stored connect params.""" if not self._realtime_client: diff --git a/requirements.txt b/requirements.txt index 3160ab0b..8c68aaea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,8 +21,9 @@ dbus-python>=1.3.0 # CLI formatting and logging rich>=14.0.0 -# Mic-osd visualization (requires python-gobject system package) +# Mic-osd visualization (requires python-gobject/python-cairo system packages) PyGObject>=3.50.0 +pycairo>=1.25.0 # ElevenLabs integration elevenlabs>=2.40.0 diff --git a/scripts/install-deps.sh b/scripts/install-deps.sh index eac56e9d..692200fb 100755 --- a/scripts/install-deps.sh +++ b/scripts/install-deps.sh @@ -310,6 +310,7 @@ install_deps_apt() { python3-pyudev \ python3-dbus \ python3-gi \ + python3-cairo \ gir1.2-gtk-4.0 \ pipewire \ pipewire-pulse \ @@ -371,6 +372,7 @@ install_deps_dnf() { python3-pyudev \ python3-dbus \ python3-gobject \ + python3-cairo \ gtk4 \ gtk4-layer-shell \ pipewire \ @@ -404,6 +406,7 @@ install_deps_zypper() { python3-pulsectl \ python3-pyudev \ python3-gobject \ + python3-pycairo \ typelib-1_0-Gtk-4_0 \ pipewire \ pipewire-pulseaudio \ diff --git a/share/config.schema.json b/share/config.schema.json index f2af0dab..db9f6f83 100644 --- a/share/config.schema.json +++ b/share/config.schema.json @@ -219,7 +219,7 @@ "websocket_model": { "type": ["string", "null"], "default": null, - "description": "Model identifier for WebSocket backend (e.g., 'gpt-realtime-mini-2025-12-15', 'gemini-3.1-flash-live-preview')" + "description": "Model identifier for WebSocket backend (e.g., 'gpt-realtime-whisper', 'gemini-3.1-flash-live-preview')" }, "websocket_url": { "type": ["string", "null"], @@ -244,6 +244,12 @@ "default": "transcribe", "description": "'transcribe' for speech-to-text, 'converse' for voice-to-AI" }, + "realtime_transcription_delay": { + "type": "string", + "enum": ["minimal", "low", "medium", "high", "xhigh"], + "default": "low", + "description": "Latency/accuracy delay setting for OpenAI gpt-realtime-whisper transcription" + }, "onnx_asr_model": { "type": "string", "default": "nemo-parakeet-tdt-0.6b-v3", diff --git a/tests/test_main_startup_safety.py b/tests/test_main_startup_safety.py new file mode 100644 index 00000000..31248b84 --- /dev/null +++ b/tests/test_main_startup_safety.py @@ -0,0 +1,137 @@ +import ast +import unittest +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] + + +class MainStartupSafetyTests(unittest.TestCase): + def _find_function(self, tree, name): + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and node.name == name: + return node + return None + + def test_realtime_partial_callback_registration_is_guarded(self): + tree = ast.parse((ROOT / "lib" / "main.py").read_text(encoding="utf-8")) + + guarded = False + for node in ast.walk(tree): + if not isinstance(node, ast.If): + continue + test = node.test + if ( + isinstance(test, ast.Call) + and isinstance(test.func, ast.Name) + and test.func.id == "hasattr" + and len(test.args) == 2 + and isinstance(test.args[1], ast.Constant) + and test.args[1].value == "set_realtime_partial_callback" + ): + for child in ast.walk(node): + if ( + isinstance(child, ast.Call) + and isinstance(child.func, ast.Attribute) + and child.func.attr == "set_realtime_partial_callback" + ): + guarded = True + + self.assertTrue(guarded) + + def test_show_mic_osd_clears_preview_before_showing(self): + tree = ast.parse((ROOT / "lib" / "main.py").read_text(encoding="utf-8")) + + show_func = self._find_function(tree, "_show_mic_osd") + self.assertIsNotNone(show_func) + + clear_line = None + show_line = None + for node in ast.walk(show_func): + if not isinstance(node, ast.Call) or not isinstance(node.func, ast.Attribute): + continue + if node.func.attr == "clear_preview_text": + clear_line = node.lineno + elif node.func.attr == "show": + show_line = node.lineno + + self.assertIsNotNone(clear_line) + self.assertIsNotNone(show_line) + self.assertLess(clear_line, show_line) + + def test_stop_recording_clears_preview_before_processing_state(self): + tree = ast.parse((ROOT / "lib" / "main.py").read_text(encoding="utf-8")) + + stop_func = self._find_function(tree, "_stop_recording") + self.assertIsNotNone(stop_func) + + clear_line = None + processing_line = None + for node in ast.walk(stop_func): + if not isinstance(node, ast.Call): + continue + if isinstance(node.func, ast.Attribute) and node.func.attr == "_clear_mic_osd_preview_text": + clear_line = node.lineno + elif ( + isinstance(node.func, ast.Attribute) + and node.func.attr == "_set_visualizer_state" + and node.args + and isinstance(node.args[0], ast.Constant) + and node.args[0].value == "processing" + ): + processing_line = node.lineno + + self.assertIsNotNone(clear_line) + self.assertIsNotNone(processing_line) + self.assertLess(clear_line, processing_line) + + def test_reset_stale_state_scrubs_transcript_preview(self): + tree = ast.parse((ROOT / "lib" / "main.py").read_text(encoding="utf-8")) + + reset_func = self._find_function(tree, "_reset_stale_state") + self.assertIsNotNone(reset_func) + + references_preview_file = any( + isinstance(node, ast.Name) and node.id == "TRANSCRIPT_PREVIEW_FILE" + for node in ast.walk(reset_func) + ) + self.assertTrue(references_preview_file) + + def test_cancel_cleanup_clears_transcript_preview(self): + tree = ast.parse((ROOT / "lib" / "main.py").read_text(encoding="utf-8")) + + cleanup_func = self._find_function(tree, "_cleanup_recording_state") + self.assertIsNotNone(cleanup_func) + + clears_preview = any( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Attribute) + and node.func.attr == "_clear_mic_osd_preview_text" + for node in ast.walk(cleanup_func) + ) + self.assertTrue(clears_preview) + + def test_process_audio_finally_clears_transcript_preview(self): + tree = ast.parse((ROOT / "lib" / "main.py").read_text(encoding="utf-8")) + + process_func = self._find_function(tree, "_process_audio") + self.assertIsNotNone(process_func) + + clears_in_finally = False + for node in ast.walk(process_func): + if not isinstance(node, ast.Try): + continue + for finalizer_node in node.finalbody: + for child in ast.walk(finalizer_node): + if ( + isinstance(child, ast.Call) + and isinstance(child.func, ast.Attribute) + and child.func.attr == "_clear_mic_osd_preview_text" + ): + clears_in_finally = True + + self.assertTrue(clears_in_finally) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_mic_osd_main_fallback.py b/tests/test_mic_osd_main_fallback.py new file mode 100644 index 00000000..8607acf9 --- /dev/null +++ b/tests/test_mic_osd_main_fallback.py @@ -0,0 +1,147 @@ +import ast +import builtins +import sys +import types +import unittest +from pathlib import Path +from unittest import mock + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "lib")) + + +class MicOSDMainFallbackTests(unittest.TestCase): + def _stub_gtk_modules(self): + gtk_module = types.SimpleNamespace(Application=object) + glib_module = types.SimpleNamespace() + gi_module = types.SimpleNamespace(require_version=lambda *args: None) + gi_repository = types.SimpleNamespace(Gtk=gtk_module, GLib=glib_module) + return { + "gi": gi_module, + "gi.repository": gi_repository, + } + + def test_main_import_degrades_cleanly_when_cairo_missing(self): + for module_name in ( + "mic_osd.main", + "mic_osd.window", + "mic_osd.visualizations", + "mic_osd.visualizations.base", + "mic_osd.visualizations.waveform", + "mic_osd.visualizations.vu_meter", + ): + sys.modules.pop(module_name, None) + + original_import = builtins.__import__ + + def fake_import(name, *args, **kwargs): + if name == "cairo": + raise ImportError("no cairo") + return original_import(name, *args, **kwargs) + + with mock.patch.dict(sys.modules, self._stub_gtk_modules()), \ + mock.patch("builtins.__import__", side_effect=fake_import): + import mic_osd.main as main_module + + self.assertIsNotNone(main_module._MIC_OSD_IMPORT_ERROR) + with mock.patch.object(sys, "argv", ["mic-osd"]): + self.assertEqual(main_module.main(), 1) + + def test_transcript_preview_fallback_uses_runtime_dir(self): + tree = ast.parse((ROOT / "lib" / "mic_osd" / "main.py").read_text(encoding="utf-8")) + + uses_runtime = False + assigns_preview_from_runtime = False + for node in ast.walk(tree): + if ( + isinstance(node, ast.Assign) + and any(isinstance(target, ast.Name) and target.id == "runtime_dir" for target in node.targets) + ): + uses_runtime = True + if ( + isinstance(node, ast.Assign) + and any(isinstance(target, ast.Name) and target.id == "TRANSCRIPT_PREVIEW_FILE" for target in node.targets) + and isinstance(node.value, ast.BinOp) + and isinstance(node.value.left, ast.Name) + and node.value.left.id == "runtime_dir" + ): + assigns_preview_from_runtime = True + + self.assertTrue(uses_runtime) + self.assertTrue(assigns_preview_from_runtime) + + def test_hide_clears_preview_file_before_visibility_return(self): + tree = ast.parse((ROOT / "lib" / "mic_osd" / "main.py").read_text(encoding="utf-8")) + + hide_func = None + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and node.name == "_hide": + hide_func = node + break + + self.assertIsNotNone(hide_func) + + clear_line = None + visible_return_line = None + for node in ast.walk(hide_func): + if ( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Attribute) + and node.func.attr == "_clear_preview_file" + ): + clear_line = node.lineno + if ( + isinstance(node, ast.If) + and isinstance(node.test, ast.UnaryOp) + and isinstance(node.test.op, ast.Not) + and isinstance(node.test.operand, ast.Attribute) + and node.test.operand.attr == "visible" + ): + visible_return_line = node.lineno + + self.assertIsNotNone(clear_line) + self.assertIsNotNone(visible_return_line) + self.assertLess(clear_line, visible_return_line) + + def test_cleanup_uses_preview_file_helper(self): + tree = ast.parse((ROOT / "lib" / "mic_osd" / "main.py").read_text(encoding="utf-8")) + + cleanup_func = None + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and node.name == "_cleanup": + cleanup_func = node + break + + self.assertIsNotNone(cleanup_func) + + calls_helper = any( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Attribute) + and node.func.attr == "_clear_preview_file" + for node in ast.walk(cleanup_func) + ) + self.assertTrue(calls_helper) + + def test_state_poll_updates_window_visualizer_state(self): + tree = ast.parse((ROOT / "lib" / "mic_osd" / "main.py").read_text(encoding="utf-8")) + + poll_func = None + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and node.name == "_poll_state_file": + poll_func = node + break + + self.assertIsNotNone(poll_func) + + calls_window_state = any( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Attribute) + and node.func.attr == "set_visualizer_state" + for node in ast.walk(poll_func) + ) + self.assertTrue(calls_window_state) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_mic_osd_runner.py b/tests/test_mic_osd_runner.py new file mode 100644 index 00000000..8a8f7479 --- /dev/null +++ b/tests/test_mic_osd_runner.py @@ -0,0 +1,360 @@ +import sys +import tempfile +import types +import unittest +import builtins +from pathlib import Path +from unittest import mock + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "lib")) + +from mic_osd.runner import MicOSDRunner +import mic_osd.runner as runner_module + + +class FakeTimer: + def __init__(self, delay, callback, args=()): + self.delay = delay + self.callback = callback + self.args = args + self.daemon = False + self.started = False + self.cancelled = False + + def start(self): + self.started = True + + def cancel(self): + self.cancelled = True + + def fire(self): + self.callback(*self.args) + + +class FakeCairoContext: + def __init__(self): + self.shown_text = [] + + def select_font_face(self, *args): + pass + + def set_font_size(self, *args): + pass + + def text_extents(self, text): + return (0, 0, len(text) * 5, 10, 0, 0) + + def set_source_rgba(self, *args): + pass + + def rectangle(self, *args): + pass + + def fill(self): + pass + + def move_to(self, *args): + pass + + def show_text(self, text): + self.shown_text.append(text) + + +class MicOSDRunnerTests(unittest.TestCase): + def _import_window_with_stubs(self): + for module_name in ("mic_osd.window",): + sys.modules.pop(module_name, None) + + cairo_module = types.SimpleNamespace( + FONT_SLANT_NORMAL=0, + FONT_WEIGHT_NORMAL=0, + Context=object, + ) + gtk_module = types.SimpleNamespace( + Window=object, + DrawingArea=type( + "DrawingArea", + (), + { + "set_content_width": lambda self, value: None, + "set_content_height": lambda self, value: None, + "set_draw_func": lambda self, value: None, + }, + ), + CssProvider=object, + StyleContext=types.SimpleNamespace(add_provider_for_display=lambda *args: None), + STYLE_PROVIDER_PRIORITY_APPLICATION=0, + ) + gdk_module = types.SimpleNamespace(Display=types.SimpleNamespace(get_default=lambda: None)) + glib_module = types.SimpleNamespace(Error=Exception) + layer_shell_module = types.SimpleNamespace( + init_for_window=lambda *args: None, + set_namespace=lambda *args: None, + set_layer=lambda *args: None, + set_anchor=lambda *args: None, + set_margin=lambda *args: None, + set_exclusive_zone=lambda *args: None, + set_keyboard_mode=lambda *args: None, + Layer=types.SimpleNamespace(OVERLAY=0), + Edge=types.SimpleNamespace(BOTTOM=0, LEFT=1, RIGHT=2, TOP=3), + KeyboardMode=types.SimpleNamespace(NONE=0), + ) + gi_module = types.SimpleNamespace(require_version=lambda *args: None) + gi_repository = types.SimpleNamespace( + Gtk=gtk_module, + Gdk=gdk_module, + GLib=glib_module, + Gtk4LayerShell=layer_shell_module, + ) + + patcher = mock.patch.dict( + sys.modules, + { + "cairo": cairo_module, + "gi": gi_module, + "gi.repository": gi_repository, + }, + ) + with patcher: + import mic_osd.window as window_module + return window_module, cairo_module + + def test_preview_text_is_written_as_utf8_with_restrictive_permissions(self): + with tempfile.TemporaryDirectory() as tmp: + preview_file = Path(tmp) / "hyprwhspr" / "transcript_preview" + original = runner_module.TRANSCRIPT_PREVIEW_FILE + runner_module.TRANSCRIPT_PREVIEW_FILE = preview_file + try: + text = "cafe 東京" + MicOSDRunner().set_preview_text(text) + + self.assertEqual(preview_file.read_bytes(), text.encode("utf-8")) + self.assertEqual(preview_file.read_text(encoding="utf-8"), text) + self.assertEqual(preview_file.parent.stat().st_mode & 0o777, 0o700) + self.assertEqual(preview_file.stat().st_mode & 0o777, 0o600) + finally: + runner_module.TRANSCRIPT_PREVIEW_FILE = original + + def test_preview_text_write_uses_atomic_replace(self): + with tempfile.TemporaryDirectory() as tmp: + preview_file = Path(tmp) / "hyprwhspr" / "transcript_preview" + original_file = runner_module.TRANSCRIPT_PREVIEW_FILE + original_replace = runner_module.os.replace + replace_calls = [] + runner_module.TRANSCRIPT_PREVIEW_FILE = preview_file + try: + def replace_spy(src, dst): + replace_calls.append((Path(src), Path(dst))) + original_replace(src, dst) + + with mock.patch.object(runner_module.os, "replace", side_effect=replace_spy): + MicOSDRunner().set_preview_text("atomic preview") + + self.assertEqual(preview_file.read_text(encoding="utf-8"), "atomic preview") + self.assertEqual(len(replace_calls), 1) + temp_path, final_path = replace_calls[0] + self.assertNotEqual(temp_path, final_path) + self.assertEqual(final_path, preview_file) + self.assertTrue(temp_path.name.startswith(".transcript_preview.")) + self.assertFalse(temp_path.exists()) + finally: + runner_module.TRANSCRIPT_PREVIEW_FILE = original_file + + def test_window_module_imports_with_cairo_available(self): + window_module, cairo_module = self._import_window_with_stubs() + + self.assertIs(window_module.cairo, cairo_module) + + def test_is_available_returns_false_when_cairo_missing(self): + original_import = builtins.__import__ + + def fake_import(name, *args, **kwargs): + if name == "cairo": + raise ImportError("no cairo") + return original_import(name, *args, **kwargs) + + with mock.patch("builtins.__import__", side_effect=fake_import): + self.assertFalse(MicOSDRunner.is_available()) + + def test_pid_validation_rejects_unrelated_process_cmdline(self): + with mock.patch.object(runner_module.os, "kill", return_value=None), \ + mock.patch.object(runner_module.Path, "read_bytes", return_value=b"python\0not-osd\0"): + self.assertFalse(MicOSDRunner._is_mic_osd_daemon_pid(12345)) + + def test_pid_validation_accepts_mic_osd_daemon_cmdline(self): + cmdline = b"python3\0-c\0from mic_osd.main import main\nsys.argv = ['mic-osd', '--daemon']\0" + with mock.patch.object(runner_module.os, "kill", return_value=None), \ + mock.patch.object(runner_module.Path, "read_bytes", return_value=cmdline): + self.assertTrue(MicOSDRunner._is_mic_osd_daemon_pid(12345)) + + def test_pid_validation_accepts_daemon_environment_marker(self): + with mock.patch.object(runner_module.os, "kill", return_value=None), \ + mock.patch.object(runner_module.Path, "read_bytes", return_value=b"HYPRWHSPR_MIC_OSD_DAEMON=1\0"): + self.assertTrue(MicOSDRunner._is_mic_osd_daemon_pid(12345)) + + def test_ensure_daemon_reuses_python_c_daemon_after_restart(self): + with tempfile.TemporaryDirectory() as tmp: + pid_file = Path(tmp) / "mic_osd.pid" + pid_file.write_text("12345", encoding="utf-8") + original_pid_file = runner_module.MIC_OSD_PID_FILE + runner_module.MIC_OSD_PID_FILE = pid_file + + cmdline = b"python3\0-c\0from mic_osd.main import main\nsys.argv = ['mic-osd', '--daemon']\0" + try: + with mock.patch.object(runner_module.os, "kill", return_value=None), \ + mock.patch.object(runner_module.Path, "read_bytes", side_effect=[b"", cmdline]), \ + mock.patch.object(runner_module.subprocess, "Popen", return_value=types.SimpleNamespace()) as popen: + runner = MicOSDRunner() + + self.assertTrue(runner._ensure_daemon()) + + popen.assert_called_once() + self.assertEqual(popen.call_args.args[0], ['true']) + self.assertEqual(runner._orphaned_daemon_pid, 12345) + self.assertTrue(pid_file.exists()) + finally: + runner_module.MIC_OSD_PID_FILE = original_pid_file + + def test_orphaned_pid_signal_revalidates_before_sending(self): + runner = MicOSDRunner() + runner._process = types.SimpleNamespace(pid=999, poll=lambda: None) + runner._orphaned_daemon_pid = 12345 + + with mock.patch.object(MicOSDRunner, "is_available", return_value=True), \ + mock.patch.object(runner, "_is_mic_osd_daemon_pid", return_value=False), \ + mock.patch.object(runner_module.os, "kill") as kill: + self.assertFalse(runner.show()) + + kill.assert_not_called() + self.assertIsNone(runner._process) + self.assertIsNone(runner._orphaned_daemon_pid) + + def test_text_extents_support_tuple_and_attribute_shapes(self): + window_module, _ = self._import_window_with_stubs() + window = object.__new__(window_module.OSDWindow) + + class TupleContext: + def text_extents(self, text): + return (0, 0, len(text) * 5, 10, 0, 0) + + class ObjectContext: + def text_extents(self, text): + return types.SimpleNamespace(width=len(text) * 5, height=10) + + self.assertEqual(window._text_width(TupleContext(), "abcd"), 20) + self.assertEqual(window._text_height(TupleContext(), "abcd"), 10) + self.assertEqual(window._text_width(ObjectContext(), "abcd"), 20) + self.assertEqual(window._text_height(ObjectContext(), "abcd"), 10) + + def test_preview_text_draws_only_while_recording(self): + window_module, _ = self._import_window_with_stubs() + window = object.__new__(window_module.OSDWindow) + window._preview_text = "live partial" + window._visualizer_state = "processing" + + processing_cr = FakeCairoContext() + window._draw_preview_text(processing_cr, 400, 68) + + window._visualizer_state = "recording" + recording_cr = FakeCairoContext() + window._draw_preview_text(recording_cr, 400, 68) + + self.assertEqual(processing_cr.shown_text, []) + self.assertEqual(recording_cr.shown_text, ["live partial"]) + + def test_preview_text_preserves_spaces_but_trims_newlines(self): + with tempfile.TemporaryDirectory() as tmp: + preview_file = Path(tmp) / "hyprwhspr" / "transcript_preview" + original = runner_module.TRANSCRIPT_PREVIEW_FILE + runner_module.TRANSCRIPT_PREVIEW_FILE = preview_file + try: + MicOSDRunner().set_preview_text(" cafe 東京 \n") + + self.assertEqual(preview_file.read_text(encoding="utf-8"), " cafe 東京 ") + finally: + runner_module.TRANSCRIPT_PREVIEW_FILE = original + + def test_clear_preview_text_removes_stale_runtime_file(self): + with tempfile.TemporaryDirectory() as tmp: + preview_file = Path(tmp) / "hyprwhspr" / "transcript_preview" + original = runner_module.TRANSCRIPT_PREVIEW_FILE + runner_module.TRANSCRIPT_PREVIEW_FILE = preview_file + try: + MicOSDRunner().set_preview_text("stale preview") + self.assertTrue(preview_file.exists()) + + MicOSDRunner().clear_preview_text() + + self.assertFalse(preview_file.exists()) + finally: + runner_module.TRANSCRIPT_PREVIEW_FILE = original + + def test_hide_cancels_pending_preview_flush(self): + with tempfile.TemporaryDirectory() as tmp: + preview_file = Path(tmp) / "hyprwhspr" / "transcript_preview" + original_file = runner_module.TRANSCRIPT_PREVIEW_FILE + original_interval = MicOSDRunner.PREVIEW_WRITE_INTERVAL_SECONDS + timers = [] + runner_module.TRANSCRIPT_PREVIEW_FILE = preview_file + MicOSDRunner.PREVIEW_WRITE_INTERVAL_SECONDS = 60.0 + runner = MicOSDRunner() + try: + def make_timer(*args, **kwargs): + timer = FakeTimer(*args, **kwargs) + timers.append(timer) + return timer + + with mock.patch.object(runner_module.threading, "Timer", side_effect=make_timer): + runner.set_preview_text("first") + runner.set_preview_text("stale pending") + + runner.hide() + timers[0].fire() + + self.assertTrue(timers[0].cancelled) + self.assertFalse(preview_file.exists()) + finally: + runner_module.TRANSCRIPT_PREVIEW_FILE = original_file + MicOSDRunner.PREVIEW_WRITE_INTERVAL_SECONDS = original_interval + + def test_high_frequency_preview_updates_are_coalesced(self): + with tempfile.TemporaryDirectory() as tmp: + preview_file = Path(tmp) / "hyprwhspr" / "transcript_preview" + original_file = runner_module.TRANSCRIPT_PREVIEW_FILE + original_interval = MicOSDRunner.PREVIEW_WRITE_INTERVAL_SECONDS + timers = [] + runner_module.TRANSCRIPT_PREVIEW_FILE = preview_file + MicOSDRunner.PREVIEW_WRITE_INTERVAL_SECONDS = 60.0 + runner = MicOSDRunner() + try: + def make_timer(*args, **kwargs): + timer = FakeTimer(*args, **kwargs) + timers.append(timer) + return timer + + with mock.patch.object(runner_module.threading, "Timer", side_effect=make_timer): + runner.set_preview_text("first") + runner.set_preview_text("second") + runner.set_preview_text("third") + + self.assertEqual(preview_file.read_text(encoding="utf-8"), "first") + self.assertEqual(len(timers), 1) + self.assertTrue(timers[0].started) + + timers[0].fire() + + self.assertEqual(preview_file.read_text(encoding="utf-8"), "third") + finally: + runner_module.TRANSCRIPT_PREVIEW_FILE = original_file + MicOSDRunner.PREVIEW_WRITE_INTERVAL_SECONDS = original_interval + + def test_requirements_include_pycairo_for_service_environment(self): + requirements = (ROOT / "requirements.txt").read_text(encoding="utf-8") + + self.assertIn("pycairo", requirements.lower()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_realtime_client.py b/tests/test_realtime_client.py new file mode 100644 index 00000000..ecca146d --- /dev/null +++ b/tests/test_realtime_client.py @@ -0,0 +1,155 @@ +import json +import sys +import types +import unittest +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "lib" / "src")) +sys.modules.setdefault("websocket", types.SimpleNamespace(WebSocketApp=object)) + +from realtime_client import RealtimeClient + + +class FakeWebSocket: + def __init__(self): + self.sent = [] + + def send(self, payload): + self.sent.append(json.loads(payload)) + + +class RealtimeClientTests(unittest.TestCase): + def _client_with_ws(self, model="gpt-realtime-whisper"): + client = RealtimeClient(mode="transcribe") + client.connected = True + client.ws = FakeWebSocket() + client.model = model + return client + + def test_gpt_realtime_whisper_session_payload(self): + client = self._client_with_ws() + client.language = "en" + client.set_transcription_delay("minimal") + client.ws.sent.clear() + + client._send_session_update() + + payload = client.ws.sent[-1] + session = payload["session"] + audio_input = session["audio"]["input"] + + self.assertEqual(payload["type"], "session.update") + self.assertEqual(session["type"], "transcription") + self.assertEqual(audio_input["format"], {"type": "audio/pcm", "rate": 24000}) + self.assertEqual(audio_input["turn_detection"], None) + self.assertEqual( + audio_input["transcription"], + { + "model": "gpt-realtime-whisper", + "language": "en", + "delay": "minimal", + }, + ) + + def test_non_whisper_transcription_session_keeps_vad_and_configured_model(self): + client = self._client_with_ws("gpt-4o-mini-transcribe") + client.language = "fr" + + client._send_session_update() + + audio_input = client.ws.sent[-1]["session"]["audio"]["input"] + self.assertEqual(audio_input["transcription"], {"model": "gpt-4o-mini-transcribe", "language": "fr"}) + self.assertEqual(audio_input["turn_detection"]["type"], "server_vad") + self.assertNotIn("delay", audio_input["transcription"]) + + def test_invalid_delay_falls_back_to_low(self): + client = self._client_with_ws() + client.set_transcription_delay("fastest") + client.ws.sent.clear() + + client._send_session_update() + + transcription = client.ws.sent[-1]["session"]["audio"]["input"]["transcription"] + self.assertEqual(transcription["delay"], "low") + + def test_delta_updates_preview_and_completed_is_final_text(self): + previews = [] + client = self._client_with_ws() + client.set_partial_transcript_callback(previews.append) + + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "hello"}) + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": " wor"}) + client._handle_event({"type": "conversation.item.input_audio_transcription.completed", "transcript": "hello world"}) + + self.assertEqual(previews, ["hello", "hello wor", ""]) + self.assertEqual(client.commit_and_get_text(timeout=0.1), "hello world") + + def test_completed_without_transcript_uses_accumulated_delta_text(self): + previews = [] + client = self._client_with_ws() + client.set_partial_transcript_callback(previews.append) + + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "delta"}) + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": " only"}) + client._handle_event({"type": "conversation.item.input_audio_transcription.completed"}) + + self.assertEqual(previews, ["delta", "delta only", ""]) + self.assertEqual(client.commit_and_get_text(timeout=0.1), "delta only") + + def test_unicode_delta_text_is_preserved(self): + previews = [] + client = self._client_with_ws() + client.set_partial_transcript_callback(previews.append) + + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "cafe "}) + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "東京"}) + client._handle_event({"type": "conversation.item.input_audio_transcription.completed"}) + + self.assertEqual(previews, ["cafe ", "cafe 東京", ""]) + self.assertEqual(client.commit_and_get_text(timeout=0.1), "cafe 東京") + + def test_partial_preview_preserves_trailing_spaces(self): + previews = [] + client = self._client_with_ws() + client.set_partial_transcript_callback(previews.append) + + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "hello "}) + + self.assertEqual(previews, ["hello "]) + + def test_speech_started_clears_stale_partial(self): + previews = [] + client = self._client_with_ws() + client.set_partial_transcript_callback(previews.append) + + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "first segment"}) + client._handle_event({"type": "input_audio_buffer.speech_started"}) + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "next"}) + + self.assertEqual(client._partial_transcript, "next") + self.assertEqual(previews, ["first segment", "", "next"]) + + def test_clear_audio_buffer_clears_stale_partial(self): + previews = [] + client = self._client_with_ws() + client.set_partial_transcript_callback(previews.append) + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "stale"}) + + client.clear_audio_buffer() + + self.assertEqual(client._partial_transcript, "") + self.assertEqual(previews[-1], "") + self.assertEqual(client.ws.sent[-1]["type"], "input_audio_buffer.clear") + + def test_schema_declares_realtime_transcription_delay_values(self): + schema = json.loads((ROOT / "share" / "config.schema.json").read_text()) + delay_schema = schema["properties"]["realtime_transcription_delay"] + + self.assertEqual(delay_schema["default"], "low") + self.assertEqual(delay_schema["enum"], ["minimal", "low", "medium", "high", "xhigh"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_realtime_preview_integration.py b/tests/test_realtime_preview_integration.py new file mode 100644 index 00000000..39a0d953 --- /dev/null +++ b/tests/test_realtime_preview_integration.py @@ -0,0 +1,93 @@ +import json +import sys +import tempfile +import types +import unittest +from pathlib import Path +from unittest import mock + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "lib" / "src")) +sys.path.insert(0, str(ROOT / "lib")) +sys.modules.setdefault("websocket", types.SimpleNamespace(WebSocketApp=object)) + +from realtime_client import RealtimeClient +from whisper_manager import WhisperManager +import mic_osd.runner as runner_module + + +class FakeConfig: + def __init__(self, values): + self.values = values + + def get_setting(self, key, default=None): + return self.values.get(key, default) + + +class FakeWebSocket: + def __init__(self): + self.sent = [] + + def send(self, payload): + self.sent.append(json.loads(payload)) + + +class RealtimePreviewIntegrationTests(unittest.TestCase): + def test_openai_realtime_whisper_delta_updates_preview_file(self): + config = FakeConfig( + { + "transcription_backend": "realtime-ws", + "websocket_provider": "openai", + "websocket_model": "gpt-realtime-whisper", + "realtime_mode": "transcribe", + "mic_osd_enabled": True, + } + ) + manager = WhisperManager(config_manager=config) + client = RealtimeClient(mode="transcribe") + client.connected = True + client.ws = FakeWebSocket() + client.model = "gpt-realtime-whisper" + manager._realtime_client = client + + with tempfile.TemporaryDirectory() as tmp: + preview_file = Path(tmp) / "hyprwhspr" / "transcript_preview" + original = runner_module.TRANSCRIPT_PREVIEW_FILE + runner_module.TRANSCRIPT_PREVIEW_FILE = preview_file + try: + runner = runner_module.MicOSDRunner() + manager.set_realtime_partial_callback(runner.set_preview_text) + + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "hello "}) + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "world"}) + runner._last_preview_write_at -= runner.PREVIEW_WRITE_INTERVAL_SECONDS + client._handle_event({"type": "conversation.item.input_audio_transcription.delta", "delta": "!"}) + + self.assertEqual(preview_file.read_text(encoding="utf-8"), "hello world!") + finally: + runner_module.TRANSCRIPT_PREVIEW_FILE = original + + def test_non_matching_realtime_config_clears_preview_callback(self): + config = FakeConfig( + { + "transcription_backend": "realtime-ws", + "websocket_provider": "openai", + "websocket_model": "gpt-4o-mini-transcribe", + "realtime_mode": "transcribe", + "mic_osd_enabled": True, + } + ) + manager = WhisperManager(config_manager=config) + client = RealtimeClient(mode="transcribe") + manager._realtime_client = client + + callback = mock.Mock() + manager.set_realtime_partial_callback(callback) + + self.assertIsNone(client.partial_transcript_callback) + callback.assert_called_once_with("") + + +if __name__ == "__main__": + unittest.main()