Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 73 additions & 18 deletions controlplane/telemetry/cmd/geoprobe-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func parseAllowedPubkeys(s string) ([][32]byte, error) {
// offsetCache stores recent DZD offsets keyed by sender (device) pubkey.
type offsetCache struct {
mu sync.RWMutex
entries map[[32]byte]*cachedOffset
entries map[[32]byte]*cachedSender
maxAge time.Duration
}

Expand All @@ -159,34 +159,87 @@ type cachedOffset struct {
receivedAt time.Time
}

func (co *cachedOffset) expired(maxAge time.Duration) bool {
return co == nil || time.Since(co.receivedAt) > maxAge
}

type cachedSender struct {
best *cachedOffset // lowest RTT seen in the full maxAge window
backup *cachedOffset // lowest RTT seen in the recent half-maxAge window
}

func newOffsetCache(maxAge time.Duration) *offsetCache {
return &offsetCache{
entries: make(map[[32]byte]*cachedOffset),
entries: make(map[[32]byte]*cachedSender),
maxAge: maxAge,
}
}

func (c *offsetCache) Put(offset *geoprobe.LocationOffset) {
c.mu.Lock()
defer c.mu.Unlock()
c.entries[offset.SenderPubkey] = &cachedOffset{

sender, ok := c.entries[offset.SenderPubkey]
if !ok {
sender = &cachedSender{}
c.entries[offset.SenderPubkey] = sender
}

now := time.Now()
entry := &cachedOffset{
offset: *offset,
receivedAt: time.Now(),
receivedAt: now,
}

// If best is expired, promote backup to best (if backup is non-expired), then clear backup.
if sender.best.expired(c.maxAge) {
if !sender.backup.expired(c.maxAge) {
sender.best = sender.backup
} else {
sender.best = nil
}
sender.backup = nil
}

// If best is nil (empty after promotion attempt), just set it.
if sender.best == nil {
sender.best = entry
return
}

if offset.RttNs <= sender.best.offset.RttNs {
// New offset is better than or equal to best: replace best.
sender.best = entry
} else {
// New offset has higher RTT than best, consider it for second-best.
// Second-best must have a half of a MaxAge left to live, so that if
// it gets promoted, it could hold for a meaningful amount of time.
halfMaxAge := c.maxAge / 2
if sender.backup.expired(c.maxAge) || offset.RttNs <= sender.backup.offset.RttNs {
sender.backup = entry
} else if time.Since(sender.backup.receivedAt) > halfMaxAge {
// Backup is stale (older than half-maxAge): replace to keep it fresh.
sender.backup = entry
}
}
}

func (c *offsetCache) Get(senderPubkey [32]byte) *geoprobe.LocationOffset {
c.mu.RLock()
defer c.mu.RUnlock()
entry, ok := c.entries[senderPubkey]
sender, ok := c.entries[senderPubkey]
if !ok {
return nil
}
if time.Since(entry.receivedAt) > c.maxAge {
return nil
if !sender.best.expired(c.maxAge) {
result := sender.best.offset
return &result
}
result := entry.offset
return &result
if !sender.backup.expired(c.maxAge) {
result := sender.backup.offset
return &result
}
return nil
}

// GetBest returns the non-expired offset with the shortest RttNs.
Expand All @@ -195,13 +248,15 @@ func (c *offsetCache) GetBest() *geoprobe.LocationOffset {
defer c.mu.RUnlock()

var best *geoprobe.LocationOffset
for _, entry := range c.entries {
if time.Since(entry.receivedAt) > c.maxAge {
continue
}
if best == nil || entry.offset.RttNs < best.RttNs {
e := entry.offset
best = &e
for _, sender := range c.entries {
for _, entry := range []*cachedOffset{sender.best, sender.backup} {
if entry.expired(c.maxAge) {
continue
}
if best == nil || entry.offset.RttNs < best.RttNs {
e := entry.offset
best = &e
}
}
}
return best
Expand All @@ -212,8 +267,8 @@ func (c *offsetCache) Evict() int {
c.mu.Lock()
defer c.mu.Unlock()
evicted := 0
for key, entry := range c.entries {
if time.Since(entry.receivedAt) > c.maxAge {
for key, sender := range c.entries {
if sender.best.expired(c.maxAge) && sender.backup.expired(c.maxAge) {
delete(c.entries, key)
evicted++
}
Expand Down
163 changes: 162 additions & 1 deletion controlplane/telemetry/cmd/geoprobe-agent/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,169 @@ func TestOffsetCache_PutReplaces(t *testing.T) {
if got == nil {
t.Fatal("expected offset, got nil")
}
// With two-slot cache, lower RTT stays as best.
if got.RttNs != 1000 {
t.Errorf("expected RttNs=1000 (best kept), got %d", got.RttNs)
}
}

func TestOffsetCache_PutKeepsBest(t *testing.T) {
cache := newOffsetCache(1 * time.Hour)

pubkey := [32]byte{1}

cache.Put(makeTestOffset(pubkey, 1000))
cache.Put(makeTestOffset(pubkey, 2000))

got := cache.Get(pubkey)
if got == nil {
t.Fatal("expected offset, got nil")
}
if got.RttNs != 1000 {
t.Errorf("expected RttNs=1000 (best kept over higher RTT), got %d", got.RttNs)
}
}

func TestOffsetCache_PutLowerRTTReplacesBest(t *testing.T) {
cache := newOffsetCache(1 * time.Hour)

pubkey := [32]byte{1}

cache.Put(makeTestOffset(pubkey, 2000))
cache.Put(makeTestOffset(pubkey, 1000))

got := cache.Get(pubkey)
if got == nil {
t.Fatal("expected offset, got nil")
}
if got.RttNs != 1000 {
t.Errorf("expected RttNs=1000 (lower RTT replaces best), got %d", got.RttNs)
}
}

func TestOffsetCache_BackupPromotion(t *testing.T) {
cache := newOffsetCache(20 * time.Millisecond)

pubkey := [32]byte{1}

// Put best (low RTT).
cache.Put(makeTestOffset(pubkey, 500))
// Put backup (higher RTT).
cache.Put(makeTestOffset(pubkey, 2000))

// Wait for best to expire.
time.Sleep(25 * time.Millisecond)

// Put a new value to trigger promotion of backup.
cache.Put(makeTestOffset(pubkey, 3000))

got := cache.Get(pubkey)
if got == nil {
t.Fatal("expected offset after backup promotion, got nil")
}
// Backup (2000) should have been promoted or the new value (3000) used.
// Either way, we should get a valid offset back.
if got.RttNs != 2000 && got.RttNs != 3000 {
t.Errorf("expected RttNs=2000 or 3000 after backup promotion, got %d", got.RttNs)
}
}

func TestOffsetCache_BackupRefreshInHalfWindow(t *testing.T) {
cache := newOffsetCache(100 * time.Millisecond)

pubkey := [32]byte{1}

// Put best (low RTT).
cache.Put(makeTestOffset(pubkey, 500))

// Wait past half-maxAge (>50ms).
time.Sleep(60 * time.Millisecond)

// Put a higher RTT offset; this should refresh the backup slot since
// we're past the half-maxAge window.
cache.Put(makeTestOffset(pubkey, 2000))

got := cache.Get(pubkey)
if got == nil {
t.Fatal("expected offset, got nil")
}
// Best should still be the low RTT.
if got.RttNs != 500 {
t.Errorf("expected RttNs=500 (best unchanged), got %d", got.RttNs)
}

// Wait for best to expire but backup should still be valid since it was
// refreshed recently.
time.Sleep(50 * time.Millisecond)

got = cache.Get(pubkey)
if got == nil {
t.Fatal("expected backup offset after best expired, got nil")
}
if got.RttNs != 2000 {
t.Errorf("expected RttNs=2000, got %d", got.RttNs)
t.Errorf("expected RttNs=2000 (backup still valid), got %d", got.RttNs)
}
}

func TestOffsetCache_GetBestWithTwoSlots(t *testing.T) {
cache := newOffsetCache(1 * time.Hour)

// Sender 1: best=500, backup=1500.
cache.Put(makeTestOffset([32]byte{1}, 500))
cache.Put(makeTestOffset([32]byte{1}, 1500))

// Sender 2: best=300, backup=2000.
cache.Put(makeTestOffset([32]byte{2}, 300))
cache.Put(makeTestOffset([32]byte{2}, 2000))

// Sender 3: best=800, backup=3000.
cache.Put(makeTestOffset([32]byte{3}, 800))
cache.Put(makeTestOffset([32]byte{3}, 3000))

best := cache.GetBest()
if best == nil {
t.Fatal("expected best offset, got nil")
}
if best.RttNs != 300 {
t.Errorf("expected global best RttNs=300, got %d", best.RttNs)
}
if best.SenderPubkey != [32]byte{2} {
t.Errorf("expected sender pubkey {2}, got %v", best.SenderPubkey)
}
}

func TestOffsetCache_EvictBothSlots(t *testing.T) {
cache := newOffsetCache(10 * time.Millisecond)

pubkey := [32]byte{1}

// Put best and backup.
cache.Put(makeTestOffset(pubkey, 500))
cache.Put(makeTestOffset(pubkey, 2000))

// Also add a non-expiring sender.
cache2 := [32]byte{2}
cache.Put(makeTestOffset(cache2, 1000))

// Wait for all entries of sender 1 to expire.
time.Sleep(15 * time.Millisecond)

// Refresh sender 2 so it stays valid.
cache.Put(makeTestOffset(cache2, 1000))

evicted := cache.Evict()
if evicted != 1 {
t.Errorf("expected 1 evicted (sender with both slots expired), got %d", evicted)
}

// Sender 1 should be gone.
if got := cache.Get(pubkey); got != nil {
t.Errorf("expected sender 1 evicted, got %+v", got)
}

// Sender 2 should still be present.
if got := cache.Get(cache2); got == nil {
t.Error("expected sender 2 still present")
}
}

Expand Down
Loading