Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ All notable changes to this project will be documented in this file.

### Changes

- Onchain Programs
- Add `target_update_count` field to GeoProbe account, incremented on `AddTarget` and `RemoveTarget`; uses `BorshDeserializeIncremental` so existing accounts default to 0 (non-breaking)
- SDK
- Add `TargetUpdateCount` field to Go GeoProbe struct with backward-compatible deserialization
- Telemetry
- Skip expensive `GetGeolocationUsers` RPC scan in geoprobe-agent when the probe's `target_update_count` is unchanged, with a forced full refresh every ~5 minutes as safety net

## [v0.13.0](https://github.com/malbeclabs/doublezero/compare/client/v0.12.0...client/v0.13.0) - 2026-03-20

### Breaking
Expand Down
31 changes: 19 additions & 12 deletions controlplane/telemetry/cmd/geoprobe-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -533,19 +534,24 @@ func main() {
}
}()

// Shared counter: parent discovery writes the GeoProbe target_update_count on each
// poll; target discovery reads it to skip expensive full scans when unchanged.
var probeTargetUpdateCount atomic.Uint32

// Run parent DZD discovery if program IDs are configured.
if parentDiscoveryEnabled {
parentUpdateCh := make(chan geoprobe.ParentUpdate, 1)
geoProbeClient := geoprobe.NewRPCGeoProbeClient(rpcClient, geolocationProgramID)
deviceResolver := geoprobe.NewRPCDeviceResolver(rpcClient, serviceabilityProgramID)

pd, err := geoprobe.NewParentDiscovery(&geoprobe.ParentDiscoveryConfig{
GeoProbePubkey: geoProbePubkey,
Client: geoProbeClient,
Resolver: deviceResolver,
CLIParents: cliParentAuthorities,
Interval: parentDiscoveryInterval,
Logger: log,
GeoProbePubkey: geoProbePubkey,
Client: geoProbeClient,
Resolver: deviceResolver,
CLIParents: cliParentAuthorities,
Interval: parentDiscoveryInterval,
Logger: log,
ProbeTargetUpdateCount: &probeTargetUpdateCount,
})
if err != nil {
log.Error("Failed to create parent discovery", "error", err)
Expand Down Expand Up @@ -578,12 +584,13 @@ func main() {
if !geolocationProgramID.IsZero() {
geolocationUserClient := geolocation.New(log, rpcClient, geolocationProgramID)
td, err := geoprobe.NewTargetDiscovery(&geoprobe.TargetDiscoveryConfig{
GeoProbePubkey: geoProbePubkey,
Client: geolocationUserClient,
CLITargets: targets,
CLIAllowedKeys: allowedKeys,
Interval: discoveryInterval,
Logger: log,
GeoProbePubkey: geoProbePubkey,
Client: geolocationUserClient,
CLITargets: targets,
CLIAllowedKeys: allowedKeys,
Interval: discoveryInterval,
Logger: log,
ProbeTargetUpdateCount: &probeTargetUpdateCount,
})
if err != nil {
log.Error("Failed to create target discovery", "error", err)
Expand Down
45 changes: 27 additions & 18 deletions controlplane/telemetry/internal/geoprobe/onchain_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"sync/atomic"
"time"

"github.com/gagliardetto/solana-go"
Expand Down Expand Up @@ -38,22 +39,24 @@ type ParentUpdate struct {

// ParentDiscoveryConfig holds configuration for parent discovery.
type ParentDiscoveryConfig struct {
GeoProbePubkey solana.PublicKey
Client GeoProbeAccountClient
Resolver DeviceResolver
CLIParents map[[32]byte][32]byte // static parents from --additional-parent
Interval time.Duration
Logger *slog.Logger
GeoProbePubkey solana.PublicKey
Client GeoProbeAccountClient
Resolver DeviceResolver
CLIParents map[[32]byte][32]byte // static parents from --additional-parent
Interval time.Duration
Logger *slog.Logger
ProbeTargetUpdateCount *atomic.Uint32 // shared counter for target discovery change detection
}

// ParentDiscovery polls the GeoProbe account and resolves parent devices.
type ParentDiscovery struct {
log *slog.Logger
geoProbePubkey solana.PublicKey
client GeoProbeAccountClient
resolver DeviceResolver
cliParents map[[32]byte][32]byte
interval time.Duration
log *slog.Logger
geoProbePubkey solana.PublicKey
client GeoProbeAccountClient
resolver DeviceResolver
cliParents map[[32]byte][32]byte
interval time.Duration
probeTargetUpdateCount *atomic.Uint32

cachedParentDevices []solana.PublicKey
tickCount uint64
Expand Down Expand Up @@ -83,12 +86,13 @@ func NewParentDiscovery(cfg *ParentDiscoveryConfig) (*ParentDiscovery, error) {
}

return &ParentDiscovery{
log: cfg.Logger,
geoProbePubkey: cfg.GeoProbePubkey,
client: cfg.Client,
resolver: cfg.Resolver,
cliParents: cliParents,
interval: cfg.Interval,
log: cfg.Logger,
geoProbePubkey: cfg.GeoProbePubkey,
client: cfg.Client,
resolver: cfg.Resolver,
cliParents: cliParents,
interval: cfg.Interval,
probeTargetUpdateCount: cfg.ProbeTargetUpdateCount,
}, nil
}

Expand Down Expand Up @@ -152,6 +156,11 @@ func (d *ParentDiscovery) discover(ctx context.Context) (*ParentUpdate, error) {
return d.cliOnlyUpdate(), nil
}

// Publish the probe's target_update_count for target discovery change detection.
if d.probeTargetUpdateCount != nil {
d.probeTargetUpdateCount.Store(probe.TargetUpdateCount)
}

// Check if parent device set changed since last poll.
if !forceFullRefresh && pubkeySlicesEqual(d.cachedParentDevices, probe.ParentDevices) {
d.log.Debug("Parent device set unchanged, skipping resolution",
Expand Down
44 changes: 44 additions & 0 deletions controlplane/telemetry/internal/geoprobe/onchain_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"log/slog"
"os"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -611,3 +612,46 @@ func TestPubkeySlicesEqual(t *testing.T) {
})
}
}

func TestParentDiscovery_StoresTargetUpdateCount(t *testing.T) {
t.Parallel()

geoProbePK := solana.NewWallet().PublicKey()
parentDevicePK := solana.NewWallet().PublicKey()
var metricsKey [32]byte
metricsKey = solana.NewWallet().PublicKey()

client := &mockGeoProbeAccountClient{
probe: &geolocation.GeoProbe{
AccountType: geolocation.AccountTypeGeoProbe,
ParentDevices: []solana.PublicKey{parentDevicePK},
TargetUpdateCount: 42,
},
}

resolver := &mockDeviceResolver{
devices: map[solana.PublicKey]*serviceability.Device{
parentDevicePK: {
PublicIp: [4]uint8{10, 0, 0, 1},
MetricsPublisherPubKey: metricsKey,
},
},
}

var counter atomic.Uint32
pd, err := NewParentDiscovery(&ParentDiscoveryConfig{
Logger: slog.New(slog.NewTextHandler(os.Stderr, nil)),
Client: client,
Resolver: resolver,
GeoProbePubkey: geoProbePK,
Interval: 10 * time.Millisecond,
ProbeTargetUpdateCount: &counter,
})
require.NoError(t, err)

update, err := pd.discover(context.Background())
require.NoError(t, err)
require.NotNil(t, update)

assert.Equal(t, uint32(42), counter.Load())
}
77 changes: 54 additions & 23 deletions controlplane/telemetry/internal/geoprobe/target_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"sort"
"sync/atomic"
"time"

"github.com/gagliardetto/solana-go"
Expand All @@ -27,30 +28,38 @@ type InboundKeyUpdate struct {
Keys [][32]byte
}

// targetDiscoveryFullRefreshEvery controls how often a full GeolocationUser scan
// is forced regardless of whether the GeoProbe target_update_count has changed.
// At the default 60s interval, 5 means a full refresh every ~5 minutes.
const targetDiscoveryFullRefreshEvery = 5

// TargetDiscoveryConfig holds configuration for target discovery.
type TargetDiscoveryConfig struct {
GeoProbePubkey solana.PublicKey
Client GeolocationUserClient
CLITargets []ProbeAddress
CLIAllowedKeys [][32]byte
Interval time.Duration
Logger *slog.Logger
GeoProbePubkey solana.PublicKey
Client GeolocationUserClient
CLITargets []ProbeAddress
CLIAllowedKeys [][32]byte
Interval time.Duration
Logger *slog.Logger
ProbeTargetUpdateCount *atomic.Uint32 // shared counter from parent discovery
}

// TargetDiscovery polls GeolocationUser accounts and sends target/key updates
// when changes are detected. It filters for activated, paid users whose targets
// reference this probe's pubkey.
type TargetDiscovery struct {
log *slog.Logger
geoProbePubkey solana.PublicKey
client GeolocationUserClient
cliTargets []ProbeAddress
cliAllowedKeys [][32]byte
interval time.Duration

cachedTargets []ProbeAddress
cachedInboundKeys [][32]byte
tickCount uint64
log *slog.Logger
geoProbePubkey solana.PublicKey
client GeolocationUserClient
cliTargets []ProbeAddress
cliAllowedKeys [][32]byte
interval time.Duration
probeTargetUpdateCount *atomic.Uint32

cachedTargets []ProbeAddress
cachedInboundKeys [][32]byte
tickCount uint64
lastSeenTargetUpdateCount uint32
}

// NewTargetDiscovery creates a new TargetDiscovery instance.
Expand All @@ -69,12 +78,13 @@ func NewTargetDiscovery(cfg *TargetDiscoveryConfig) (*TargetDiscovery, error) {
}

return &TargetDiscovery{
log: cfg.Logger,
geoProbePubkey: cfg.GeoProbePubkey,
client: cfg.Client,
cliTargets: cfg.CLITargets,
cliAllowedKeys: cfg.CLIAllowedKeys,
interval: cfg.Interval,
log: cfg.Logger,
geoProbePubkey: cfg.GeoProbePubkey,
client: cfg.Client,
cliTargets: cfg.CLITargets,
cliAllowedKeys: cfg.CLIAllowedKeys,
interval: cfg.Interval,
probeTargetUpdateCount: cfg.ProbeTargetUpdateCount,
}, nil
}

Expand Down Expand Up @@ -111,6 +121,11 @@ func (d *TargetDiscovery) discoverAndSend(ctx context.Context, targetCh chan<- T
return
}

// nil targets means the scan was skipped (target_update_count unchanged).
if targets == nil && inboundKeys == nil {
return
}

if !probeAddressSlicesEqual(targets, d.cachedTargets) {
d.cachedTargets = targets
select {
Expand All @@ -131,10 +146,21 @@ func (d *TargetDiscovery) discoverAndSend(ctx context.Context, targetCh chan<- T
}

// discover performs a single discovery cycle: fetch users, filter, extract targets/keys,
// merge with CLI values.
// merge with CLI values. Returns nil, nil, nil when the scan is skipped.
func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, [][32]byte, error) {
forceFullRefresh := d.tickCount%targetDiscoveryFullRefreshEvery == 0
d.tickCount++

if d.probeTargetUpdateCount != nil && !forceFullRefresh {
current := d.probeTargetUpdateCount.Load()
if current == d.lastSeenTargetUpdateCount && d.tickCount > 1 {
d.log.Debug("GeoProbe target_update_count unchanged, skipping target scan",
"targetUpdateCount", current)
return nil, nil, nil
}
d.lastSeenTargetUpdateCount = current
}

users, err := d.client.GetGeolocationUsers(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch GeolocationUser accounts: %w", err)
Expand Down Expand Up @@ -195,6 +221,11 @@ func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, [][32]b
mergedTargets := mergeProbes(d.cliTargets, onchainTargets)
mergedKeys := mergeKeys(d.cliAllowedKeys, onchainKeys)

// Sync lastSeenTargetUpdateCount after a full scan (covers forced refresh path).
if d.probeTargetUpdateCount != nil {
d.lastSeenTargetUpdateCount = d.probeTargetUpdateCount.Load()
}

d.log.Debug("Target discovery tick",
"users", len(users),
"onchainOutbound", len(onchainTargets),
Expand Down
Loading
Loading