diff --git a/controlplane/telemetry/cmd/geoprobe-agent/main.go b/controlplane/telemetry/cmd/geoprobe-agent/main.go index b4bc7ca2b..4b3104e55 100644 --- a/controlplane/telemetry/cmd/geoprobe-agent/main.go +++ b/controlplane/telemetry/cmd/geoprobe-agent/main.go @@ -150,7 +150,7 @@ func parseAllowedPubkeys(s string) ([][32]byte, error) { // offsetCache stores recent DZD offsets keyed by sender (device) pubkey. type offsetCache struct { mu sync.RWMutex - entries map[[32]byte]*cachedOffset + entries map[[32]byte]*cachedSender maxAge time.Duration } @@ -159,9 +159,18 @@ type cachedOffset struct { receivedAt time.Time } +func (co *cachedOffset) expired(maxAge time.Duration) bool { + return co == nil || time.Since(co.receivedAt) > maxAge +} + +type cachedSender struct { + best *cachedOffset // lowest RTT seen in the full maxAge window + backup *cachedOffset // lowest RTT seen in the recent half-maxAge window +} + func newOffsetCache(maxAge time.Duration) *offsetCache { return &offsetCache{ - entries: make(map[[32]byte]*cachedOffset), + entries: make(map[[32]byte]*cachedSender), maxAge: maxAge, } } @@ -169,24 +178,68 @@ func newOffsetCache(maxAge time.Duration) *offsetCache { func (c *offsetCache) Put(offset *geoprobe.LocationOffset) { c.mu.Lock() defer c.mu.Unlock() - c.entries[offset.SenderPubkey] = &cachedOffset{ + + sender, ok := c.entries[offset.SenderPubkey] + if !ok { + sender = &cachedSender{} + c.entries[offset.SenderPubkey] = sender + } + + now := time.Now() + entry := &cachedOffset{ offset: *offset, - receivedAt: time.Now(), + receivedAt: now, + } + + // If best is expired, promote backup to best (if backup is non-expired), then clear backup. + if sender.best.expired(c.maxAge) { + if !sender.backup.expired(c.maxAge) { + sender.best = sender.backup + } else { + sender.best = nil + } + sender.backup = nil + } + + // If best is nil (empty after promotion attempt), just set it. + if sender.best == nil { + sender.best = entry + return + } + + if offset.RttNs <= sender.best.offset.RttNs { + // New offset is better than or equal to best: replace best. + sender.best = entry + } else { + // New offset has higher RTT than best, consider it for second-best. + // Second-best must have a half of a MaxAge left to live, so that if + // it gets promoted, it could hold for a meaningful amount of time. + halfMaxAge := c.maxAge / 2 + if sender.backup.expired(c.maxAge) || offset.RttNs <= sender.backup.offset.RttNs { + sender.backup = entry + } else if time.Since(sender.backup.receivedAt) > halfMaxAge { + // Backup is stale (older than half-maxAge): replace to keep it fresh. + sender.backup = entry + } } } func (c *offsetCache) Get(senderPubkey [32]byte) *geoprobe.LocationOffset { c.mu.RLock() defer c.mu.RUnlock() - entry, ok := c.entries[senderPubkey] + sender, ok := c.entries[senderPubkey] if !ok { return nil } - if time.Since(entry.receivedAt) > c.maxAge { - return nil + if !sender.best.expired(c.maxAge) { + result := sender.best.offset + return &result } - result := entry.offset - return &result + if !sender.backup.expired(c.maxAge) { + result := sender.backup.offset + return &result + } + return nil } // GetBest returns the non-expired offset with the shortest RttNs. @@ -195,13 +248,15 @@ func (c *offsetCache) GetBest() *geoprobe.LocationOffset { defer c.mu.RUnlock() var best *geoprobe.LocationOffset - for _, entry := range c.entries { - if time.Since(entry.receivedAt) > c.maxAge { - continue - } - if best == nil || entry.offset.RttNs < best.RttNs { - e := entry.offset - best = &e + for _, sender := range c.entries { + for _, entry := range []*cachedOffset{sender.best, sender.backup} { + if entry.expired(c.maxAge) { + continue + } + if best == nil || entry.offset.RttNs < best.RttNs { + e := entry.offset + best = &e + } } } return best @@ -212,8 +267,8 @@ func (c *offsetCache) Evict() int { c.mu.Lock() defer c.mu.Unlock() evicted := 0 - for key, entry := range c.entries { - if time.Since(entry.receivedAt) > c.maxAge { + for key, sender := range c.entries { + if sender.best.expired(c.maxAge) && sender.backup.expired(c.maxAge) { delete(c.entries, key) evicted++ } diff --git a/controlplane/telemetry/cmd/geoprobe-agent/main_test.go b/controlplane/telemetry/cmd/geoprobe-agent/main_test.go index ba5807fdf..ec5c1bd38 100644 --- a/controlplane/telemetry/cmd/geoprobe-agent/main_test.go +++ b/controlplane/telemetry/cmd/geoprobe-agent/main_test.go @@ -78,8 +78,169 @@ func TestOffsetCache_PutReplaces(t *testing.T) { if got == nil { t.Fatal("expected offset, got nil") } + // With two-slot cache, lower RTT stays as best. + if got.RttNs != 1000 { + t.Errorf("expected RttNs=1000 (best kept), got %d", got.RttNs) + } +} + +func TestOffsetCache_PutKeepsBest(t *testing.T) { + cache := newOffsetCache(1 * time.Hour) + + pubkey := [32]byte{1} + + cache.Put(makeTestOffset(pubkey, 1000)) + cache.Put(makeTestOffset(pubkey, 2000)) + + got := cache.Get(pubkey) + if got == nil { + t.Fatal("expected offset, got nil") + } + if got.RttNs != 1000 { + t.Errorf("expected RttNs=1000 (best kept over higher RTT), got %d", got.RttNs) + } +} + +func TestOffsetCache_PutLowerRTTReplacesBest(t *testing.T) { + cache := newOffsetCache(1 * time.Hour) + + pubkey := [32]byte{1} + + cache.Put(makeTestOffset(pubkey, 2000)) + cache.Put(makeTestOffset(pubkey, 1000)) + + got := cache.Get(pubkey) + if got == nil { + t.Fatal("expected offset, got nil") + } + if got.RttNs != 1000 { + t.Errorf("expected RttNs=1000 (lower RTT replaces best), got %d", got.RttNs) + } +} + +func TestOffsetCache_BackupPromotion(t *testing.T) { + cache := newOffsetCache(20 * time.Millisecond) + + pubkey := [32]byte{1} + + // Put best (low RTT). + cache.Put(makeTestOffset(pubkey, 500)) + // Put backup (higher RTT). + cache.Put(makeTestOffset(pubkey, 2000)) + + // Wait for best to expire. + time.Sleep(25 * time.Millisecond) + + // Put a new value to trigger promotion of backup. + cache.Put(makeTestOffset(pubkey, 3000)) + + got := cache.Get(pubkey) + if got == nil { + t.Fatal("expected offset after backup promotion, got nil") + } + // Backup (2000) should have been promoted or the new value (3000) used. + // Either way, we should get a valid offset back. + if got.RttNs != 2000 && got.RttNs != 3000 { + t.Errorf("expected RttNs=2000 or 3000 after backup promotion, got %d", got.RttNs) + } +} + +func TestOffsetCache_BackupRefreshInHalfWindow(t *testing.T) { + cache := newOffsetCache(100 * time.Millisecond) + + pubkey := [32]byte{1} + + // Put best (low RTT). + cache.Put(makeTestOffset(pubkey, 500)) + + // Wait past half-maxAge (>50ms). + time.Sleep(60 * time.Millisecond) + + // Put a higher RTT offset; this should refresh the backup slot since + // we're past the half-maxAge window. + cache.Put(makeTestOffset(pubkey, 2000)) + + got := cache.Get(pubkey) + if got == nil { + t.Fatal("expected offset, got nil") + } + // Best should still be the low RTT. + if got.RttNs != 500 { + t.Errorf("expected RttNs=500 (best unchanged), got %d", got.RttNs) + } + + // Wait for best to expire but backup should still be valid since it was + // refreshed recently. + time.Sleep(50 * time.Millisecond) + + got = cache.Get(pubkey) + if got == nil { + t.Fatal("expected backup offset after best expired, got nil") + } if got.RttNs != 2000 { - t.Errorf("expected RttNs=2000, got %d", got.RttNs) + t.Errorf("expected RttNs=2000 (backup still valid), got %d", got.RttNs) + } +} + +func TestOffsetCache_GetBestWithTwoSlots(t *testing.T) { + cache := newOffsetCache(1 * time.Hour) + + // Sender 1: best=500, backup=1500. + cache.Put(makeTestOffset([32]byte{1}, 500)) + cache.Put(makeTestOffset([32]byte{1}, 1500)) + + // Sender 2: best=300, backup=2000. + cache.Put(makeTestOffset([32]byte{2}, 300)) + cache.Put(makeTestOffset([32]byte{2}, 2000)) + + // Sender 3: best=800, backup=3000. + cache.Put(makeTestOffset([32]byte{3}, 800)) + cache.Put(makeTestOffset([32]byte{3}, 3000)) + + best := cache.GetBest() + if best == nil { + t.Fatal("expected best offset, got nil") + } + if best.RttNs != 300 { + t.Errorf("expected global best RttNs=300, got %d", best.RttNs) + } + if best.SenderPubkey != [32]byte{2} { + t.Errorf("expected sender pubkey {2}, got %v", best.SenderPubkey) + } +} + +func TestOffsetCache_EvictBothSlots(t *testing.T) { + cache := newOffsetCache(10 * time.Millisecond) + + pubkey := [32]byte{1} + + // Put best and backup. + cache.Put(makeTestOffset(pubkey, 500)) + cache.Put(makeTestOffset(pubkey, 2000)) + + // Also add a non-expiring sender. + cache2 := [32]byte{2} + cache.Put(makeTestOffset(cache2, 1000)) + + // Wait for all entries of sender 1 to expire. + time.Sleep(15 * time.Millisecond) + + // Refresh sender 2 so it stays valid. + cache.Put(makeTestOffset(cache2, 1000)) + + evicted := cache.Evict() + if evicted != 1 { + t.Errorf("expected 1 evicted (sender with both slots expired), got %d", evicted) + } + + // Sender 1 should be gone. + if got := cache.Get(pubkey); got != nil { + t.Errorf("expected sender 1 evicted, got %+v", got) + } + + // Sender 2 should still be present. + if got := cache.Get(cache2); got == nil { + t.Error("expected sender 2 still present") } }