Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ All notable changes to this project will be documented in this file.
- Telemetry
- Embed LocationOffsets from parent DZDs in signed TWAMP replies so inbound probes carry geolocation context, and make signed TWAMP replies more like LocationOffsets to couple with a new double-probe system for inbound probing.
- Client
- Add `doublezero_connection_info` Prometheus metric exposing connection metadata (user_type, network, current_device, metro, tunnel_name, tunnel_src, tunnel_dst) ([#3201](https://github.com/malbeclabs/doublezero/pull/3201))
- Increase default onchain fetch timeout from 20s to 60s to improve resilience on high-latency RPC paths; add `-reconciler-fetch-timeout` flag to allow operators to override
- Add prometheus metrics for onchain RPC fetches: fetch duration histogram, result counter (success/error with stale cache/error with no cache), and stale cache age gauge
- Increase default route liveness probe interval (TxMin/RxMin) from 300ms to 1s and raise MaxTxCeil from 1s to 3s to preserve backoff headroom
Expand Down
34 changes: 34 additions & 0 deletions client/doublezerod/internal/manager/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (n *NetlinkManager) ServeProvision(w http.ResponseWriter, r *http.Request)
return
}

n.updateConnectionInfoMetric()
_, _ = w.Write([]byte(`{"status": "ok"}`))
}

Expand Down Expand Up @@ -97,6 +98,7 @@ func (n *NetlinkManager) ServeRemove(w http.ResponseWriter, r *http.Request) {
return
}

n.updateConnectionInfoMetric()
_, _ = w.Write([]byte(`{"status": "ok"}`))
}

Expand Down Expand Up @@ -167,6 +169,38 @@ func (n *NetlinkManager) ServeV2Status(w http.ResponseWriter, _ *http.Request) {
})
}

// updateConnectionInfoMetric resets and repopulates the doublezero_connection_info
// gauge with current service metadata.
func (n *NetlinkManager) updateConnectionInfoMetric() {
metricConnectionInfo.Reset()

statuses, err := n.Status()
if err != nil || len(statuses) == 0 {
return
}

enriched := n.enrichStatuses(statuses)
for _, svc := range enriched {
metricConnectionInfo.WithLabelValues(
svc.UserType.String(),
n.network,
svc.CurrentDevice,
svc.Metro,
svc.TunnelName,
ipString(svc.TunnelSrc),
ipString(svc.TunnelDst),
).Set(1)
}
}

// ipString returns the string representation of an IP, or empty string if nil.
func ipString(ip net.IP) string {
if ip == nil {
return ""
}
return ip.String()
}

// latencyToleranceNS matches the CLI's LATENCY_TOLERANCE_NS (5ms).
const latencyToleranceNS int64 = 5_000_000

Expand Down
3 changes: 3 additions & 0 deletions client/doublezerod/internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ func (n *NetlinkManager) reconcilerTeardown() {
}
// Clear cached tunnel src so a fresh lookup is done on next enable.
n.tunnelSrcCache = make(map[string]net.IP)
metricConnectionInfo.Reset()
}

func (n *NetlinkManager) reconcile(ctx context.Context) {
Expand Down Expand Up @@ -535,6 +536,8 @@ func (n *NetlinkManager) reconcile(ctx context.Context) {
// Reconcile unicast and multicast services
n.reconcileService(wantUnicast, n.HasUnicastService(), serviceUnicast, api.UserTypeIBRL, devicesByPK, mcastGroupsByPK, allPrefixes, data.Config)
n.reconcileService(wantMulticast, n.HasMulticastService(), serviceMulticast, api.UserTypeMulticast, devicesByPK, mcastGroupsByPK, allPrefixes, data.Config)

n.updateConnectionInfoMetric()
}

func (n *NetlinkManager) reconcileService(
Expand Down
8 changes: 8 additions & 0 deletions client/doublezerod/internal/manager/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,12 @@ var (
},
[]string{labelServiceType},
)

metricConnectionInfo = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "doublezero_connection_info",
Help: "Connection metadata for active DoubleZero services",
},
[]string{"user_type", "network", "current_device", "metro", "tunnel_name", "tunnel_src", "tunnel_dst"},
)
)
62 changes: 62 additions & 0 deletions client/doublezerod/internal/manager/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/malbeclabs/doublezero/client/doublezerod/internal/pim"
"github.com/malbeclabs/doublezero/client/doublezerod/internal/routing"
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
"github.com/prometheus/client_golang/prometheus/testutil"
)

// --- test mocks ---
Expand Down Expand Up @@ -1263,6 +1264,67 @@ func TestServeV2Status_Enrichment(t *testing.T) {
}
}

func TestConnectionInfoMetric(t *testing.T) {
devicePK := [32]byte{1}
exchangePK := [32]byte{2}
clientIP := net.IPv4(1, 2, 3, 4).To4()

device := testDevice(devicePK, [4]uint8{5, 6, 7, 8}, [][5]uint8{{10, 0, 0, 0, 24}})
device.ExchangePubKey = exchangePK
device.Code = "dz1"
device.Status = serviceability.DeviceStatusActivated

user := testUser([4]uint8{1, 2, 3, 4}, devicePK, serviceability.UserTypeIBRL, serviceability.UserStatusActivated)

fetcher := &mockFetcher{
data: &serviceability.ProgramData{
Config: testConfig(),
Devices: []serviceability.Device{device},
Users: []serviceability.User{user},
Exchanges: []serviceability.Exchange{
{PubKey: exchangePK, Name: "Amsterdam"},
},
},
}

dir := t.TempDir()
n := newTestNLM(fetcher,
WithClientIP(clientIP),
WithPollInterval(time.Hour),
WithStateDir(dir),
WithEnabled(true),
WithNetwork("testnet"),
)

// Reset metric to avoid cross-test pollution from promauto global state.
metricConnectionInfo.Reset()

t.Run("populated_after_reconcile", func(t *testing.T) {
n.reconcile(context.Background())

count := testutil.CollectAndCount(metricConnectionInfo)
if count != 1 {
t.Fatalf("expected 1 metric series, got %d", count)
}

val := testutil.ToFloat64(metricConnectionInfo.WithLabelValues(
"IBRL", "testnet", "dz1", "Amsterdam", "doublezero0", "1.2.3.4", "5.6.7.8",
))
if val != 1 {
t.Fatalf("expected metric value 1, got %f", val)
}
})

t.Run("cleared_after_teardown", func(t *testing.T) {
n.reconcilerTeardown()

count := testutil.CollectAndCount(metricConnectionInfo)
if count != 0 {
t.Fatalf("expected 0 metric series after teardown, got %d", count)
}
})
}

func TestServeV2Status_NoFetcher(t *testing.T) {
dir := t.TempDir()
// Create NLM with a nil-data fetcher to verify enrichment handles missing data gracefully.
Expand Down
Loading