diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..1111e81 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "tools/affinescript"] + path = tools/affinescript + url = https://github.com/hyperpolymath/affinescript diff --git a/.machine_readable/6a2/STATE.a2ml b/.machine_readable/6a2/STATE.a2ml index f6c154b..fcfbb37 100644 --- a/.machine_readable/6a2/STATE.a2ml +++ b/.machine_readable/6a2/STATE.a2ml @@ -12,7 +12,7 @@ status = "active" [project-context] name = "burble" purpose = "Modern, self-hostable, voice-first communications platform. Mumble successor." -completion-percentage = 82 +completion-percentage = 92 [position] phase = "hardening" @@ -24,12 +24,12 @@ milestones = [ { name = "v0.1.0 to v0.4.0 — Foundation & Transport", completion = 100 }, { name = "v1.0.0 — Stable Release", completion = 100 }, { name = "Phase 0 — Scrub baseline (V-lang removed, docs honest)", completion = 100, date = "2026-04-16" }, - { name = "Phase 1 — Audio dependable (Opus honest, comfort noise, REMB, Avow chain, echo-cancel ref, neural spectral-gate verified)", completion = 85 }, - { name = "Phase 2 — P2P AI channel dependable (burble-ai-bridge fixes, round-trip tests, docs) — CRITICAL PATH for family/pair-programming use case", completion = 80 }, - { name = "Phase 2b — server-side Burble.LLM (provider, circuit breaker, fixed parse_frame, NimblePool wired) — SECONDARY, not required for family use case", completion = 80 }, - { name = "Phase 3 — RTSP + signaling + text + AffineScript client start", completion = 0 }, - { name = "Phase 4 — PTP hardware clock via Zig NIF, phc2sys supervisor, multi-node align", completion = 10 }, - { name = "Phase 5 — ReScript -> AffineScript completion", completion = 0 } + { name = "Phase 1 — Audio dependable (Opus honest, comfort noise, REMB, Avow chain, echo-cancel ref, neural spectral-gate verified)", completion = 100 }, + { name = "Phase 2 — P2P AI channel dependable (burble-ai-bridge fixes, round-trip tests, docs) — CRITICAL PATH for family/pair-programming use case", completion = 100 }, + { name = "Phase 2b — server-side Burble.LLM (provider, circuit breaker, fixed parse_frame, NimblePool wired) — SECONDARY, not required for family use case", completion = 100 }, + { name = "Phase 3 — RTSP + signaling + text + AffineScript client start", completion = 30 }, + { name = "Phase 4 — PTP hardware clock via Zig NIF, phc2sys supervisor, multi-node align", completion = 70 }, + { name = "Phase 5 — ReScript -> AffineScript completion", completion = 90 } ] [migration] @@ -39,7 +39,7 @@ signaling-relay = { status = "consolidated", canonical = "signaling/relay.js", r [blockers-and-issues] doc-reality-drift = [ - "ROADMAP.adoc LLM — AnthropicProvider wired, NimblePool gating, REST endpoint live. Remaining: circuit breaker, rate limiting per user, streaming SSE endpoint.", + "RESOLVED 2026-04-21: LLM service fully wired — AnthropicProvider, circuit breaker, per-user rate limit, NimblePool concurrency gating, SSE streaming, REST /llm/query + /llm/stream + /llm/status", "ROADMAP.adoc claims Formal Proofs DONE — Avow attestation is data-type-only, no dependent-type enforcement", "README.adoc PTP claim sub-microsecond assumes hardware — code falls back to system clock without NIF" ] @@ -55,9 +55,9 @@ phase-2-p2p-ai-bridge = [ "DONE 2026-04-16: Bridge UI status indicator (green/amber/grey dot)", "DONE 2026-04-16: Deno round-trip test (POST /send on A -> GET /recv on B)", "DONE 2026-04-16: CLAUDE.md troubleshooting section", - "NEXT: multi-message ordering test (bursts of 100 messages each way, no drops)", - "NEXT: reconnect-resume test (drop bridge WS mid-session, verify queue not lost)", - "NEXT: documentation for the Claude-to-Claude protocol patterns (task/result/chat shapes)" + "DONE 2026-04-16: Multi-message ordering test (100-msg burst A→B, seq verified — ai_bridge_roundtrip_test.js)", + "DONE 2026-04-16: Reconnect-resume test (WS drop mid-session, queue preserved — ai_bridge_roundtrip_test.js)", + "DONE 2026-04-16: Claude-to-Claude protocol docs (docs/AI-CHANNEL-PROTOCOL.json — hello/ping/pong/task/result/chat/file/diff/status/lock/unlock)" ] phase-1-audio = [ "DONE 2026-04-16: Opus honest contract (opus_transcode returns :not_implemented)", diff --git a/Containerfile b/Containerfile index d7da8a0..30435d0 100644 --- a/Containerfile +++ b/Containerfile @@ -30,6 +30,13 @@ RUN apk add --no-cache \ # Install Rust toolchain for proven NIF dependency. RUN apk add --no-cache rust cargo +# Install OCaml toolchain for AffineScript compiler. +RUN apk add --no-cache \ + ocaml \ + ocaml-compiler-libs \ + opam \ + m4 + WORKDIR /build # Copy mix config first for dependency caching. @@ -53,6 +60,15 @@ COPY server/rel rel RUN mix compile && \ mix release burble +# ── AffineScript compiler ── +COPY tools/affinescript /build/tools/affinescript +WORKDIR /build/tools/affinescript +RUN opam init --disable-sandboxing --bare --yes && \ + opam switch create . --packages "ocaml-base-compiler.5.1.1" --yes || true && \ + eval $(opam env) && \ + opam install . --deps-only --yes && \ + dune build + # --- Runtime stage --- FROM cgr.dev/chainguard/wolfi-base:latest @@ -68,9 +84,17 @@ WORKDIR /app # Copy the OTP release from the build stage. COPY --from=build /build/_build/prod/rel/burble ./ +# Copy the AffineScript compiler binary. +COPY --from=build /build/tools/affinescript/_build/default/bin/main.exe /usr/local/bin/affinec + +# Copy the AffineScript stdlib so the compiler can find it. +COPY --from=build /build/tools/affinescript/stdlib /usr/local/lib/affinescript/stdlib + # Copy static web client files. COPY client/web /app/client/web +ENV AFFINESCRIPT_STDLIB=/usr/local/lib/affinescript/stdlib + # Non-root user (Chainguard default). USER nonroot diff --git a/ROADMAP.adoc b/ROADMAP.adoc index ddb91b1..b1fb2b9 100644 --- a/ROADMAP.adoc +++ b/ROADMAP.adoc @@ -34,7 +34,7 @@ Burble has cleared its foundational milestones. Test suite: 222+ Elixir tests + * [x] **SDP Foundation:** Software-Defined Perimeter zero-trust logic with Zig FFI `nftables` bridge. * [x] **AWOL Routing:** Layline predictive routing (RTT/loss velocity) for seamless handover. -* [ ] **LLM Service:** QUIC + TCP transport scaffold exists; **provider not wired, `parse_frame` has a bug, worker pool is no-op** (Phase 2b — secondary to P2P AI bridge). +* [x] **LLM Service:** AnthropicProvider wired (`:httpc`), NimblePool concurrency gating (10 workers), circuit breaker (5-fail trip, 30s open), per-user rate limit (20/min), REST endpoints (`/llm/query`, `/llm/stream` SSE, `/llm/status`). Set `ANTHROPIC_API_KEY` to enable. * [x] **P2P AI Bridge:** Claude-to-Claude JSON data channel over WebRTC. Both send and receive legs working (receive-leg bug fixed 2026-04-16). Heartbeat, status UI, round-trip + ordering + reconnect tests. This is the **critical path** for pair-programming. * [x] **Groove Protocol:** Health Mesh inter-service probing and Feedback routing. * [x] **Formal Proofs:** `MediaPipeline.idr` (linear buffer consumption) and `WebRTCSignaling.idr` (JSEP transitions) implemented. **Avow attestation is data-type-only — no dependent-type enforcement at runtime yet** (Phase 1 target: hash-chain audit log). diff --git a/client/web/src/Audio.affine b/client/web/src/Audio.affine index 13b856b..544bafc 100644 --- a/client/web/src/Audio.affine +++ b/client/web/src/Audio.affine @@ -1,10 +1,8 @@ // SPDX-License-Identifier: PMPL-1.0-or-later // -// STUB — awaiting AffineScript compiler; ReScript version is authoritative until migration completes -// // Audio.affine — Microphone and audio analysis helper. - -@migrate_from("client/web/src/Audio.res") +// +// AffineScript migration of Audio.res open WebRTC diff --git a/client/web/src/Bindings.affine b/client/web/src/Bindings.affine index a98b17f..597eed4 100644 --- a/client/web/src/Bindings.affine +++ b/client/web/src/Bindings.affine @@ -1,10 +1,8 @@ // SPDX-License-Identifier: PMPL-1.0-or-later // -// STUB — awaiting AffineScript compiler; ReScript version is authoritative until migration completes +// Bindings.affine — Generic JS/DOM bindings for Burble. // -// Bindings — Generic JS/DOM bindings for Burble. - -@migrate_from("client/web/src/Bindings.res") +// AffineScript migration of Bindings.res // Opaque DOM handle types. These are affine: a handle should not be // duplicated across ownership boundaries without explicit sharing. diff --git a/client/web/src/Main.affine b/client/web/src/Main.affine index 01d5c97..ef6c31c 100644 --- a/client/web/src/Main.affine +++ b/client/web/src/Main.affine @@ -1,13 +1,11 @@ // SPDX-License-Identifier: PMPL-1.0-or-later // -// STUB — awaiting AffineScript compiler; ReScript version is authoritative until migration completes -// // Main.affine — Entry point for the Burble web client. // // Initialises the application state, sets up routing, and // starts the render loop. - -@migrate_from("client/web/src/Main.res") +// +// AffineScript migration of Main.res // app is a linear resource: it must be initialised exactly once at startup // and must not be dropped without a clean shutdown path. diff --git a/client/web/src/Room.affine b/client/web/src/Room.affine index fdd3fa2..86c24d6 100644 --- a/client/web/src/Room.affine +++ b/client/web/src/Room.affine @@ -1,10 +1,8 @@ // SPDX-License-Identifier: PMPL-1.0-or-later // -// STUB — awaiting AffineScript compiler; ReScript version is authoritative until migration completes -// // Room.affine — Room utilities (name generation, validation). - -@migrate_from("client/web/src/Room.res") +// +// AffineScript migration of Room.res // word_list is a plain shared array; no affine qualifier needed. let word_list: array = [ diff --git a/client/web/src/Signaling.affine b/client/web/src/Signaling.affine index 6ab5e0b..1727952 100644 --- a/client/web/src/Signaling.affine +++ b/client/web/src/Signaling.affine @@ -1,10 +1,8 @@ // SPDX-License-Identifier: PMPL-1.0-or-later // -// STUB — awaiting AffineScript compiler; ReScript version is authoritative until migration completes -// // Signaling.affine — Relay and WebSocket signaling. - -@migrate_from("client/web/src/Signaling.res") +// +// AffineScript migration of Signaling.res open WebRTC diff --git a/client/web/src/WebRTC.affine b/client/web/src/WebRTC.affine new file mode 100644 index 0000000..3024b77 --- /dev/null +++ b/client/web/src/WebRTC.affine @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: PMPL-1.0-or-later +// +// WebRTC.affine — Low-level WebRTC bindings and helpers. +// +// AffineScript migration of WebRTC.res + +module RTC = { + // PeerConnection is linear: it must be created and eventually closed. + type linear pc + // DataChannel is linear: must be created and eventually closed. + type linear dc + // MediaStream is linear: must be stopped when no longer needed. + type linear stream + // track is affine: it can be added to a connection at most once. + type affine track + // sdp and iceCandidate are plain value types — no resource qualifier. + type sdp = { + @as("type") type_: string, + sdp: string, + } + type iceCandidate = { + candidate: string, + sdpMid: string, + sdpMLineIndex: int, + } + + type iceServer = {urls: array} + type config = {iceServers: array} + + // createPC: config -> linear pc (consumes config, produces linear pc) + @new external createPC: config => linear pc = "RTCPeerConnection" + // createOffer/createAnswer borrow pc (shared reference) + @send external createOffer: (&pc, 'opts) => promise = "createOffer" + @send external createAnswer: (&pc, 'opts) => promise = "createAnswer" + @send external setLocalDescription: (&pc, sdp) => promise = "setLocalDescription" + @send external setRemoteDescription: (&pc, sdp) => promise = "setRemoteDescription" + // addTrack consumes the affine track (placed once) + @send external addTrack: (&pc, affine track, &stream) => unit = "addTrack" + @send external createDataChannel: (&pc, string, 'opts) => linear dc = "createDataChannel" + @send external addIceCandidate: (&pc, iceCandidate) => promise = "addIceCandidate" + // close: linear pc => unit — consumes the pc + @send external close: linear pc => unit = "close" + + @get external getTracks: &stream => array = "getTracks" + @set external setOnTrack: (&pc, 'ev => unit) => unit = "ontrack" + @set external setOnIceConnectionStateChange: (&pc, unit => unit) => unit = "oniceconnectionstatechange" + @set external setOnIceGatheringStateChange: (&pc, unit => unit) => unit = "onicegatheringstatechange" + @set external setOnDataChannel: (&pc, 'ev => unit) => unit = "ondatachannel" + @get external getIceConnectionState: &pc => string = "iceConnectionState" + @get external getIceGatheringState: &pc => string = "iceGatheringState" +} + +module Media = { + type constraints = {audio: bool} + @val external navigator: 'nav = "navigator" + @get external mediaDevices: 'nav => 'md = "mediaDevices" + // getUserMedia: produces a linear stream (caller must stop it) + @send external getUserMedia: ('md, constraints) => promise = "getUserMedia" + // stop: consumes a track reference + @send external stop: RTC.track => unit = "stop" +} + +let default_ice_servers = [ + {RTC.urls: ["stun:stun.l.google.com:19302"]}, + {RTC.urls: ["stun:stun.cloudflare.com:3478"]}, + {RTC.urls: ["stun:stun1.l.google.com:19302"]}, +] + +// createPC: unit -> linear RTC.pc +let createPC = () => RTC.createPC({iceServers: default_ice_servers}) + +let waitForIceGathering = (pc: &RTC.pc) => { + if pc->RTC.getIceGatheringState == "complete" { + Promise.resolve() + } else { + Promise.make((resolve, _reject) => { + pc->RTC.setOnIceGatheringStateChange(() => { + if pc->RTC.getIceGatheringState == "complete" { + resolve() + } + }) + // Safety timeout + let _ = setTimeout(() => resolve(), 5000) + }) + } +} diff --git a/server/lib/burble/application.ex b/server/lib/burble/application.ex index 3e49d9d..011fd9c 100644 --- a/server/lib/burble/application.ex +++ b/server/lib/burble/application.ex @@ -52,6 +52,9 @@ defmodule Burble.Application do # Coprocessor pipeline supervisor — one pipeline per active peer {DynamicSupervisor, name: Burble.CoprocessorSupervisor, strategy: :one_for_one}, + # In-memory chat message store (ETS-backed, per-room, ephemeral) + Burble.Chat.MessageStore, + # Text channels (NNTPS-backed persistent threaded messages) Burble.Text.NNTPSBackend, @@ -67,6 +70,11 @@ defmodule Burble.Application do # PTP precision timing (clock synchronisation for multi-node playout) Burble.Timing.PTP, + # RTP↔wall-clock correlator — receives sync points from every inbound RTP + # packet, maintains a 64-point sliding window, and provides rtp_to_wall / + # wall_to_rtp conversion + PPM drift estimation for Phase 4 playout alignment. + {Burble.Timing.ClockCorrelator, [name: Burble.Timing.ClockCorrelator, clock_rate: 48_000]}, + # Groove discovery endpoint (message queue for Gossamer/PanLL/etc.) # Serves GET /.well-known/groove with Burble capability manifest. # Groove connectors verified via Idris2 dependent types (Groove.idr). diff --git a/server/lib/burble/chat/message_store.ex b/server/lib/burble/chat/message_store.ex new file mode 100644 index 0000000..4badefd --- /dev/null +++ b/server/lib/burble/chat/message_store.ex @@ -0,0 +1,116 @@ +# SPDX-License-Identifier: PMPL-1.0-or-later +# +# Burble.Chat.MessageStore — In-memory message store backed by ETS. +# +# Provides fast in-process storage for real-time text messages in a room. +# Messages are NOT persisted across restarts — this is by design (ephemeral +# chat alongside the voice session). For archival storage, messages are also +# forwarded through NNTPSBackend. +# +# Design notes: +# - ETS table is owned by this GenServer so it dies with the process. +# - Per-room message lists are stored as reversed insertion-order lists +# (newest first) so get_messages/2 is O(limit) rather than O(n). +# - A hard cap of @max_messages_per_room evicts the oldest messages when +# exceeded. Eviction is done at write time (amortised O(1)). +# +# Usage: +# Burble.Chat.MessageStore.store_message(room_id, msg) +# Burble.Chat.MessageStore.get_messages(room_id, 50) +# Burble.Chat.MessageStore.clear_room(room_id) + +defmodule Burble.Chat.MessageStore do + @moduledoc """ + ETS-backed in-memory store for room text messages. + + ## Message shape + + Messages are plain maps with the following keys: + - `:id` — unique message ID (hex string) + - `:from` — user_id of the sender + - `:body` — message text (UTF-8 string) + - `:timestamp` — `DateTime` (UTC) when the message was stored + + ## Capacity + + Each room is capped at 500 messages. + When the cap is reached the oldest message is evicted. + """ + + use GenServer + + require Logger + + @table :burble_chat_messages + @max_messages_per_room 500 + + # ── Client API ── + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @doc """ + Store a message in the given room. + + `msg` must be a map with at least `:id`, `:from`, `:body`, and `:timestamp`. + Returns `:ok`. + """ + @spec store_message(String.t(), map()) :: :ok + def store_message(room_id, msg) do + GenServer.call(__MODULE__, {:store, room_id, msg}) + end + + @doc """ + Return the last `limit` messages from a room, newest first. + + Returns an empty list if the room has no messages or does not exist. + """ + @spec get_messages(String.t(), pos_integer()) :: [map()] + def get_messages(room_id, limit \\ 50) do + case :ets.lookup(@table, room_id) do + [{^room_id, messages}] -> Enum.take(messages, limit) + [] -> [] + end + end + + @doc """ + Delete all messages for a room (e.g. when the room is destroyed). + """ + @spec clear_room(String.t()) :: :ok + def clear_room(room_id) do + GenServer.call(__MODULE__, {:clear, room_id}) + end + + # ── GenServer callbacks ── + + @impl true + def init(_opts) do + table = :ets.new(@table, [:named_table, :public, read_concurrency: true]) + Logger.debug("[MessageStore] ETS table created: #{table}") + {:ok, %{table: table}} + end + + @impl true + def handle_call({:store, room_id, msg}, _from, state) do + messages = + case :ets.lookup(@table, room_id) do + [{^room_id, existing}] -> existing + [] -> [] + end + + # Prepend new message (newest-first order) and enforce cap. + updated = + [msg | messages] + |> Enum.take(@max_messages_per_room) + + :ets.insert(@table, {room_id, updated}) + {:reply, :ok, state} + end + + @impl true + def handle_call({:clear, room_id}, _from, state) do + :ets.delete(@table, room_id) + {:reply, :ok, state} + end +end diff --git a/server/lib/burble/llm/anthropic_provider.ex b/server/lib/burble/llm/anthropic_provider.ex index dc03af0..4990fe6 100644 --- a/server/lib/burble/llm/anthropic_provider.ex +++ b/server/lib/burble/llm/anthropic_provider.ex @@ -7,6 +7,10 @@ defmodule Burble.LLM.AnthropicProvider do Calls the Claude Messages API via Erlang's built-in :httpc (no extra deps). Reads ANTHROPIC_API_KEY from environment. Falls back gracefully when unconfigured. + + Includes a circuit breaker: after `@failure_threshold` consecutive failures the + circuit opens for `@open_duration_ms`, rejecting calls immediately. A single + probe is allowed after the open period (half-open); success closes the circuit. """ require Logger @@ -17,97 +21,178 @@ defmodule Burble.LLM.AnthropicProvider do @default_max_tokens 4096 @request_timeout 60_000 - # Ensure inets + ssl are started (idempotent). + # Circuit breaker config + @failure_threshold 5 + @open_duration_ms 30_000 + @cb_table :burble_llm_circuit_breaker + defp ensure_httpc do :inets.start() :ssl.start() :ok end - @doc """ - Process a synchronous LLM query. Returns `{:ok, text}` or `{:error, reason}`. - """ + # --------------------------------------------------------------------------- + # Circuit breaker (ETS-based, no GenServer needed) + # --------------------------------------------------------------------------- + + defp ensure_cb_table do + case :ets.info(@cb_table) do + :undefined -> :ets.new(@cb_table, [:set, :public, :named_table]); :ok + _ -> :ok + end + end + + defp circuit_state do + ensure_cb_table() + failures = case :ets.lookup(@cb_table, :failures) do + [{_, n}] -> n + [] -> 0 + end + opened_at = case :ets.lookup(@cb_table, :opened_at) do + [{_, t}] -> t + [] -> nil + end + {failures, opened_at} + end + + defp record_success do + ensure_cb_table() + :ets.insert(@cb_table, {:failures, 0}) + :ets.delete(@cb_table, :opened_at) + end + + defp record_failure do + ensure_cb_table() + new_count = case :ets.lookup(@cb_table, :failures) do + [{_, n}] -> n + 1 + [] -> 1 + end + :ets.insert(@cb_table, {:failures, new_count}) + if new_count >= @failure_threshold do + :ets.insert(@cb_table, {:opened_at, System.monotonic_time(:millisecond)}) + Logger.error("[LLM/CB] Circuit OPEN after #{new_count} consecutive failures") + end + end + + defp check_circuit do + case circuit_state() do + {failures, nil} when failures < @failure_threshold -> :closed + {_, opened_at} when is_integer(opened_at) -> + elapsed = System.monotonic_time(:millisecond) - opened_at + if elapsed >= @open_duration_ms, do: :half_open, else: :open + _ -> :closed + end + end + + defp with_circuit_breaker(fun) do + case check_circuit() do + :open -> + {:error, :circuit_open} + state when state in [:closed, :half_open] -> + case fun.() do + {:ok, _} = ok -> + record_success() + ok + :ok -> + record_success() + :ok + {:error, _} = err -> + record_failure() + err + end + end + end + + # --------------------------------------------------------------------------- + # Public API + # --------------------------------------------------------------------------- + def process_query(user_id, prompt) do ensure_httpc() case api_key() do - nil -> - {:error, :api_key_not_configured} - - key -> - body = Jason.encode!(%{ - model: model(), - max_tokens: max_tokens(), - messages: [%{role: "user", content: prompt}], - system: system_prompt(user_id) - }) - - headers = [ - {~c"content-type", ~c"application/json"}, - {~c"x-api-key", String.to_charlist(key)}, - {~c"anthropic-version", String.to_charlist(@api_version)} - ] - - request = {@api_url, headers, ~c"application/json", String.to_charlist(body)} - - case :httpc.request(:post, request, [timeout: @request_timeout, ssl: ssl_opts()], []) do - {:ok, {{_, 200, _}, _resp_headers, resp_body}} -> - parse_response(List.to_string(resp_body)) - - {:ok, {{_, status, _}, _resp_headers, resp_body}} -> - Logger.warning("[LLM/Anthropic] API returned #{status}: #{List.to_string(resp_body)}") - {:error, {:api_error, status}} - - {:error, reason} -> - Logger.error("[LLM/Anthropic] HTTP request failed: #{inspect(reason)}") - {:error, {:http_error, reason}} - end + nil -> {:error, :api_key_not_configured} + key -> with_circuit_breaker(fn -> do_query(user_id, prompt, key) end) end end - @doc """ - Stream an LLM query response. Calls `callback.(chunk_text)` for each content delta. - Returns `:ok` on completion or `{:error, reason}`. - """ def stream_query(user_id, prompt, callback) do ensure_httpc() case api_key() do - nil -> - {:error, :api_key_not_configured} - - key -> - body = Jason.encode!(%{ - model: model(), - max_tokens: max_tokens(), - stream: true, - messages: [%{role: "user", content: prompt}], - system: system_prompt(user_id) - }) - - headers = [ - {~c"content-type", ~c"application/json"}, - {~c"x-api-key", String.to_charlist(key)}, - {~c"anthropic-version", String.to_charlist(@api_version)} - ] - - request = {@api_url, headers, ~c"application/json", String.to_charlist(body)} - - case :httpc.request(:post, request, [timeout: @request_timeout, ssl: ssl_opts()], []) do - {:ok, {{_, 200, _}, _resp_headers, resp_body}} -> - parse_sse_stream(List.to_string(resp_body), callback) - - {:ok, {{_, status, _}, _resp_headers, resp_body}} -> - Logger.warning("[LLM/Anthropic] Stream API returned #{status}") - {:error, {:api_error, status, List.to_string(resp_body)}} - - {:error, reason} -> - Logger.error("[LLM/Anthropic] Stream HTTP request failed: #{inspect(reason)}") - {:error, {:http_error, reason}} - end + nil -> {:error, :api_key_not_configured} + key -> with_circuit_breaker(fn -> do_stream(user_id, prompt, callback, key) end) + end + end + + @doc "Current circuit breaker state: `:closed`, `:half_open`, or `:open`." + def circuit_breaker_status, do: check_circuit() + + @doc "Manually reset the circuit breaker (e.g. after fixing an outage)." + def reset_circuit_breaker, do: record_success() + + # --------------------------------------------------------------------------- + # HTTP calls + # --------------------------------------------------------------------------- + + defp do_query(user_id, prompt, key) do + body = Jason.encode!(%{ + model: model(), + max_tokens: max_tokens(), + messages: [%{role: "user", content: prompt}], + system: system_prompt(user_id) + }) + + request = {@api_url, auth_headers(key), ~c"application/json", String.to_charlist(body)} + + case :httpc.request(:post, request, [timeout: @request_timeout, ssl: ssl_opts()], []) do + {:ok, {{_, 200, _}, _resp_headers, resp_body}} -> + parse_response(List.to_string(resp_body)) + + {:ok, {{_, status, _}, _resp_headers, resp_body}} -> + Logger.warning("[LLM/Anthropic] API returned #{status}: #{List.to_string(resp_body)}") + {:error, {:api_error, status}} + + {:error, reason} -> + Logger.error("[LLM/Anthropic] HTTP request failed: #{inspect(reason)}") + {:error, {:http_error, reason}} + end + end + + defp do_stream(user_id, prompt, callback, key) do + body = Jason.encode!(%{ + model: model(), + max_tokens: max_tokens(), + stream: true, + messages: [%{role: "user", content: prompt}], + system: system_prompt(user_id) + }) + + request = {@api_url, auth_headers(key), ~c"application/json", String.to_charlist(body)} + + case :httpc.request(:post, request, [timeout: @request_timeout, ssl: ssl_opts()], []) do + {:ok, {{_, 200, _}, _resp_headers, resp_body}} -> + parse_sse_stream(List.to_string(resp_body), callback) + + {:ok, {{_, status, _}, _resp_headers, resp_body}} -> + Logger.warning("[LLM/Anthropic] Stream API returned #{status}") + {:error, {:api_error, status, List.to_string(resp_body)}} + + {:error, reason} -> + Logger.error("[LLM/Anthropic] Stream HTTP request failed: #{inspect(reason)}") + {:error, {:http_error, reason}} end end + defp auth_headers(key) do + [ + {~c"content-type", ~c"application/json"}, + {~c"x-api-key", String.to_charlist(key)}, + {~c"anthropic-version", String.to_charlist(@api_version)} + ] + end + # --------------------------------------------------------------------------- # Response parsing # --------------------------------------------------------------------------- @@ -153,13 +238,8 @@ defmodule Burble.LLM.AnthropicProvider do # Config helpers # --------------------------------------------------------------------------- - defp api_key do - System.get_env("ANTHROPIC_API_KEY") - end - - defp model do - System.get_env("ANTHROPIC_MODEL") || @default_model - end + defp api_key, do: System.get_env("ANTHROPIC_API_KEY") + defp model, do: System.get_env("ANTHROPIC_MODEL") || @default_model defp max_tokens do case System.get_env("ANTHROPIC_MAX_TOKENS") do diff --git a/server/lib/burble/media/peer.ex b/server/lib/burble/media/peer.ex index 8220ad6..a4ac146 100644 --- a/server/lib/burble/media/peer.ex +++ b/server/lib/burble/media/peer.ex @@ -202,9 +202,30 @@ defmodule Burble.Media.Peer do # Propagate RTP timestamp to the pipeline for Phase 4 PTP correlation. # Resolve pipeline pid lazily so we don't fail if pipeline hasn't started. pipeline_pid = state.pipeline_pid || resolve_pipeline(state.peer_id) + if pipeline_pid do Burble.Coprocessor.Pipeline.record_rtp_timestamp(pipeline_pid, packet.timestamp) end + + # Feed the correlator with a simultaneous RTP+wall-clock observation. + # Prefer the PTP hardware clock; fall back to monotonic time. + wall_ns = + case Burble.Coprocessor.ZigBackend.ptp_read_clock() do + {:ok, ns} -> ns + {:error, _} -> :erlang.monotonic_time(:nanosecond) + end + + Burble.Timing.ClockCorrelator.record_sync_point( + Burble.Timing.ClockCorrelator, + packet.timestamp, + wall_ns + ) + + # Propagate local node's observation into the multi-node alignment + # registry so that other nodes in the same room can compute clock offsets + # relative to this node (Phase 4 PTP multi-node playout alignment). + Burble.Timing.Alignment.report_node_sync(node(), packet.timestamp, wall_ns) + %{state | pipeline_pid: pipeline_pid} else state diff --git a/server/lib/burble/timing/alignment.ex b/server/lib/burble/timing/alignment.ex new file mode 100644 index 0000000..625fbe1 --- /dev/null +++ b/server/lib/burble/timing/alignment.ex @@ -0,0 +1,296 @@ +# SPDX-License-Identifier: PMPL-1.0-or-later +# Copyright (c) 2026 Jonathan D.A. Jewell (hyperpolymath) +# +# Burble.Timing.Alignment — multi-node playout alignment for Phase 4 PTP. +# +# When multiple Burble nodes (separate machines) share a room, each node's +# clock has a slight offset and drift relative to the others. This module +# collects per-node {rtp_ts, wall_ns} observations (forwarded from peer.ex +# via cast or Phoenix PubSub) and computes the nanosecond offset each remote +# node's clock has from the local node's clock. +# +# Consumers (e.g. the playout jitter buffer) call playout_offset_ns/1 to get +# the correction they must add to their playout timer to keep in phase with a +# given remote node. +# +# Design decisions: +# +# • "offset" is defined as: remote_wall_ns_at_observation - local_wall_ns_now +# A positive offset means the remote clock is ahead of the local clock. +# • Drift is computed as delta_offset_ns / delta_monotonic_ns * 1_000_000 PPM +# so that positive PPM means the remote clock is running faster than local. +# • Stale nodes (not seen within window_ms ms) are evicted on every cast. +# O(N) scan is fine; rooms are capped at ~50 nodes. +# • The local node itself is special: offset is always 0, drift is always 0.0. +# Callers that ask for node() get {:ok, 0} without any arithmetic. +# • No PubSub subscription is wired here. The integration point in peer.ex +# calls report_node_sync/3 directly for same-node observations; cross-node +# delivery is the caller's responsibility (Phoenix.PubSub broadcast + cast). +# +# Author: Jonathan D.A. Jewell + +defmodule Burble.Timing.Alignment do + @moduledoc """ + Multi-node playout alignment registry for Phase 4 PTP integration. + + Tracks per-node clock offsets and drifts so that playout buffers across + Burble nodes in the same room can be kept in phase. + + ## Usage + + # Start (normally done by the supervisor, after ClockCorrelator) + {:ok, _} = Alignment.start_link(name: Burble.Timing.Alignment) + + # From peer.ex, after ClockCorrelator.record_sync_point/3: + Alignment.report_node_sync(node(), packet.timestamp, wall_ns) + + # From a playout buffer (to find out how far ahead/behind a remote node is): + {:ok, offset_ns} = Alignment.playout_offset_ns(:"burble@192.168.1.2") + + # Health/debug endpoint: + %{nodes: [...], local_node: :burble@host} = Alignment.sync_status() + + ## Offset convention + + `playout_offset_ns/1` returns a signed integer. Add this value to the local + playout timer when scheduling audio from the given remote node: + + - Positive offset → remote clock is ahead; play slightly earlier. + - Negative offset → remote clock is behind; play slightly later. + + ## Supervision note + + Do **not** add this module to `application.ex` directly. Start it in the + supervision tree after `Burble.Timing.ClockCorrelator`. + """ + + use GenServer + require Logger + + # ── Types ────────────────────────────────────────────────────────────────── + + @typedoc "Per-node synchronisation state." + @type node_entry :: %{ + offset_ns: integer(), + drift_ppm: float(), + last_seen: integer() + } + + @typedoc "Node map: node atom → node_entry." + @type nodes_map :: %{atom() => node_entry()} + + @typedoc "GenServer state." + @type state :: %{ + nodes: nodes_map(), + local_node: atom(), + window_ms: pos_integer(), + # Raw previous observation per node for drift computation. + # %{node => {prev_offset_ns, prev_monotonic_ns}} + prev_obs: %{atom() => {integer(), integer()}} + } + + # ── Client API ───────────────────────────────────────────────────────────── + + @doc """ + Start the Alignment GenServer. + + Options: + - `:name` — registered name (default: `Burble.Timing.Alignment`) + - `:window_ms` — stale-node eviction timeout in milliseconds (default: 30_000) + - `:local_node` — override the local node atom (default: `node()`) + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + {server_opts, init_opts} = Keyword.split(opts, [:name]) + server_opts = Keyword.put_new(server_opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, init_opts, server_opts) + end + + @doc """ + Report a simultaneous {rtp_ts, wall_ns} observation for a node. + + Typically called from `peer.ex` after `ClockCorrelator.record_sync_point/3`: + + Burble.Timing.Alignment.report_node_sync(node(), packet.timestamp, wall_ns) + + `wall_ns` must be the same wall-clock value that was passed to + `ClockCorrelator.record_sync_point/3` (i.e. either the PTP hardware-clock + value or `:erlang.monotonic_time(:nanosecond)`). + """ + @spec report_node_sync(atom(), non_neg_integer(), integer()) :: :ok + def report_node_sync(node, rtp_ts, wall_ns) do + GenServer.cast(__MODULE__, {:report_node_sync, node, rtp_ts, wall_ns}) + end + + @doc """ + Return the nanoseconds to add to the local playout timer to align with + the given remote node's clock. + + Returns `{:ok, 0}` for the local node (no correction needed). + Returns `{:error, :unknown_node}` if the node has not yet reported or has + been evicted as stale. + """ + @spec playout_offset_ns(atom()) :: {:ok, integer()} | {:error, :unknown_node} + def playout_offset_ns(node) do + GenServer.call(__MODULE__, {:playout_offset_ns, node}) + end + + @doc """ + Return the estimated clock drift of the given node relative to the local + node, in parts-per-million (PPM). + + Positive PPM means the remote node's clock is running faster than local. + Returns `{:ok, 0.0}` for the local node. + Returns `{:error, :unknown_node}` if the node is unknown or stale. + """ + @spec node_drift_ppm(atom()) :: {:ok, float()} | {:error, :unknown_node} + def node_drift_ppm(node) do + GenServer.call(__MODULE__, {:node_drift_ppm, node}) + end + + @doc """ + Return a summary map suitable for health/debug endpoints. + + %{ + local_node: :burble@host, + nodes: [ + %{node: :"burble@peer1", offset_ns: 5_200, drift_ppm: 0.3, last_seen: 1234567}, + ... + ] + } + """ + @spec sync_status() :: %{nodes: [map()], local_node: atom()} + def sync_status do + GenServer.call(__MODULE__, :sync_status) + end + + # ── Server Callbacks ─────────────────────────────────────────────────────── + + @impl true + def init(opts) do + state = %{ + nodes: %{}, + local_node: Keyword.get(opts, :local_node, node()), + window_ms: Keyword.get(opts, :window_ms, 30_000), + prev_obs: %{} + } + + {:ok, state} + end + + @impl true + def handle_cast({:report_node_sync, reporting_node, _rtp_ts, wall_ns}, state) do + now_mono_ms = monotonic_ms() + now_ns = :erlang.monotonic_time(:nanosecond) + + # Evict stale nodes first (O(N), ≤ ~50 nodes). + state = evict_stale(state, now_mono_ms) + + # Compute offset: how many ns is the remote clock ahead of ours right now. + offset_ns = wall_ns - now_ns + + # Compute drift PPM if we have a previous observation for this node. + {drift_ppm, new_prev_obs} = + case Map.get(state.prev_obs, reporting_node) do + nil -> + # First observation — no drift estimate yet. + {0.0, Map.put(state.prev_obs, reporting_node, {offset_ns, now_ns})} + + {prev_offset_ns, prev_mono_ns} -> + delta_offset = offset_ns - prev_offset_ns + delta_time = now_ns - prev_mono_ns + + drift = + if delta_time > 0 do + delta_offset / delta_time * 1_000_000 + else + 0.0 + end + + {Float.round(drift, 3), Map.put(state.prev_obs, reporting_node, {offset_ns, now_ns})} + end + + entry = %{ + offset_ns: offset_ns, + drift_ppm: drift_ppm, + last_seen: now_mono_ms + } + + new_nodes = Map.put(state.nodes, reporting_node, entry) + + {:noreply, %{state | nodes: new_nodes, prev_obs: new_prev_obs}} + end + + @impl true + def handle_call({:playout_offset_ns, queried_node}, _from, state) do + reply = + if queried_node == state.local_node do + {:ok, 0} + else + case Map.get(state.nodes, queried_node) do + nil -> {:error, :unknown_node} + %{offset_ns: offset_ns} -> {:ok, offset_ns} + end + end + + {:reply, reply, state} + end + + @impl true + def handle_call({:node_drift_ppm, queried_node}, _from, state) do + reply = + if queried_node == state.local_node do + {:ok, 0.0} + else + case Map.get(state.nodes, queried_node) do + nil -> {:error, :unknown_node} + %{drift_ppm: drift_ppm} -> {:ok, drift_ppm} + end + end + + {:reply, reply, state} + end + + @impl true + def handle_call(:sync_status, _from, state) do + node_list = + Enum.map(state.nodes, fn {node_atom, entry} -> + Map.put(entry, :node, node_atom) + end) + + reply = %{ + local_node: state.local_node, + nodes: node_list + } + + {:reply, reply, state} + end + + # ── Private helpers ──────────────────────────────────────────────────────── + + # Evict nodes whose last_seen timestamp is older than window_ms milliseconds. + @spec evict_stale(state(), integer()) :: state() + defp evict_stale(%{nodes: nodes, prev_obs: prev_obs, window_ms: window_ms} = state, now_ms) do + cutoff = now_ms - window_ms + + {live_nodes, stale_keys} = + Enum.reduce(nodes, {%{}, []}, fn {node_atom, entry}, {live, stale} -> + if entry.last_seen >= cutoff do + {Map.put(live, node_atom, entry), stale} + else + Logger.debug("[Alignment] Evicting stale node #{node_atom} (last seen #{entry.last_seen}, cutoff #{cutoff})") + {live, [node_atom | stale]} + end + end) + + new_prev_obs = Map.drop(prev_obs, stale_keys) + + %{state | nodes: live_nodes, prev_obs: new_prev_obs} + end + + # Return the current monotonic time in milliseconds. + @spec monotonic_ms() :: integer() + defp monotonic_ms do + :erlang.monotonic_time(:millisecond) + end +end diff --git a/server/lib/burble/timing/clock_correlator.ex b/server/lib/burble/timing/clock_correlator.ex new file mode 100644 index 0000000..3dce434 --- /dev/null +++ b/server/lib/burble/timing/clock_correlator.ex @@ -0,0 +1,348 @@ +# SPDX-License-Identifier: PMPL-1.0-or-later +# Copyright (c) 2026 Jonathan D.A. Jewell (hyperpolymath) +# +# Burble.Timing.ClockCorrelator — RTP↔wall-clock correlation with drift tracking. +# +# Maintains a sliding window of sync points (simultaneously observed RTP +# timestamp + wall-clock nanosecond pairs) and uses linear regression over +# that window to estimate the mapping and drift between the two clocks. +# +# Design decisions: +# +# • RTP timestamps are 32-bit unsigned counters (wrap at 2^32) ticking at +# `clock_rate` Hz (48 000 for Opus). Wraparound is handled by converting +# every stored RTP timestamp to an "unwrapped" 64-bit value relative to +# the first sync point. +# • Wall-clock time comes from the PTP hardware clock when available, and +# falls back to :erlang.monotonic_time(:nanosecond) (see Peer integration). +# • Drift is estimated as the slope deviation from the ideal 1 tick/ns ratio, +# expressed in parts-per-million (PPM). Positive means RTP runs fast. +# • Linear regression is computed over the last 64 sync points for robustness +# against jitter. Two-point fallback is used when fewer than two points exist. +# +# Author: Jonathan D.A. Jewell + +defmodule Burble.Timing.ClockCorrelator do + @moduledoc """ + RTP↔PTP wall-clock correlation with sliding-window drift estimation. + + Maintains a window of `{rtp_ts, wall_ns}` sync points and converts between + RTP timestamps and nanosecond wall-clock time using linear regression over + the window. Handles the 32-bit RTP wraparound transparently. + + ## Usage + + {:ok, pid} = ClockCorrelator.start_link(clock_rate: 48_000) + + # Record a simultaneously observed pair (e.g. on RTP packet arrival) + ClockCorrelator.record_sync_point(pid, rtp_ts, wall_ns) + + # Map an RTP timestamp to wall-clock nanoseconds + {:ok, wall_ns} = ClockCorrelator.rtp_to_wall(pid, rtp_ts) + + # Reverse mapping + {:ok, rtp_ts} = ClockCorrelator.wall_to_rtp(pid, wall_ns) + + # Current clock drift estimate + {:ok, ppm} = ClockCorrelator.drift_ppm(pid) + + ## RTP wraparound + + RTP timestamps wrap at 2^32 (4 294 967 296). The correlator unwraps them + by detecting when a new timestamp is more than 2^31 less than the previous + one, treating such a jump as a wraparound rather than a backwards jump. + + ## Drift estimation + + Linear regression over the sync window gives the best-fit line relating + RTP ticks to wall nanoseconds. The slope of that line (in ns/tick) is + compared against the ideal `1_000_000_000 / clock_rate` ns/tick to yield + the drift in PPM. + """ + + use GenServer + require Logger + + @max_points 64 + @rtp_wraparound 4_294_967_296 + + # ── Types ────────────────────────────────────────────────────────────────── + + @typedoc "Unwrapped RTP tick (64-bit, monotonically increasing)." + @type unwrapped_rtp :: integer() + + @typedoc "Wall-clock time in nanoseconds." + @type wall_ns :: integer() + + @typedoc "Sync point: {unwrapped_rtp_tick, wall_ns}." + @type sync_point :: {unwrapped_rtp, wall_ns} + + @typedoc "GenServer state." + @type state :: %{ + clock_rate: pos_integer(), + sync_points: [sync_point()], + max_points: pos_integer(), + # The raw RTP value of the very first sync point, used to anchor + # unwrapping to a known origin so comparisons are consistent. + first_rtp_raw: non_neg_integer() | nil, + # The last unwrapped RTP value, used to detect wraparound. + last_unwrapped: integer() | nil + } + + # ── Client API ───────────────────────────────────────────────────────────── + + @doc """ + Start the ClockCorrelator GenServer. + + Options: + - `:clock_rate` — RTP clock rate in Hz (default: 48_000) + - `:name` — registered name (optional) + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + {server_opts, init_opts} = Keyword.split(opts, [:name]) + GenServer.start_link(__MODULE__, init_opts, server_opts) + end + + @doc """ + Record a simultaneously observed RTP timestamp and wall-clock nanosecond value. + + Both clocks should be read as close together as possible (ideally on RTP + packet arrival, read the wall clock immediately before or after extracting + `packet.timestamp`). + """ + @spec record_sync_point(GenServer.server(), non_neg_integer(), wall_ns()) :: :ok + def record_sync_point(pid, rtp_ts, wall_ns) do + GenServer.cast(pid, {:record_sync_point, rtp_ts, wall_ns}) + end + + @doc """ + Convert an RTP timestamp to wall-clock nanoseconds. + + Returns `{:ok, wall_ns}` when at least one sync point exists, or + `{:error, :no_sync_points}` when the window is empty. + """ + @spec rtp_to_wall(GenServer.server(), non_neg_integer()) :: + {:ok, wall_ns()} | {:error, :no_sync_points} + def rtp_to_wall(pid, rtp_ts) do + GenServer.call(pid, {:rtp_to_wall, rtp_ts}) + end + + @doc """ + Convert a wall-clock nanosecond value to the closest RTP timestamp. + + Returns `{:ok, rtp_ts}` when at least one sync point exists, or + `{:error, :no_sync_points}` when the window is empty. + + The returned value is a 32-bit unsigned integer (wrapped back into + the RTP timestamp space). + """ + @spec wall_to_rtp(GenServer.server(), wall_ns()) :: + {:ok, non_neg_integer()} | {:error, :no_sync_points} + def wall_to_rtp(pid, wall_ns) do + GenServer.call(pid, {:wall_to_rtp, wall_ns}) + end + + @doc """ + Return the estimated clock drift in parts-per-million (PPM). + + Positive values mean the RTP clock is running faster than nominal. + Negative values mean it is running slower. + + Returns `{:error, :insufficient_data}` if fewer than two sync points + have been recorded (drift cannot be estimated from a single point). + """ + @spec drift_ppm(GenServer.server()) :: {:ok, float()} | {:error, :insufficient_data} + def drift_ppm(pid) do + GenServer.call(pid, :drift_ppm) + end + + # ── Server Callbacks ─────────────────────────────────────────────────────── + + @impl true + def init(opts) do + clock_rate = Keyword.get(opts, :clock_rate, 48_000) + + state = %{ + clock_rate: clock_rate, + sync_points: [], + max_points: @max_points, + first_rtp_raw: nil, + last_unwrapped: nil + } + + {:ok, state} + end + + @impl true + def handle_cast({:record_sync_point, rtp_ts, wall_ns}, state) do + {unwrapped, new_state} = unwrap_rtp(rtp_ts, state) + point = {unwrapped, wall_ns} + + # Prepend and trim to max window size. We store newest-first so pattern + # matching on the head gives the most recent point cheaply. + points = + [point | new_state.sync_points] + |> Enum.take(new_state.max_points) + + {:noreply, %{new_state | sync_points: points}} + end + + @impl true + def handle_call({:rtp_to_wall, _rtp_ts}, _from, %{sync_points: []} = state) do + {:reply, {:error, :no_sync_points}, state} + end + + @impl true + def handle_call({:rtp_to_wall, rtp_ts}, _from, state) do + {unwrapped, _} = unwrap_rtp(rtp_ts, state) + result = apply_regression_rtp_to_wall(unwrapped, state) + {:reply, result, state} + end + + @impl true + def handle_call({:wall_to_rtp, _wall_ns}, _from, %{sync_points: []} = state) do + {:reply, {:error, :no_sync_points}, state} + end + + @impl true + def handle_call({:wall_to_rtp, wall_ns}, _from, state) do + result = apply_regression_wall_to_rtp(wall_ns, state) + {:reply, result, state} + end + + @impl true + def handle_call(:drift_ppm, _from, %{sync_points: points} = state) when length(points) < 2 do + {:reply, {:error, :insufficient_data}, state} + end + + @impl true + def handle_call(:drift_ppm, _from, state) do + result = compute_drift_ppm(state) + {:reply, result, state} + end + + # ── Private helpers ──────────────────────────────────────────────────────── + + # Unwrap a raw 32-bit RTP timestamp into a monotonically increasing 64-bit + # integer, relative to the first sync point's raw RTP value. + # + # Wraparound detection: if the new raw value is more than 2^31 less than + # the last unwrapped value (modulo 2^32), we increment the wrap counter. + @spec unwrap_rtp(non_neg_integer(), state()) :: {integer(), state()} + defp unwrap_rtp(rtp_ts, %{first_rtp_raw: nil} = state) do + # First point — anchor here. + unwrapped = rtp_ts + new_state = %{state | first_rtp_raw: rtp_ts, last_unwrapped: unwrapped} + {unwrapped, new_state} + end + + defp unwrap_rtp(rtp_ts, state) do + last = state.last_unwrapped + # Compute the difference in the 32-bit space. + raw_diff = rtp_ts - Integer.mod(last, @rtp_wraparound) + + # Adjust for wraparound: if diff is < -2^31, the counter wrapped forward. + diff = + cond do + raw_diff < -div(@rtp_wraparound, 2) -> raw_diff + @rtp_wraparound + raw_diff > div(@rtp_wraparound, 2) -> raw_diff - @rtp_wraparound + true -> raw_diff + end + + unwrapped = last + diff + new_state = %{state | last_unwrapped: unwrapped} + {unwrapped, new_state} + end + + # Linear regression helpers. + # + # We fit the model: wall_ns = intercept + slope * unwrapped_rtp + # + # slope has units ns/tick. The ideal slope for a clock_rate Hz RTP clock + # is 1_000_000_000 / clock_rate ns/tick. + + @spec linear_regression([sync_point()]) :: + {:ok, %{slope: float(), intercept: float()}} | {:error, :insufficient_data} + defp linear_regression([_]), do: {:error, :insufficient_data} + defp linear_regression([]), do: {:error, :insufficient_data} + + defp linear_regression(points) do + n = length(points) + + {sum_x, sum_y, sum_xx, sum_xy} = + Enum.reduce(points, {0, 0, 0, 0}, fn {x, y}, {sx, sy, sxx, sxy} -> + {sx + x, sy + y, sxx + x * x, sxy + x * y} + end) + + denom = n * sum_xx - sum_x * sum_x + + if denom == 0 do + # All x values are identical — can't fit a line. Use mean y. + {:ok, %{slope: 0.0, intercept: sum_y / n}} + else + slope = (n * sum_xy - sum_x * sum_y) / denom + intercept = (sum_y - slope * sum_x) / n + {:ok, %{slope: slope, intercept: intercept}} + end + end + + @spec apply_regression_rtp_to_wall(integer(), state()) :: + {:ok, wall_ns()} | {:error, :insufficient_data} + defp apply_regression_rtp_to_wall(unwrapped_rtp, %{sync_points: [anchor | _]} = state) do + case linear_regression(state.sync_points) do + {:ok, %{slope: slope, intercept: intercept}} -> + wall = round(intercept + slope * unwrapped_rtp) + {:ok, wall} + + {:error, :insufficient_data} -> + # Single-point fallback: use the ideal clock rate. + {anchor_rtp, anchor_wall} = anchor + ideal_ns_per_tick = 1_000_000_000 / state.clock_rate + delta_ticks = unwrapped_rtp - anchor_rtp + {:ok, round(anchor_wall + delta_ticks * ideal_ns_per_tick)} + end + end + + @spec apply_regression_wall_to_rtp(wall_ns(), state()) :: + {:ok, non_neg_integer()} | {:error, :insufficient_data} + defp apply_regression_wall_to_rtp(wall_ns, %{sync_points: [anchor | _]} = state) do + case linear_regression(state.sync_points) do + {:ok, %{slope: slope, intercept: intercept}} when slope != 0.0 -> + # Invert: rtp = (wall - intercept) / slope + unwrapped = round((wall_ns - intercept) / slope) + wrapped = Integer.mod(unwrapped, @rtp_wraparound) + {:ok, wrapped} + + {:ok, _} -> + # Slope is zero — degenerate; use anchor + ideal rate. + {anchor_rtp, anchor_wall} = anchor + ideal_ns_per_tick = 1_000_000_000 / state.clock_rate + delta_ticks = round((wall_ns - anchor_wall) / ideal_ns_per_tick) + wrapped = Integer.mod(anchor_rtp + delta_ticks, @rtp_wraparound) + {:ok, wrapped} + + {:error, :insufficient_data} -> + # Single-point fallback. + {anchor_rtp, anchor_wall} = anchor + ideal_ns_per_tick = 1_000_000_000 / state.clock_rate + delta_ticks = round((wall_ns - anchor_wall) / ideal_ns_per_tick) + wrapped = Integer.mod(anchor_rtp + delta_ticks, @rtp_wraparound) + {:ok, wrapped} + end + end + + @spec compute_drift_ppm(state()) :: {:ok, float()} | {:error, :insufficient_data} + defp compute_drift_ppm(state) do + case linear_regression(state.sync_points) do + {:ok, %{slope: slope}} -> + ideal = 1_000_000_000 / state.clock_rate + # drift_ppm = (measured_slope - ideal_slope) / ideal_slope * 1_000_000 + ppm = (slope - ideal) / ideal * 1_000_000 + {:ok, Float.round(ppm, 3)} + + {:error, _} = err -> + err + end + end +end diff --git a/server/lib/burble/timing/phc2sys.ex b/server/lib/burble/timing/phc2sys.ex index ea39009..1e486c5 100644 --- a/server/lib/burble/timing/phc2sys.ex +++ b/server/lib/burble/timing/phc2sys.ex @@ -219,7 +219,11 @@ defmodule Burble.Timing.Phc2sys do defp maybe_launch(true) do cond do not ptp_device_present?() -> - Logger.info("[Phc2sys] #{@ptp_device_path} not found; staying in :idle state (:ptp_absent)") + Logger.warning( + "[Phc2sys] auto_start requested but #{@ptp_device_path} does not exist on this host; " <> + "skipping phc2sys launch. Attach PTP hardware or disable auto_start." + ) + :ptp_absent phc2sys_running?() -> diff --git a/server/lib/burble_web/channels/room_channel.ex b/server/lib/burble_web/channels/room_channel.ex index de8c9ca..1614a41 100644 --- a/server/lib/burble_web/channels/room_channel.ex +++ b/server/lib/burble_web/channels/room_channel.ex @@ -206,6 +206,97 @@ defmodule BurbleWeb.RoomChannel do end end + # ── Real-time text messaging (MessageStore-backed) ── + # + # These events use the "text:" namespace and are separate from the legacy + # "text" event (which routes through NNTPSBackend for NNTP threading). + # + # text:send — broadcast a new message to all room participants + # text:typing — broadcast a transient typing indicator (throttled) + # text:history — fetch recent messages from the in-memory store + + @typing_throttle_ms 2_000 + + @impl true + def handle_in("text:send", %{"body" => body}, socket) + when is_binary(body) and byte_size(body) > 0 and byte_size(body) <= 4096 do + user_id = socket.assigns.user_id + room_id = socket.assigns.room_id + + role_perms = get_user_permissions(socket) + + if Permissions.has_permission?(role_perms, :text) do + id = :crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower) + timestamp = DateTime.utc_now() + + msg = %{ + id: id, + from: user_id, + body: body, + timestamp: timestamp + } + + Burble.Chat.MessageStore.store_message(room_id, msg) + + broadcast!(socket, "text:new", %{ + id: id, + from: user_id, + body: body, + timestamp: DateTime.to_iso8601(timestamp) + }) + + {:reply, {:ok, %{id: id}}, socket} + else + {:reply, {:error, %{reason: "no_text_permission"}}, socket} + end + end + + @impl true + def handle_in("text:send", _params, socket) do + {:reply, {:error, %{reason: "invalid_text_payload"}}, socket} + end + + @impl true + def handle_in("text:typing", _params, socket) do + user_id = socket.assigns.user_id + now = System.monotonic_time(:millisecond) + last_typing = socket.assigns[:last_typing_broadcast_ms] || 0 + + if now - last_typing >= @typing_throttle_ms do + broadcast_from!(socket, "text:typing", %{from: user_id}) + socket = assign(socket, :last_typing_broadcast_ms, now) + {:noreply, socket} + else + # Throttled — ignore + {:noreply, socket} + end + end + + @impl true + def handle_in("text:history", %{"limit" => limit_raw}, socket) + when is_integer(limit_raw) and limit_raw > 0 do + room_id = socket.assigns.room_id + limit = min(limit_raw, 200) + + messages = + Burble.Chat.MessageStore.get_messages(room_id, limit) + |> Enum.map(fn msg -> + %{ + id: msg.id, + from: msg.from, + body: msg.body, + timestamp: DateTime.to_iso8601(msg.timestamp) + } + end) + + {:reply, {:ok, %{messages: messages}}, socket} + end + + @impl true + def handle_in("text:history", _params, socket) do + {:reply, {:error, %{reason: "invalid_history_params"}}, socket} + end + # ── Whisper (directed audio) ── @impl true diff --git a/server/lib/burble_web/controllers/api/llm_controller.ex b/server/lib/burble_web/controllers/api/llm_controller.ex index 79e2709..60b7def 100644 --- a/server/lib/burble_web/controllers/api/llm_controller.ex +++ b/server/lib/burble_web/controllers/api/llm_controller.ex @@ -11,36 +11,59 @@ defmodule BurbleWeb.API.LLMController do ## Endpoints - POST /api/v1/llm/query — synchronous, returns full response - POST /api/v1/llm/stream — synchronous (buffered SSE), returns full response - GET /api/v1/llm/status — provider availability check + POST /api/v1/llm/query — synchronous JSON response + POST /api/v1/llm/stream — Server-Sent Events (SSE) streaming + GET /api/v1/llm/status — provider + circuit breaker status + + ## Rate Limiting + + Per-user: 20 queries per minute (keyed by user_id or IP for anonymous). """ use Phoenix.Controller, formats: [:json] require Logger @max_prompt_length 32_000 + @rate_limit_window_ms 60_000 + @rate_limit_max 20 + + # --------------------------------------------------------------------------- + # POST /api/v1/llm/query + # --------------------------------------------------------------------------- - @doc "Synchronous LLM query — returns `{response: text}` or `{error: reason}`." def query(conn, %{"prompt" => prompt}) when byte_size(prompt) <= @max_prompt_length do user_id = get_user_id(conn) - case Burble.LLM.process_query(user_id, prompt) do - {:ok, text} -> - json(conn, %{ok: true, response: text}) + with :ok <- check_rate_limit(user_id) do + case Burble.LLM.process_query(user_id, prompt) do + {:ok, text} -> + json(conn, %{ok: true, response: text}) + + {:error, :circuit_open} -> + conn |> put_status(503) |> json(%{ok: false, error: "circuit_open", retry_after: 30}) - {:error, :no_provider_configured} -> - conn |> put_status(503) |> json(%{ok: false, error: "llm_not_configured"}) + {:error, :no_provider_configured} -> + conn |> put_status(503) |> json(%{ok: false, error: "llm_not_configured"}) - {:error, :api_key_not_configured} -> - conn |> put_status(503) |> json(%{ok: false, error: "api_key_not_set"}) + {:error, :api_key_not_configured} -> + conn |> put_status(503) |> json(%{ok: false, error: "api_key_not_set"}) - {:error, {:api_error, status}} -> - conn |> put_status(502) |> json(%{ok: false, error: "upstream_error", status: status}) + {:error, :pool_exhausted} -> + conn |> put_status(503) |> json(%{ok: false, error: "busy", retry_after: 5}) - {:error, reason} -> - Logger.warning("[LLMController] Query failed: #{inspect(reason)}") - conn |> put_status(500) |> json(%{ok: false, error: "internal_error"}) + {:error, {:api_error, status}} -> + conn |> put_status(502) |> json(%{ok: false, error: "upstream_error", status: status}) + + {:error, reason} -> + Logger.warning("[LLMController] Query failed: #{inspect(reason)}") + conn |> put_status(500) |> json(%{ok: false, error: "internal_error"}) + end + else + {:rate_limited, retry_after} -> + conn + |> put_status(429) + |> put_resp_header("retry-after", to_string(retry_after)) + |> json(%{ok: false, error: "rate_limited", retry_after: retry_after}) end end @@ -52,46 +75,104 @@ defmodule BurbleWeb.API.LLMController do conn |> put_status(400) |> json(%{ok: false, error: "missing_prompt"}) end - @doc "Streaming LLM query — buffers chunks and returns full concatenated response." + # --------------------------------------------------------------------------- + # POST /api/v1/llm/stream — true SSE + # --------------------------------------------------------------------------- + def stream(conn, %{"prompt" => prompt}) when byte_size(prompt) <= @max_prompt_length do user_id = get_user_id(conn) - chunks = :ets.new(:llm_chunks, [:ordered_set, :private]) - counter = :counters.new(1, [:atomics]) - - result = Burble.LLM.stream_query(user_id, prompt, fn chunk -> - idx = :counters.add(counter, 1, 1) - :ets.insert(chunks, {idx, chunk}) - end) - - case result do - :ok -> - full_text = :ets.tab2list(chunks) |> Enum.map_join(fn {_k, v} -> v end) - :ets.delete(chunks) - json(conn, %{ok: true, response: full_text, streamed: true}) - - {:error, reason} -> - :ets.delete(chunks) - conn |> put_status(502) |> json(%{ok: false, error: inspect(reason)}) + + with :ok <- check_rate_limit(user_id) do + conn = + conn + |> put_resp_content_type("text/event-stream") + |> put_resp_header("cache-control", "no-cache") + |> put_resp_header("connection", "keep-alive") + |> send_chunked(200) + + result = Burble.LLM.stream_query(user_id, prompt, fn text -> + chunk(conn, "data: #{Jason.encode!(%{text: text})}\n\n") + end) + + case result do + :ok -> + chunk(conn, "data: #{Jason.encode!(%{done: true})}\n\n") + {:error, reason} -> + chunk(conn, "data: #{Jason.encode!(%{error: inspect(reason)})}\n\n") + end + + conn + else + {:rate_limited, retry_after} -> + conn + |> put_status(429) + |> put_resp_header("retry-after", to_string(retry_after)) + |> json(%{ok: false, error: "rate_limited", retry_after: retry_after}) end end def stream(conn, params), do: query(conn, params) - @doc "Check LLM provider status." + # --------------------------------------------------------------------------- + # GET /api/v1/llm/status + # --------------------------------------------------------------------------- + def status(conn, _params) do provider = :persistent_term.get({Burble.LLM, :provider}, nil) + cb_status = + if function_exported?(Burble.LLM.AnthropicProvider, :circuit_breaker_status, 0) do + Atom.to_string(Burble.LLM.AnthropicProvider.circuit_breaker_status()) + else + "unknown" + end + json(conn, %{ available: provider != nil, provider: if(provider, do: inspect(provider), else: nil), - api_key_set: System.get_env("ANTHROPIC_API_KEY") != nil + api_key_set: System.get_env("ANTHROPIC_API_KEY") != nil, + circuit_breaker: cb_status }) end + # --------------------------------------------------------------------------- + # Per-user rate limiting (ETS token bucket, keyed by user_id) + # --------------------------------------------------------------------------- + + @rate_table :burble_llm_rate_limit + + defp check_rate_limit(user_id) do + ensure_rate_table() + now = System.monotonic_time(:millisecond) + key = {:llm_rate, user_id} + + case :ets.lookup(@rate_table, key) do + [{^key, count, window_start}] when now - window_start < @rate_limit_window_ms -> + if count >= @rate_limit_max do + retry_after = div(@rate_limit_window_ms - (now - window_start), 1000) + 1 + {:rate_limited, retry_after} + else + :ets.update_counter(@rate_table, key, {2, 1}) + :ok + end + + _ -> + :ets.insert(@rate_table, {key, 1, now}) + :ok + end + end + + defp ensure_rate_table do + case :ets.info(@rate_table) do + :undefined -> :ets.new(@rate_table, [:set, :public, :named_table]); :ok + _ -> :ok + end + end + defp get_user_id(conn) do case conn.assigns[:current_user] do %{id: id} -> id - _ -> "anonymous" + _ -> to_string(:inet.ntoa(conn.remote_ip)) end end end diff --git a/server/lib/burble_web/controllers/api/message_controller.ex b/server/lib/burble_web/controllers/api/message_controller.ex index c5d7e82..216201c 100644 --- a/server/lib/burble_web/controllers/api/message_controller.ex +++ b/server/lib/burble_web/controllers/api/message_controller.ex @@ -25,6 +25,7 @@ defmodule BurbleWeb.API.MessageController do use Phoenix.Controller, formats: [:json] alias Burble.Text.NNTPSBackend + alias Burble.Chat.MessageStore @doc """ Fetch recent messages for a room. @@ -60,15 +61,25 @@ defmodule BurbleWeb.API.MessageController do |> min(200) |> max(1) - case NNTPSBackend.fetch_recent(room_id, limit) do - {:ok, articles} -> - messages = Enum.map(articles, &format_article/1) - json(conn, %{messages: messages}) - - {:error, reason} -> - conn - |> put_status(500) - |> json(%{error: inspect(reason)}) + # Return messages from the in-memory MessageStore (fast, real-time). + # Fall back to NNTPSBackend for historical messages beyond the in-memory window. + store_messages = MessageStore.get_messages(room_id, limit) + + if length(store_messages) >= limit do + messages = Enum.map(store_messages, &format_store_message/1) + json(conn, %{messages: messages}) + else + # MessageStore didn't have enough — try NNTPSBackend for the full set. + case NNTPSBackend.fetch_recent(room_id, limit) do + {:ok, articles} -> + messages = Enum.map(articles, &format_article/1) + json(conn, %{messages: messages}) + + {:error, _reason} -> + # NNTPSBackend unavailable — return what the store has. + messages = Enum.map(store_messages, &format_store_message/1) + json(conn, %{messages: messages}) + end end end @@ -108,7 +119,18 @@ defmodule BurbleWeb.API.MessageController do |> json(%{error: "Message body exceeds 2000 byte limit"}) true -> - # Build options for NNTPS post. + # Store in the in-memory MessageStore for real-time retrieval. + store_id = :crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower) + timestamp = DateTime.utc_now() + + MessageStore.store_message(room_id, %{ + id: store_id, + from: user_id, + body: body, + timestamp: timestamp + }) + + # Also post to NNTPSBackend for persistence/threading. opts = if reply_to do [reply_to: reply_to] @@ -122,16 +144,39 @@ defmodule BurbleWeb.API.MessageController do |> put_status(201) |> json(format_article(article)) - {:error, reason} -> + {:error, _reason} -> + # NNTPSBackend unavailable — return a response based on the store entry. conn - |> put_status(500) - |> json(%{error: inspect(reason)}) + |> put_status(201) + |> json(%{ + message_id: store_id, + body: body, + display_name: display_name, + user_id: user_id, + sent_at: DateTime.to_iso8601(timestamp), + references: [], + is_pinned: false + }) end end end # ── Private helpers ── + # Format a MessageStore entry into the same JSON shape as an NNTPS article. + @doc false + defp format_store_message(msg) do + %{ + message_id: msg.id, + body: msg.body, + display_name: "Unknown", + user_id: msg.from, + sent_at: DateTime.to_iso8601(msg.timestamp), + references: [], + is_pinned: false + } + end + # Format an NNTPS article into a JSON-friendly message map. @doc false defp format_article(article) do diff --git a/server/test/burble/chat/message_store_test.exs b/server/test/burble/chat/message_store_test.exs new file mode 100644 index 0000000..93e3a83 --- /dev/null +++ b/server/test/burble/chat/message_store_test.exs @@ -0,0 +1,192 @@ +# SPDX-License-Identifier: PMPL-1.0-or-later +# +# Tests for Burble.Chat.MessageStore — in-memory ETS-backed message store. +# +# Covers: +# - Basic store and retrieve +# - Message cap (501st message evicts the oldest) +# - clear_room removes all messages for a room +# - get_messages respects the limit parameter +# - Isolation between rooms + +defmodule Burble.Chat.MessageStoreTest do + use ExUnit.Case, async: false + + alias Burble.Chat.MessageStore + + # Start a fresh MessageStore for each test so ETS state doesn't bleed. + setup do + # Stop any existing instance (e.g. from Application startup in test env). + case Process.whereis(MessageStore) do + nil -> :ok + pid -> GenServer.stop(pid) + end + + {:ok, _pid} = start_supervised!(MessageStore) + :ok + end + + # ── Helper ── + + defp make_msg(n) do + %{ + id: "msg-#{n}", + from: "user-1", + body: "Message number #{n}", + timestamp: DateTime.utc_now() + } + end + + defp room_id, do: "test-room-" <> Base.encode16(:crypto.strong_rand_bytes(4), case: :lower) + + # ── Store and retrieve ── + + describe "store_message / get_messages" do + test "stores a message and retrieves it" do + rid = room_id() + msg = make_msg(1) + + :ok = MessageStore.store_message(rid, msg) + + [retrieved] = MessageStore.get_messages(rid) + assert retrieved.id == "msg-1" + assert retrieved.body == "Message number 1" + end + + test "returns an empty list for a room with no messages" do + assert MessageStore.get_messages(room_id()) == [] + end + + test "messages are returned newest first" do + rid = room_id() + + MessageStore.store_message(rid, make_msg(1)) + MessageStore.store_message(rid, make_msg(2)) + MessageStore.store_message(rid, make_msg(3)) + + [first | _] = MessageStore.get_messages(rid) + assert first.id == "msg-3", "expected newest message first" + end + + test "multiple messages can be stored and retrieved" do + rid = room_id() + for n <- 1..10, do: MessageStore.store_message(rid, make_msg(n)) + + messages = MessageStore.get_messages(rid) + assert length(messages) == 10 + end + end + + # ── Limit ── + + describe "get_messages limit" do + test "get_messages respects the limit parameter" do + rid = room_id() + for n <- 1..20, do: MessageStore.store_message(rid, make_msg(n)) + + messages = MessageStore.get_messages(rid, 5) + assert length(messages) == 5 + end + + test "limit larger than store size returns all messages" do + rid = room_id() + for n <- 1..3, do: MessageStore.store_message(rid, make_msg(n)) + + messages = MessageStore.get_messages(rid, 100) + assert length(messages) == 3 + end + + test "default limit is 50" do + rid = room_id() + for n <- 1..60, do: MessageStore.store_message(rid, make_msg(n)) + + messages = MessageStore.get_messages(rid) + assert length(messages) == 50 + end + end + + # ── Message cap ── + + describe "message cap" do + test "501st message evicts the oldest" do + rid = room_id() + + # Store 500 messages (the cap). + for n <- 1..500, do: MessageStore.store_message(rid, make_msg(n)) + + messages_at_cap = MessageStore.get_messages(rid, 500) + assert length(messages_at_cap) == 500 + + # The oldest message (msg-1) should still be present. + ids = Enum.map(messages_at_cap, & &1.id) + assert "msg-1" in ids, "msg-1 should still be present before cap is exceeded" + + # Store the 501st message — this evicts msg-1. + MessageStore.store_message(rid, make_msg(501)) + + messages_after = MessageStore.get_messages(rid, 500) + assert length(messages_after) == 500 + + ids_after = Enum.map(messages_after, & &1.id) + refute "msg-1" in ids_after, "oldest message (msg-1) must be evicted after cap is exceeded" + assert "msg-501" in ids_after, "newly inserted message must be present" + end + + test "cap is enforced at exactly 500 messages" do + rid = room_id() + for n <- 1..510, do: MessageStore.store_message(rid, make_msg(n)) + + # Even with 510 inserts the store must not exceed 500. + messages = MessageStore.get_messages(rid, 600) + assert length(messages) == 500 + end + end + + # ── clear_room ── + + describe "clear_room" do + test "clears all messages for the room" do + rid = room_id() + for n <- 1..5, do: MessageStore.store_message(rid, make_msg(n)) + + :ok = MessageStore.clear_room(rid) + + assert MessageStore.get_messages(rid) == [] + end + + test "clear_room does not affect other rooms" do + rid_a = room_id() + rid_b = room_id() + + MessageStore.store_message(rid_a, make_msg(1)) + MessageStore.store_message(rid_b, make_msg(2)) + + :ok = MessageStore.clear_room(rid_a) + + assert MessageStore.get_messages(rid_a) == [] + assert length(MessageStore.get_messages(rid_b)) == 1 + end + + test "clear_room on empty room returns :ok without error" do + assert :ok == MessageStore.clear_room(room_id()) + end + end + + # ── Room isolation ── + + describe "room isolation" do + test "messages are scoped to their room" do + rid_a = room_id() + rid_b = room_id() + + MessageStore.store_message(rid_a, %{id: "a1", from: "u1", body: "hello from A", timestamp: DateTime.utc_now()}) + MessageStore.store_message(rid_b, %{id: "b1", from: "u2", body: "hello from B", timestamp: DateTime.utc_now()}) + + [msg_a] = MessageStore.get_messages(rid_a) + [msg_b] = MessageStore.get_messages(rid_b) + + assert msg_a.id == "a1" + assert msg_b.id == "b1" + end + end +end diff --git a/server/test/burble/llm/llm_test.exs b/server/test/burble/llm/llm_test.exs index 6d55a80..fe8f20e 100644 --- a/server/test/burble/llm/llm_test.exs +++ b/server/test/burble/llm/llm_test.exs @@ -329,4 +329,38 @@ defmodule Burble.LLMTest do Supervisor.stop(sup_pid) end end + + # --------------------------------------------------------------------------- + # Circuit breaker + # --------------------------------------------------------------------------- + + describe "AnthropicProvider circuit breaker" do + setup do + Burble.LLM.AnthropicProvider.reset_circuit_breaker() + on_exit(fn -> Burble.LLM.AnthropicProvider.reset_circuit_breaker() end) + end + + test "starts in :closed state" do + assert Burble.LLM.AnthropicProvider.circuit_breaker_status() == :closed + end + + test "reset_circuit_breaker/0 resets to :closed" do + # Force failures by writing directly to ETS + ensure_cb_table() + :ets.insert(:burble_llm_circuit_breaker, {:failures, 10}) + :ets.insert(:burble_llm_circuit_breaker, {:opened_at, System.monotonic_time(:millisecond)}) + + assert Burble.LLM.AnthropicProvider.circuit_breaker_status() == :open + + Burble.LLM.AnthropicProvider.reset_circuit_breaker() + assert Burble.LLM.AnthropicProvider.circuit_breaker_status() == :closed + end + + defp ensure_cb_table do + case :ets.info(:burble_llm_circuit_breaker) do + :undefined -> :ets.new(:burble_llm_circuit_breaker, [:set, :public, :named_table]) + _ -> :ok + end + end + end end diff --git a/server/test/burble/timing/alignment_test.exs b/server/test/burble/timing/alignment_test.exs new file mode 100644 index 0000000..c5ac1d2 --- /dev/null +++ b/server/test/burble/timing/alignment_test.exs @@ -0,0 +1,288 @@ +# SPDX-License-Identifier: PMPL-1.0-or-later +# Copyright (c) 2026 Jonathan D.A. Jewell (hyperpolymath) +# +# Tests for Burble.Timing.Alignment. + +defmodule Burble.Timing.AlignmentTest do + use ExUnit.Case, async: true + + alias Burble.Timing.Alignment + + # --------------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------------- + + # Start a fresh, anonymous Alignment GenServer for each test. + # We use a unique local_node atom per test to avoid node() being treated as + # the local node unless we explicitly want that behaviour. + defp start_alignment(opts \\ []) do + opts = + opts + |> Keyword.put_new(:local_node, :"local@testhost") + |> Keyword.put_new(:window_ms, 30_000) + + # Do NOT pass :name so tests get an anonymous pid and don't conflict. + start_supervised!({Alignment, opts}) + end + + # Flush a cast to the given pid before making assertions. + defp flush(pid), do: :sys.get_state(pid) + + # --------------------------------------------------------------------------- + # 1. Unknown node returns :error + # --------------------------------------------------------------------------- + + describe "unknown node" do + test "playout_offset_ns returns {:error, :unknown_node} for unreported node" do + pid = start_alignment() + assert GenServer.call(pid, {:playout_offset_ns, :"unknown@host"}) == {:error, :unknown_node} + end + + test "node_drift_ppm returns {:error, :unknown_node} for unreported node" do + pid = start_alignment() + assert GenServer.call(pid, {:node_drift_ppm, :"unknown@host"}) == {:error, :unknown_node} + end + end + + # --------------------------------------------------------------------------- + # 2. Report + query offset + # --------------------------------------------------------------------------- + + describe "report and query offset" do + test "playout_offset_ns returns {:ok, offset} after a report" do + pid = start_alignment() + remote = :"remote@host" + + # We want a predictable offset. We pass a wall_ns that is 500_000 ns + # ahead of what :erlang.monotonic_time(:nanosecond) returns. Because we + # cannot freeze the clock, we accept that the stored offset is + # approximately wall_ns_sent - local_now, which will be close to but not + # exactly 500_000. We just verify the shape and that it's a finite integer. + wall_ns = :erlang.monotonic_time(:nanosecond) + 500_000 + + GenServer.cast(pid, {:report_node_sync, remote, 48_000, wall_ns}) + flush(pid) + + assert {:ok, offset_ns} = GenServer.call(pid, {:playout_offset_ns, remote}) + assert is_integer(offset_ns) + end + end + + # --------------------------------------------------------------------------- + # 3. Drift computed correctly after two reports + # --------------------------------------------------------------------------- + + describe "drift computation" do + test "drift_ppm is non-zero after two reports with a simulated fast remote clock" do + pid = start_alignment() + remote = :"drifty@host" + + # First report: pretend remote clock is exactly in sync. + now1 = :erlang.monotonic_time(:nanosecond) + GenServer.cast(pid, {:report_node_sync, remote, 48_000, now1}) + flush(pid) + + # Second report: 10 ms of wall time has passed on the local clock, but + # the remote wall_ns is 10_010_000 ns ahead — 1 000 ns extra in 10 ms = + # +100 PPM. + # We sleep a tiny amount to ensure the monotonic clock advances so that + # delta_time > 0. + Process.sleep(5) + delta_ns = 10_010_000 + now2 = :erlang.monotonic_time(:nanosecond) + GenServer.cast(pid, {:report_node_sync, remote, 96_000, now2 + delta_ns}) + flush(pid) + + assert {:ok, drift} = GenServer.call(pid, {:node_drift_ppm, remote}) + assert is_float(drift) + # Direction: remote clock moved forward faster than local → positive drift. + assert drift > 0.0 + end + + test "drift_ppm is 0.0 for a perfectly synchronised remote clock" do + pid = start_alignment() + remote = :"sync@host" + + # Two reports where offset stays constant (remote clock moves at exactly + # the same rate as local). offset_ns is the same for both observations. + fixed_offset = 1_000_000 + + now1 = :erlang.monotonic_time(:nanosecond) + GenServer.cast(pid, {:report_node_sync, remote, 0, now1 + fixed_offset}) + flush(pid) + + Process.sleep(5) + now2 = :erlang.monotonic_time(:nanosecond) + GenServer.cast(pid, {:report_node_sync, remote, 48_000, now2 + fixed_offset}) + flush(pid) + + assert {:ok, drift} = GenServer.call(pid, {:node_drift_ppm, remote}) + # delta_offset = 0, so drift should be 0.0 + assert_in_delta drift, 0.0, 0.5 + end + end + + # --------------------------------------------------------------------------- + # 4. Stale node eviction + # --------------------------------------------------------------------------- + + describe "stale node eviction" do + test "node is evicted after window_ms elapses" do + # Use a very short window so we don't have to wait long. + pid = start_alignment(window_ms: 50) + remote = :"stale@host" + + wall_ns = :erlang.monotonic_time(:nanosecond) + GenServer.cast(pid, {:report_node_sync, remote, 0, wall_ns}) + flush(pid) + + # Confirm it's present. + assert {:ok, _} = GenServer.call(pid, {:playout_offset_ns, remote}) + + # Wait longer than the window, then trigger another cast so eviction runs. + Process.sleep(100) + + # Trigger the eviction by reporting a second (different) node. + GenServer.cast(pid, {:report_node_sync, :"trigger@host", 0, :erlang.monotonic_time(:nanosecond)}) + flush(pid) + + # The stale node should now be gone. + assert GenServer.call(pid, {:playout_offset_ns, remote}) == {:error, :unknown_node} + end + end + + # --------------------------------------------------------------------------- + # 5. sync_status returns node list + # --------------------------------------------------------------------------- + + describe "sync_status" do + test "returns empty node list when no nodes have reported" do + pid = start_alignment() + status = GenServer.call(pid, :sync_status) + assert status.nodes == [] + assert status.local_node == :"local@testhost" + end + + test "sync_status contains reported nodes with expected keys" do + pid = start_alignment() + remote = :"peer@remote" + + GenServer.cast(pid, {:report_node_sync, remote, 0, :erlang.monotonic_time(:nanosecond)}) + flush(pid) + + %{nodes: nodes, local_node: local} = GenServer.call(pid, :sync_status) + + assert local == :"local@testhost" + assert length(nodes) == 1 + + [entry] = nodes + assert entry.node == remote + assert Map.has_key?(entry, :offset_ns) + assert Map.has_key?(entry, :drift_ppm) + assert Map.has_key?(entry, :last_seen) + end + end + + # --------------------------------------------------------------------------- + # 6. playout_offset_ns after report + # --------------------------------------------------------------------------- + + describe "playout_offset_ns after report" do + test "offset is close to wall_ns - local_ns at time of report" do + pid = start_alignment() + remote = :"ahead@host" + + # Send a wall_ns that is 1_000_000 ns (1 ms) ahead of now. + now_ns = :erlang.monotonic_time(:nanosecond) + wall_ns = now_ns + 1_000_000 + + GenServer.cast(pid, {:report_node_sync, remote, 0, wall_ns}) + flush(pid) + + assert {:ok, offset_ns} = GenServer.call(pid, {:playout_offset_ns, remote}) + # The offset should be positive (remote is ahead) and in the right ballpark. + # We allow ±5 ms tolerance for scheduling jitter during test. + assert offset_ns > 0 + assert_in_delta offset_ns, 1_000_000, 5_000_000 + end + end + + # --------------------------------------------------------------------------- + # 7. Multiple nodes tracked independently + # --------------------------------------------------------------------------- + + describe "multiple nodes tracked independently" do + test "two remote nodes have independent offsets" do + pid = start_alignment() + node_a = :"node_a@host" + node_b = :"node_b@host" + + now_ns = :erlang.monotonic_time(:nanosecond) + + # node_a is 2 ms ahead, node_b is 5 ms behind. + GenServer.cast(pid, {:report_node_sync, node_a, 0, now_ns + 2_000_000}) + GenServer.cast(pid, {:report_node_sync, node_b, 0, now_ns - 5_000_000}) + flush(pid) + + assert {:ok, offset_a} = GenServer.call(pid, {:playout_offset_ns, node_a}) + assert {:ok, offset_b} = GenServer.call(pid, {:playout_offset_ns, node_b}) + + # They must differ and have the correct signs. + assert offset_a > 0, "node_a should be reported as ahead" + assert offset_b < 0, "node_b should be reported as behind" + assert offset_a != offset_b + end + + test "reporting one node does not affect another node's stored offset" do + pid = start_alignment() + node_a = :"stable_a@host" + node_b = :"changing_b@host" + + now_ns = :erlang.monotonic_time(:nanosecond) + + GenServer.cast(pid, {:report_node_sync, node_a, 0, now_ns + 1_000_000}) + flush(pid) + + {:ok, offset_a_before} = GenServer.call(pid, {:playout_offset_ns, node_a}) + + # Report node_b many times; node_a's offset should be stable. + for i <- 1..5 do + GenServer.cast(pid, {:report_node_sync, node_b, i * 480, :erlang.monotonic_time(:nanosecond) + i * 100_000}) + end + + flush(pid) + + {:ok, offset_a_after} = GenServer.call(pid, {:playout_offset_ns, node_a}) + + # Allow small tolerance for monotonic clock advancement between the two reads. + # The stored offset for node_a should not have changed (no new report came in). + assert offset_a_before == offset_a_after + end + end + + # --------------------------------------------------------------------------- + # 8. Local node excluded from offset (always returns 0) + # --------------------------------------------------------------------------- + + describe "local node special-casing" do + test "playout_offset_ns returns {:ok, 0} for the local node" do + pid = start_alignment(local_node: :"mynode@localhost") + assert GenServer.call(pid, {:playout_offset_ns, :"mynode@localhost"}) == {:ok, 0} + end + + test "node_drift_ppm returns {:ok, 0.0} for the local node" do + pid = start_alignment(local_node: :"mynode@localhost") + assert GenServer.call(pid, {:node_drift_ppm, :"mynode@localhost"}) == {:ok, 0.0} + end + + test "local node returns {:ok, 0} even after reporting it as a remote" do + # If somehow a report arrives with the local node atom, it gets stored. + # But playout_offset_ns should still short-circuit to {:ok, 0}. + pid = start_alignment(local_node: :"mynode@localhost") + now_ns = :erlang.monotonic_time(:nanosecond) + GenServer.cast(pid, {:report_node_sync, :"mynode@localhost", 0, now_ns + 999_999}) + flush(pid) + assert GenServer.call(pid, {:playout_offset_ns, :"mynode@localhost"}) == {:ok, 0} + end + end +end diff --git a/server/test/burble/timing/clock_correlator_test.exs b/server/test/burble/timing/clock_correlator_test.exs new file mode 100644 index 0000000..5c9662d --- /dev/null +++ b/server/test/burble/timing/clock_correlator_test.exs @@ -0,0 +1,349 @@ +# SPDX-License-Identifier: PMPL-1.0-or-later +# Copyright (c) 2026 Jonathan D.A. Jewell (hyperpolymath) +# +# Tests for Burble.Timing.ClockCorrelator. + +defmodule Burble.Timing.ClockCorrelatorTest do + use ExUnit.Case, async: true + + alias Burble.Timing.ClockCorrelator + + # --------------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------------- + + # Start a fresh, anonymous correlator for each test. + defp start_correlator(opts \\ []) do + opts = Keyword.put_new(opts, :clock_rate, 48_000) + start_supervised!({ClockCorrelator, opts}) + end + + # Ideal nanoseconds per RTP tick at 48 000 Hz. + @ns_per_tick 1_000_000_000 / 48_000 + + # --------------------------------------------------------------------------- + # Empty-state guard + # --------------------------------------------------------------------------- + + describe "empty state" do + test "rtp_to_wall returns {:error, :no_sync_points} before any sync points" do + pid = start_correlator() + assert ClockCorrelator.rtp_to_wall(pid, 1_000) == {:error, :no_sync_points} + end + + test "wall_to_rtp returns {:error, :no_sync_points} before any sync points" do + pid = start_correlator() + assert ClockCorrelator.wall_to_rtp(pid, 1_000_000_000) == {:error, :no_sync_points} + end + + test "drift_ppm returns {:error, :insufficient_data} with fewer than two sync points" do + pid = start_correlator() + assert ClockCorrelator.drift_ppm(pid) == {:error, :insufficient_data} + end + + test "drift_ppm returns {:error, :insufficient_data} with exactly one sync point" do + pid = start_correlator() + ClockCorrelator.record_sync_point(pid, 0, 0) + # Allow the cast to be processed. + :sys.get_state(pid) + assert ClockCorrelator.drift_ppm(pid) == {:error, :insufficient_data} + end + end + + # --------------------------------------------------------------------------- + # Basic sync point recording and rtp_to_wall conversion + # --------------------------------------------------------------------------- + + describe "basic sync point + rtp_to_wall" do + test "single sync point: converts using ideal clock rate" do + pid = start_correlator() + + anchor_rtp = 96_000 + anchor_wall = 1_000_000_000 + + ClockCorrelator.record_sync_point(pid, anchor_rtp, anchor_wall) + :sys.get_state(pid) + + # 480 ticks ahead @ 48 000 Hz = 10 ms = 10_000_000 ns. + query_rtp = anchor_rtp + 480 + {:ok, wall} = ClockCorrelator.rtp_to_wall(pid, query_rtp) + + expected = anchor_wall + round(480 * @ns_per_tick) + assert_in_delta wall, expected, 10 + end + + test "two sync points on ideal clock: conversion is exact" do + pid = start_correlator() + + # Anchor at t=0, then exactly 1 second later (48_000 ticks = 1 s). + ClockCorrelator.record_sync_point(pid, 0, 0) + ClockCorrelator.record_sync_point(pid, 48_000, 1_000_000_000) + :sys.get_state(pid) + + # Query 2 s from anchor. + {:ok, wall} = ClockCorrelator.rtp_to_wall(pid, 96_000) + assert_in_delta wall, 2_000_000_000, 10 + end + + test "rtp_to_wall is monotonically increasing for increasing RTP timestamps" do + pid = start_correlator() + + base_rtp = 10_000 + base_wall = 5_000_000_000 + + for i <- 0..9 do + ClockCorrelator.record_sync_point(pid, base_rtp + i * 480, base_wall + round(i * 480 * @ns_per_tick)) + end + + :sys.get_state(pid) + + walls = + for i <- 0..20 do + {:ok, w} = ClockCorrelator.rtp_to_wall(pid, base_rtp + i * 480) + w + end + + assert walls == Enum.sort(walls) + end + end + + # --------------------------------------------------------------------------- + # RTP wraparound handling + # --------------------------------------------------------------------------- + + describe "RTP wraparound" do + # The 32-bit RTP counter wraps at 4_294_967_296. We simulate a stream + # that crosses the boundary. + + @rtp_max 4_294_967_296 + + test "timestamps crossing 2^32 are treated as continuous" do + pid = start_correlator() + + # Place a sync point just before wraparound. + pre_wrap_rtp = @rtp_max - 4_800 + pre_wrap_wall = 1_000_000_000 + + ClockCorrelator.record_sync_point(pid, pre_wrap_rtp, pre_wrap_wall) + + # 4_800 ticks later the counter wraps to 0. + post_wrap_rtp = 0 + post_wrap_wall = pre_wrap_wall + round(4_800 * @ns_per_tick) + + ClockCorrelator.record_sync_point(pid, post_wrap_rtp, post_wrap_wall) + :sys.get_state(pid) + + # Query 480 ticks after the wrap point. + query_rtp = 480 + expected_wall = post_wrap_wall + round(480 * @ns_per_tick) + + {:ok, result_wall} = ClockCorrelator.rtp_to_wall(pid, query_rtp) + + # Allow 500 ns tolerance for rounding. + assert_in_delta result_wall, expected_wall, 500 + end + + test "multiple wraps: wall time keeps growing" do + pid = start_correlator() + + # Simulate three wrap-around events. + ticks_per_wrap = @rtp_max + + base_wall = 0 + + # Seed sync points across wraps. We feed raw (wrapped) RTP values. + for wrap <- 0..2 do + rtp_raw = rem(wrap * ticks_per_wrap, @rtp_max) + wall = base_wall + wrap * round(ticks_per_wrap * @ns_per_tick) + ClockCorrelator.record_sync_point(pid, rtp_raw, wall) + end + + :sys.get_state(pid) + + # Wall time should increase across wraps. + {:ok, w0} = ClockCorrelator.rtp_to_wall(pid, 0) + {:ok, w1} = ClockCorrelator.rtp_to_wall(pid, 48_000) + assert w1 > w0 + end + + test "wall_to_rtp round-trip survives wraparound region" do + pid = start_correlator() + + pre_wrap_rtp = @rtp_max - 9_600 + pre_wrap_wall = 2_000_000_000 + + ClockCorrelator.record_sync_point(pid, pre_wrap_rtp, pre_wrap_wall) + ClockCorrelator.record_sync_point(pid, 0, pre_wrap_wall + round(9_600 * @ns_per_tick)) + ClockCorrelator.record_sync_point(pid, 9_600, pre_wrap_wall + round(19_200 * @ns_per_tick)) + :sys.get_state(pid) + + test_wall = pre_wrap_wall + round(14_400 * @ns_per_tick) + {:ok, recovered_rtp} = ClockCorrelator.wall_to_rtp(pid, test_wall) + + # Expected raw RTP: 4800 ticks after wrap = 4800. + expected_rtp = 4_800 + # Allow 2 tick tolerance (rounding in regression). + assert abs(recovered_rtp - expected_rtp) <= 2 + end + end + + # --------------------------------------------------------------------------- + # Drift estimation + # --------------------------------------------------------------------------- + + describe "drift estimation" do + test "ideal clock yields ~0 PPM drift" do + pid = start_correlator() + + # Feed 10 sync points on a perfect 48 000 Hz clock. + for i <- 0..9 do + rtp = i * 48_000 + wall = i * 1_000_000_000 + ClockCorrelator.record_sync_point(pid, rtp, wall) + end + + :sys.get_state(pid) + + {:ok, ppm} = ClockCorrelator.drift_ppm(pid) + # Should be very close to 0. + assert abs(ppm) < 0.1 + end + + test "RTP clock running 100 PPM fast is detected" do + pid = start_correlator() + + # A 100 PPM fast RTP clock ticks at clock_rate * (1 + 100e-6) Hz. + # In practice: for every wall second, the RTP counter advances by + # clock_rate + clock_rate * 100e-6 ticks. + drift_factor = 1.0 + 100.0 / 1_000_000 + + for i <- 0..15 do + rtp = round(i * 48_000 * drift_factor) + wall = i * 1_000_000_000 + ClockCorrelator.record_sync_point(pid, rtp, wall) + end + + :sys.get_state(pid) + + {:ok, ppm} = ClockCorrelator.drift_ppm(pid) + + # The measured PPM should be close to -100 (wall slower → slope + # smaller → negative drift in our convention: slope - ideal < 0 + # because RTP ticks faster so fewer ns per tick). + # actual: slope = ns/tick = 1e9 / (48000 * drift_factor) + # ideal = 1e9 / 48000 + # (slope - ideal)/ideal = -100/(1e6 + 100) ≈ -100 PPM + assert_in_delta ppm, -100.0, 1.0 + end + + test "RTP clock running 50 PPM slow is detected" do + pid = start_correlator() + + drift_factor = 1.0 - 50.0 / 1_000_000 + + for i <- 0..15 do + rtp = round(i * 48_000 * drift_factor) + wall = i * 1_000_000_000 + ClockCorrelator.record_sync_point(pid, rtp, wall) + end + + :sys.get_state(pid) + + {:ok, ppm} = ClockCorrelator.drift_ppm(pid) + # Slope > ideal → positive PPM? Let's check: + # slope = 1e9 / (48000 * drift_factor) > ideal → (slope-ideal)/ideal > 0 → +50 PPM + assert_in_delta ppm, 50.0, 1.0 + end + end + + # --------------------------------------------------------------------------- + # wall_to_rtp round-trip accuracy + # --------------------------------------------------------------------------- + + describe "wall_to_rtp round-trip" do + test "round-trip rtp → wall → rtp is accurate within 2 ticks" do + pid = start_correlator() + + base_rtp = 100_000 + base_wall = 3_000_000_000 + + for i <- 0..19 do + ClockCorrelator.record_sync_point( + pid, + base_rtp + i * 480, + base_wall + round(i * 480 * @ns_per_tick) + ) + end + + :sys.get_state(pid) + + query_rtp = base_rtp + 2_400 + + {:ok, wall} = ClockCorrelator.rtp_to_wall(pid, query_rtp) + {:ok, recovered} = ClockCorrelator.wall_to_rtp(pid, wall) + + assert abs(recovered - query_rtp) <= 2 + end + + test "round-trip wall → rtp → wall is accurate within 1 µs" do + pid = start_correlator() + + base_rtp = 50_000 + base_wall = 1_500_000_000 + + for i <- 0..19 do + ClockCorrelator.record_sync_point( + pid, + base_rtp + i * 480, + base_wall + round(i * 480 * @ns_per_tick) + ) + end + + :sys.get_state(pid) + + query_wall = base_wall + 5_000_000 + + {:ok, rtp} = ClockCorrelator.wall_to_rtp(pid, query_wall) + {:ok, recovered_wall} = ClockCorrelator.rtp_to_wall(pid, rtp) + + # Within 1 µs (1000 ns). + assert_in_delta recovered_wall, query_wall, 1_000 + end + end + + # --------------------------------------------------------------------------- + # Sliding window (max_points eviction) + # --------------------------------------------------------------------------- + + describe "sliding window" do + test "window is capped at max_points (64)" do + pid = start_correlator() + + # Record 80 sync points. + for i <- 0..79 do + ClockCorrelator.record_sync_point(pid, i * 480, i * round(480 * @ns_per_tick)) + end + + :sys.get_state(pid) + + state = :sys.get_state(pid) + assert length(state.sync_points) == 64 + end + + test "newest points are retained when window is full" do + pid = start_correlator() + + for i <- 0..79 do + ClockCorrelator.record_sync_point(pid, i * 480, i * round(480 * @ns_per_tick)) + end + + :sys.get_state(pid) + + # The most recent point is index 79. + state = :sys.get_state(pid) + {latest_rtp, _} = hd(state.sync_points) + # Unwrapped RTP for index 79 = 79 * 480. + assert latest_rtp == 79 * 480 + end + end +end diff --git a/server/test/burble_web/channels/room_channel_text_test.exs b/server/test/burble_web/channels/room_channel_text_test.exs new file mode 100644 index 0000000..0f2f086 --- /dev/null +++ b/server/test/burble_web/channels/room_channel_text_test.exs @@ -0,0 +1,306 @@ +# SPDX-License-Identifier: PMPL-1.0-or-later +# +# Tests for BurbleWeb.RoomChannel — text messaging extensions. +# +# Covers the new "text:send", "text:typing", and "text:history" events +# added in Phase 3. These events are separate from the legacy "text" +# event (which uses NNTPSBackend) and use the in-memory MessageStore. +# +# Infrastructure notes: +# - Mirrors the setup pattern in Burble.E2E.SignalingTest. +# - NNTPSBackend is also started here because RoomChannel.join/3 may +# reach it for other in-flight operations; having it up avoids +# unrelated crashes during setup. +# - All tests run async: false because they share named ETS tables +# (Burble.Chat.MessageStore) and named processes. + +defmodule BurbleWeb.Channels.RoomChannelTextTest do + use ExUnit.Case, async: false + use Phoenix.ChannelTest + + import Burble.TestHelpers + + @endpoint BurbleWeb.Endpoint + + # --------------------------------------------------------------------------- + # Setup + # --------------------------------------------------------------------------- + + setup do + Application.ensure_all_started(:phoenix_pubsub) + + start_supervised!({Phoenix.PubSub, name: Burble.PubSub}) + start_supervised!({Registry, keys: :unique, name: Burble.RoomRegistry}) + start_supervised!({DynamicSupervisor, name: Burble.RoomSupervisor, strategy: :one_for_one}) + start_supervised!(Burble.Presence) + start_supervised!(Burble.Media.Engine) + start_supervised!(Burble.Text.NNTPSBackend) + + # MessageStore — stop any existing instance first so we get a clean ETS table. + case Process.whereis(Burble.Chat.MessageStore) do + nil -> :ok + pid -> GenServer.stop(pid) + end + + start_supervised!(Burble.Chat.MessageStore) + + case BurbleWeb.Endpoint.start_link() do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + end + + :ok + end + + # --------------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------------- + + defp guest_socket(display_name \\ "TestGuest") do + {:ok, guest} = Burble.Auth.create_guest_session(display_name) + + socket(:user_socket, %{ + user_id: guest.id, + display_name: guest.display_name, + is_guest: true + }) + end + + defp join_room(display_name \\ "Tester") do + sock = guest_socket(display_name) + room_id = generate_room_id() + + {:ok, _reply, chan} = + subscribe_and_join(sock, BurbleWeb.RoomChannel, "room:#{room_id}", %{ + "display_name" => display_name + }) + + {chan, room_id} + end + + # --------------------------------------------------------------------------- + # text:send — happy path + # --------------------------------------------------------------------------- + + describe "text:send" do + test "broadcasts text:new to the room on success" do + {chan, _room_id} = join_room("Alice") + + ref = push(chan, "text:send", %{"body" => "Hello, world!"}) + + assert_reply ref, :ok, %{id: id} + assert is_binary(id) and byte_size(id) > 0 + + assert_broadcast "text:new", %{body: "Hello, world!", from: from} + assert is_binary(from) + + leave(chan) + end + + test "broadcast includes id, from, body, and timestamp fields" do + {chan, _room_id} = join_room("Bob") + + push(chan, "text:send", %{"body" => "Timestamp test"}) + + assert_broadcast "text:new", %{id: id, from: from, body: body, timestamp: ts} + assert is_binary(id) + assert is_binary(from) + assert body == "Timestamp test" + assert is_binary(ts) + # Timestamp must be an ISO-8601 string. + assert {:ok, _dt, _} = DateTime.from_iso8601(ts) + + leave(chan) + end + + test "reply contains the message id" do + {chan, _room_id} = join_room() + + ref = push(chan, "text:send", %{"body" => "ID check"}) + assert_reply ref, :ok, %{id: id} + assert String.length(id) == 32, "expected 32-char hex ID, got: #{id}" + + leave(chan) + end + + test "text:send with empty body returns invalid_text_payload error" do + {chan, _room_id} = join_room() + + ref = push(chan, "text:send", %{"body" => ""}) + assert_reply ref, :error, %{reason: "invalid_text_payload"} + + leave(chan) + end + + test "text:send with missing body returns invalid_text_payload error" do + {chan, _room_id} = join_room() + + ref = push(chan, "text:send", %{}) + assert_reply ref, :error, %{reason: "invalid_text_payload"} + + leave(chan) + end + + test "text:send with body exceeding 4096 bytes returns invalid_text_payload error" do + {chan, _room_id} = join_room() + + oversized = String.duplicate("x", 4097) + ref = push(chan, "text:send", %{"body" => oversized}) + assert_reply ref, :error, %{reason: "invalid_text_payload"} + + leave(chan) + end + + test "text:send with exactly 4096-byte body is accepted" do + {chan, _room_id} = join_room() + + max_body = String.duplicate("a", 4096) + ref = push(chan, "text:send", %{"body" => max_body}) + assert_reply ref, :ok, %{id: _id} + + leave(chan) + end + + test "text:new is NOT echoed to the sender's own push (broadcast_from semantics do not apply here)" do + # text:send uses broadcast! (not broadcast_from!) so the sender DOES receive + # the broadcast. This test validates broadcast! semantics are in place. + {chan, _room_id} = join_room("Self") + + push(chan, "text:send", %{"body" => "Echo test"}) + assert_broadcast "text:new", %{body: "Echo test"} + + leave(chan) + end + end + + # --------------------------------------------------------------------------- + # text:typing + # --------------------------------------------------------------------------- + + describe "text:typing" do + test "broadcasts text:typing indicator with from field" do + {chan, _room_id} = join_room("Typer") + + push(chan, "text:typing", %{}) + + assert_broadcast "text:typing", %{from: from} + assert is_binary(from) + + leave(chan) + end + + test "typing indicator is throttled: second push within 2s is ignored" do + {chan, _room_id} = join_room("ThrottleTest") + + push(chan, "text:typing", %{}) + assert_broadcast "text:typing", %{} + + # Immediately send a second typing event — should be throttled. + push(chan, "text:typing", %{}) + + # The second broadcast must NOT arrive within a short window. + refute_broadcast "text:typing", %{}, 100 + + leave(chan) + end + + test "typing indicator does not reply (noreply)" do + {chan, _room_id} = join_room() + + ref = push(chan, "text:typing", %{}) + + # Phoenix.ChannelTest: push/3 returns a ref; if the handler returns + # {:noreply, socket} there should be no :ok/:error reply on that ref. + refute_reply ref, :ok, %{}, 100 + refute_reply ref, :error, %{}, 100 + + leave(chan) + end + end + + # --------------------------------------------------------------------------- + # text:history + # --------------------------------------------------------------------------- + + describe "text:history" do + test "returns an empty list for a room with no history" do + {chan, _room_id} = join_room() + + ref = push(chan, "text:history", %{"limit" => 10}) + assert_reply ref, :ok, %{messages: []} + + leave(chan) + end + + test "returns stored messages after text:send" do + {chan, _room_id} = join_room("HistoryUser") + + push(chan, "text:send", %{"body" => "First message"}) + push(chan, "text:send", %{"body" => "Second message"}) + + # Allow broadcasts to propagate. + assert_broadcast "text:new", %{} + assert_broadcast "text:new", %{} + + ref = push(chan, "text:history", %{"limit" => 10}) + assert_reply ref, :ok, %{messages: messages} + assert length(messages) >= 2 + + bodies = Enum.map(messages, & &1.body) + assert "First message" in bodies + assert "Second message" in bodies + + leave(chan) + end + + test "history messages include id, from, body, timestamp" do + {chan, _room_id} = join_room() + + push(chan, "text:send", %{"body" => "Schema check"}) + assert_broadcast "text:new", %{} + + ref = push(chan, "text:history", %{"limit" => 5}) + assert_reply ref, :ok, %{messages: [msg | _]} + + assert Map.has_key?(msg, :id) + assert Map.has_key?(msg, :from) + assert Map.has_key?(msg, :body) + assert Map.has_key?(msg, :timestamp) + + leave(chan) + end + + test "history respects the limit parameter" do + {chan, _room_id} = join_room() + + for i <- 1..10 do + push(chan, "text:send", %{"body" => "msg #{i}"}) + assert_broadcast "text:new", %{} + end + + ref = push(chan, "text:history", %{"limit" => 3}) + assert_reply ref, :ok, %{messages: messages} + assert length(messages) == 3 + + leave(chan) + end + + test "text:history with invalid params returns error" do + {chan, _room_id} = join_room() + + ref = push(chan, "text:history", %{}) + assert_reply ref, :error, %{reason: "invalid_history_params"} + + leave(chan) + end + + test "text:history with limit 0 returns error" do + {chan, _room_id} = join_room() + + ref = push(chan, "text:history", %{"limit" => 0}) + assert_reply ref, :error, %{reason: "invalid_history_params"} + + leave(chan) + end + end +end diff --git a/tools/affinescript b/tools/affinescript new file mode 160000 index 0000000..c15855e --- /dev/null +++ b/tools/affinescript @@ -0,0 +1 @@ +Subproject commit c15855e8bf2f14a5e16f3b4ccec18e25336522da