diff --git a/cmd/config/default.go b/cmd/config/default.go index 6da11ce2cc..6da72e8812 100644 --- a/cmd/config/default.go +++ b/cmd/config/default.go @@ -36,6 +36,8 @@ var defaultConfig = harmonyconfig.HarmonyConfig{ DiscConcurrency: nodeconfig.DefaultP2PConcurrency, MaxConnsPerIP: nodeconfig.DefaultMaxConnPerIP, DisablePrivateIPScan: false, + PeerScoreRetention: nodeconfig.DefaultPeerScoreRetention, + PeerMinScore: nodeconfig.DefaultPeerMinScore, MaxPeers: nodeconfig.DefaultMaxPeers, ConnManagerLowWatermark: nodeconfig.DefaultConnManagerLowWatermark, ConnManagerHighWatermark: nodeconfig.DefaultConnManagerHighWatermark, diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 2bca6f3847..751748f078 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -609,6 +609,8 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, DiscConcurrency: hc.P2P.DiscConcurrency, MaxConnPerIP: hc.P2P.MaxConnsPerIP, DisablePrivateIPScan: hc.P2P.DisablePrivateIPScan, + PeerScoreRetention: hc.P2P.PeerScoreRetention, + PeerMinScore: hc.P2P.PeerMinScore, MaxPeers: hc.P2P.MaxPeers, ConnManagerLowWatermark: hc.P2P.ConnManagerLowWatermark, ConnManagerHighWatermark: hc.P2P.ConnManagerHighWatermark, diff --git a/internal/configs/bootnode/bootnode.go b/internal/configs/bootnode/bootnode.go index 71c9186a7d..31b7639502 100644 --- a/internal/configs/bootnode/bootnode.go +++ b/internal/configs/bootnode/bootnode.go @@ -76,6 +76,8 @@ type P2pConfig struct { DiscConcurrency int // Discovery Concurrency value MaxConnsPerIP int DisablePrivateIPScan bool + PeerScoreRetention time.Duration + PeerMinScore float64 MaxPeers int64 // In order to disable Connection Manager, it only needs to // set both the high and low watermarks to zero. In this way, diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index 1232703445..065a2d14ff 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -121,6 +121,8 @@ type P2pConfig struct { DiscConcurrency int // Discovery Concurrency value MaxConnsPerIP int DisablePrivateIPScan bool + PeerScoreRetention time.Duration + PeerMinScore float64 MaxPeers int64 // In order to disable Connection Manager, it only needs to // set both the high and low watermarks to zero. In this way, diff --git a/internal/configs/node/network.go b/internal/configs/node/network.go index 2b401968e3..73690b2742 100644 --- a/internal/configs/node/network.go +++ b/internal/configs/node/network.go @@ -69,6 +69,11 @@ const ( DefaultP2PConcurrency = 0 // DefaultMaxConnPerIP is the maximum number of connections to/from a remote IP DefaultMaxConnPerIP = 10 + // DefaultPeerScoreRetention controls how long peer score records are kept. + // Zero disables peer score persistence and score-based connection gating. + DefaultPeerScoreRetention = time.Duration(0) + // DefaultPeerMinScore is the minimum score accepted by peer score gating. + DefaultPeerMinScore = 0.0 // DefaultMaxPeers is the maximum number of remote peers, with 0 representing no limit DefaultMaxPeers = 0 // DefaultConnManagerLowWatermark is the lowest number of connections that'll be maintained in connection manager diff --git a/p2p/host.go b/p2p/host.go index c89fbc9189..84d0f606c8 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -1,6 +1,7 @@ package p2p import ( + "container/list" "context" "encoding/binary" "fmt" @@ -133,6 +134,8 @@ type HostConfig struct { DiscConcurrency int MaxConnPerIP int DisablePrivateIPScan bool + PeerScoreRetention time.Duration + PeerMinScore float64 MaxPeers int64 ConnManagerLowWatermark int ConnManagerHighWatermark int @@ -168,6 +171,7 @@ func init() { trustedPeersAddedCounter, trustedPeersDnsResolvedCounter, trustedPeersConnectFailuresCounter, + peerScoreGaugeVec, ) } @@ -196,8 +200,88 @@ var ( Name: "trusted_peer_connect_failures_total", Help: "Total number of failed attempts to establish P2P host-level connections with trusted peers", }) + peerScoreGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "hmy", + Subsystem: "p2p", + Name: "peer_score", + Help: "current peer score by local instance and remote peer", + }, + []string{"local_p2p_id", "remote_p2p_id"}, + ) + peerScoreMetricMu sync.Mutex + peerScoreMetricList = list.New() + peerScoreMetricIdx = make(map[peerScoreMetricKey]*list.Element) ) +const maxPeerScoreMetricSeries = 2048 + +type peerScoreMetricKey struct { + localPeerID string + remotePeerID string +} + +// SetPeerScoreMetric sets the canonical P2P peer score metric. +func SetPeerScoreMetric(localPeerID, remotePeerID string, score float64) { + if localPeerID == "" || remotePeerID == "" { + return + } + key := peerScoreMetricKey{ + localPeerID: localPeerID, + remotePeerID: remotePeerID, + } + + peerScoreMetricMu.Lock() + var evicted *peerScoreMetricKey + if elem, exists := peerScoreMetricIdx[key]; exists { + peerScoreMetricList.MoveToBack(elem) + } else { + if len(peerScoreMetricIdx) >= maxPeerScoreMetricSeries { + if oldest := peerScoreMetricList.Front(); oldest != nil { + oldestKey := oldest.Value.(peerScoreMetricKey) + evicted = &oldestKey + delete(peerScoreMetricIdx, oldestKey) + peerScoreMetricList.Remove(oldest) + } + } + elem := peerScoreMetricList.PushBack(key) + peerScoreMetricIdx[key] = elem + } + peerScoreMetricMu.Unlock() + + if evicted != nil { + peerScoreGaugeVec.Delete(prometheus.Labels{ + "local_p2p_id": evicted.localPeerID, + "remote_p2p_id": evicted.remotePeerID, + }) + } + + peerScoreGaugeVec.With(prometheus.Labels{ + "local_p2p_id": localPeerID, + "remote_p2p_id": remotePeerID, + }).Set(score) +} + +func DeletePeerScoreMetric(localPeerID, remotePeerID string) { + if localPeerID == "" || remotePeerID == "" { + return + } + key := peerScoreMetricKey{ + localPeerID: localPeerID, + remotePeerID: remotePeerID, + } + peerScoreMetricMu.Lock() + if elem, exists := peerScoreMetricIdx[key]; exists { + delete(peerScoreMetricIdx, key) + peerScoreMetricList.Remove(elem) + } + peerScoreMetricMu.Unlock() + peerScoreGaugeVec.Delete(prometheus.Labels{ + "local_p2p_id": localPeerID, + "remote_p2p_id": remotePeerID, + }) +} + // NewHost .. func NewHost(cfg HostConfig) (Host, error) { var ( @@ -233,9 +317,7 @@ func NewHost(cfg HostConfig) (Host, error) { if err != nil { return nil, fmt.Errorf("failed to open peerstore: %w", err) } - var scoreRetention time.Duration - // TODO: add scoreRetention to configs (for now, it is zero and so, peer scoring is disabled) - scoreRetention = 0 + scoreRetention := cfg.PeerScoreRetention logger := log.New() ps, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, basePs, datastore, scoreRetention) if err != nil { @@ -256,6 +338,9 @@ func NewHost(cfg HostConfig) (Host, error) { // Prevent dialing of public addresses connGtr = gating.AddBlocking(connGtr, cfg.DisablePrivateIPScan) } + if scoreRetention > 0 { + connGtr = gating.AddScoring(connGtr, ps, cfg.PeerMinScore) + } connGtr = gating.AddBanExpiry(connGtr, ps, clock.SystemClock) connGtr = gating.AddMetering(connGtr) @@ -1295,6 +1380,7 @@ func (host *HostV2) Connected(net libp2p_network.Network, conn libp2p_network.Co // called when a connection closed func (host *HostV2) Disconnected(net libp2p_network.Network, conn libp2p_network.Conn) { host.logger.Debug().Interface("node", conn.RemotePeer()).Msg("peer disconnected") + DeletePeerScoreMetric(host.GetID().String(), conn.RemotePeer().String()) for _, function := range host.onDisconnects.GetAll() { if err := function(conn); err != nil { diff --git a/p2p/store/iface.go b/p2p/store/iface.go index a83b5ae06e..8ab1172921 100644 --- a/p2p/store/iface.go +++ b/p2p/store/iface.go @@ -83,6 +83,22 @@ type PeerScores struct { ReqResp ReqRespScores `json:"reqResp"` } +const ( + // Req/resp contribution weights for the combined peer score. + peerScoreValidResponseWeight = 1.0 + peerScoreErrorResponsePenalty = 1.0 + peerScoreRejectedPayloadPenalty = 2.0 +) + +// ComputePeerScore returns the combined peer score used by gating and metrics. +// It combines gossip score and req/resp behavior with explicit penalties. +func ComputePeerScore(scores PeerScores) float64 { + return scores.Gossip.Total + + (scores.ReqResp.ValidResponses * peerScoreValidResponseWeight) - + (scores.ReqResp.ErrorResponses * peerScoreErrorResponsePenalty) - + (scores.ReqResp.RejectedPayloads * peerScoreRejectedPayloadPenalty) +} + // ScoreDatastore defines a type-safe API for getting and setting libp2p peer score information type ScoreDatastore interface { // GetPeerScores returns the current scores for the specified peer diff --git a/p2p/store/scorebook.go b/p2p/store/scorebook.go index 170053b862..d1cde9a004 100644 --- a/p2p/store/scorebook.go +++ b/p2p/store/scorebook.go @@ -85,7 +85,7 @@ func (d *scoreBook) GetPeerScore(id peer.ID) (float64, error) { if err != nil { return 0, err } - return scores.Gossip.Total, nil + return ComputePeerScore(scores), nil } func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) (PeerScores, error) { diff --git a/p2p/store/scorebook_test.go b/p2p/store/scorebook_test.go index 3a4d9887cb..2d10e8d31d 100644 --- a/p2p/store/scorebook_test.go +++ b/p2p/store/scorebook_test.go @@ -274,7 +274,7 @@ func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expecte score, err := store.GetPeerScore(id) require.NoError(t, err) - require.Equal(t, expected.Gossip.Total, score) + require.Equal(t, ComputePeerScore(expected), score) } func createMemoryStore(t *testing.T) ExtendedPeerstore { diff --git a/p2p/stream/protocols/sync/client.go b/p2p/stream/protocols/sync/client.go index 7f551b5a12..893c314e1a 100644 --- a/p2p/stream/protocols/sync/client.go +++ b/p2p/stream/protocols/sync/client.go @@ -2,6 +2,7 @@ package sync import ( "context" + stderrs "errors" "fmt" "strconv" "strings" @@ -10,18 +11,85 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/store" + "github.com/harmony-one/harmony/p2p/stream/common/requestmanager" "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" syncpb "github.com/harmony-one/harmony/p2p/stream/protocols/sync/message" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + libp2p_peer "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" protobuf "google.golang.org/protobuf/proto" ) +const ( + peerScoreValidResponsesCap = 100 + peerScoreErrorResponsesCap = 100 + peerScoreRejectedPayloadsCap = 100 +) + +func (p *Protocol) trackRequestScore(stid sttypes.StreamID, reqErr error) { + if p == nil || p.config.Host == nil || stid == "" { + return + } + p2pHost := p.config.Host.GetP2PHost() + if p2pHost == nil || p2pHost.Peerstore() == nil { + return + } + scoreStore, ok := p2pHost.Peerstore().(store.ScoreDatastore) + if !ok || scoreStore == nil { + return + } + + var diff store.ScoreDiff + if reqErr == nil { + diff = store.IncrementValidResponses{Cap: peerScoreValidResponsesCap} + } else { + diff = classifyRequestScoreDiff(reqErr) + } + if diff == nil { + return + } + + scores, err := scoreStore.SetScore(libp2p_peer.ID(stid), diff) + if err != nil { + p.logger.Debug().Err(err).Str("streamID", string(stid)).Msg("failed to update peer score") + return + } + + p2p.SetPeerScoreMetric(p.config.Host.GetID().String(), string(stid), store.ComputePeerScore(scores)) +} + +func classifyRequestScoreDiff(reqErr error) store.ScoreDiff { + // Caller cancellation should not penalize peers. + if stderrs.Is(reqErr, context.Canceled) { + return nil + } + + switch requestmanager.ClassifyRequestError(reqErr) { + case requestmanager.RequestErrorSkip: + return nil + case requestmanager.RequestErrorCritical: + return store.IncrementRejectedPayloads{Cap: peerScoreRejectedPayloadsCap} + } + + // RequestErrorLow and parser-level failures: separate malformed payloads from transient errors. + lower := strings.ToLower(reqErr.Error()) + if strings.Contains(lower, "not sync response") || + strings.Contains(lower, "response not ") || + strings.Contains(lower, "commit sigs size not expected") || + strings.Contains(lower, "rlp") { + return store.IncrementRejectedPayloads{Cap: peerScoreRejectedPayloadsCap} + } + return store.IncrementErrorResponses{Cap: peerScoreErrorResponsesCap} +} + // GetBlocksByNumber do getBlocksByNumberRequest through sync stream protocol. // Return the block as result, target stream id, and error func (p *Protocol) GetBlocksByNumber(ctx context.Context, bns []uint64, opts ...Option) (blocks []*types.Block, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getBlocksByNumber") defer p.doMetricPostClientRequest("getBlocksByNumber", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if len(bns) == 0 { err = fmt.Errorf("zero block numbers requested") @@ -48,6 +116,7 @@ func (p *Protocol) GetBlocksByNumber(ctx context.Context, bns []uint64, opts ... func (p *Protocol) GetRawBlocksByNumber(ctx context.Context, bns []uint64, opts ...Option) (blockBytes [][]byte, sigBytes [][]byte, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getBlocksByNumber") defer p.doMetricPostClientRequest("getBlocksByNumber", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if len(bns) == 0 { err = fmt.Errorf("zero block numbers requested") @@ -79,6 +148,7 @@ func (p *Protocol) GetRawBlocksByNumber(ctx context.Context, bns []uint64, opts func (p *Protocol) GetCurrentBlockNumber(ctx context.Context, opts ...Option) (bn uint64, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getBlockNumber") defer p.doMetricPostClientRequest("getBlockNumber", err, timer) + defer func() { p.trackRequestScore(stid, err) }() req := newGetBlockNumberRequest() @@ -96,6 +166,7 @@ func (p *Protocol) GetCurrentBlockNumber(ctx context.Context, opts ...Option) (b func (p *Protocol) GetBlockHashes(ctx context.Context, bns []uint64, opts ...Option) (hashes []common.Hash, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getBlockHashes") defer p.doMetricPostClientRequest("getBlockHashes", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if len(bns) == 0 { err = fmt.Errorf("zero block numbers requested") @@ -119,6 +190,7 @@ func (p *Protocol) GetBlockHashes(ctx context.Context, bns []uint64, opts ...Opt func (p *Protocol) GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...Option) (blocks []*types.Block, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getBlocksByHashes") defer p.doMetricPostClientRequest("getBlocksByHashes", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if len(hs) == 0 { err = fmt.Errorf("zero block hashes requested") @@ -141,6 +213,7 @@ func (p *Protocol) GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts func (p *Protocol) GetRawBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...Option) (blockBytes [][]byte, sigBytes [][]byte, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getBlocksByHashes") defer p.doMetricPostClientRequest("getBlocksByHashes", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if len(hs) == 0 { err = fmt.Errorf("empty block hashes requested") @@ -173,6 +246,7 @@ func (p *Protocol) GetRawBlocksByHashes(ctx context.Context, hs []common.Hash, o func (p *Protocol) GetReceipts(ctx context.Context, hs []common.Hash, opts ...Option) (receipts []types.Receipts, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getReceipts") defer p.doMetricPostClientRequest("getReceipts", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if len(hs) == 0 { err = fmt.Errorf("zero receipt hashes requested") @@ -196,6 +270,7 @@ func (p *Protocol) GetReceipts(ctx context.Context, hs []common.Hash, opts ...Op func (p *Protocol) GetNodeData(ctx context.Context, hs []common.Hash, opts ...Option) (data [][]byte, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getNodeData") defer p.doMetricPostClientRequest("getNodeData", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if len(hs) == 0 { err = fmt.Errorf("zero node data hashes requested") @@ -219,6 +294,7 @@ func (p *Protocol) GetNodeData(ctx context.Context, hs []common.Hash, opts ...Op func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getAccountRange") defer p.doMetricPostClientRequest("getAccountRange", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if bytes == 0 { err = fmt.Errorf("zero account ranges bytes requested") @@ -242,6 +318,7 @@ func (p *Protocol) GetAccountRange(ctx context.Context, root common.Hash, origin func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getStorageRanges") defer p.doMetricPostClientRequest("getStorageRanges", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if bytes == 0 { err = fmt.Errorf("zero storage ranges bytes requested") @@ -277,6 +354,7 @@ func (p *Protocol) GetStorageRanges(ctx context.Context, root common.Hash, accou func (p *Protocol) GetByteCodes(ctx context.Context, hs []common.Hash, bytes uint64, opts ...Option) (codes [][]byte, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getByteCodes") defer p.doMetricPostClientRequest("getByteCodes", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if bytes == 0 { err = fmt.Errorf("zero bytecode bytes requested") @@ -304,6 +382,7 @@ func (p *Protocol) GetByteCodes(ctx context.Context, hs []common.Hash, bytes uin func (p *Protocol) GetTrieNodes(ctx context.Context, root common.Hash, paths []*message.TrieNodePathSet, bytes uint64, opts ...Option) (nodes [][]byte, stid sttypes.StreamID, err error) { timer := p.doMetricClientRequest("getTrieNodes") defer p.doMetricPostClientRequest("getTrieNodes", err, timer) + defer func() { p.trackRequestScore(stid, err) }() if bytes == 0 { err = fmt.Errorf("zero trie nodes bytes requested")