From b8f6607210abe1ab7370c03c751559cdcb77b441 Mon Sep 17 00:00:00 2001 From: Aron Gates Date: Thu, 12 Mar 2026 11:44:44 +0000 Subject: [PATCH 1/3] fix: prevent double-close inflating zstd pool release counters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In writeZstd(), the zstdReader was closed twice: once by the buffer (which takes ownership via NewCASBufferFromReader) and again by defer zstdReader.Close(). Since metricsDecoder.Close() had no double-close guard — unlike the underlying pooledDecoder — each duplicate close incremented releases_total without a corresponding acquisition, causing releases to permanently exceed acquisitions. Remove the redundant defer and add double-close protection to both metricsEncoder and metricsDecoder as a safety net. --- pkg/blobstore/grpcservers/byte_stream_server.go | 1 - pkg/zstd/metrics_pool.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/blobstore/grpcservers/byte_stream_server.go b/pkg/blobstore/grpcservers/byte_stream_server.go index 8e377bb7..4d0df86c 100644 --- a/pkg/blobstore/grpcservers/byte_stream_server.go +++ b/pkg/blobstore/grpcservers/byte_stream_server.go @@ -222,7 +222,6 @@ func (s *byteStreamServer) writeZstd(stream bytestream.ByteStream_WriteServer, r if err != nil { return status.Errorf(codes.ResourceExhausted, "Failed to acquire ZSTD decoder: %v", err) } - defer zstdReader.Close() if err := s.blobAccess.Put( ctx, diff --git a/pkg/zstd/metrics_pool.go b/pkg/zstd/metrics_pool.go index 08e204f5..893fc316 100644 --- a/pkg/zstd/metrics_pool.go +++ b/pkg/zstd/metrics_pool.go @@ -123,9 +123,14 @@ func (p *metricsPool) NewDecoder(ctx context.Context, r io.Reader) (Decoder, err type metricsEncoder struct { Encoder releases prometheus.Counter + closed bool } func (e *metricsEncoder) Close() error { + if e.closed { + return nil + } + e.closed = true err := e.Encoder.Close() e.releases.Inc() return err @@ -134,9 +139,14 @@ func (e *metricsEncoder) Close() error { type metricsDecoder struct { Decoder releases prometheus.Counter + closed bool } func (d *metricsDecoder) Close() { + if d.closed { + return + } + d.closed = true d.Decoder.Close() d.releases.Inc() } From 6a68e25cc6428b7dcde59c0348646f0f6aa2292c Mon Sep 17 00:00:00 2001 From: Aron Gates Date: Thu, 12 Mar 2026 13:06:55 +0000 Subject: [PATCH 2/3] fix: use nil guard instead of bool for double-close protection Per review feedback: set Encoder/Decoder to nil on close instead of using a separate bool field. This prevents calling any methods on the object after close and matches the pattern in pooledEncoder/pooledDecoder. --- pkg/zstd/metrics_pool.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/zstd/metrics_pool.go b/pkg/zstd/metrics_pool.go index 893fc316..6b3c8d0d 100644 --- a/pkg/zstd/metrics_pool.go +++ b/pkg/zstd/metrics_pool.go @@ -123,15 +123,14 @@ func (p *metricsPool) NewDecoder(ctx context.Context, r io.Reader) (Decoder, err type metricsEncoder struct { Encoder releases prometheus.Counter - closed bool } func (e *metricsEncoder) Close() error { - if e.closed { + if e.Encoder == nil { return nil } - e.closed = true err := e.Encoder.Close() + e.Encoder = nil e.releases.Inc() return err } @@ -139,14 +138,13 @@ func (e *metricsEncoder) Close() error { type metricsDecoder struct { Decoder releases prometheus.Counter - closed bool } func (d *metricsDecoder) Close() { - if d.closed { + if d.Decoder == nil { return } - d.closed = true d.Decoder.Close() + d.Decoder = nil d.releases.Inc() } From 76958239c418b6681c9efb47fe768efae15c554f Mon Sep 17 00:00:00 2001 From: Aron Gates Date: Thu, 12 Mar 2026 13:43:21 +0000 Subject: [PATCH 3/3] fix: use util.StatusWrap for decoder acquisition error Per review feedback: preserve the gRPC status code from Pool.NewDecoder() (e.g. codes.Canceled) instead of hardcoding codes.ResourceExhausted via status.Errorf. --- pkg/blobstore/grpcservers/byte_stream_server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/blobstore/grpcservers/byte_stream_server.go b/pkg/blobstore/grpcservers/byte_stream_server.go index 4d0df86c..a80baa72 100644 --- a/pkg/blobstore/grpcservers/byte_stream_server.go +++ b/pkg/blobstore/grpcservers/byte_stream_server.go @@ -9,6 +9,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/blobstore" "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" "github.com/buildbarn/bb-storage/pkg/digest" + "github.com/buildbarn/bb-storage/pkg/util" bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "google.golang.org/genproto/googleapis/bytestream" @@ -220,7 +221,7 @@ func (s *byteStreamServer) writeZstd(stream bytestream.ByteStream_WriteServer, r zstdReader, err := bb_zstd.NewReadCloser(ctx, s.zstdPool, streamReader) if err != nil { - return status.Errorf(codes.ResourceExhausted, "Failed to acquire ZSTD decoder: %v", err) + return util.StatusWrap(err, "Failed to acquire ZSTD decoder") } if err := s.blobAccess.Put(