Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (

MainnetRevenueDistributionProgramID = "dzrevZC94tBLwuHw1dyynZxaXTWyp7yocsinyEVPtt4"
MainnetGeolocationProgramID = "8H7nS6eZiuf7rGQtz3PPz2q9m4eJRL37PPM678KHnspG"
MainnetReservationProgramID = "dzshrr3yL57SB13sJPYHYo3TV8Bo1i1FxkyrZr3bKNE"

// Testnet constants.
TestnetLedgerPublicRPCURL = "https://doublezerolocalnet.rpcpool.com/8a4fd3f4-0977-449f-88c7-63d4b0f10f16"
Expand All @@ -27,6 +28,7 @@ const (
TestnetTelemetryFlowIngestURL = "http://telemetry-flow-in.testnet.doublezero.xyz"
TestnetTelemetryStateIngestURL = "http://telemetry-state-in.testnet.doublezero.xyz"
TestnetGeolocationProgramID = "3AG2BCA7gAm47Q6xZzPQcUUYvnBjxAvPKnPz919cxHF4"
TestnetReservationProgramID = "dzshrr3yL57SB13sJPYHYo3TV8Bo1i1FxkyrZr3bKNE"
TestnetTelemetryGNMITunnelServerAddr = "gnmic-testnet.doublezero.xyz:443"

// Devnet constants.
Expand All @@ -39,6 +41,7 @@ const (
DevnetTelemetryFlowIngestURL = "http://telemetry-flow-in.devnet.doublezero.xyz"
DevnetTelemetryStateIngestURL = "http://telemetry-state-in.devnet.doublezero.xyz"
DevnetGeolocationProgramID = "EXUUFfAjjuXnaBtsAMLsJX18ynnNHPwtkmk33bLVVoCm"
DevnetReservationProgramID = "" // TODO: set when deployed to devnet
DevnetTelemetryGNMITunnelServerAddr = "gnmic-devnet.doublezero.xyz:443"

// Localnet constants.
Expand All @@ -52,5 +55,6 @@ const (
LocalnetTelemetryFlowIngestURL = "http://localhost:8911"
LocalnetTelemetryStateIngestURL = "http://localhost:8911"
LocalnetGeolocationProgramID = "36WA9nUCsJaAQL5h44WYoLezDpocy8Q71NZbtrUN8DyC"
LocalnetReservationProgramID = "" // TODO: set when deployed to localnet
LocalnetTelemetryGNMITunnelServerAddr = "localhost:50051"
)
12 changes: 12 additions & 0 deletions config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type NetworkConfig struct {
TelemetryStateIngestURL string
TelemetryGNMITunnelServerAddr string
GeolocationProgramID solana.PublicKey
ReservationProgramID string
}

func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
Expand Down Expand Up @@ -63,6 +64,7 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
RevenueDistributionProgramID: revenueDistributionProgramID,
InternetLatencyCollectorPK: internetLatencyCollectorPK,
GeolocationProgramID: geolocationProgramID,
ReservationProgramID: MainnetReservationProgramID,
DeviceLocalASN: MainnetDeviceLocalASN,
TwoZOracleURL: MainnetTwoZOracleURL,
SolanaRPCURL: MainnetSolanaRPC,
Expand Down Expand Up @@ -94,6 +96,7 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
TelemetryProgramID: telemetryProgramID,
InternetLatencyCollectorPK: internetLatencyCollectorPK,
GeolocationProgramID: geolocationProgramID,
ReservationProgramID: TestnetReservationProgramID,
DeviceLocalASN: TestnetDeviceLocalASN,
TwoZOracleURL: TestnetTwoZOracleURL,
SolanaRPCURL: TestnetSolanaRPC,
Expand Down Expand Up @@ -125,6 +128,7 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
TelemetryProgramID: telemetryProgramID,
InternetLatencyCollectorPK: internetLatencyCollectorPK,
GeolocationProgramID: geolocationProgramID,
ReservationProgramID: DevnetReservationProgramID,
DeviceLocalASN: DevnetDeviceLocalASN,
TwoZOracleURL: DevnetTwoZOracleURL,
SolanaRPCURL: TestnetSolanaRPC,
Expand Down Expand Up @@ -156,6 +160,7 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
TelemetryProgramID: telemetryProgramID,
InternetLatencyCollectorPK: internetLatencyCollectorPK,
GeolocationProgramID: geolocationProgramID,
ReservationProgramID: LocalnetReservationProgramID,
DeviceLocalASN: LocalnetDeviceLocalASN,
TwoZOracleURL: LocalnetTwoZOracleURL,
SolanaRPCURL: LocalnetSolanaRPC,
Expand All @@ -168,6 +173,13 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
return nil, fmt.Errorf("invalid environment %q, must be one of: %s, %s, %s", env, EnvMainnetBeta, EnvTestnet, EnvDevnet)
}

// Validate reservation program ID if set (empty means not yet deployed to this env).
if config.ReservationProgramID != "" {
if _, err := solana.PublicKeyFromBase58(config.ReservationProgramID); err != nil {
return nil, fmt.Errorf("failed to parse reservation program ID: %w", err)
}
}

ledgerRPCURL := os.Getenv("DZ_LEDGER_RPC_URL")
if ledgerRPCURL != "" {
config.LedgerPublicRPCURL = ledgerRPCURL
Expand Down
3 changes: 3 additions & 0 deletions config/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestConfig_NetworkConfigForEnv(t *testing.T) {
TelemetryStateIngestURL: config.MainnetTelemetryStateIngestURL,
TelemetryGNMITunnelServerAddr: config.MainnetTelemetryGNMITunnelServerAddr,
GeolocationProgramID: solana.MustPublicKeyFromBase58(config.MainnetGeolocationProgramID),
ReservationProgramID: config.MainnetReservationProgramID,
},
},
{
Expand All @@ -50,6 +51,7 @@ func TestConfig_NetworkConfigForEnv(t *testing.T) {
TelemetryStateIngestURL: config.MainnetTelemetryStateIngestURL,
TelemetryGNMITunnelServerAddr: config.MainnetTelemetryGNMITunnelServerAddr,
GeolocationProgramID: solana.MustPublicKeyFromBase58(config.MainnetGeolocationProgramID),
ReservationProgramID: config.MainnetReservationProgramID,
},
},
{
Expand All @@ -67,6 +69,7 @@ func TestConfig_NetworkConfigForEnv(t *testing.T) {
TelemetryStateIngestURL: config.TestnetTelemetryStateIngestURL,
TelemetryGNMITunnelServerAddr: config.TestnetTelemetryGNMITunnelServerAddr,
GeolocationProgramID: solana.MustPublicKeyFromBase58(config.TestnetGeolocationProgramID),
ReservationProgramID: config.TestnetReservationProgramID,
},
},
{
Expand Down
10 changes: 8 additions & 2 deletions e2e/internal/qa/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ type Client struct {
// Exported as a simple configuration field (unlike publicIP which uses a setter
// because it has a non-nil invariant enforced by SetPublicIP).
ClientIP string

// Settlement config passed to doublezero-solana shreds commands.
SolanaRPCURL string
ReservationProgramID string
}

func NewClient(ctx context.Context, log *slog.Logger, hostname string, port int, networkConfig *config.NetworkConfig, devices map[string]*Device, allocateAddr bool) (*Client, error) {
Expand Down Expand Up @@ -133,8 +137,10 @@ func NewClient(ctx context.Context, log *slog.Logger, hostname string, port int,
serviceability: serviceabilityClient,
devices: devices,

Host: hostname,
AllocateAddr: allocateAddr,
Host: hostname,
AllocateAddr: allocateAddr,
SolanaRPCURL: networkConfig.SolanaRPCURL,
ReservationProgramID: networkConfig.ReservationProgramID,
}, nil
}

Expand Down
99 changes: 99 additions & 0 deletions e2e/internal/qa/client_settlement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package qa

import (
"context"
"fmt"
"math"

pb "github.com/malbeclabs/doublezero/e2e/proto/qa/gen/pb-go"
"google.golang.org/protobuf/types/known/emptypb"
)

// FeedEnable calls the FeedEnable RPC to start the doublezerod reconciler.
func (c *Client) FeedEnable(ctx context.Context) error {
c.log.Debug("Enabling reconciler", "host", c.Host)
resp, err := c.grpcClient.FeedEnable(ctx, &emptypb.Empty{})
if err != nil {
return fmt.Errorf("failed to enable reconciler on host %s: %w", c.Host, err)
}
if !resp.GetSuccess() {
return fmt.Errorf("enable failed on host %s: %s", c.Host, resp.GetOutput())
}
c.log.Debug("Reconciler enabled", "host", c.Host)
return nil
}

// ClosestDevice returns the reachable device with the lowest average latency.
// It calls GetLatency and looks up the result in the client's devices map.
func (c *Client) ClosestDevice(ctx context.Context) (*Device, error) {
latencies, err := c.GetLatency(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get latency on host %s: %w", c.Host, err)
}

var bestLatency *pb.Latency
var bestAvg uint64 = math.MaxUint64
for _, l := range latencies {
if !l.Reachable {
continue
}
if l.AvgLatencyNs < bestAvg {
bestAvg = l.AvgLatencyNs
bestLatency = l
}
}
if bestLatency == nil {
return nil, fmt.Errorf("no reachable devices found on host %s", c.Host)
}

// Look up device by code in the devices map.
device, ok := c.devices[bestLatency.DeviceCode]
if !ok {
return nil, fmt.Errorf("closest device %q (pk=%s) not found in devices map on host %s", bestLatency.DeviceCode, bestLatency.DevicePk, c.Host)
}

c.log.Debug("Determined closest device", "host", c.Host, "deviceCode", device.Code, "avgLatencyNs", bestAvg)
return device, nil
}

// FeedSeatPay calls the FeedSeatPay RPC to pay for a seat on a device.
// The client's public IP is auto-filled. Instant allocation is the default.
func (c *Client) FeedSeatPay(ctx context.Context, devicePubkey string, amount string) error {
c.log.Debug("Paying for seat", "host", c.Host, "device", devicePubkey, "amount", amount)
resp, err := c.grpcClient.FeedSeatPay(ctx, &pb.FeedSeatPayRequest{
DevicePubkey: devicePubkey,
ClientIp: c.publicIP.To4().String(),
Amount: amount,
SolanaRpcUrl: c.SolanaRPCURL,
ReservationProgramId: c.ReservationProgramID,
})
if err != nil {
return fmt.Errorf("failed to pay for seat on host %s: %w", c.Host, err)
}
if !resp.GetSuccess() {
c.log.Error("Seat payment failed", "host", c.Host, "device", devicePubkey, "output", resp.GetOutput())
return fmt.Errorf("seat payment failed on host %s: %s", c.Host, resp.GetOutput())
}
c.log.Debug("Seat payment successful", "host", c.Host, "device", devicePubkey)
return nil
}

// FeedSeatWithdraw calls the FeedSeatWithdraw RPC to withdraw a seat from a device.
// Instant withdrawal is the default.
func (c *Client) FeedSeatWithdraw(ctx context.Context, devicePubkey string) error {
c.log.Debug("Withdrawing seat", "host", c.Host, "device", devicePubkey)
resp, err := c.grpcClient.FeedSeatWithdraw(ctx, &pb.FeedSeatWithdrawRequest{
DevicePubkey: devicePubkey,
ClientIp: c.publicIP.To4().String(),
SolanaRpcUrl: c.SolanaRPCURL,
ReservationProgramId: c.ReservationProgramID,
})
if err != nil {
return fmt.Errorf("failed to withdraw seat on host %s: %w", c.Host, err)
}
if !resp.GetSuccess() {
return fmt.Errorf("seat withdrawal failed on host %s: %s", c.Host, resp.GetOutput())
}
c.log.Debug("Seat withdrawal successful", "host", c.Host, "device", devicePubkey)
return nil
}
85 changes: 85 additions & 0 deletions e2e/internal/rpc/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,91 @@ func (q *QAAgent) Disconnect(ctx context.Context, req *emptypb.Empty) (*pb.Resul
return res, nil
}

// FeedEnable implements the FeedEnable RPC, which enables the reconciler on doublezerod.
// This is equivalent to running `doublezero enable`.
func (q *QAAgent) FeedEnable(ctx context.Context, req *emptypb.Empty) (*pb.Result, error) {
q.log.Debug("Received FeedEnable request")
cmd := exec.CommandContext(ctx, "doublezero", "enable")
res, err := runCmd(cmd)
if err != nil {
q.log.Error("Failed to enable reconciler", "output", res.GetOutput())
return res, fmt.Errorf("failed to enable reconciler: %w", err)
}
q.log.Debug("Reconciler enabled", "output", res.GetOutput())
return res, nil
}

// FeedSeatPay implements the FeedSeatPay RPC, which pays for a seat on a device.
// This executes `doublezero-solana shreds pay` with the provided parameters.
// Instant allocation is always the default in the CLI (--now was removed).
// --accept-partial-epoch is hardcoded to prevent the interactive prompt from
// hanging in non-interactive QA test runs.
func (q *QAAgent) FeedSeatPay(ctx context.Context, req *pb.FeedSeatPayRequest) (*pb.Result, error) {
if req.GetDevicePubkey() == "" {
return nil, fmt.Errorf("device_pubkey is required")
}
if req.GetClientIp() == "" {
return nil, fmt.Errorf("client_ip is required")
}
if req.GetAmount() == "" {
return nil, fmt.Errorf("amount is required")
}
q.log.Debug("Received SeatPay request", "device", req.GetDevicePubkey(), "clientIP", req.GetClientIp(), "amount", req.GetAmount())

args := []string{"shreds", "pay", "--device", req.GetDevicePubkey(), "--client-ip", req.GetClientIp(), "--amount", req.GetAmount(), "--accept-partial-epoch"}
if req.GetSolanaRpcUrl() != "" {
args = append(args, "--url", req.GetSolanaRpcUrl())
}

cmdCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

cmd := exec.CommandContext(cmdCtx, "doublezero-solana", args...)
if req.GetReservationProgramId() != "" {
cmd.Env = append(cmd.Environ(), "RESERVATION_PROGRAM_ID="+req.GetReservationProgramId())
}
res, err := runCmd(cmd)
if err != nil {
q.log.Error("Failed to pay for seat", "device", req.GetDevicePubkey(), "output", res.GetOutput())
return res, fmt.Errorf("failed to pay for seat on device %s: %w", req.GetDevicePubkey(), err)
}
q.log.Debug("Seat payment successful", "device", req.GetDevicePubkey(), "output", res.GetOutput())
return res, nil
}

// FeedSeatWithdraw implements the FeedSeatWithdraw RPC, which withdraws a seat from a device.
// This executes `doublezero-solana shreds withdraw` with the provided parameters.
// Instant withdrawal is always the default in the CLI (--unsafe-now was removed).
func (q *QAAgent) FeedSeatWithdraw(ctx context.Context, req *pb.FeedSeatWithdrawRequest) (*pb.Result, error) {
if req.GetDevicePubkey() == "" {
return nil, fmt.Errorf("device_pubkey is required")
}
if req.GetClientIp() == "" {
return nil, fmt.Errorf("client_ip is required")
}
q.log.Debug("Received SeatWithdraw request", "device", req.GetDevicePubkey(), "clientIP", req.GetClientIp())

args := []string{"shreds", "withdraw", "--device", req.GetDevicePubkey(), "--client-ip", req.GetClientIp()}
if req.GetSolanaRpcUrl() != "" {
args = append(args, "--url", req.GetSolanaRpcUrl())
}

cmdCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

cmd := exec.CommandContext(cmdCtx, "doublezero-solana", args...)
if req.GetReservationProgramId() != "" {
cmd.Env = append(cmd.Environ(), "RESERVATION_PROGRAM_ID="+req.GetReservationProgramId())
}
res, err := runCmd(cmd)
if err != nil {
q.log.Error("Failed to withdraw seat", "device", req.GetDevicePubkey(), "output", res.GetOutput())
return res, fmt.Errorf("failed to withdraw seat on device %s: %w", req.GetDevicePubkey(), err)
}
q.log.Debug("Seat withdrawal successful", "device", req.GetDevicePubkey(), "output", res.GetOutput())
return res, nil
}

type StatusResponse struct {
Response struct {
TunnelName string `json:"tunnel_name"`
Expand Down
20 changes: 20 additions & 0 deletions e2e/proto/qa/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ service QAAgentService {
rpc MulticastLeave(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc MulticastReport(MulticastReportRequest) returns (MulticastReportResult);
rpc MulticastSend(MulticastSendRequest) returns (google.protobuf.Empty);
rpc FeedEnable(google.protobuf.Empty) returns (Result);
rpc FeedSeatPay(FeedSeatPayRequest) returns (Result);
rpc FeedSeatWithdraw(FeedSeatWithdrawRequest) returns (Result);
}

message ConnectUnicastRequest {
Expand Down Expand Up @@ -203,3 +206,20 @@ message MulticastSendRequest {
uint32 port = 2;
uint32 duration = 3; // in seconds
}

message FeedSeatPayRequest {
string device_pubkey = 1;
string client_ip = 2;
string amount = 3;
bool instant = 4;
string solana_rpc_url = 5;
string reservation_program_id = 6;
}

message FeedSeatWithdrawRequest {
string device_pubkey = 1;
string client_ip = 2;
bool instant = 3;
string solana_rpc_url = 4;
string reservation_program_id = 5;
}
Loading
Loading