diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 04b95f8c..41cba389 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -2312,15 +2312,21 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i logtrace.Debug(ctx, "Iterate batch store: dispatching to nodes", logtrace.Fields{"task_id": id, "nodes": len(knownNodes)}) // If there are no candidate nodes, there's nothing to fan out to. The caller - // already persisted the batch locally (see StoreBatch), so treat this as a - // no-op success rather than an error. + // already persisted the batch locally (see StoreBatch), but local-only + // persistence is not sufficient for network durability. Treat this as an + // error so callers do not finalize actions or delete source data under the + // assumption that replication occurred. if len(knownNodes) == 0 { - logtrace.Info(ctx, "dht: batch store skipped (no candidate nodes)", logtrace.Fields{ - logtrace.FieldModule: "dht", - "task_id": id, - "keys": len(values), + logtrace.Error(ctx, "dht: batch store skipped (no candidate nodes)", logtrace.Fields{ + logtrace.FieldModule: "dht", + "task_id": id, + "keys": len(values), + "len_nodes": len(s.ht.nodes()), + "banned_nodes": len(ignoreList), + "routing_allow_ready": s.routingAllowReady.Load(), + "routing_allow_count": s.routingAllowCount.Load(), }) - return nil + return fmt.Errorf("no candidate nodes for batch store") } storeResponses := s.batchStoreNetwork(ctx, values, knownNodes, storageMap, typ) for response := range storeResponses { diff --git a/p2p/kademlia/dht_batch_store_test.go b/p2p/kademlia/dht_batch_store_test.go new file mode 100644 index 00000000..25965b3c --- /dev/null +++ b/p2p/kademlia/dht_batch_store_test.go @@ -0,0 +1,34 @@ +package kademlia + +import ( + "context" + "strings" + "testing" +) + +func TestIterateBatchStore_NoCandidateNodes_ReturnsError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ht, err := NewHashTable(&Options{ + ID: []byte("self"), + IP: "0.0.0.0", + Port: 4445, + }) + if err != nil { + t.Fatalf("NewHashTable: %v", err) + } + + dht := &DHT{ + ht: ht, + ignorelist: NewBanList(ctx), + } + + err = dht.IterateBatchStore(ctx, [][]byte{[]byte("value")}, 0, "task") + if err == nil { + t.Fatalf("expected error, got nil") + } + if !strings.Contains(err.Error(), "no candidate nodes") { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/p2p/p2p.go b/p2p/p2p.go index 584fd787..f7387bce 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -139,6 +139,16 @@ func (s *p2p) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by return s.dht.Retrieve(ctx, key, localOnly...) } +// PeersCount returns the current number of peers in the routing table. +// This is intentionally not part of the public Client interface; it is used by +// internal guardrails (e.g., registration) to avoid local-only stores. +func (s *p2p) PeersCount() int { + if s == nil || s.dht == nil { + return 0 + } + return s.dht.PeersCount() +} + // BatchRetrieve retrive the data from the kademlia network func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) { diff --git a/p2p/p2p_peerscount_test.go b/p2p/p2p_peerscount_test.go new file mode 100644 index 00000000..de3e0dfd --- /dev/null +++ b/p2p/p2p_peerscount_test.go @@ -0,0 +1,24 @@ +package p2p + +import "testing" + +type peersCounter interface { + PeersCount() int +} + +var _ peersCounter = (*p2p)(nil) + +func TestPeersCount_NilReceiver_ReturnsZero(t *testing.T) { + var s *p2p + if got := s.PeersCount(); got != 0 { + t.Fatalf("expected 0, got %d", got) + } +} + +func TestPeersCount_NilDHT_ReturnsZero(t *testing.T) { + s := &p2p{} + if got := s.PeersCount(); got != 0 { + t.Fatalf("expected 0, got %d", got) + } +} + diff --git a/supernode/adaptors/p2p.go b/supernode/adaptors/p2p.go index 2900652d..036ec24a 100644 --- a/supernode/adaptors/p2p.go +++ b/supernode/adaptors/p2p.go @@ -45,6 +45,18 @@ type StoreArtefactsRequest struct { } func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error { + // Registration must never proceed when the node is not connected to any peers. + // Otherwise, StoreBatch can devolve into local-only persistence and actions may + // be finalized without durable replication. + type peersCounter interface { + PeersCount() int + } + if pc, ok := p.p2p.(peersCounter); ok { + if peers := pc.PeersCount(); peers <= 0 { + return fmt.Errorf("p2p has zero peers; refusing to store artefacts (would be non-durable)") + } + } + idFilesBytes := totalBytes(req.IDFiles) logtrace.Info(ctx, "store: p2p start", logtrace.Fields{ "taskID": req.TaskID, diff --git a/supernode/adaptors/p2p_test.go b/supernode/adaptors/p2p_test.go new file mode 100644 index 00000000..66a94d81 --- /dev/null +++ b/supernode/adaptors/p2p_test.go @@ -0,0 +1,54 @@ +package adaptors + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/LumeraProtocol/supernode/v2/p2p" + "github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore" + "go.uber.org/mock/gomock" +) + +type clientWithPeersCount struct { + p2p.Client + peers int +} + +func (c clientWithPeersCount) PeersCount() int { return c.peers } + +func TestStoreArtefacts_ZeroPeers_ReturnsError(t *testing.T) { + svc := NewP2PService(clientWithPeersCount{peers: 0}, nil) + + err := svc.StoreArtefacts(context.Background(), StoreArtefactsRequest{TaskID: "task"}, nil) + if err == nil { + t.Fatalf("expected error, got nil") + } + if !strings.Contains(err.Error(), "zero peers") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestStoreArtefacts_PeersPresent_DoesNotTripGuard(t *testing.T) { + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + store := rqstore.NewMockStore(ctrl) + storeErr := errors.New("store down") + store.EXPECT().StoreSymbolDirectory("task", "").Return(storeErr) + + svc := NewP2PService(clientWithPeersCount{peers: 1}, store) + + err := svc.StoreArtefacts(context.Background(), StoreArtefactsRequest{TaskID: "task"}, nil) + if err == nil { + t.Fatalf("expected error, got nil") + } + if strings.Contains(err.Error(), "zero peers") { + t.Fatalf("guard should not have fired, got: %v", err) + } + if !strings.Contains(err.Error(), storeErr.Error()) { + t.Fatalf("expected wrapped store error, got: %v", err) + } +} +