diff --git a/.machine_readable/6a2/STATE.a2ml b/.machine_readable/6a2/STATE.a2ml index d81e1b2..96c7103 100644 --- a/.machine_readable/6a2/STATE.a2ml +++ b/.machine_readable/6a2/STATE.a2ml @@ -66,7 +66,7 @@ phase-1-audio = [ "DONE 2026-04-16: Server-side comfort noise injection after 3 silent frames (60ms at 20ms/frame)", "DONE 2026-04-16: REMB bitrate adaptation — Pipeline.update_bitrate/3 wired via Backend.io_adaptive_bitrate", "DONE 2026-04-16: Avow hash-chain linkage + ETS store + 10 property tests (commit 43669aa)", - "NEXT: Wire RTP-timestamp sync in media/peer.ex → Pipeline (precursor to PTP Phase 4, not blocking audio)" + "DONE 2026-04-21: Wire RTP-timestamp sync — peer.ex extracts packet.timestamp, pipeline.ex stores last_rtp_ts via record_rtp_timestamp/2 cast (Phase 4 PTP correlation ready)" ] [maintenance-status] diff --git a/client/web/src/Main.affine b/client/web/src/Main.affine index e5b984a..01d5c97 100644 --- a/client/web/src/Main.affine +++ b/client/web/src/Main.affine @@ -23,9 +23,10 @@ Console.log2("[Burble] Auth:", AuthState.displayName(app.auth)) @val @scope("window") external addPopStateListener: (@as("popstate") _, affine ('a => unit)) => unit = "addEventListener" +// The closure borrows app via &app (shared borrow) — does not consume it. addPopStateListener(_ => { let path: string = %raw(`window.location.pathname`) - App.handleUrlChange(app, path) + App.handleUrlChange(&app, path) }) // Set initial page title (borrows app.currentRoute). diff --git a/server/lib/burble/coprocessor/pipeline.ex b/server/lib/burble/coprocessor/pipeline.ex index e6dbda0..40e0145 100644 --- a/server/lib/burble/coprocessor/pipeline.ex +++ b/server/lib/burble/coprocessor/pipeline.ex @@ -140,6 +140,10 @@ defmodule Burble.Coprocessor.Pipeline do # Silence counter: frames since last non-nil inbound. Drives comfort # noise injection so peers don't hear dead air when a speaker pauses. silence_frames: 0, + # Last RTP timestamp received from the network — populated via + # record_rtp_timestamp/2 when peer.ex extracts it from incoming packets. + # Used by Phase 4 PTP correlation to map RTP clock → wall clock. + last_rtp_ts: 0, # Metrics frames_processed: 0, frames_dropped: 0, @@ -297,6 +301,27 @@ defmodule Burble.Coprocessor.Pipeline do {:reply, {:ok, health}, state} end + # --------------------------------------------------------------------------- + # RTP timestamp tracking (Phase 4 PTP precursor) + # --------------------------------------------------------------------------- + + @doc """ + Record the latest RTP timestamp received from the network. + + Called by `Burble.Media.Peer` each time an RTP packet arrives so the + pipeline knows the sender's RTP clock position. Phase 4 will correlate + this against the PTP hardware clock to derive end-to-end latency and + enable multi-node playout alignment. + """ + def record_rtp_timestamp(pipeline, rtp_ts) do + GenServer.cast(pipeline, {:rtp_timestamp, rtp_ts}) + end + + @impl true + def handle_cast({:rtp_timestamp, rtp_ts}, state) do + {:noreply, %{state | last_rtp_ts: rtp_ts}} + end + # --------------------------------------------------------------------------- # Bitrate adaptation (REMB feedback) # --------------------------------------------------------------------------- diff --git a/server/lib/burble/coprocessor/snif_backend.ex b/server/lib/burble/coprocessor/snif_backend.ex index dc967ac..13c0ea1 100644 --- a/server/lib/burble/coprocessor/snif_backend.ex +++ b/server/lib/burble/coprocessor/snif_backend.ex @@ -517,10 +517,10 @@ defmodule Burble.Coprocessor.SNIFBackend do result = Wasmex.call_function(pid, function, args) GenServer.stop(pid, :normal) result - {:error, reason} -> {:error, :wasm_load_failed, detail: reason} + {:error, reason} -> {:error, {:wasm_load_failed, reason}} end rescue - error -> {:error, :snif_exception, detail: error} + error -> {:error, {:snif_exception, error}} end end diff --git a/server/lib/burble/media/peer.ex b/server/lib/burble/media/peer.ex index 3a0b9c3..8220ad6 100644 --- a/server/lib/burble/media/peer.ex +++ b/server/lib/burble/media/peer.ex @@ -128,7 +128,10 @@ defmodule Burble.Media.Peer do recv_track_id: nil, outbound_tracks: outbound_tracks, pending_peers: [], - negotiating: false + negotiating: false, + # Pipeline pid for this peer — looked up lazily on first RTP packet. + # Used to forward RTP timestamps for Phase 4 PTP correlation. + pipeline_pid: nil } # Generate initial offer. @@ -193,10 +196,19 @@ defmodule Burble.Media.Peer do @impl true def handle_info({:ex_webrtc, pc, {:rtp, track_id, _rid, packet}}, %{pc: pc} = state) do # Received RTP from this peer — forward to all other peers in the room. - if track_id == state.recv_track_id do - # Tell the Engine to distribute this packet. - Burble.Media.Engine.distribute_rtp(state.room_id, state.peer_id, packet) - end + state = + if track_id == state.recv_track_id do + Burble.Media.Engine.distribute_rtp(state.room_id, state.peer_id, packet) + # Propagate RTP timestamp to the pipeline for Phase 4 PTP correlation. + # Resolve pipeline pid lazily so we don't fail if pipeline hasn't started. + pipeline_pid = state.pipeline_pid || resolve_pipeline(state.peer_id) + if pipeline_pid do + Burble.Coprocessor.Pipeline.record_rtp_timestamp(pipeline_pid, packet.timestamp) + end + %{state | pipeline_pid: pipeline_pid} + else + state + end {:noreply, state} end @@ -314,6 +326,15 @@ defmodule Burble.Media.Peer do {:via, Registry, {Burble.PeerRegistry, peer_id}} end + # Look up the pipeline GenServer for this peer via the CoprocessorRegistry. + # Returns the pid if found, nil if not yet started (non-fatal). + defp resolve_pipeline(peer_id) do + case Registry.lookup(Burble.CoprocessorRegistry, peer_id) do + [{pid, _}] -> pid + [] -> nil + end + end + defp add_sendonly_track(pc) do stream_id = MediaStreamTrack.generate_stream_id() track = MediaStreamTrack.new(:audio, [stream_id])