Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions controlplane/telemetry/cmd/geoprobe-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,10 +724,13 @@ func runMeasurementLoop(
for _, t := range update.Targets {
newSet[t.String()] = t
}
var newlyAdded []geoprobe.ProbeAddress
for key, addr := range newSet {
if _, exists := oldSet[key]; !exists {
if err := pinger.AddProbe(ctx, addr); err != nil {
log.Warn("Failed to add discovered target probe", "target", addr, "error", err)
} else {
newlyAdded = append(newlyAdded, addr)
}
}
}
Expand All @@ -741,6 +744,20 @@ func runMeasurementLoop(
targets = update.Targets
log.Info("Updated targets from discovery", "totalTargets", len(targets))

// Immediately probe newly discovered targets so they don't
// have to wait for the next measurement ticker.
if len(newlyAdded) > 0 {
rttData := make(map[geoprobe.ProbeAddress]uint64, len(newlyAdded))
for _, addr := range newlyAdded {
if rttNs, ok := pinger.MeasureOne(ctx, addr); ok {
rttData[addr] = rttNs
}
}
if len(rttData) > 0 {
sendCompositeOffsets(ctx, log, rttData, cache, signer, senderConn, getCurrentSlot)
}
}

case keyUpdate := <-inboundKeyCh:
signedReflector.SetAuthorizedKeys(keyUpdate.Keys)
log.Info("Updated signed TWAMP authorized keys from discovery",
Expand Down Expand Up @@ -784,16 +801,33 @@ func runMeasurementCycle(
log.Debug("target measurement result", "target", addr.Host, "rtt_ms", float64(rttNs)/1000000.0)
}

sent := sendCompositeOffsets(ctx, log, rttData, cache, signer, senderConn, getCurrentSlot)

log.Info("Completed measurement cycle",
"measured", len(rttData),
"sent", sent,
"total_targets", len(targets))
}

func sendCompositeOffsets(
ctx context.Context,
log *slog.Logger,
rttData map[geoprobe.ProbeAddress]uint64,
cache *offsetCache,
signer *geoprobe.OffsetSigner,
senderConn *net.UDPConn,
getCurrentSlot func(ctx context.Context) (uint64, error),
) int {
dzdOffset := cache.GetBest()
if dzdOffset == nil {
log.Warn("No valid DZD offsets in cache, skipping composite generation")
return
return 0
}

slot, err := getCurrentSlot(ctx)
if err != nil {
log.Error("Failed to get current slot", "error", err)
return
return 0
}

log.Debug("fetched current slot", "slot", slot)
Expand Down Expand Up @@ -835,8 +869,5 @@ func runMeasurementCycle(
"ref_sender_pubkey", solana.PublicKeyFromBytes(dzdOffset.SenderPubkey[:]))
}

log.Info("Completed measurement cycle",
"measured", len(rttData),
"sent", sentCount,
"total_targets", len(targets))
return sentCount
}
50 changes: 50 additions & 0 deletions controlplane/telemetry/internal/geoprobe/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,56 @@ func (p *Pinger) probeWorker(
}
}

// MeasureOne measures a single probe and returns the best RTT in nanoseconds.
func (p *Pinger) MeasureOne(ctx context.Context, addr ProbeAddress) (uint64, bool) {
p.sendersMu.Lock()
entry, exists := p.senders[addr.String()]
p.sendersMu.Unlock()
if !exists {
p.log.Warn("MeasureOne called for unknown probe", "probe", addr.String())
return 0, false
}

probeCtx, cancel := context.WithTimeout(ctx, p.cfg.ProbeTimeout)
defer cancel()

type probeResult struct {
rtt time.Duration
err error
}
ch := make(chan probeResult, 2)
go func() {
rtt, err := entry.warmupSender.Probe(probeCtx)
ch <- probeResult{rtt, err}
}()
go func() {
time.Sleep(DefaultWarmupDelay)
rtt, err := entry.sender.Probe(probeCtx)
ch <- probeResult{rtt, err}
}()

var bestRTT time.Duration
ok := false
for range 2 {
r := <-ch
if r.err == nil && (!ok || r.rtt < bestRTT) {
bestRTT = r.rtt
ok = true
}
}

if ok {
p.log.Debug("MeasureOne succeeded", "probe", addr.String(), "rtt", bestRTT)
} else {
p.log.Debug("MeasureOne failed", "probe", addr.String())
}

if !ok {
return 0, false
}
return uint64(bestRTT.Nanoseconds()), true
}

func (p *Pinger) MeasureAll(ctx context.Context) (map[ProbeAddress]uint64, error) {
p.sendersMu.Lock()
sendersCopy := make([]*senderEntry, 0, len(p.senders))
Expand Down
Loading