Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -2312,15 +2312,21 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
logtrace.Debug(ctx, "Iterate batch store: dispatching to nodes", logtrace.Fields{"task_id": id, "nodes": len(knownNodes)})

// If there are no candidate nodes, there's nothing to fan out to. The caller
// already persisted the batch locally (see StoreBatch), so treat this as a
// no-op success rather than an error.
// already persisted the batch locally (see StoreBatch), but local-only
// persistence is not sufficient for network durability. Treat this as an
// error so callers do not finalize actions or delete source data under the
// assumption that replication occurred.
if len(knownNodes) == 0 {
logtrace.Info(ctx, "dht: batch store skipped (no candidate nodes)", logtrace.Fields{
logtrace.FieldModule: "dht",
"task_id": id,
"keys": len(values),
logtrace.Error(ctx, "dht: batch store skipped (no candidate nodes)", logtrace.Fields{
logtrace.FieldModule: "dht",
"task_id": id,
"keys": len(values),
"len_nodes": len(s.ht.nodes()),
"banned_nodes": len(ignoreList),
"routing_allow_ready": s.routingAllowReady.Load(),
"routing_allow_count": s.routingAllowCount.Load(),
})
return nil
return fmt.Errorf("no candidate nodes for batch store")
}
storeResponses := s.batchStoreNetwork(ctx, values, knownNodes, storageMap, typ)
for response := range storeResponses {
Expand Down
34 changes: 34 additions & 0 deletions p2p/kademlia/dht_batch_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package kademlia

import (
"context"
"strings"
"testing"
)

func TestIterateBatchStore_NoCandidateNodes_ReturnsError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ht, err := NewHashTable(&Options{
ID: []byte("self"),
IP: "0.0.0.0",
Port: 4445,
})
if err != nil {
t.Fatalf("NewHashTable: %v", err)
}

dht := &DHT{
ht: ht,
ignorelist: NewBanList(ctx),
}

err = dht.IterateBatchStore(ctx, [][]byte{[]byte("value")}, 0, "task")
if err == nil {
t.Fatalf("expected error, got nil")
}
if !strings.Contains(err.Error(), "no candidate nodes") {
t.Fatalf("unexpected error: %v", err)
}
}
10 changes: 10 additions & 0 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ func (s *p2p) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by
return s.dht.Retrieve(ctx, key, localOnly...)
}

// PeersCount returns the current number of peers in the routing table.
// This is intentionally not part of the public Client interface; it is used by
// internal guardrails (e.g., registration) to avoid local-only stores.
func (s *p2p) PeersCount() int {
if s == nil || s.dht == nil {
return 0
}
return s.dht.PeersCount()
}

// BatchRetrieve retrive the data from the kademlia network
func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) {

Expand Down
24 changes: 24 additions & 0 deletions p2p/p2p_peerscount_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package p2p

import "testing"

type peersCounter interface {
PeersCount() int
}

var _ peersCounter = (*p2p)(nil)

func TestPeersCount_NilReceiver_ReturnsZero(t *testing.T) {
var s *p2p
if got := s.PeersCount(); got != 0 {
t.Fatalf("expected 0, got %d", got)
}
}

func TestPeersCount_NilDHT_ReturnsZero(t *testing.T) {
s := &p2p{}
if got := s.PeersCount(); got != 0 {
t.Fatalf("expected 0, got %d", got)
}
}

12 changes: 12 additions & 0 deletions supernode/adaptors/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ type StoreArtefactsRequest struct {
}

func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error {
// Registration must never proceed when the node is not connected to any peers.
// Otherwise, StoreBatch can devolve into local-only persistence and actions may
// be finalized without durable replication.
type peersCounter interface {
PeersCount() int
}
if pc, ok := p.p2p.(peersCounter); ok {
if peers := pc.PeersCount(); peers <= 0 {
return fmt.Errorf("p2p has zero peers; refusing to store artefacts (would be non-durable)")
}
}

idFilesBytes := totalBytes(req.IDFiles)
logtrace.Info(ctx, "store: p2p start", logtrace.Fields{
"taskID": req.TaskID,
Expand Down
54 changes: 54 additions & 0 deletions supernode/adaptors/p2p_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package adaptors

import (
"context"
"errors"
"strings"
"testing"

"github.com/LumeraProtocol/supernode/v2/p2p"
"github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore"
"go.uber.org/mock/gomock"
)

type clientWithPeersCount struct {
p2p.Client
peers int
}

func (c clientWithPeersCount) PeersCount() int { return c.peers }

func TestStoreArtefacts_ZeroPeers_ReturnsError(t *testing.T) {
svc := NewP2PService(clientWithPeersCount{peers: 0}, nil)

err := svc.StoreArtefacts(context.Background(), StoreArtefactsRequest{TaskID: "task"}, nil)
if err == nil {
t.Fatalf("expected error, got nil")
}
if !strings.Contains(err.Error(), "zero peers") {
t.Fatalf("unexpected error: %v", err)
}
}

func TestStoreArtefacts_PeersPresent_DoesNotTripGuard(t *testing.T) {
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)

store := rqstore.NewMockStore(ctrl)
storeErr := errors.New("store down")
store.EXPECT().StoreSymbolDirectory("task", "").Return(storeErr)

svc := NewP2PService(clientWithPeersCount{peers: 1}, store)

err := svc.StoreArtefacts(context.Background(), StoreArtefactsRequest{TaskID: "task"}, nil)
if err == nil {
t.Fatalf("expected error, got nil")
}
if strings.Contains(err.Error(), "zero peers") {
t.Fatalf("guard should not have fired, got: %v", err)
}
if !strings.Contains(err.Error(), storeErr.Error()) {
t.Fatalf("expected wrapped store error, got: %v", err)
}
}