From 7fa78b843b858e4a60617388e39a7385681f5694 Mon Sep 17 00:00:00 2001 From: "F." Date: Wed, 25 Feb 2026 00:57:31 +0100 Subject: [PATCH] refactor(transport): node-aware HTTP requests with helpers + safer response cleanup - Introduce resolveBaseURL, newNodeRequest, and doTrusted to build node-aware requests and centralize transport behavior. - Migrate ForwardGet/Set/Remove, Health, FetchMerkle, and ListKeys to the new helpers. - Drain and close HTTP response bodies to improve connection reuse and avoid leaks. - hypercache: add fallback size estimation for backends without a serializer. - cluster: derive node ID using binary.LittleEndian instead of manual bit shifts. - Style: minor formatting/tidy changes across non-functional areas. build: bump Go toolchain to 1.26.0; update golangci-lint to v2.10.1 and buf to 1.65.0 chore(deps): upgrade modules (e.g., fiber v3.1.0, sectools v1.2.3, x/{crypto,net,sys,text}); refresh go.sum --- .golangci.yaml | 2 +- .pre-commit/golangci-lint-hook | 2 +- .pre-commit/unit-test-hook | 2 +- .project-settings.env | 6 +- Makefile | 9 +- go.mod | 20 +- go.sum | 40 +-- hypercache.go | 3 + internal/cluster/node.go | 6 +- pkg/backend/dist_http_transport.go | 235 +++++++++++++----- pkg/backend/dist_latency.go | 1 + pkg/backend/dist_memory.go | 1 + pkg/backend/dist_transport.go | 1 + pkg/backend/inmemory.go | 1 + pkg/backend/redis.go | 5 + pkg/cache/item.go | 1 + pkg/cache/v2/cmap_test.go | 1 + pkg/cache/v2/item.go | 1 + pkg/eviction/arc_test.go | 1 + pkg/eviction/cawolfu.go | 2 + service.go | 1 + ...ache_distmemory_heartbeat_sampling_test.go | 1 + tests/hypercache_distmemory_heartbeat_test.go | 3 +- ...cache_distmemory_remove_readrepair_test.go | 9 + ...hypercache_distmemory_stale_quorum_test.go | 1 + tests/hypercache_http_merkle_test.go | 1 + tests/hypercache_mgmt_dist_test.go | 3 +- tests/integration/dist_phase1_test.go | 4 +- .../dist_rebalance_replica_diff_test.go | 1 + tests/integration/dist_rebalance_test.go | 35 +++ 30 files changed, 290 insertions(+), 109 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index 7839fab..190190d 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -42,7 +42,7 @@ run: # Define the Go version limit. # Mainly related to generics support since go1.18. # Default: use Go version from the go.mod file, fallback on the env var `GOVERSION`, fallback on 1.17 - go: "1.25.5" + go: "1.26.0" linters: # Enable specific linter diff --git a/.pre-commit/golangci-lint-hook b/.pre-commit/golangci-lint-hook index 765ce26..8c91ece 100755 --- a/.pre-commit/golangci-lint-hook +++ b/.pre-commit/golangci-lint-hook @@ -23,7 +23,7 @@ if [[ -f "${ROOT_DIR}/.project-settings.env" ]]; then # shellcheck disable=SC1090 source "${ROOT_DIR}/.project-settings.env" fi -GOLANGCI_LINT_VERSION="${GOLANGCI_LINT_VERSION:-v2.7.2}" +GOLANGCI_LINT_VERSION="${GOLANGCI_LINT_VERSION:-v2.10.1}" # ####################################### # Install dependencies to run the pre-commit hook diff --git a/.pre-commit/unit-test-hook b/.pre-commit/unit-test-hook index ae2a4e9..56d91e1 100755 --- a/.pre-commit/unit-test-hook +++ b/.pre-commit/unit-test-hook @@ -21,7 +21,7 @@ hook() { local root_dir root_dir=$(git rev-parse --show-toplevel) - local toolchain_version="1.25.6" + local toolchain_version="1.26.0" if [[ -f "${root_dir}/.project-settings.env" ]]; then # shellcheck disable=SC1090 source "${root_dir}/.project-settings.env" diff --git a/.project-settings.env b/.project-settings.env index 9377da4..9e6a104 100644 --- a/.project-settings.env +++ b/.project-settings.env @@ -1,5 +1,5 @@ -GOLANGCI_LINT_VERSION=v2.8.0 -BUF_VERSION=v1.63.0 -GO_VERSION=1.25.6 +GOLANGCI_LINT_VERSION=v2.10.1 +BUF_VERSION=v1.65.0 +GO_VERSION=1.26.0 GCI_PREFIX=github.com/hyp3rd/hypercache PROTO_ENABLED=true diff --git a/Makefile b/Makefile index 7cf0805..a6bb1e5 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ include .project-settings.env -GOLANGCI_LINT_VERSION ?= v2.7.2 -BUF_VERSION ?= v1.61.0 -GO_VERSION ?= 1.25.6 +GOLANGCI_LINT_VERSION ?= v2.10.1 +BUF_VERSION ?= v1.65.0 +GO_VERSION ?= 1.26.0 GCI_PREFIX ?= github.com/hyp3rd/hypercache PROTO_ENABLED ?= true @@ -28,8 +28,7 @@ run-example: go run ./__examples/$(group)/*.go $(ARGS) update-deps: - go get -v -u ./... - go mod tidy + go get -u -t ./... && go mod tidy -v && go mod verify prepare-toolchain: prepare-base-tools diff --git a/go.mod b/go.mod index 2dbcd25..b0734b7 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,13 @@ module github.com/hyp3rd/hypercache -go 1.25.6 +go 1.26.0 require ( github.com/cespare/xxhash/v2 v2.3.0 github.com/goccy/go-json v0.10.5 - github.com/gofiber/fiber/v3 v3.0.0 + github.com/gofiber/fiber/v3 v3.1.0 github.com/hyp3rd/ewrap v1.3.7 - github.com/hyp3rd/sectools v1.2.2 + github.com/hyp3rd/sectools v1.2.3 github.com/longbridgeapp/assert v1.1.0 github.com/redis/go-redis/v9 v9.18.0 github.com/shamaton/msgpack/v2 v2.4.0 @@ -21,10 +21,10 @@ require ( github.com/andybalholm/brotli v1.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/gofiber/schema v1.6.0 // indirect - github.com/gofiber/utils/v2 v2.0.0 // indirect + github.com/gofiber/schema v1.7.0 // indirect + github.com/gofiber/utils/v2 v2.0.2 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/klauspost/compress v1.18.3 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -36,10 +36,10 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.69.0 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/crypto v0.47.0 // indirect - golang.org/x/net v0.49.0 // indirect - golang.org/x/sys v0.40.0 // indirect - golang.org/x/text v0.33.0 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/net v0.50.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/text v0.34.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index be2f7ad..2c89f2a 100644 --- a/go.sum +++ b/go.sum @@ -19,22 +19,22 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= -github.com/gofiber/fiber/v3 v3.0.0 h1:GPeCG8X60L42wLKrzgeewDHBr6pE6veAvwaXsqD3Xjk= -github.com/gofiber/fiber/v3 v3.0.0/go.mod h1:kVZiO/AwyT5Pq6PgC8qRCJ+j/BHrMy5jNw1O9yH38aY= -github.com/gofiber/schema v1.6.0 h1:rAgVDFwhndtC+hgV7Vu5ItQCn7eC2mBA4Eu1/ZTiEYY= -github.com/gofiber/schema v1.6.0/go.mod h1:WNZWpQx8LlPSK7ZaX0OqOh+nQo/eW2OevsXs1VZfs/s= -github.com/gofiber/utils/v2 v2.0.0 h1:SCC3rpsEDWupFSHtc0RKxg/BKgV0s1qKfZg9Jv6D0sM= -github.com/gofiber/utils/v2 v2.0.0/go.mod h1:xF9v89FfmbrYqI/bQUGN7gR8ZtXot2jxnZvmAUtiavE= +github.com/gofiber/fiber/v3 v3.1.0 h1:1p4I820pIa+FGxfwWuQZ5rAyX0WlGZbGT6Hnuxt6hKY= +github.com/gofiber/fiber/v3 v3.1.0/go.mod h1:n2nYQovvL9z3Too/FGOfgtERjW3GQcAUqgfoezGBZdU= +github.com/gofiber/schema v1.7.0 h1:yNM+FNRZjyYEli9Ey0AXRBrAY9jTnb+kmGs3lJGPvKg= +github.com/gofiber/schema v1.7.0/go.mod h1:A/X5Ffyru4p9eBdp99qu+nzviHzQiZ7odLT+TwxWhbk= +github.com/gofiber/utils/v2 v2.0.2 h1:ShRRssz0F3AhTlAQcuEj54OEDtWF7+HJDwEi/aa6QLI= +github.com/gofiber/utils/v2 v2.0.2/go.mod h1:+9Ub4NqQ+IaJoTliq5LfdmOJAA/Hzwf4pXOxOa3RrJ0= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hyp3rd/ewrap v1.3.7 h1:3uS7osww2dHI/08/rihGdtEzaSSgdtH4l9HubtMe6Io= github.com/hyp3rd/ewrap v1.3.7/go.mod h1:9IhBgb6LhJDrgVNdx4vZ9SIw6eXxiopjZb2HvbG8pgA= -github.com/hyp3rd/sectools v1.2.2 h1:I6zdSTKSvSOlVd2D6CNSuWAjBZMLMeMMdl5JS3C8smI= -github.com/hyp3rd/sectools v1.2.2/go.mod h1:oK0jWZoPUk4EPsK24n0uIz54552vaJjflVF1VXEKwbE= -github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= -github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/hyp3rd/sectools v1.2.3 h1:XElGIhLOWPJxVLyLPzfKASYjs+3yEkDN48JeSw/Wvjo= +github.com/hyp3rd/sectools v1.2.3/go.mod h1:iwl65boK1VNhwvRNSQDItdD5xon8W1l+ox4JFTe5WbI= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -62,8 +62,8 @@ github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0t github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/shamaton/msgpack/v2 v2.4.0 h1:O5Z08MRmbo0lA9o2xnQ4TXx6teJbPqEurqcCOQ8Oi/4= github.com/shamaton/msgpack/v2 v2.4.0/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI= -github.com/shamaton/msgpack/v3 v3.0.0 h1:xl40uxWkSpwBCSTvS5wyXvJRsC6AcVcYeox9PspKiZg= -github.com/shamaton/msgpack/v3 v3.0.0/go.mod h1:DcQG8jrdrQCIxr3HlMYkiXdMhK+KfN2CitkyzsQV4uc= +github.com/shamaton/msgpack/v3 v3.1.0 h1:jsk0vEAqVvvS9+fTZ5/EcQ9tz860c9pWxJ4Iwecz8gU= +github.com/shamaton/msgpack/v3 v3.1.0/go.mod h1:DcQG8jrdrQCIxr3HlMYkiXdMhK+KfN2CitkyzsQV4uc= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= @@ -92,15 +92,15 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= -golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= -golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= -golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/hypercache.go b/hypercache.go index cc09485..f1e6a8b 100644 --- a/hypercache.go +++ b/hypercache.go @@ -839,6 +839,9 @@ func (hyperCache *HyperCache[T]) setItemSize(item *cache.Item) error { return nil } + + default: + // Fall back to generic size estimation for backends without a serializer. } return item.SetSize() diff --git a/internal/cluster/node.go b/internal/cluster/node.go index 2533a78..5cd4bf8 100644 --- a/internal/cluster/node.go +++ b/internal/cluster/node.go @@ -1,6 +1,7 @@ package cluster import ( + "encoding/binary" "encoding/hex" "errors" "fmt" @@ -23,7 +24,6 @@ const ( // internal constants. const ( nodeIDBytes = 8 - byteShift = 8 // bits per byte for id derivation ) func (s NodeState) String() string { @@ -60,9 +60,7 @@ func NewNode(id, addr string) *Node { hv := xxhash.Sum64String(addr) b := make([]byte, nodeIDBytes) - for i := range nodeIDBytes { - b[i] = byte(hv >> (byteShift * i)) - } + binary.LittleEndian.PutUint64(b, hv) id = hex.EncodeToString(b) } diff --git a/pkg/backend/dist_http_transport.go b/pkg/backend/dist_http_transport.go index f61ece2..0a86f44 100644 --- a/pkg/backend/dist_http_transport.go +++ b/pkg/backend/dist_http_transport.go @@ -3,9 +3,11 @@ package backend import ( "bytes" "context" - "fmt" "io" "net/http" + "net/url" + "strconv" + "strings" "time" "github.com/goccy/go-json" @@ -40,11 +42,6 @@ const ( // ForwardSet sends a Set/Replicate request to a remote node. func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error { //nolint:ireturn - base, ok := t.baseURLFn(nodeID) - if !ok { - return sentinel.ErrBackendNotFound - } - reqBody := httpSetRequest{ Key: item.Key, Value: item.Value, @@ -60,19 +57,31 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item } // prefer canonical endpoint; legacy /internal/cache/set still served - hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, base+"/internal/set", bytes.NewReader(payloadBytes)) + hreq, err := t.newNodeRequest(ctx, http.MethodPost, nodeID, "/internal/set", nil, bytes.NewReader(payloadBytes)) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } hreq.Header.Set("Content-Type", "application/json") - resp, err := t.client.Do(hreq) + resp, err := t.doTrusted(hreq) if err != nil { - return ewrap.Wrap(err, errMsgDoRequest) + return err } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck + defer func() { + _, copyErr := io.Copy(io.Discard, resp.Body) + if copyErr != nil { + // Best-effort drain to keep connections reusable. + _ = copyErr + } + + closeErr := resp.Body.Close() + if closeErr != nil { + // Best-effort close on deferred cleanup. + _ = closeErr + } + }() if resp.StatusCode == http.StatusNotFound { return sentinel.ErrBackendNotFound @@ -92,23 +101,32 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item // ForwardGet fetches a single item from a remote node. func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID, key string) (*cache.Item, bool, error) { //nolint:ireturn - base, ok := t.baseURLFn(nodeID) - if !ok { - return nil, false, sentinel.ErrBackendNotFound - } - // prefer canonical endpoint - hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/internal/get?key=%s", base, key), nil) + hreq, err := t.newNodeRequest(ctx, http.MethodGet, nodeID, "/internal/get", url.Values{ + "key": {key}, + }, nil) if err != nil { return nil, false, ewrap.Wrap(err, errMsgNewRequest) } - resp, err := t.client.Do(hreq) + resp, err := t.doTrusted(hreq) if err != nil { - return nil, false, ewrap.Wrap(err, errMsgDoRequest) + return nil, false, err } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck + defer func() { + _, copyErr := io.Copy(io.Discard, resp.Body) + if copyErr != nil { + // Best-effort drain to keep connections reusable. + _ = copyErr + } + + closeErr := resp.Body.Close() + if closeErr != nil { + // Best-effort close on deferred cleanup. + _ = closeErr + } + }() if resp.StatusCode == http.StatusNotFound { return nil, false, sentinel.ErrBackendNotFound @@ -181,28 +199,33 @@ func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn // ForwardRemove propagates a delete operation to a remote node. func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error { //nolint:ireturn - base, ok := t.baseURLFn(nodeID) - if !ok { - return sentinel.ErrBackendNotFound - } - // prefer canonical endpoint - hreq, err := http.NewRequestWithContext( - ctx, - http.MethodDelete, - fmt.Sprintf("%s/internal/del?key=%s&replicate=%t", base, key, replicate), - nil, - ) + hreq, err := t.newNodeRequest(ctx, http.MethodDelete, nodeID, "/internal/del", url.Values{ + "key": {key}, + "replicate": {strconv.FormatBool(replicate)}, + }, nil) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } - resp, err := t.client.Do(hreq) + resp, err := t.doTrusted(hreq) if err != nil { - return ewrap.Wrap(err, errMsgDoRequest) + return err } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck + defer func() { + _, copyErr := io.Copy(io.Discard, resp.Body) + if copyErr != nil { + // Best-effort drain to keep connections reusable. + _ = copyErr + } + + closeErr := resp.Body.Close() + if closeErr != nil { + // Best-effort close on deferred cleanup. + _ = closeErr + } + }() if resp.StatusCode == http.StatusNotFound { return sentinel.ErrBackendNotFound @@ -217,22 +240,29 @@ func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID, key strin // Health performs a health probe against a remote node. func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { //nolint:ireturn - base, ok := t.baseURLFn(nodeID) - if !ok { - return sentinel.ErrBackendNotFound - } - - hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/health", nil) + hreq, err := t.newNodeRequest(ctx, http.MethodGet, nodeID, "/health", nil, nil) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } - resp, err := t.client.Do(hreq) + resp, err := t.doTrusted(hreq) if err != nil { - return ewrap.Wrap(err, errMsgDoRequest) + return err } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck + defer func() { + _, copyErr := io.Copy(io.Discard, resp.Body) + if copyErr != nil { + // Best-effort drain to keep connections reusable. + _ = copyErr + } + + closeErr := resp.Body.Close() + if closeErr != nil { + // Best-effort close on deferred cleanup. + _ = closeErr + } + }() if resp.StatusCode == http.StatusNotFound { return sentinel.ErrBackendNotFound @@ -251,22 +281,29 @@ func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*Me return nil, errNoTransport } - base, ok := t.baseURLFn(nodeID) - if !ok { - return nil, sentinel.ErrBackendNotFound - } - - hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/internal/merkle", nil) + hreq, err := t.newNodeRequest(ctx, http.MethodGet, nodeID, "/internal/merkle", nil, nil) if err != nil { return nil, ewrap.Wrap(err, errMsgNewRequest) } - resp, err := t.client.Do(hreq) + resp, err := t.doTrusted(hreq) if err != nil { - return nil, ewrap.Wrap(err, errMsgDoRequest) + return nil, err } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck + defer func() { + _, copyErr := io.Copy(io.Discard, resp.Body) + if copyErr != nil { + // Best-effort drain to keep connections reusable. + _ = copyErr + } + + closeErr := resp.Body.Close() + if closeErr != nil { + // Best-effort close on deferred cleanup. + _ = closeErr + } + }() if resp.StatusCode == http.StatusNotFound { return nil, sentinel.ErrBackendNotFound @@ -294,22 +331,29 @@ func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*Me // ListKeys returns all keys from a remote node (expensive; used for tests / anti-entropy fallback). func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID string) ([]string, error) { //nolint:ireturn - base, ok := t.baseURLFn(nodeID) - if !ok { - return nil, sentinel.ErrBackendNotFound - } - - hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/internal/keys", nil) + hreq, err := t.newNodeRequest(ctx, http.MethodGet, nodeID, "/internal/keys", nil, nil) if err != nil { return nil, ewrap.Wrap(err, errMsgNewRequest) } - resp, err := t.client.Do(hreq) + resp, err := t.doTrusted(hreq) if err != nil { - return nil, ewrap.Wrap(err, errMsgDoRequest) + return nil, err } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck + defer func() { + _, copyErr := io.Copy(io.Discard, resp.Body) + if copyErr != nil { + // Best-effort drain to keep connections reusable. + _ = copyErr + } + + closeErr := resp.Body.Close() + if closeErr != nil { + // Best-effort close on deferred cleanup. + _ = closeErr + } + }() if resp.StatusCode >= statusThreshold { return nil, ewrap.Newf("list keys status %d", resp.StatusCode) @@ -328,3 +372,74 @@ func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID string) ([]stri return body.Keys, nil } + +func (t *DistHTTPTransport) resolveBaseURL(nodeID string) (*url.URL, error) { //nolint:ireturn + if t == nil || t.baseURLFn == nil { + return nil, errNoTransport + } + + base, ok := t.baseURLFn(nodeID) + if !ok { + return nil, sentinel.ErrBackendNotFound + } + + parsed, err := url.Parse(base) + if err != nil { + return nil, ewrap.Wrap(err, "parse base url") + } + + if !parsed.IsAbs() || parsed.Host == "" { + return nil, ewrap.Newf("invalid base url for node %q", nodeID) + } + + switch strings.ToLower(parsed.Scheme) { + case "http", "https": + default: + return nil, ewrap.Newf("unsupported base url scheme %q", parsed.Scheme) + } + + return parsed, nil +} + +func (t *DistHTTPTransport) newNodeRequest( + ctx context.Context, + method, nodeID, endpointPath string, + query url.Values, + body io.Reader, +) (*http.Request, error) { + base, err := t.resolveBaseURL(nodeID) + if err != nil { + return nil, err + } + + target, err := url.JoinPath(base.String(), strings.TrimPrefix(endpointPath, "/")) + if err != nil { + return nil, ewrap.Wrap(err, "join base url path") + } + + targetURL, err := url.Parse(target) + if err != nil { + return nil, ewrap.Wrap(err, "parse target url") + } + + if len(query) > 0 { + targetURL.RawQuery = query.Encode() + } + + req, err := http.NewRequestWithContext(ctx, method, targetURL.String(), body) + if err != nil { + return nil, ewrap.Wrap(err, "create new request") + } + + return req, nil +} + +func (t *DistHTTPTransport) doTrusted(hreq *http.Request) (*http.Response, error) { + // URL is built from the internal cluster resolver and scheme/host validated in newNodeRequest. + resp, err := t.client.Do(hreq) // #nosec G704 -- trusted cluster node URL, not user-supplied arbitrary endpoint + if err != nil { + return nil, ewrap.Wrap(err, errMsgDoRequest) + } + + return resp, nil +} diff --git a/pkg/backend/dist_latency.go b/pkg/backend/dist_latency.go index 1a58817..433a00e 100644 --- a/pkg/backend/dist_latency.go +++ b/pkg/backend/dist_latency.go @@ -57,6 +57,7 @@ func (c *distLatencyCollector) observe(op distOp, d time.Duration) { return } } + // +Inf bucket c.buckets[op][len(latencyBuckets)].Add(1) } diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 3e92ad1..b6acb36 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -1310,6 +1310,7 @@ func (dm *DistMemory) fetchAndAdopt(ctx context.Context, nodeID, key string) { if tomb.version >= it.Version { return } + // remote has newer version; clear tombstone (key resurrected intentionally) delete(sh.tombs, key) atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) diff --git a/pkg/backend/dist_transport.go b/pkg/backend/dist_transport.go index e26a1c9..6ccef99 100644 --- a/pkg/backend/dist_transport.go +++ b/pkg/backend/dist_transport.go @@ -51,6 +51,7 @@ func (t *InProcessTransport) ForwardSet(ctx context.Context, nodeID string, item if !ok { return sentinel.ErrBackendNotFound } + // direct apply bypasses ownership check (already routed) b.applySet(ctx, item, replicate) diff --git a/pkg/backend/inmemory.go b/pkg/backend/inmemory.go index d9e1563..17d5340 100644 --- a/pkg/backend/inmemory.go +++ b/pkg/backend/inmemory.go @@ -59,6 +59,7 @@ func (cacheBackend *InMemory) Get(_ context.Context, key string) (*cache.Item, b if !ok { return nil, false } + // return the item return item, true } diff --git a/pkg/backend/redis.go b/pkg/backend/redis.go index 0f14ebc..d371730 100644 --- a/pkg/backend/redis.go +++ b/pkg/backend/redis.go @@ -40,10 +40,12 @@ func NewRedis(redisOptions ...Option[Redis]) (IBackend[Redis], error) { if rb.rdb == nil { return nil, sentinel.ErrNilClient } + // Check if the `capacity` is valid if rb.capacity < 0 { return nil, sentinel.ErrInvalidCapacity } + // Check if the `keysSetName` is empty if rb.keysSetName == "" { rb.keysSetName = constants.RedisBackend @@ -98,6 +100,7 @@ func (cacheBackend *Redis) Get(ctx context.Context, key string) (*cache.Item, bo if !isMember { return nil, false } + // Get a transient item from pool, but clone before returning to caller pooled := cacheBackend.itemPoolManager.Get() @@ -110,11 +113,13 @@ func (cacheBackend *Redis) Get(ctx context.Context, key string) (*cache.Item, bo return nil, false } + // Deserialize the item err = cacheBackend.Serializer.Unmarshal(data, pooled) if err != nil { return nil, false } + // Clone into a new heap object to avoid returning a pooled pointer out := *pooled cacheBackend.itemPoolManager.Put(pooled) diff --git a/pkg/cache/item.go b/pkg/cache/item.go index eb3e48f..9b10a7f 100644 --- a/pkg/cache/item.go +++ b/pkg/cache/item.go @@ -46,6 +46,7 @@ func (m *ItemPoolManager) Put(item *Item) { if item == nil { return } + // Zero the struct to avoid retaining large references across pool reuses *item = Item{} m.pool.Put(item) diff --git a/pkg/cache/v2/cmap_test.go b/pkg/cache/v2/cmap_test.go index 1d80bab..7d88824 100644 --- a/pkg/cache/v2/cmap_test.go +++ b/pkg/cache/v2/cmap_test.go @@ -24,6 +24,7 @@ func TestNew(t *testing.T) { return } + // no hasher field in v2 shards } } diff --git a/pkg/cache/v2/item.go b/pkg/cache/v2/item.go index 2c26ed9..536adf9 100644 --- a/pkg/cache/v2/item.go +++ b/pkg/cache/v2/item.go @@ -51,6 +51,7 @@ func (m *ItemPoolManager) Put(it *Item) { if it == nil { return } + // Zero to avoid retaining references across pool reuses *it = Item{} m.pool.Put(it) diff --git a/pkg/eviction/arc_test.go b/pkg/eviction/arc_test.go index f0e4f62..db93a0c 100644 --- a/pkg/eviction/arc_test.go +++ b/pkg/eviction/arc_test.go @@ -55,6 +55,7 @@ func TestARC_Delete_RemovesResidentAndGhost(t *testing.T) { if _, ok := arc.Get("a"); ok { t.Fatalf("expected 'a' deleted") } + // create a ghost by forcing eviction arc.Set("c", 3) arc.Delete("b") // whether resident or ghost, Delete should handle it diff --git a/pkg/eviction/cawolfu.go b/pkg/eviction/cawolfu.go index d3fd7a1..f491fa5 100644 --- a/pkg/eviction/cawolfu.go +++ b/pkg/eviction/cawolfu.go @@ -73,6 +73,7 @@ func (c *CAWOLFU) Evict() (string, bool) { return evictedKey, true } + // If map/list out of sync, forcibly clean up resetCAWOLFUNode(node) c.nodePool.Put(node) @@ -228,6 +229,7 @@ func (c *CAWOLFU) moveToFront(node *CAWOLFUNode) { if node == nil || node == c.list.head { return } + // Remove node from its current position if node == c.list.tail { c.list.tail = node.prev diff --git a/service.go b/service.go index 94efd10..fc7ffdd 100644 --- a/service.go +++ b/service.go @@ -56,6 +56,7 @@ func ApplyMiddleware(svc Service, mw ...Middleware) Service { for _, m := range mw { svc = m(svc) } + // Return the decorated service return svc } diff --git a/tests/hypercache_distmemory_heartbeat_sampling_test.go b/tests/hypercache_distmemory_heartbeat_sampling_test.go index 0330e60..157df69 100644 --- a/tests/hypercache_distmemory_heartbeat_sampling_test.go +++ b/tests/hypercache_distmemory_heartbeat_sampling_test.go @@ -64,6 +64,7 @@ func TestHeartbeatSamplingAndTransitions(t *testing.T) { //nolint:paralleltest if mfinal.NodesDead == 0 { t.Fatalf("expected at least one dead transition, got 0") } + // ensure membership version advanced beyond initial additions (>= number of transitions + initial upserts) snap := b1.DistMembershipSnapshot() verAny := snap["version"] diff --git a/tests/hypercache_distmemory_heartbeat_test.go b/tests/hypercache_distmemory_heartbeat_test.go index 5908db0..a48d08b 100644 --- a/tests/hypercache_distmemory_heartbeat_test.go +++ b/tests/hypercache_distmemory_heartbeat_test.go @@ -11,7 +11,7 @@ import ( // TestDistMemoryHeartbeatLiveness spins up three nodes with a fast heartbeat interval // and validates suspect -> removal transitions plus success/failure metrics. -func TestDistMemoryHeartbeatLiveness(t *testing.T) { //nolint:paralleltest,tparallel +func TestDistMemoryHeartbeatLiveness(t *testing.T) { //nolint:paralleltest interval := 30 * time.Millisecond suspectAfter := 2 * interval deadAfter := 4 * interval @@ -112,6 +112,7 @@ func TestDistMemoryHeartbeatLiveness(t *testing.T) { //nolint:paralleltest,tpara if !sawSuspect { t.Fatalf("node2 never became suspect") } + // ensure removed for _, n := range membership.List() { if n.ID == n2.ID { diff --git a/tests/hypercache_distmemory_remove_readrepair_test.go b/tests/hypercache_distmemory_remove_readrepair_test.go index 51f86e3..60bdce5 100644 --- a/tests/hypercache_distmemory_remove_readrepair_test.go +++ b/tests/hypercache_distmemory_remove_readrepair_test.go @@ -52,6 +52,7 @@ func TestDistMemoryRemoveReplication(t *testing.T) { if err != nil { t.Fatalf("valid: %v", err) } + // write via primary if owners[0] == b1.LocalNodeID() { // local helper we add below err := b1.Set(context.Background(), item) @@ -64,6 +65,7 @@ func TestDistMemoryRemoveReplication(t *testing.T) { t.Fatalf("set: %v", err) } } + // assert item readable from both nodes if _, ok := b1.Get(context.Background(), key); !ok { t.Fatalf("b1 missing pre-remove") @@ -72,6 +74,7 @@ func TestDistMemoryRemoveReplication(t *testing.T) { if _, ok := b2.Get(context.Background(), key); !ok { t.Fatalf("b2 missing pre-remove") } + // remove via primary if owners[0] == b1.LocalNodeID() { err := b1.Remove(context.Background(), key) @@ -110,6 +113,7 @@ func TestDistMemoryReadRepair(t *testing.T) { if err != nil { t.Fatalf("valid: %v", err) } + // write via primary if owners[0] == b1.LocalNodeID() { err := b1.Set(context.Background(), item) @@ -122,6 +126,7 @@ func TestDistMemoryReadRepair(t *testing.T) { t.Fatalf("set: %v", err) } } + // determine replica node (owners[1]) and drop local copy there manually if len(owners) < 2 { t.Skip("replication factor <2") @@ -134,6 +139,7 @@ func TestDistMemoryReadRepair(t *testing.T) { } else { b2.DebugDropLocal(key) } + // ensure dropped locally if replica == b1.LocalNodeID() && b1.LocalContains(key) { t.Fatalf("replica still has key after drop") @@ -142,6 +148,7 @@ func TestDistMemoryReadRepair(t *testing.T) { if replica == b2.LocalNodeID() && b2.LocalContains(key) { t.Fatalf("replica still has key after drop") } + // issue Get from a non-owner node to trigger forwarding, then verify owners repaired. // choose a requester: use node that is neither primary nor replica if possible; with 2 nodes this means primary forwards to replica or // vice versa. @@ -155,6 +162,7 @@ func TestDistMemoryReadRepair(t *testing.T) { if _, ok := requester.Get(context.Background(), key); !ok { t.Fatalf("get for read-repair failed") } + // after forwarding, both owners should have key locally again if owners[0] == b1.LocalNodeID() && !b1.LocalContains(key) { t.Fatalf("primary missing after read repair") @@ -171,6 +179,7 @@ func TestDistMemoryReadRepair(t *testing.T) { if replica == b2.LocalNodeID() && !b2.LocalContains(key) { t.Fatalf("replica missing after read repair") } + // metrics should show at least one read repair var repaired bool if replica == b1.LocalNodeID() { diff --git a/tests/hypercache_distmemory_stale_quorum_test.go b/tests/hypercache_distmemory_stale_quorum_test.go index 0706adc..60c8a81 100644 --- a/tests/hypercache_distmemory_stale_quorum_test.go +++ b/tests/hypercache_distmemory_stale_quorum_test.go @@ -92,6 +92,7 @@ func TestDistMemoryStaleQuorum(t *testing.T) { if !ok { t.Fatalf("quorum get failed") } + // Value stored as interface{} may be string (not []byte) in this test if sval, okCast := got.Value.(string); !okCast || sval != "v2" { t.Fatalf("expected quorum to return newer version v2, got=%v (type %T)", got.Value, got.Value) diff --git a/tests/hypercache_http_merkle_test.go b/tests/hypercache_http_merkle_test.go index b67a671..93cca06 100644 --- a/tests/hypercache_http_merkle_test.go +++ b/tests/hypercache_http_merkle_test.go @@ -66,6 +66,7 @@ func TestHTTPFetchMerkle(t *testing.T) { item := &cache.Item{Key: httpKey(i), Value: []byte("v"), Version: uint64(i + 1), Origin: "n1", LastUpdated: time.Now()} b1.DebugInject(item) } + // ensure HTTP merkle endpoint reachable resp, err := http.Get("http://" + b1.LocalNodeAddr() + "/internal/merkle") if err != nil { diff --git a/tests/hypercache_mgmt_dist_test.go b/tests/hypercache_mgmt_dist_test.go index aae0244..49ebee9 100644 --- a/tests/hypercache_mgmt_dist_test.go +++ b/tests/hypercache_mgmt_dist_test.go @@ -60,6 +60,7 @@ func TestManagementHTTPDistMemory(t *testing.T) { //nolint:paralleltest if e, hasErr := metricsBody["error"]; hasErr { t.Fatalf("/dist/metrics returned error: %v", e) } + // else fail t.Errorf("/dist/metrics missing ForwardGet field") } @@ -103,7 +104,7 @@ func waitForMgmt(t *testing.T, hc *hypercache.HyperCache[backend.DistMemory]) st for time.Now().Before(deadline) { addr = hc.ManagementHTTPAddress() if addr != "" { - resp, err := http.Get("http://" + addr + "/health") //nolint:noctx,gosec + resp, err := http.Get("http://" + addr + "/health") //nolint:noctx if err == nil && resp.StatusCode == http.StatusOK { _ = resp.Body.Close() diff --git a/tests/integration/dist_phase1_test.go b/tests/integration/dist_phase1_test.go index 16d50f8..c43e0ff 100644 --- a/tests/integration/dist_phase1_test.go +++ b/tests/integration/dist_phase1_test.go @@ -102,8 +102,6 @@ func TestDistPhase1BasicQuorum(t *testing.T) { } Done: - - fmt.Println("phase1 basic quorum scaffolding complete") } // valueOK returns true if the stored value matches logical "v1" across supported encodings. @@ -140,6 +138,7 @@ func valueOK(v any) bool { //nolint:ireturn if len(x) == 0 { return false } + // try as string literal var s string @@ -155,6 +154,7 @@ func valueOK(v any) bool { //nolint:ireturn } } } + // fall back to raw compare return string(x) == "v1" || string(x) == "\"v1\"" diff --git a/tests/integration/dist_rebalance_replica_diff_test.go b/tests/integration/dist_rebalance_replica_diff_test.go index cadb7f6..654d675 100644 --- a/tests/integration/dist_rebalance_replica_diff_test.go +++ b/tests/integration/dist_rebalance_replica_diff_test.go @@ -35,6 +35,7 @@ func TestDistRebalanceReplicaDiff(t *testing.T) { k := cacheKey(i) it := &cache.Item{Key: k, Value: []byte("v"), Version: 1, Origin: "A", LastUpdated: time.Now()} + err := nodeA.Set(ctx, it) if err != nil { t.Fatalf("set %s: %v", k, err) diff --git a/tests/integration/dist_rebalance_test.go b/tests/integration/dist_rebalance_test.go index 090759b..d7326c1 100644 --- a/tests/integration/dist_rebalance_test.go +++ b/tests/integration/dist_rebalance_test.go @@ -2,6 +2,9 @@ package integration import ( "context" + "fmt" + "io" + "net/http" "strconv" "testing" "time" @@ -186,6 +189,8 @@ func mustDistNode( t.Fatalf("new dist memory: %v", err) } + waitForDistNodeHealth(t, addr) + return bm.(*backend.DistMemory) } @@ -222,3 +227,33 @@ func ownedPrimaryCount(dm *backend.DistMemory, keys []string) int { return c } + +func waitForDistNodeHealth(t *testing.T, addr string) { + t.Helper() + + client := &http.Client{Timeout: 100 * time.Millisecond} + healthURL := "http://" + addr + "/health" + deadline := time.Now().Add(3 * time.Second) + + var lastErr error + + for time.Now().Before(deadline) { + resp, err := client.Get(healthURL) + if err == nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + return + } + + lastErr = fmt.Errorf("unexpected status %d", resp.StatusCode) + } else { + lastErr = err + } + + time.Sleep(10 * time.Millisecond) + } + + t.Fatalf("node %s health not ready: %v", addr, lastErr) +}