From 38a5353214d8697dd0596f4423131ec197f291a7 Mon Sep 17 00:00:00 2001 From: Ben Blier Date: Fri, 20 Mar 2026 11:55:33 -0400 Subject: [PATCH 1/2] geolocation; Immediatley send first ping to new targets --- .../telemetry/cmd/geoprobe-agent/main.go | 43 +++++++++++++--- .../telemetry/internal/geoprobe/pinger.go | 50 +++++++++++++++++++ 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/controlplane/telemetry/cmd/geoprobe-agent/main.go b/controlplane/telemetry/cmd/geoprobe-agent/main.go index c2640e8c75..8e1556fd89 100644 --- a/controlplane/telemetry/cmd/geoprobe-agent/main.go +++ b/controlplane/telemetry/cmd/geoprobe-agent/main.go @@ -724,10 +724,13 @@ func runMeasurementLoop( for _, t := range update.Targets { newSet[t.String()] = t } + var newlyAdded []geoprobe.ProbeAddress for key, addr := range newSet { if _, exists := oldSet[key]; !exists { if err := pinger.AddProbe(ctx, addr); err != nil { log.Warn("Failed to add discovered target probe", "target", addr, "error", err) + } else { + newlyAdded = append(newlyAdded, addr) } } } @@ -741,6 +744,20 @@ func runMeasurementLoop( targets = update.Targets log.Info("Updated targets from discovery", "totalTargets", len(targets)) + // Immediately probe newly discovered targets so they don't + // have to wait for the next measurement ticker. + if len(newlyAdded) > 0 { + rttData := make(map[geoprobe.ProbeAddress]uint64, len(newlyAdded)) + for _, addr := range newlyAdded { + if rttNs, ok := pinger.MeasureOne(ctx, addr); ok { + rttData[addr] = rttNs + } + } + if len(rttData) > 0 { + sendCompositeOffsets(ctx, log, rttData, cache, signer, senderConn, getCurrentSlot) + } + } + case keyUpdate := <-inboundKeyCh: signedReflector.SetAuthorizedKeys(keyUpdate.Keys) log.Info("Updated signed TWAMP authorized keys from discovery", @@ -784,16 +801,33 @@ func runMeasurementCycle( log.Debug("target measurement result", "target", addr.Host, "rtt_ms", float64(rttNs)/1000000.0) } + sent := sendCompositeOffsets(ctx, log, rttData, cache, signer, senderConn, getCurrentSlot) + + log.Info("Completed measurement cycle", + "measured", len(rttData), + "sent", sent, + "total_targets", len(targets)) +} + +func sendCompositeOffsets( + ctx context.Context, + log *slog.Logger, + rttData map[geoprobe.ProbeAddress]uint64, + cache *offsetCache, + signer *geoprobe.OffsetSigner, + senderConn *net.UDPConn, + getCurrentSlot func(ctx context.Context) (uint64, error), +) int { dzdOffset := cache.GetBest() if dzdOffset == nil { log.Warn("No valid DZD offsets in cache, skipping composite generation") - return + return 0 } slot, err := getCurrentSlot(ctx) if err != nil { log.Error("Failed to get current slot", "error", err) - return + return 0 } log.Debug("fetched current slot", "slot", slot) @@ -835,8 +869,5 @@ func runMeasurementCycle( "ref_sender_pubkey", solana.PublicKeyFromBytes(dzdOffset.SenderPubkey[:])) } - log.Info("Completed measurement cycle", - "measured", len(rttData), - "sent", sentCount, - "total_targets", len(targets)) + return sentCount } diff --git a/controlplane/telemetry/internal/geoprobe/pinger.go b/controlplane/telemetry/internal/geoprobe/pinger.go index 5b68194d09..968b45f549 100644 --- a/controlplane/telemetry/internal/geoprobe/pinger.go +++ b/controlplane/telemetry/internal/geoprobe/pinger.go @@ -188,6 +188,56 @@ func (p *Pinger) probeWorker( } } +// MeasureOne measures a single probe and returns the best RTT in nanoseconds. +func (p *Pinger) MeasureOne(ctx context.Context, addr ProbeAddress) (uint64, bool) { + p.sendersMu.Lock() + entry, exists := p.senders[addr.String()] + p.sendersMu.Unlock() + if !exists { + p.log.Warn("MeasureOne called for unknown probe", "probe", addr.String()) + return 0, false + } + + probeCtx, cancel := context.WithTimeout(ctx, p.cfg.ProbeTimeout) + defer cancel() + + type probeResult struct { + rtt time.Duration + err error + } + ch := make(chan probeResult, 2) + go func() { + rtt, err := entry.warmupSender.Probe(probeCtx) + ch <- probeResult{rtt, err} + }() + go func() { + time.Sleep(DefaultWarmupDelay) + rtt, err := entry.sender.Probe(probeCtx) + ch <- probeResult{rtt, err} + }() + + var bestRTT time.Duration + ok := false + for range 2 { + r := <-ch + if r.err == nil && (!ok || r.rtt < bestRTT) { + bestRTT = r.rtt + ok = true + } + } + + if ok { + p.log.Debug("MeasureOne succeeded", "probe", addr.String(), "rtt", bestRTT) + } else { + p.log.Debug("MeasureOne failed", "probe", addr.String()) + } + + if !ok { + return 0, false + } + return uint64(bestRTT.Nanoseconds()), true +} + func (p *Pinger) MeasureAll(ctx context.Context) (map[ProbeAddress]uint64, error) { p.sendersMu.Lock() sendersCopy := make([]*senderEntry, 0, len(p.senders)) From e3de956bab2d8939bc2c6241510c4b3181acb78f Mon Sep 17 00:00:00 2001 From: Ben Blier Date: Mon, 23 Mar 2026 17:37:20 -0400 Subject: [PATCH 2/2] Add lightweight tests for MeasureOne --- .../internal/geoprobe/pinger_test.go | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/controlplane/telemetry/internal/geoprobe/pinger_test.go b/controlplane/telemetry/internal/geoprobe/pinger_test.go index f234850193..e7e68b337c 100644 --- a/controlplane/telemetry/internal/geoprobe/pinger_test.go +++ b/controlplane/telemetry/internal/geoprobe/pinger_test.go @@ -646,3 +646,67 @@ func TestPinger_DualProbe_BothFail(t *testing.T) { assert.NotContains(t, results, addr, "should not have result when both probes fail") } + +func TestPinger_MeasureOne_UnknownProbe(t *testing.T) { + t.Parallel() + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + pinger := NewPinger(&PingerConfig{ + Logger: logger, + ProbeTimeout: 1 * time.Second, + Interval: 1 * time.Second, + StaggerDelay: 1 * time.Millisecond, + }) + + addr := ProbeAddress{Host: "192.0.2.99", Port: 8923, TWAMPPort: 8925} + rtt, ok := pinger.MeasureOne(context.Background(), addr) + assert.False(t, ok) + assert.Equal(t, uint64(0), rtt) +} + +func TestPinger_MeasureOne_Success(t *testing.T) { + t.Parallel() + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + pinger := NewPinger(&PingerConfig{ + Logger: logger, + ProbeTimeout: 1 * time.Second, + Interval: 1 * time.Second, + StaggerDelay: 1 * time.Millisecond, + }) + + addr := ProbeAddress{Host: "192.0.2.1", Port: 8923, TWAMPPort: 8925} + pinger.senders[addr.String()] = &senderEntry{ + addr: addr, + sender: &mockSender{rtt: 10 * time.Millisecond}, + warmupSender: &mockSender{rtt: 50 * time.Millisecond}, + } + + rtt, ok := pinger.MeasureOne(context.Background(), addr) + assert.True(t, ok) + assert.Equal(t, uint64((10 * time.Millisecond).Nanoseconds()), rtt, + "should return the lower RTT in nanoseconds") +} + +func TestPinger_MeasureOne_BothFail(t *testing.T) { + t.Parallel() + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + pinger := NewPinger(&PingerConfig{ + Logger: logger, + ProbeTimeout: 1 * time.Second, + Interval: 1 * time.Second, + StaggerDelay: 1 * time.Millisecond, + }) + + addr := ProbeAddress{Host: "192.0.2.1", Port: 8923, TWAMPPort: 8925} + pinger.senders[addr.String()] = &senderEntry{ + addr: addr, + sender: &mockSender{err: context.DeadlineExceeded}, + warmupSender: &mockSender{err: context.DeadlineExceeded}, + } + + rtt, ok := pinger.MeasureOne(context.Background(), addr) + assert.False(t, ok) + assert.Equal(t, uint64(0), rtt) +}