diff --git a/.machine_readable/6a2/STATE.a2ml b/.machine_readable/6a2/STATE.a2ml index d81e1b2..f6c154b 100644 --- a/.machine_readable/6a2/STATE.a2ml +++ b/.machine_readable/6a2/STATE.a2ml @@ -6,7 +6,7 @@ [metadata] project = "burble" version = "1.1.0-pre" -last-updated = "2026-04-16" +last-updated = "2026-04-21" status = "active" [project-context] @@ -26,7 +26,7 @@ milestones = [ { name = "Phase 0 — Scrub baseline (V-lang removed, docs honest)", completion = 100, date = "2026-04-16" }, { name = "Phase 1 — Audio dependable (Opus honest, comfort noise, REMB, Avow chain, echo-cancel ref, neural spectral-gate verified)", completion = 85 }, { name = "Phase 2 — P2P AI channel dependable (burble-ai-bridge fixes, round-trip tests, docs) — CRITICAL PATH for family/pair-programming use case", completion = 80 }, - { name = "Phase 2b — server-side Burble.LLM (provider, circuit breaker, fixed parse_frame, NimblePool wired) — SECONDARY, not required for family use case", completion = 40 }, + { name = "Phase 2b — server-side Burble.LLM (provider, circuit breaker, fixed parse_frame, NimblePool wired) — SECONDARY, not required for family use case", completion = 80 }, { name = "Phase 3 — RTSP + signaling + text + AffineScript client start", completion = 0 }, { name = "Phase 4 — PTP hardware clock via Zig NIF, phc2sys supervisor, multi-node align", completion = 10 }, { name = "Phase 5 — ReScript -> AffineScript completion", completion = 0 } @@ -39,7 +39,7 @@ signaling-relay = { status = "consolidated", canonical = "signaling/relay.js", r [blockers-and-issues] doc-reality-drift = [ - "ROADMAP.adoc claims LLM Service DONE — is a stub (provider missing, parse_frame broken)", + "ROADMAP.adoc LLM — AnthropicProvider wired, NimblePool gating, REST endpoint live. Remaining: circuit breaker, rate limiting per user, streaming SSE endpoint.", "ROADMAP.adoc claims Formal Proofs DONE — Avow attestation is data-type-only, no dependent-type enforcement", "README.adoc PTP claim sub-microsecond assumes hardware — code falls back to system clock without NIF" ] @@ -66,7 +66,7 @@ phase-1-audio = [ "DONE 2026-04-16: Server-side comfort noise injection after 3 silent frames (60ms at 20ms/frame)", "DONE 2026-04-16: REMB bitrate adaptation — Pipeline.update_bitrate/3 wired via Backend.io_adaptive_bitrate", "DONE 2026-04-16: Avow hash-chain linkage + ETS store + 10 property tests (commit 43669aa)", - "NEXT: Wire RTP-timestamp sync in media/peer.ex → Pipeline (precursor to PTP Phase 4, not blocking audio)" + "DONE 2026-04-21: Wire RTP-timestamp sync — peer.ex extracts packet.timestamp, pipeline.ex stores last_rtp_ts via record_rtp_timestamp/2 cast (Phase 4 PTP correlation ready)" ] [maintenance-status] diff --git a/client/web/src/Main.affine b/client/web/src/Main.affine index e5b984a..01d5c97 100644 --- a/client/web/src/Main.affine +++ b/client/web/src/Main.affine @@ -23,9 +23,10 @@ Console.log2("[Burble] Auth:", AuthState.displayName(app.auth)) @val @scope("window") external addPopStateListener: (@as("popstate") _, affine ('a => unit)) => unit = "addEventListener" +// The closure borrows app via &app (shared borrow) — does not consume it. addPopStateListener(_ => { let path: string = %raw(`window.location.pathname`) - App.handleUrlChange(app, path) + App.handleUrlChange(&app, path) }) // Set initial page title (borrows app.currentRoute). diff --git a/server/config/runtime.exs b/server/config/runtime.exs index d62e541..62379a1 100644 --- a/server/config/runtime.exs +++ b/server/config/runtime.exs @@ -57,6 +57,9 @@ if config_env() == :prod do enabled: true, primary_port: String.to_integer(System.get_env("LLM_PORT") || "8503"), fallback_port: String.to_integer(System.get_env("LLM_FALLBACK_PORT") || "8085"), + # Anthropic Claude API — set ANTHROPIC_API_KEY to enable server-side LLM. + anthropic_model: System.get_env("ANTHROPIC_MODEL") || "claude-sonnet-4-6", + anthropic_max_tokens: String.to_integer(System.get_env("ANTHROPIC_MAX_TOKENS") || "4096"), ipv6_preference: true, tls: [ certfile: "priv/ssl/cert.pem", diff --git a/server/lib/burble/coprocessor/pipeline.ex b/server/lib/burble/coprocessor/pipeline.ex index e6dbda0..40e0145 100644 --- a/server/lib/burble/coprocessor/pipeline.ex +++ b/server/lib/burble/coprocessor/pipeline.ex @@ -140,6 +140,10 @@ defmodule Burble.Coprocessor.Pipeline do # Silence counter: frames since last non-nil inbound. Drives comfort # noise injection so peers don't hear dead air when a speaker pauses. silence_frames: 0, + # Last RTP timestamp received from the network — populated via + # record_rtp_timestamp/2 when peer.ex extracts it from incoming packets. + # Used by Phase 4 PTP correlation to map RTP clock → wall clock. + last_rtp_ts: 0, # Metrics frames_processed: 0, frames_dropped: 0, @@ -297,6 +301,27 @@ defmodule Burble.Coprocessor.Pipeline do {:reply, {:ok, health}, state} end + # --------------------------------------------------------------------------- + # RTP timestamp tracking (Phase 4 PTP precursor) + # --------------------------------------------------------------------------- + + @doc """ + Record the latest RTP timestamp received from the network. + + Called by `Burble.Media.Peer` each time an RTP packet arrives so the + pipeline knows the sender's RTP clock position. Phase 4 will correlate + this against the PTP hardware clock to derive end-to-end latency and + enable multi-node playout alignment. + """ + def record_rtp_timestamp(pipeline, rtp_ts) do + GenServer.cast(pipeline, {:rtp_timestamp, rtp_ts}) + end + + @impl true + def handle_cast({:rtp_timestamp, rtp_ts}, state) do + {:noreply, %{state | last_rtp_ts: rtp_ts}} + end + # --------------------------------------------------------------------------- # Bitrate adaptation (REMB feedback) # --------------------------------------------------------------------------- diff --git a/server/lib/burble/coprocessor/snif_backend.ex b/server/lib/burble/coprocessor/snif_backend.ex index dc967ac..13c0ea1 100644 --- a/server/lib/burble/coprocessor/snif_backend.ex +++ b/server/lib/burble/coprocessor/snif_backend.ex @@ -517,10 +517,10 @@ defmodule Burble.Coprocessor.SNIFBackend do result = Wasmex.call_function(pid, function, args) GenServer.stop(pid, :normal) result - {:error, reason} -> {:error, :wasm_load_failed, detail: reason} + {:error, reason} -> {:error, {:wasm_load_failed, reason}} end rescue - error -> {:error, :snif_exception, detail: error} + error -> {:error, {:snif_exception, error}} end end diff --git a/server/lib/burble/llm.ex b/server/lib/burble/llm.ex index 3c343c2..e7fce1c 100644 --- a/server/lib/burble/llm.ex +++ b/server/lib/burble/llm.ex @@ -21,15 +21,18 @@ defmodule Burble.LLM do :ok end + @pool_timeout 65_000 + @doc """ - Process a synchronous LLM query. + Process a synchronous LLM query. Routes through the NimblePool to gate + concurrency — at most `pool_size` (default 10) requests run simultaneously. """ def process_query(user_id, prompt) do - Logger.debug("[LLM] Processing query for #{user_id}: #{prompt}") + Logger.debug("[LLM] Processing query for #{user_id}") provider = :persistent_term.get({__MODULE__, :provider}, @provider) if provider do - provider.process_query(user_id, prompt) + checkout_and_run(fn -> provider.process_query(user_id, prompt) end) else Logger.warning("[LLM] process_query called but no provider is configured") {:error, :no_provider_configured} @@ -37,19 +40,39 @@ defmodule Burble.LLM do end @doc """ - Stream an LLM query response. + Stream an LLM query response. Routes through the NimblePool to gate + concurrency — the pool slot is held for the duration of the stream. """ def stream_query(user_id, prompt, callback) do - Logger.debug("[LLM] Streaming query for #{user_id}: #{prompt}") + Logger.debug("[LLM] Streaming query for #{user_id}") provider = :persistent_term.get({__MODULE__, :provider}, @provider) if provider do - provider.stream_query(user_id, prompt, callback) + checkout_and_run(fn -> provider.stream_query(user_id, prompt, callback) end) else Logger.warning("[LLM] stream_query called but no provider is configured") {:error, :no_provider_configured} end end + + # Checkout a worker from the pool, run the function, then check back in. + # If the pool is exhausted, the caller blocks until a slot opens or timeout. + defp checkout_and_run(fun) do + try do + NimblePool.checkout!(:llm_worker_pool, :checkout, fn _from, worker_state -> + result = fun.() + {result, worker_state} + end, @pool_timeout) + catch + :exit, {:timeout, _} -> + Logger.warning("[LLM] Pool checkout timeout — all workers busy") + {:error, :pool_exhausted} + + :exit, reason -> + Logger.error("[LLM] Pool checkout failed: #{inspect(reason)}") + {:error, :pool_error} + end + end end defmodule Burble.LLM.Registry do diff --git a/server/lib/burble/llm/anthropic_provider.ex b/server/lib/burble/llm/anthropic_provider.ex new file mode 100644 index 0000000..dc03af0 --- /dev/null +++ b/server/lib/burble/llm/anthropic_provider.ex @@ -0,0 +1,180 @@ +# SPDX-License-Identifier: PMPL-1.0-or-later +# Copyright (c) 2026 Jonathan D.A. Jewell (hyperpolymath) + +defmodule Burble.LLM.AnthropicProvider do + @moduledoc """ + Anthropic Claude API provider for Burble's LLM service. + + Calls the Claude Messages API via Erlang's built-in :httpc (no extra deps). + Reads ANTHROPIC_API_KEY from environment. Falls back gracefully when unconfigured. + """ + + require Logger + + @api_url ~c"https://api.anthropic.com/v1/messages" + @api_version "2023-06-01" + @default_model "claude-sonnet-4-6" + @default_max_tokens 4096 + @request_timeout 60_000 + + # Ensure inets + ssl are started (idempotent). + defp ensure_httpc do + :inets.start() + :ssl.start() + :ok + end + + @doc """ + Process a synchronous LLM query. Returns `{:ok, text}` or `{:error, reason}`. + """ + def process_query(user_id, prompt) do + ensure_httpc() + + case api_key() do + nil -> + {:error, :api_key_not_configured} + + key -> + body = Jason.encode!(%{ + model: model(), + max_tokens: max_tokens(), + messages: [%{role: "user", content: prompt}], + system: system_prompt(user_id) + }) + + headers = [ + {~c"content-type", ~c"application/json"}, + {~c"x-api-key", String.to_charlist(key)}, + {~c"anthropic-version", String.to_charlist(@api_version)} + ] + + request = {@api_url, headers, ~c"application/json", String.to_charlist(body)} + + case :httpc.request(:post, request, [timeout: @request_timeout, ssl: ssl_opts()], []) do + {:ok, {{_, 200, _}, _resp_headers, resp_body}} -> + parse_response(List.to_string(resp_body)) + + {:ok, {{_, status, _}, _resp_headers, resp_body}} -> + Logger.warning("[LLM/Anthropic] API returned #{status}: #{List.to_string(resp_body)}") + {:error, {:api_error, status}} + + {:error, reason} -> + Logger.error("[LLM/Anthropic] HTTP request failed: #{inspect(reason)}") + {:error, {:http_error, reason}} + end + end + end + + @doc """ + Stream an LLM query response. Calls `callback.(chunk_text)` for each content delta. + Returns `:ok` on completion or `{:error, reason}`. + """ + def stream_query(user_id, prompt, callback) do + ensure_httpc() + + case api_key() do + nil -> + {:error, :api_key_not_configured} + + key -> + body = Jason.encode!(%{ + model: model(), + max_tokens: max_tokens(), + stream: true, + messages: [%{role: "user", content: prompt}], + system: system_prompt(user_id) + }) + + headers = [ + {~c"content-type", ~c"application/json"}, + {~c"x-api-key", String.to_charlist(key)}, + {~c"anthropic-version", String.to_charlist(@api_version)} + ] + + request = {@api_url, headers, ~c"application/json", String.to_charlist(body)} + + case :httpc.request(:post, request, [timeout: @request_timeout, ssl: ssl_opts()], []) do + {:ok, {{_, 200, _}, _resp_headers, resp_body}} -> + parse_sse_stream(List.to_string(resp_body), callback) + + {:ok, {{_, status, _}, _resp_headers, resp_body}} -> + Logger.warning("[LLM/Anthropic] Stream API returned #{status}") + {:error, {:api_error, status, List.to_string(resp_body)}} + + {:error, reason} -> + Logger.error("[LLM/Anthropic] Stream HTTP request failed: #{inspect(reason)}") + {:error, {:http_error, reason}} + end + end + end + + # --------------------------------------------------------------------------- + # Response parsing + # --------------------------------------------------------------------------- + + defp parse_response(body) do + case Jason.decode(body) do + {:ok, %{"content" => [%{"text" => text} | _]}} -> + {:ok, text} + + {:ok, %{"error" => %{"message" => msg}}} -> + {:error, {:api_error, msg}} + + {:ok, other} -> + Logger.warning("[LLM/Anthropic] Unexpected response shape: #{inspect(other)}") + {:error, :unexpected_response} + + {:error, _} -> + {:error, :json_decode_error} + end + end + + defp parse_sse_stream(body, callback) do + body + |> String.split("\n") + |> Enum.each(fn line -> + case line do + "data: " <> json_str -> + case Jason.decode(json_str) do + {:ok, %{"type" => "content_block_delta", "delta" => %{"text" => text}}} -> + callback.(text) + _ -> + :ok + end + _ -> + :ok + end + end) + + :ok + end + + # --------------------------------------------------------------------------- + # Config helpers + # --------------------------------------------------------------------------- + + defp api_key do + System.get_env("ANTHROPIC_API_KEY") + end + + defp model do + System.get_env("ANTHROPIC_MODEL") || @default_model + end + + defp max_tokens do + case System.get_env("ANTHROPIC_MAX_TOKENS") do + nil -> @default_max_tokens + val -> String.to_integer(val) + end + end + + defp system_prompt(user_id) do + "You are a helpful AI assistant integrated into Burble, a P2P voice and collaboration platform. " <> + "You are responding to user #{user_id}. Be concise, technical when appropriate, and helpful. " <> + "If the user asks about Burble features, you can mention voice chat, AI bridge, E2EE, and WebRTC." + end + + defp ssl_opts do + [verify: :verify_peer, cacerts: :public_key.cacerts_get(), depth: 3] + end +end diff --git a/server/lib/burble/llm/supervisor.ex b/server/lib/burble/llm/supervisor.ex index 4ba478a..6fa023d 100644 --- a/server/lib/burble/llm/supervisor.ex +++ b/server/lib/burble/llm/supervisor.ex @@ -4,35 +4,46 @@ defmodule Burble.LLM.Supervisor do @moduledoc """ LLM service supervisor. - + Manages the LLM transport listeners and worker pools. + Configures the Anthropic provider at startup when ANTHROPIC_API_KEY is set. """ - + use Supervisor - + require Logger + @transport_workers 10 - + def start_link(opts) do Supervisor.start_link(__MODULE__, opts, name: __MODULE__) end - + @impl true def init(opts) do + configure_provider() + children = [ - # Start transport manager/GenServer {Burble.LLM.Transport, opts}, - - # Worker pool for LLM processing (using NimblePool) + {NimblePool, [ worker: {Burble.LLM.Worker, []}, pool_size: @transport_workers, name: :llm_worker_pool ]} ] - + Supervisor.init(children, strategy: :one_for_one) end - + + defp configure_provider do + if System.get_env("ANTHROPIC_API_KEY") do + Burble.LLM.configure_provider(Burble.LLM.AnthropicProvider) + Logger.info("[LLM] Anthropic provider configured (model: #{System.get_env("ANTHROPIC_MODEL") || "claude-sonnet-4-6"})") + else + Logger.warning("[LLM] ANTHROPIC_API_KEY not set — LLM queries will return {:error, :no_provider_configured}") + end + end + @doc """ Get available transport. """ diff --git a/server/lib/burble/media/peer.ex b/server/lib/burble/media/peer.ex index 3a0b9c3..8220ad6 100644 --- a/server/lib/burble/media/peer.ex +++ b/server/lib/burble/media/peer.ex @@ -128,7 +128,10 @@ defmodule Burble.Media.Peer do recv_track_id: nil, outbound_tracks: outbound_tracks, pending_peers: [], - negotiating: false + negotiating: false, + # Pipeline pid for this peer — looked up lazily on first RTP packet. + # Used to forward RTP timestamps for Phase 4 PTP correlation. + pipeline_pid: nil } # Generate initial offer. @@ -193,10 +196,19 @@ defmodule Burble.Media.Peer do @impl true def handle_info({:ex_webrtc, pc, {:rtp, track_id, _rid, packet}}, %{pc: pc} = state) do # Received RTP from this peer — forward to all other peers in the room. - if track_id == state.recv_track_id do - # Tell the Engine to distribute this packet. - Burble.Media.Engine.distribute_rtp(state.room_id, state.peer_id, packet) - end + state = + if track_id == state.recv_track_id do + Burble.Media.Engine.distribute_rtp(state.room_id, state.peer_id, packet) + # Propagate RTP timestamp to the pipeline for Phase 4 PTP correlation. + # Resolve pipeline pid lazily so we don't fail if pipeline hasn't started. + pipeline_pid = state.pipeline_pid || resolve_pipeline(state.peer_id) + if pipeline_pid do + Burble.Coprocessor.Pipeline.record_rtp_timestamp(pipeline_pid, packet.timestamp) + end + %{state | pipeline_pid: pipeline_pid} + else + state + end {:noreply, state} end @@ -314,6 +326,15 @@ defmodule Burble.Media.Peer do {:via, Registry, {Burble.PeerRegistry, peer_id}} end + # Look up the pipeline GenServer for this peer via the CoprocessorRegistry. + # Returns the pid if found, nil if not yet started (non-fatal). + defp resolve_pipeline(peer_id) do + case Registry.lookup(Burble.CoprocessorRegistry, peer_id) do + [{pid, _}] -> pid + [] -> nil + end + end + defp add_sendonly_track(pc) do stream_id = MediaStreamTrack.generate_stream_id() track = MediaStreamTrack.new(:audio, [stream_id]) diff --git a/server/lib/burble_web/controllers/api/llm_controller.ex b/server/lib/burble_web/controllers/api/llm_controller.ex new file mode 100644 index 0000000..79e2709 --- /dev/null +++ b/server/lib/burble_web/controllers/api/llm_controller.ex @@ -0,0 +1,97 @@ +# SPDX-License-Identifier: PMPL-1.0-or-later +# Copyright (c) 2026 Jonathan D.A. Jewell (hyperpolymath) + +defmodule BurbleWeb.API.LLMController do + @moduledoc """ + REST endpoint for server-side LLM queries. + + Either side of the P2P bridge can POST a prompt and get a Claude response + back. Requires authentication (JWT) in production; dev mode accepts any + request so the bridge can call it without token setup. + + ## Endpoints + + POST /api/v1/llm/query — synchronous, returns full response + POST /api/v1/llm/stream — synchronous (buffered SSE), returns full response + GET /api/v1/llm/status — provider availability check + """ + + use Phoenix.Controller, formats: [:json] + require Logger + + @max_prompt_length 32_000 + + @doc "Synchronous LLM query — returns `{response: text}` or `{error: reason}`." + def query(conn, %{"prompt" => prompt}) when byte_size(prompt) <= @max_prompt_length do + user_id = get_user_id(conn) + + case Burble.LLM.process_query(user_id, prompt) do + {:ok, text} -> + json(conn, %{ok: true, response: text}) + + {:error, :no_provider_configured} -> + conn |> put_status(503) |> json(%{ok: false, error: "llm_not_configured"}) + + {:error, :api_key_not_configured} -> + conn |> put_status(503) |> json(%{ok: false, error: "api_key_not_set"}) + + {:error, {:api_error, status}} -> + conn |> put_status(502) |> json(%{ok: false, error: "upstream_error", status: status}) + + {:error, reason} -> + Logger.warning("[LLMController] Query failed: #{inspect(reason)}") + conn |> put_status(500) |> json(%{ok: false, error: "internal_error"}) + end + end + + def query(conn, %{"prompt" => _prompt}) do + conn |> put_status(413) |> json(%{ok: false, error: "prompt_too_long", max: @max_prompt_length}) + end + + def query(conn, _params) do + conn |> put_status(400) |> json(%{ok: false, error: "missing_prompt"}) + end + + @doc "Streaming LLM query — buffers chunks and returns full concatenated response." + def stream(conn, %{"prompt" => prompt}) when byte_size(prompt) <= @max_prompt_length do + user_id = get_user_id(conn) + chunks = :ets.new(:llm_chunks, [:ordered_set, :private]) + counter = :counters.new(1, [:atomics]) + + result = Burble.LLM.stream_query(user_id, prompt, fn chunk -> + idx = :counters.add(counter, 1, 1) + :ets.insert(chunks, {idx, chunk}) + end) + + case result do + :ok -> + full_text = :ets.tab2list(chunks) |> Enum.map_join(fn {_k, v} -> v end) + :ets.delete(chunks) + json(conn, %{ok: true, response: full_text, streamed: true}) + + {:error, reason} -> + :ets.delete(chunks) + conn |> put_status(502) |> json(%{ok: false, error: inspect(reason)}) + end + end + + def stream(conn, params), do: query(conn, params) + + @doc "Check LLM provider status." + def status(conn, _params) do + provider = :persistent_term.get({Burble.LLM, :provider}, nil) + + json(conn, %{ + available: provider != nil, + provider: if(provider, do: inspect(provider), else: nil), + api_key_set: System.get_env("ANTHROPIC_API_KEY") != nil + }) + end + + defp get_user_id(conn) do + case conn.assigns[:current_user] do + %{id: id} -> id + _ -> "anonymous" + end + end +end diff --git a/server/lib/burble_web/router.ex b/server/lib/burble_web/router.ex index 27beb02..beb9f9d 100644 --- a/server/lib/burble_web/router.ex +++ b/server/lib/burble_web/router.ex @@ -68,6 +68,9 @@ defmodule BurbleWeb.Router do get "/diagnostics/self-test", DiagnosticsController, :self_test get "/diagnostics/self-test/:mode", DiagnosticsController, :self_test + # LLM status (public — checks if provider is configured) + get "/llm/status", LLMController, :status + # Instant connect — join via link/QR/code (public, no auth required) get "/join/:code", InstantConnectController, :lookup post "/join/:code", InstantConnectController, :redeem @@ -108,6 +111,10 @@ defmodule BurbleWeb.Router do post "/rooms/:id/move", ModerationController, :move post "/servers/:id/ban", ModerationController, :ban + # LLM queries (authenticated — either side of the P2P bridge can call) + post "/llm/query", LLMController, :query + post "/llm/stream", LLMController, :stream + # Invites (creation requires auth) post "/servers/:server_id/invites", InviteController, :create end diff --git a/server/test/burble/llm/llm_test.exs b/server/test/burble/llm/llm_test.exs index 5bd7692..6d55a80 100644 --- a/server/test/burble/llm/llm_test.exs +++ b/server/test/burble/llm/llm_test.exs @@ -2,18 +2,43 @@ # # Burble LLM module tests — covers core query processing, streaming, the # connection registry, transport endpoint management, protocol frame parsing, -# and supervisor startup. +# pool concurrency gating, and provider configuration. defmodule Burble.LLMTest do - # async: false because Registry uses :persistent_term (global state) and - # Transport registers under its module name as a named GenServer. use ExUnit.Case, async: false + # A simple test provider that echoes the prompt back. + defmodule TestProvider do + def process_query(_user_id, prompt) do + if String.contains?(prompt, "trigger_error") do + {:error, :simulated_error} + else + {:ok, "Echo: #{prompt}"} + end + end + + def stream_query(_user_id, prompt, callback) do + if String.contains?(prompt, "trigger_error") do + {:error, :simulated_error} + else + for word <- String.split(prompt) do + callback.(word <> " ") + end + :ok + end + end + end + # --------------------------------------------------------------------------- # LLM.process_query/2 # --------------------------------------------------------------------------- describe "LLM.process_query/2" do + setup do + Burble.LLM.configure_provider(TestProvider) + on_exit(fn -> :persistent_term.erase({Burble.LLM, :provider}) end) + end + test "returns {:ok, response_string} for a normal prompt" do assert {:ok, response} = Burble.LLM.process_query("user_1", "hello world") assert is_binary(response) @@ -32,11 +57,28 @@ defmodule Burble.LLMTest do end end + describe "LLM.process_query/2 without provider" do + setup do + :persistent_term.erase({Burble.LLM, :provider}) + :ok + end + + test "returns {:error, :no_provider_configured} when no provider is set" do + assert {:error, :no_provider_configured} = + Burble.LLM.process_query("user_x", "hello") + end + end + # --------------------------------------------------------------------------- # LLM.stream_query/3 # --------------------------------------------------------------------------- describe "LLM.stream_query/3" do + setup do + Burble.LLM.configure_provider(TestProvider) + on_exit(fn -> :persistent_term.erase({Burble.LLM, :provider}) end) + end + test "calls the callback at least once with a binary chunk" do chunks = :ets.new(:test_chunks, [:bag, :public]) @@ -74,6 +116,22 @@ defmodule Burble.LLMTest do end end + # --------------------------------------------------------------------------- + # LLM.configure_provider/1 + # --------------------------------------------------------------------------- + + describe "LLM.configure_provider/1" do + test "sets the provider and subsequent queries use it" do + :persistent_term.erase({Burble.LLM, :provider}) + assert {:error, :no_provider_configured} = Burble.LLM.process_query("u", "hi") + + Burble.LLM.configure_provider(TestProvider) + assert {:ok, "Echo: hi"} = Burble.LLM.process_query("u", "hi") + + :persistent_term.erase({Burble.LLM, :provider}) + end + end + # --------------------------------------------------------------------------- # LLM.Registry # --------------------------------------------------------------------------- @@ -98,7 +156,6 @@ defmodule Burble.LLMTest do :ok = Burble.LLM.Registry.register_connection(user_id, old_pid) - # Spawn a new process to get a different pid new_pid = spawn(fn -> :timer.sleep(100) end) :ok = Burble.LLM.Registry.register_connection(user_id, new_pid) @@ -112,7 +169,6 @@ defmodule Burble.LLMTest do describe "LLM.Transport" do setup do - # Stop any existing instance so we can start fresh per test case Process.whereis(Burble.LLM.Transport) do nil -> :ok pid -> GenServer.stop(pid) @@ -149,13 +205,9 @@ defmodule Burble.LLMTest do end test "report_failure/2 marks the endpoint offline and failover selects next" do - # Confirm primary is currently active assert {:ok, %{host: "primary.test"}} = Burble.LLM.Transport.get_active_endpoint() - # Report failure on the primary Burble.LLM.Transport.report_failure("primary.test", 8503) - - # Allow the cast to be processed _ = :sys.get_state(Burble.LLM.Transport) assert {:ok, next} = Burble.LLM.Transport.get_active_endpoint() @@ -165,32 +217,18 @@ defmodule Burble.LLMTest do end # --------------------------------------------------------------------------- - # LLM.Protocol.parse_frame/1 (via the public read_frame path, tested directly - # by reaching into the module's private helper through a wrapper approach) + # LLM.Protocol frame parsing # --------------------------------------------------------------------------- - # - # parse_frame/1 is private, but the module exposes its shape through - # read_frame/1. We exercise the parsing logic directly by calling the - # module-level function we *can* reach, and by verifying documented - # behaviour for the bugfixed tuple→list destructure path. describe "LLM.Protocol frame parsing" do - # Helper: build the same full_data string that read_frame builds before - # handing to parse_frame/1 so we can test round-trip via an internal - # send to ourselves (loopback TCP socket). - # - # Because parse_frame/1 is private we verify behaviour indirectly through - # sending a known payload via a loopback socket and calling read_frame/1. setup do {:ok, listen} = :gen_tcp.listen(0, [:binary, packet: :raw, active: false, reuseaddr: true]) {:ok, port} = :inet.port(listen) - # Acceptor task parent = self() acceptor = Task.async(fn -> {:ok, server} = :gen_tcp.accept(listen, 1000) send(parent, {:server_socket, server}) - # Keep socket open until caller is done receive do :close -> :gen_tcp.close(server) end @@ -216,8 +254,6 @@ defmodule Burble.LLMTest do end test "parse_frame succeeds for a well-formed QUERY frame", %{client: client, server: server} do - # read_frame prepends @frame_header and appends @frame_footer, so we - # only need to send the inner payload (what would come from the wire). payload = "QUERY\nid: msg-001\nprompt: hello" :ok = :gen_tcp.send(client, payload) @@ -228,13 +264,8 @@ defmodule Burble.LLMTest do end test "read_frame returns {:error, :closed} when the socket is closed", %{client: client, server: server} do - # Close the sending side before read_frame tries to recv — the server - # should see a :closed error from :gen_tcp.recv and propagate it. :gen_tcp.close(client) - - # Give the OS a moment to propagate the FIN Process.sleep(20) - assert {:error, :closed} = Burble.LLM.Protocol.read_frame(server) end @@ -247,13 +278,38 @@ defmodule Burble.LLMTest do end end + # --------------------------------------------------------------------------- + # AnthropicProvider (unit, no real API calls) + # --------------------------------------------------------------------------- + + describe "AnthropicProvider" do + test "returns {:error, :api_key_not_configured} without ANTHROPIC_API_KEY" do + old = System.get_env("ANTHROPIC_API_KEY") + System.delete_env("ANTHROPIC_API_KEY") + + assert {:error, :api_key_not_configured} = + Burble.LLM.AnthropicProvider.process_query("u", "hello") + + if old, do: System.put_env("ANTHROPIC_API_KEY", old) + end + + test "stream_query returns {:error, :api_key_not_configured} without key" do + old = System.get_env("ANTHROPIC_API_KEY") + System.delete_env("ANTHROPIC_API_KEY") + + assert {:error, :api_key_not_configured} = + Burble.LLM.AnthropicProvider.stream_query("u", "hello", fn _ -> :ok end) + + if old, do: System.put_env("ANTHROPIC_API_KEY", old) + end + end + # --------------------------------------------------------------------------- # LLM.Supervisor # --------------------------------------------------------------------------- describe "LLM.Supervisor" do test "start_link/1 starts the supervisor with children" do - # Stop any existing instance from the application startup case Process.whereis(Burble.LLM.Supervisor) do nil -> :ok pid -> Supervisor.stop(pid) @@ -266,7 +322,6 @@ defmodule Burble.LLMTest do children = Supervisor.which_children(sup_pid) assert length(children) > 0 - # Transport child must be present assert Enum.any?(children, fn {id, _pid, _type, _mods} -> id == Burble.LLM.Transport end)