From e0907a8dd24c362b51eae3a4d93abacf008e50be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Sun, 14 Jun 2026 17:02:57 +0200 Subject: [PATCH 1/2] fix(proxy): strip AppendRequest size prefix in fetch/produce fan-out re-marshal encodeFetchRequest/encodeProduceRequest returned kmsg RequestFormatter.AppendRequest output, which already carries a 4-byte big-endian size prefix. forwardToBackend writes that payload via protocol.WriteFrame, which prepends its own size prefix. On any path that re-marshals (multi-broker fan-out where canUseOriginal is false, or a retry where originalPayload is nil'd) the on-wire frame became [size][size][header][body]. The broker reads the outer size, then parses the inner size bytes as apiKey/apiVersion, producing the observed "decode Produce v###: not enough data" and a dead consume path on the 3-broker azure-sim. The canUseOriginal branch was unaffected because originalPayload is the prefix-less ReadFrame payload. Fix: strip the leading 4 bytes that AppendRequest writes so WriteFrame is the single source of the size prefix. Add a real round-trip regression (encode -> WriteFrame -> ReadFrame -> ParseRequest, the exact broker decode path) for both fetch (v12, carries topic name) and produce (v9); it fails on the pre-fix code with the same family of decode garbage. Co-Authored-By: Claude Opus 4.8 (1M context) --- cmd/proxy/main.go | 16 ++++- cmd/proxy/main_test.go | 133 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 2 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index a0b603d..c4bea05 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -1880,7 +1880,13 @@ func addFetchErrorForAllPartitions(resp *kmsg.FetchResponse, req *kmsg.FetchRequ // encodeProduceRequest serializes a produce request with header into a wire frame. func encodeProduceRequest(header *protocol.RequestHeader, req *kmsg.ProduceRequest) []byte { formatter := kmsg.NewRequestFormatter(kmsg.FormatterClientID(clientIDStr(header.ClientID))) - return formatter.AppendRequest(nil, req, header.CorrelationID) + b := formatter.AppendRequest(nil, req, header.CorrelationID) + // AppendRequest emits a 4-byte size prefix, but WriteFrame re-adds one; + // strip it here so the wire frame is single-prefixed (header+body only). + if len(b) < 4 { + return b + } + return b[4:] } // parseProduceResponse deserializes a produce response from wire bytes. @@ -1900,7 +1906,13 @@ func parseProduceResponse(data []byte, version int16) (*kmsg.ProduceResponse, er // encodeFetchRequest serializes a fetch request with header into a wire frame. func encodeFetchRequest(header *protocol.RequestHeader, req *kmsg.FetchRequest) []byte { formatter := kmsg.NewRequestFormatter(kmsg.FormatterClientID(clientIDStr(header.ClientID))) - return formatter.AppendRequest(nil, req, header.CorrelationID) + b := formatter.AppendRequest(nil, req, header.CorrelationID) + // AppendRequest emits a 4-byte size prefix, but WriteFrame re-adds one; + // strip it here so the wire frame is single-prefixed (header+body only). + if len(b) < 4 { + return b + } + return b[4:] } // parseFetchResponse deserializes a fetch response from wire bytes. diff --git a/cmd/proxy/main_test.go b/cmd/proxy/main_test.go index d69ade0..054d720 100644 --- a/cmd/proxy/main_test.go +++ b/cmd/proxy/main_test.go @@ -1180,3 +1180,136 @@ func TestForwardFetchRetriesOnBackendError(t *testing.T) { t.Fatalf("expected >=2 backend attempts (error then retry success), got %d", fb.attempts) } } + +// TestEncodeRequestNoDoubleLengthPrefix is a real round-trip regression for the +// double-length-prefix bug: encodeFetchRequest/encodeProduceRequest return the +// AppendRequest output, and forwardToBackend writes it via protocol.WriteFrame, +// which prepends a 4-byte size. If the encode funcs also keep AppendRequest's own +// size prefix, the on-wire frame is [size][size][header][body]; a broker reading +// it via protocol.ParseRequest sees the inner size's bytes as apiKey/apiVersion +// (the observed "decode Produce v###: not enough data"). This test pushes the +// encoded bytes through WriteFrame -> ReadFrame -> ParseRequest, exactly as the +// broker would, and asserts the apiVersion and topics survive intact. +func TestEncodeRequestNoDoubleLengthPrefix(t *testing.T) { + clientID := "proxy-test" + + t.Run("fetch", func(t *testing.T) { + // Fetch v12 is the last version that carries the topic NAME on the wire + // (v13+ switches to TopicID UUIDs), so the name survives a round-trip and + // makes a meaningful single-prefix assertion. + header := &protocol.RequestHeader{ + APIKey: protocol.APIKeyFetch, + APIVersion: 12, + CorrelationID: 4242, + ClientID: &clientID, + } + req := kmsg.NewPtrFetchRequest() + req.SetVersion(12) + req.MaxWaitMillis = 500 + req.MinBytes = 1 + req.MaxBytes = 1048576 + for _, topicName := range []string{"events.er1_items", "citizen.audit.queries"} { + ft := kmsg.NewFetchRequestTopic() + ft.Topic = topicName + for p := int32(0); p < 3; p++ { + fp := kmsg.NewFetchRequestTopicPartition() + fp.Partition = p + fp.FetchOffset = int64(p) * 10 + fp.PartitionMaxBytes = 524288 + ft.Partitions = append(ft.Partitions, fp) + } + req.Topics = append(req.Topics, ft) + } + + encoded := encodeFetchRequest(header, req) + + var buf bytes.Buffer + if err := protocol.WriteFrame(&buf, encoded); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + frame, err := protocol.ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + gotHeader, parsedReq, err := protocol.ParseRequest(frame.Payload) + if err != nil { + t.Fatalf("broker-path ParseRequest failed (double prefix bug?): %v", err) + } + if gotHeader.APIKey != protocol.APIKeyFetch { + t.Fatalf("apiKey: want %d got %d (double prefix)", protocol.APIKeyFetch, gotHeader.APIKey) + } + if gotHeader.APIVersion != 12 { + t.Fatalf("apiVersion: want 12 got %d (double prefix garbles version)", gotHeader.APIVersion) + } + if gotHeader.CorrelationID != 4242 { + t.Fatalf("correlationID: want 4242 got %d", gotHeader.CorrelationID) + } + fr, ok := parsedReq.(*kmsg.FetchRequest) + if !ok { + t.Fatalf("parsed request is %T, not *kmsg.FetchRequest", parsedReq) + } + if len(fr.Topics) != 2 { + t.Fatalf("topics: want 2 got %d", len(fr.Topics)) + } + if fr.Topics[0].Topic != "events.er1_items" { + t.Fatalf("topic[0]: want events.er1_items got %q", fr.Topics[0].Topic) + } + if len(fr.Topics[0].Partitions) != 3 { + t.Fatalf("topic[0] partitions: want 3 got %d", len(fr.Topics[0].Partitions)) + } + }) + + t.Run("produce", func(t *testing.T) { + header := &protocol.RequestHeader{ + APIKey: protocol.APIKeyProduce, + APIVersion: 9, + CorrelationID: 99, + ClientID: &clientID, + } + req := kmsg.NewPtrProduceRequest() + req.SetVersion(9) + req.TimeoutMillis = 1000 + for _, topicName := range []string{"events.er1_items", "citizen.audit.queries"} { + pt := kmsg.NewProduceRequestTopic() + pt.Topic = topicName + for p := int32(0); p < 2; p++ { + pp := kmsg.NewProduceRequestTopicPartition() + pp.Partition = p + pp.Records = []byte{0x01, 0x02, 0x03} + pt.Partitions = append(pt.Partitions, pp) + } + req.Topics = append(req.Topics, pt) + } + + encoded := encodeProduceRequest(header, req) + + var buf bytes.Buffer + if err := protocol.WriteFrame(&buf, encoded); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + frame, err := protocol.ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + gotHeader, parsedReq, err := protocol.ParseRequest(frame.Payload) + if err != nil { + t.Fatalf("broker-path ParseRequest failed (double prefix bug?): %v", err) + } + if gotHeader.APIKey != protocol.APIKeyProduce { + t.Fatalf("apiKey: want %d got %d (double prefix)", protocol.APIKeyProduce, gotHeader.APIKey) + } + if gotHeader.APIVersion != 9 { + t.Fatalf("apiVersion: want 9 got %d (double prefix garbles version)", gotHeader.APIVersion) + } + pr, ok := parsedReq.(*kmsg.ProduceRequest) + if !ok { + t.Fatalf("parsed request is %T, not *kmsg.ProduceRequest", parsedReq) + } + if len(pr.Topics) != 2 { + t.Fatalf("topics: want 2 got %d", len(pr.Topics)) + } + if pr.Topics[1].Topic != "citizen.audit.queries" { + t.Fatalf("topic[1]: want citizen.audit.queries got %q", pr.Topics[1].Topic) + } + }) +} From a8b2b22fcd41e060914003319209f0e2c7956e90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Tue, 16 Jun 2026 00:13:09 +0200 Subject: [PATCH 2/2] test(proxy): assert single length-prefix and cover fan-out in double-prefix regression Strengthen TestEncodeRequestNoDoubleLengthPrefix so it proves the single-length-prefix invariant directly instead of only checking the decoded header. For each case, after protocol.WriteFrame the test now asserts the first 4 bytes equal big-endian(len(rest)) AND that rest equals the encodeFetchRequest/encodeProduceRequest output byte-for-byte (exactly one prefix), then ParseRequest round-trips with the correct apiKey/apiVersion. Cases added: single-partition fetch, multi-partition fetch, a multi-broker fan-out (sub-requests built the way groupFetchPartitionsByBroker builds them, each encoded and asserted to frame singly), and an empty-partition fetch. The fan-out case also asserts no two sub-request encodings share a backing array, confirming no buffer aliasing on the hot path (each encode does a fresh AppendRequest(nil, ...) and returns a sub-slice of that buffer). go test ./cmd/proxy/... passes; negative-tested by reverting the b[4:] strip in both encode funcs, which makes every case fail with the same decode garbage, then restored. Co-Authored-By: Claude Opus 4.8 (1M context) --- cmd/proxy/main_test.go | 313 ++++++++++++++++++++++++++++++++--------- 1 file changed, 243 insertions(+), 70 deletions(-) diff --git a/cmd/proxy/main_test.go b/cmd/proxy/main_test.go index 054d720..d996375 100644 --- a/cmd/proxy/main_test.go +++ b/cmd/proxy/main_test.go @@ -22,6 +22,7 @@ import ( "io" "log/slog" "net" + "sort" "sync" "testing" "time" @@ -1181,81 +1182,229 @@ func TestForwardFetchRetriesOnBackendError(t *testing.T) { } } +// assertSinglePrefixFrame drives encoded bytes through the exact wire path +// forwardToBackend uses: WriteFrame (which prepends a 4-byte big-endian size) -> +// ReadFrame -> ParseRequest (the broker decode path). It asserts the +// single-length-prefix invariant explicitly: +// +// 1. After WriteFrame, the first 4 bytes equal big-endian(len(rest)), AND +// 2. rest (the frame payload the broker reads back) equals the encode output +// byte-for-byte. Together these prove there is EXACTLY ONE size prefix on the +// wire. If the encode funcs kept AppendRequest's own prefix, the wire frame +// would be [size][size][header][body]; the outer size would then be 4 larger +// than len(encoded) and the round-tripped payload would carry a spurious +// leading 4 bytes, so both asserts catch the regression. +// +// It returns the parsed header and request for the caller to assert on. +func assertSinglePrefixFrame(t *testing.T, encoded []byte) (*protocol.RequestHeader, kmsg.Request) { + t.Helper() + + var buf bytes.Buffer + if err := protocol.WriteFrame(&buf, encoded); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + wire := buf.Bytes() + if len(wire) < 4 { + t.Fatalf("framed bytes too short: %d", len(wire)) + } + + // Assert 1: the on-wire size prefix is exactly len(encoded), not len(encoded)+4. + gotSize := binary.BigEndian.Uint32(wire[:4]) + if int(gotSize) != len(encoded) { + t.Fatalf("on-wire size prefix=%d, want %d (len of encode output); a value of %d would mean a double prefix", + gotSize, len(encoded), len(encoded)+4) + } + // Assert 2: the framed payload equals the encode output byte-for-byte. This is + // the strict single-prefix proof: WriteFrame adds exactly one prefix and the + // encode output carries none of its own. + rest := wire[4:] + if !bytes.Equal(rest, encoded) { + t.Fatalf("frame payload (len %d) does not equal encode output (len %d) byte-for-byte; double prefix?", + len(rest), len(encoded)) + } + + frame, err := protocol.ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + if !bytes.Equal(frame.Payload, encoded) { + t.Fatalf("ReadFrame payload does not match encode output byte-for-byte; double prefix?") + } + gotHeader, parsedReq, err := protocol.ParseRequest(frame.Payload) + if err != nil { + t.Fatalf("broker-path ParseRequest failed (double prefix bug?): %v", err) + } + return gotHeader, parsedReq +} + +// makeFetchRequestTyped builds a fetch v12 request (the last version that carries +// the topic NAME on the wire; v13+ switches to TopicID UUIDs) with the given +// topic -> partitions layout, so the name survives a round-trip and makes a +// meaningful single-prefix assertion. +func makeFetchRequestTyped(version int16, topics map[string][]int32) *kmsg.FetchRequest { + req := kmsg.NewPtrFetchRequest() + req.SetVersion(version) + req.ReplicaID = -1 + req.MaxWaitMillis = 500 + req.MinBytes = 1 + req.MaxBytes = 1048576 + req.SessionEpoch = -1 + // Sort topic names so encode output is deterministic across runs. + names := make([]string, 0, len(topics)) + for name := range topics { + names = append(names, name) + } + sort.Strings(names) + for _, name := range names { + ft := kmsg.NewFetchRequestTopic() + ft.Topic = name + for _, p := range topics[name] { + fp := kmsg.NewFetchRequestTopicPartition() + fp.Partition = p + fp.FetchOffset = int64(p) * 10 + fp.PartitionMaxBytes = 524288 + ft.Partitions = append(ft.Partitions, fp) + } + req.Topics = append(req.Topics, ft) + } + return req +} + // TestEncodeRequestNoDoubleLengthPrefix is a real round-trip regression for the -// double-length-prefix bug: encodeFetchRequest/encodeProduceRequest return the -// AppendRequest output, and forwardToBackend writes it via protocol.WriteFrame, -// which prepends a 4-byte size. If the encode funcs also keep AppendRequest's own -// size prefix, the on-wire frame is [size][size][header][body]; a broker reading -// it via protocol.ParseRequest sees the inner size's bytes as apiKey/apiVersion -// (the observed "decode Produce v###: not enough data"). This test pushes the -// encoded bytes through WriteFrame -> ReadFrame -> ParseRequest, exactly as the -// broker would, and asserts the apiVersion and topics survive intact. +// double-length-prefix bug on the REQUEST re-marshal path: +// encodeFetchRequest/encodeProduceRequest produce the bytes forwardToBackend +// writes via protocol.WriteFrame, which prepends a 4-byte size. If the encode +// funcs also kept AppendRequest's own size prefix, the on-wire frame would be +// [size][size][header][body]; a broker reading it via protocol.ParseRequest would +// see the inner size's bytes as apiKey/apiVersion (the observed "decode Produce +// v###: not enough data"). Every sub-test pushes the encoded bytes through +// WriteFrame -> ReadFrame -> ParseRequest exactly as the broker would, and asserts +// a SINGLE length prefix (see assertSinglePrefixFrame) plus that the header and +// topics survive intact. func TestEncodeRequestNoDoubleLengthPrefix(t *testing.T) { clientID := "proxy-test" - - t.Run("fetch", func(t *testing.T) { - // Fetch v12 is the last version that carries the topic NAME on the wire - // (v13+ switches to TopicID UUIDs), so the name survives a round-trip and - // makes a meaningful single-prefix assertion. - header := &protocol.RequestHeader{ + fetchHeader := func(corr int32) *protocol.RequestHeader { + return &protocol.RequestHeader{ APIKey: protocol.APIKeyFetch, APIVersion: 12, - CorrelationID: 4242, + CorrelationID: corr, ClientID: &clientID, } - req := kmsg.NewPtrFetchRequest() - req.SetVersion(12) - req.MaxWaitMillis = 500 - req.MinBytes = 1 - req.MaxBytes = 1048576 - for _, topicName := range []string{"events.er1_items", "citizen.audit.queries"} { - ft := kmsg.NewFetchRequestTopic() - ft.Topic = topicName - for p := int32(0); p < 3; p++ { - fp := kmsg.NewFetchRequestTopicPartition() - fp.Partition = p - fp.FetchOffset = int64(p) * 10 - fp.PartitionMaxBytes = 524288 - ft.Partitions = append(ft.Partitions, fp) - } - req.Topics = append(req.Topics, ft) - } + } - encoded := encodeFetchRequest(header, req) + t.Run("fetch_single_partition", func(t *testing.T) { + req := makeFetchRequestTyped(12, map[string][]int32{"events.er1_items": {0}}) + header := fetchHeader(1) - var buf bytes.Buffer - if err := protocol.WriteFrame(&buf, encoded); err != nil { - t.Fatalf("WriteFrame: %v", err) - } - frame, err := protocol.ReadFrame(&buf) - if err != nil { - t.Fatalf("ReadFrame: %v", err) + gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeFetchRequest(header, req)) + + if gotHeader.APIKey != protocol.APIKeyFetch || gotHeader.APIVersion != 12 || gotHeader.CorrelationID != 1 { + t.Fatalf("header garbled: key=%d ver=%d corr=%d", gotHeader.APIKey, gotHeader.APIVersion, gotHeader.CorrelationID) } - gotHeader, parsedReq, err := protocol.ParseRequest(frame.Payload) - if err != nil { - t.Fatalf("broker-path ParseRequest failed (double prefix bug?): %v", err) + fr := parsedReq.(*kmsg.FetchRequest) + if len(fr.Topics) != 1 || fr.Topics[0].Topic != "events.er1_items" { + t.Fatalf("topics: %+v", fr.Topics) } - if gotHeader.APIKey != protocol.APIKeyFetch { - t.Fatalf("apiKey: want %d got %d (double prefix)", protocol.APIKeyFetch, gotHeader.APIKey) + if len(fr.Topics[0].Partitions) != 1 { + t.Fatalf("partitions: want 1 got %d", len(fr.Topics[0].Partitions)) } + }) + + t.Run("fetch_multi_partition", func(t *testing.T) { + req := makeFetchRequestTyped(12, map[string][]int32{ + "events.er1_items": {0, 1, 2}, + "citizen.audit.queries": {0, 1, 2}, + }) + header := fetchHeader(4242) + + gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeFetchRequest(header, req)) + if gotHeader.APIVersion != 12 { t.Fatalf("apiVersion: want 12 got %d (double prefix garbles version)", gotHeader.APIVersion) } if gotHeader.CorrelationID != 4242 { t.Fatalf("correlationID: want 4242 got %d", gotHeader.CorrelationID) } - fr, ok := parsedReq.(*kmsg.FetchRequest) - if !ok { - t.Fatalf("parsed request is %T, not *kmsg.FetchRequest", parsedReq) - } + fr := parsedReq.(*kmsg.FetchRequest) if len(fr.Topics) != 2 { t.Fatalf("topics: want 2 got %d", len(fr.Topics)) } - if fr.Topics[0].Topic != "events.er1_items" { - t.Fatalf("topic[0]: want events.er1_items got %q", fr.Topics[0].Topic) + for _, ft := range fr.Topics { + if len(ft.Partitions) != 3 { + t.Fatalf("topic %q partitions: want 3 got %d", ft.Topic, len(ft.Partitions)) + } + } + }) + + t.Run("fetch_empty_partitions", func(t *testing.T) { + // An empty topic / no records still produces a well-formed fetch sub-request + // with zero partitions; it must frame singly too. This is the shape a fetch + // against an empty topic takes after groupFetchPartitionsByBroker filtering. + req := makeFetchRequestTyped(12, map[string][]int32{"events.er1_items": {}}) + header := fetchHeader(7) + + gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeFetchRequest(header, req)) + + if gotHeader.APIVersion != 12 { + t.Fatalf("apiVersion: want 12 got %d", gotHeader.APIVersion) } - if len(fr.Topics[0].Partitions) != 3 { - t.Fatalf("topic[0] partitions: want 3 got %d", len(fr.Topics[0].Partitions)) + fr := parsedReq.(*kmsg.FetchRequest) + if len(fr.Topics) != 1 || len(fr.Topics[0].Partitions) != 0 { + t.Fatalf("want 1 topic with 0 partitions, got %+v", fr.Topics) + } + }) + + t.Run("fetch_multi_broker_fanout", func(t *testing.T) { + // Reproduce the canUseOriginal == false hot path: groupFetchPartitionsByBroker + // splits a fetch into one sub-request per owning broker, each of which is + // independently encodeFetchRequest'd and WriteFrame'd. With no router every + // partition lands under the round-robin group, so to exercise a genuine + // multi-broker split we build the sub-requests the same way the grouper does + // (one *kmsg.FetchRequest per broker, settings copied from the parent) and + // assert EACH sub-request frames singly and round-trips. + full := makeFetchRequestTyped(12, map[string][]int32{ + "events.er1_items": {0, 1, 2, 3}, + "citizen.audit.queries": {0, 1}, + }) + // Owner map mimicking three brokers owning disjoint partitions. + owner := func(topic string, part int32) string { + return []string{"broker-0", "broker-1", "broker-2"}[int(part)%3] + } + subReqs := splitFetchByOwnerForTest(full, owner) + if len(subReqs) != 3 { + t.Fatalf("expected 3 broker sub-requests, got %d", len(subReqs)) + } + + header := fetchHeader(31337) + encodings := make([][]byte, 0, len(subReqs)) + for addr, sub := range subReqs { + enc := encodeFetchRequest(header, sub) + encodings = append(encodings, enc) + gotHeader, parsedReq := assertSinglePrefixFrame(t, enc) + if gotHeader.APIVersion != 12 { + t.Fatalf("broker %s sub-request apiVersion: want 12 got %d", addr, gotHeader.APIVersion) + } + fr, ok := parsedReq.(*kmsg.FetchRequest) + if !ok { + t.Fatalf("broker %s: parsed %T, not *kmsg.FetchRequest", addr, parsedReq) + } + if len(fr.Topics) == 0 { + t.Fatalf("broker %s sub-request has no topics", addr) + } + } + + // No buffer aliasing on the fan-out hot path: each encodeFetchRequest calls + // AppendRequest(nil, ...) (a fresh allocation) and returns b[4:] (a sub-slice + // of that fresh buffer), so distinct sub-requests must not share backing + // storage. Distinct lengths or distinct backing arrays both prove this; we + // assert no two sub-request encodings alias the same backing array. + for i := 0; i < len(encodings); i++ { + for j := i + 1; j < len(encodings); j++ { + if len(encodings[i]) > 0 && len(encodings[j]) > 0 && + &encodings[i][0] == &encodings[j][0] { + t.Fatalf("fan-out buffer aliasing: sub-request %d and %d share backing array", i, j) + } + } } }) @@ -1281,30 +1430,15 @@ func TestEncodeRequestNoDoubleLengthPrefix(t *testing.T) { req.Topics = append(req.Topics, pt) } - encoded := encodeProduceRequest(header, req) + gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeProduceRequest(header, req)) - var buf bytes.Buffer - if err := protocol.WriteFrame(&buf, encoded); err != nil { - t.Fatalf("WriteFrame: %v", err) - } - frame, err := protocol.ReadFrame(&buf) - if err != nil { - t.Fatalf("ReadFrame: %v", err) - } - gotHeader, parsedReq, err := protocol.ParseRequest(frame.Payload) - if err != nil { - t.Fatalf("broker-path ParseRequest failed (double prefix bug?): %v", err) - } if gotHeader.APIKey != protocol.APIKeyProduce { t.Fatalf("apiKey: want %d got %d (double prefix)", protocol.APIKeyProduce, gotHeader.APIKey) } if gotHeader.APIVersion != 9 { t.Fatalf("apiVersion: want 9 got %d (double prefix garbles version)", gotHeader.APIVersion) } - pr, ok := parsedReq.(*kmsg.ProduceRequest) - if !ok { - t.Fatalf("parsed request is %T, not *kmsg.ProduceRequest", parsedReq) - } + pr := parsedReq.(*kmsg.ProduceRequest) if len(pr.Topics) != 2 { t.Fatalf("topics: want 2 got %d", len(pr.Topics)) } @@ -1313,3 +1447,42 @@ func TestEncodeRequestNoDoubleLengthPrefix(t *testing.T) { } }) } + +// splitFetchByOwnerForTest mirrors how groupFetchPartitionsByBroker builds one +// *kmsg.FetchRequest per owning broker (settings copied from the parent, topics +// and partitions distributed by owner). It is owner-map-driven so the test does +// not need a live etcd-backed PartitionRouter to exercise a multi-broker fan-out. +func splitFetchByOwnerForTest(req *kmsg.FetchRequest, owner func(topic string, part int32) string) map[string]*kmsg.FetchRequest { + groups := make(map[string]*kmsg.FetchRequest) + topicIdx := make(map[string]map[string]int) + for _, topic := range req.Topics { + for _, part := range topic.Partitions { + addr := owner(topic.Topic, part.Partition) + sub, ok := groups[addr] + if !ok { + sub = &kmsg.FetchRequest{ + Version: req.Version, + ReplicaID: req.ReplicaID, + MaxWaitMillis: req.MaxWaitMillis, + MinBytes: req.MinBytes, + MaxBytes: req.MaxBytes, + IsolationLevel: req.IsolationLevel, + SessionID: req.SessionID, + SessionEpoch: req.SessionEpoch, + } + groups[addr] = sub + topicIdx[addr] = make(map[string]int) + } + idx, ok := topicIdx[addr][topic.Topic] + if !ok { + idx = len(sub.Topics) + st := kmsg.NewFetchRequestTopic() + st.Topic = topic.Topic + sub.Topics = append(sub.Topics, st) + topicIdx[addr][topic.Topic] = idx + } + sub.Topics[idx].Partitions = append(sub.Topics[idx].Partitions, part) + } + } + return groups +}