diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index a0b603d..c4bea05 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -1880,7 +1880,13 @@ func addFetchErrorForAllPartitions(resp *kmsg.FetchResponse, req *kmsg.FetchRequ // encodeProduceRequest serializes a produce request with header into a wire frame. func encodeProduceRequest(header *protocol.RequestHeader, req *kmsg.ProduceRequest) []byte { formatter := kmsg.NewRequestFormatter(kmsg.FormatterClientID(clientIDStr(header.ClientID))) - return formatter.AppendRequest(nil, req, header.CorrelationID) + b := formatter.AppendRequest(nil, req, header.CorrelationID) + // AppendRequest emits a 4-byte size prefix, but WriteFrame re-adds one; + // strip it here so the wire frame is single-prefixed (header+body only). + if len(b) < 4 { + return b + } + return b[4:] } // parseProduceResponse deserializes a produce response from wire bytes. @@ -1900,7 +1906,13 @@ func parseProduceResponse(data []byte, version int16) (*kmsg.ProduceResponse, er // encodeFetchRequest serializes a fetch request with header into a wire frame. func encodeFetchRequest(header *protocol.RequestHeader, req *kmsg.FetchRequest) []byte { formatter := kmsg.NewRequestFormatter(kmsg.FormatterClientID(clientIDStr(header.ClientID))) - return formatter.AppendRequest(nil, req, header.CorrelationID) + b := formatter.AppendRequest(nil, req, header.CorrelationID) + // AppendRequest emits a 4-byte size prefix, but WriteFrame re-adds one; + // strip it here so the wire frame is single-prefixed (header+body only). + if len(b) < 4 { + return b + } + return b[4:] } // parseFetchResponse deserializes a fetch response from wire bytes. diff --git a/cmd/proxy/main_test.go b/cmd/proxy/main_test.go index d69ade0..d996375 100644 --- a/cmd/proxy/main_test.go +++ b/cmd/proxy/main_test.go @@ -22,6 +22,7 @@ import ( "io" "log/slog" "net" + "sort" "sync" "testing" "time" @@ -1180,3 +1181,308 @@ func TestForwardFetchRetriesOnBackendError(t *testing.T) { t.Fatalf("expected >=2 backend attempts (error then retry success), got %d", fb.attempts) } } + +// assertSinglePrefixFrame drives encoded bytes through the exact wire path +// forwardToBackend uses: WriteFrame (which prepends a 4-byte big-endian size) -> +// ReadFrame -> ParseRequest (the broker decode path). It asserts the +// single-length-prefix invariant explicitly: +// +// 1. After WriteFrame, the first 4 bytes equal big-endian(len(rest)), AND +// 2. rest (the frame payload the broker reads back) equals the encode output +// byte-for-byte. Together these prove there is EXACTLY ONE size prefix on the +// wire. If the encode funcs kept AppendRequest's own prefix, the wire frame +// would be [size][size][header][body]; the outer size would then be 4 larger +// than len(encoded) and the round-tripped payload would carry a spurious +// leading 4 bytes, so both asserts catch the regression. +// +// It returns the parsed header and request for the caller to assert on. +func assertSinglePrefixFrame(t *testing.T, encoded []byte) (*protocol.RequestHeader, kmsg.Request) { + t.Helper() + + var buf bytes.Buffer + if err := protocol.WriteFrame(&buf, encoded); err != nil { + t.Fatalf("WriteFrame: %v", err) + } + wire := buf.Bytes() + if len(wire) < 4 { + t.Fatalf("framed bytes too short: %d", len(wire)) + } + + // Assert 1: the on-wire size prefix is exactly len(encoded), not len(encoded)+4. + gotSize := binary.BigEndian.Uint32(wire[:4]) + if int(gotSize) != len(encoded) { + t.Fatalf("on-wire size prefix=%d, want %d (len of encode output); a value of %d would mean a double prefix", + gotSize, len(encoded), len(encoded)+4) + } + // Assert 2: the framed payload equals the encode output byte-for-byte. This is + // the strict single-prefix proof: WriteFrame adds exactly one prefix and the + // encode output carries none of its own. + rest := wire[4:] + if !bytes.Equal(rest, encoded) { + t.Fatalf("frame payload (len %d) does not equal encode output (len %d) byte-for-byte; double prefix?", + len(rest), len(encoded)) + } + + frame, err := protocol.ReadFrame(&buf) + if err != nil { + t.Fatalf("ReadFrame: %v", err) + } + if !bytes.Equal(frame.Payload, encoded) { + t.Fatalf("ReadFrame payload does not match encode output byte-for-byte; double prefix?") + } + gotHeader, parsedReq, err := protocol.ParseRequest(frame.Payload) + if err != nil { + t.Fatalf("broker-path ParseRequest failed (double prefix bug?): %v", err) + } + return gotHeader, parsedReq +} + +// makeFetchRequestTyped builds a fetch v12 request (the last version that carries +// the topic NAME on the wire; v13+ switches to TopicID UUIDs) with the given +// topic -> partitions layout, so the name survives a round-trip and makes a +// meaningful single-prefix assertion. +func makeFetchRequestTyped(version int16, topics map[string][]int32) *kmsg.FetchRequest { + req := kmsg.NewPtrFetchRequest() + req.SetVersion(version) + req.ReplicaID = -1 + req.MaxWaitMillis = 500 + req.MinBytes = 1 + req.MaxBytes = 1048576 + req.SessionEpoch = -1 + // Sort topic names so encode output is deterministic across runs. + names := make([]string, 0, len(topics)) + for name := range topics { + names = append(names, name) + } + sort.Strings(names) + for _, name := range names { + ft := kmsg.NewFetchRequestTopic() + ft.Topic = name + for _, p := range topics[name] { + fp := kmsg.NewFetchRequestTopicPartition() + fp.Partition = p + fp.FetchOffset = int64(p) * 10 + fp.PartitionMaxBytes = 524288 + ft.Partitions = append(ft.Partitions, fp) + } + req.Topics = append(req.Topics, ft) + } + return req +} + +// TestEncodeRequestNoDoubleLengthPrefix is a real round-trip regression for the +// double-length-prefix bug on the REQUEST re-marshal path: +// encodeFetchRequest/encodeProduceRequest produce the bytes forwardToBackend +// writes via protocol.WriteFrame, which prepends a 4-byte size. If the encode +// funcs also kept AppendRequest's own size prefix, the on-wire frame would be +// [size][size][header][body]; a broker reading it via protocol.ParseRequest would +// see the inner size's bytes as apiKey/apiVersion (the observed "decode Produce +// v###: not enough data"). Every sub-test pushes the encoded bytes through +// WriteFrame -> ReadFrame -> ParseRequest exactly as the broker would, and asserts +// a SINGLE length prefix (see assertSinglePrefixFrame) plus that the header and +// topics survive intact. +func TestEncodeRequestNoDoubleLengthPrefix(t *testing.T) { + clientID := "proxy-test" + fetchHeader := func(corr int32) *protocol.RequestHeader { + return &protocol.RequestHeader{ + APIKey: protocol.APIKeyFetch, + APIVersion: 12, + CorrelationID: corr, + ClientID: &clientID, + } + } + + t.Run("fetch_single_partition", func(t *testing.T) { + req := makeFetchRequestTyped(12, map[string][]int32{"events.er1_items": {0}}) + header := fetchHeader(1) + + gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeFetchRequest(header, req)) + + if gotHeader.APIKey != protocol.APIKeyFetch || gotHeader.APIVersion != 12 || gotHeader.CorrelationID != 1 { + t.Fatalf("header garbled: key=%d ver=%d corr=%d", gotHeader.APIKey, gotHeader.APIVersion, gotHeader.CorrelationID) + } + fr := parsedReq.(*kmsg.FetchRequest) + if len(fr.Topics) != 1 || fr.Topics[0].Topic != "events.er1_items" { + t.Fatalf("topics: %+v", fr.Topics) + } + if len(fr.Topics[0].Partitions) != 1 { + t.Fatalf("partitions: want 1 got %d", len(fr.Topics[0].Partitions)) + } + }) + + t.Run("fetch_multi_partition", func(t *testing.T) { + req := makeFetchRequestTyped(12, map[string][]int32{ + "events.er1_items": {0, 1, 2}, + "citizen.audit.queries": {0, 1, 2}, + }) + header := fetchHeader(4242) + + gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeFetchRequest(header, req)) + + if gotHeader.APIVersion != 12 { + t.Fatalf("apiVersion: want 12 got %d (double prefix garbles version)", gotHeader.APIVersion) + } + if gotHeader.CorrelationID != 4242 { + t.Fatalf("correlationID: want 4242 got %d", gotHeader.CorrelationID) + } + fr := parsedReq.(*kmsg.FetchRequest) + if len(fr.Topics) != 2 { + t.Fatalf("topics: want 2 got %d", len(fr.Topics)) + } + for _, ft := range fr.Topics { + if len(ft.Partitions) != 3 { + t.Fatalf("topic %q partitions: want 3 got %d", ft.Topic, len(ft.Partitions)) + } + } + }) + + t.Run("fetch_empty_partitions", func(t *testing.T) { + // An empty topic / no records still produces a well-formed fetch sub-request + // with zero partitions; it must frame singly too. This is the shape a fetch + // against an empty topic takes after groupFetchPartitionsByBroker filtering. + req := makeFetchRequestTyped(12, map[string][]int32{"events.er1_items": {}}) + header := fetchHeader(7) + + gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeFetchRequest(header, req)) + + if gotHeader.APIVersion != 12 { + t.Fatalf("apiVersion: want 12 got %d", gotHeader.APIVersion) + } + fr := parsedReq.(*kmsg.FetchRequest) + if len(fr.Topics) != 1 || len(fr.Topics[0].Partitions) != 0 { + t.Fatalf("want 1 topic with 0 partitions, got %+v", fr.Topics) + } + }) + + t.Run("fetch_multi_broker_fanout", func(t *testing.T) { + // Reproduce the canUseOriginal == false hot path: groupFetchPartitionsByBroker + // splits a fetch into one sub-request per owning broker, each of which is + // independently encodeFetchRequest'd and WriteFrame'd. With no router every + // partition lands under the round-robin group, so to exercise a genuine + // multi-broker split we build the sub-requests the same way the grouper does + // (one *kmsg.FetchRequest per broker, settings copied from the parent) and + // assert EACH sub-request frames singly and round-trips. + full := makeFetchRequestTyped(12, map[string][]int32{ + "events.er1_items": {0, 1, 2, 3}, + "citizen.audit.queries": {0, 1}, + }) + // Owner map mimicking three brokers owning disjoint partitions. + owner := func(topic string, part int32) string { + return []string{"broker-0", "broker-1", "broker-2"}[int(part)%3] + } + subReqs := splitFetchByOwnerForTest(full, owner) + if len(subReqs) != 3 { + t.Fatalf("expected 3 broker sub-requests, got %d", len(subReqs)) + } + + header := fetchHeader(31337) + encodings := make([][]byte, 0, len(subReqs)) + for addr, sub := range subReqs { + enc := encodeFetchRequest(header, sub) + encodings = append(encodings, enc) + gotHeader, parsedReq := assertSinglePrefixFrame(t, enc) + if gotHeader.APIVersion != 12 { + t.Fatalf("broker %s sub-request apiVersion: want 12 got %d", addr, gotHeader.APIVersion) + } + fr, ok := parsedReq.(*kmsg.FetchRequest) + if !ok { + t.Fatalf("broker %s: parsed %T, not *kmsg.FetchRequest", addr, parsedReq) + } + if len(fr.Topics) == 0 { + t.Fatalf("broker %s sub-request has no topics", addr) + } + } + + // No buffer aliasing on the fan-out hot path: each encodeFetchRequest calls + // AppendRequest(nil, ...) (a fresh allocation) and returns b[4:] (a sub-slice + // of that fresh buffer), so distinct sub-requests must not share backing + // storage. Distinct lengths or distinct backing arrays both prove this; we + // assert no two sub-request encodings alias the same backing array. + for i := 0; i < len(encodings); i++ { + for j := i + 1; j < len(encodings); j++ { + if len(encodings[i]) > 0 && len(encodings[j]) > 0 && + &encodings[i][0] == &encodings[j][0] { + t.Fatalf("fan-out buffer aliasing: sub-request %d and %d share backing array", i, j) + } + } + } + }) + + t.Run("produce", func(t *testing.T) { + header := &protocol.RequestHeader{ + APIKey: protocol.APIKeyProduce, + APIVersion: 9, + CorrelationID: 99, + ClientID: &clientID, + } + req := kmsg.NewPtrProduceRequest() + req.SetVersion(9) + req.TimeoutMillis = 1000 + for _, topicName := range []string{"events.er1_items", "citizen.audit.queries"} { + pt := kmsg.NewProduceRequestTopic() + pt.Topic = topicName + for p := int32(0); p < 2; p++ { + pp := kmsg.NewProduceRequestTopicPartition() + pp.Partition = p + pp.Records = []byte{0x01, 0x02, 0x03} + pt.Partitions = append(pt.Partitions, pp) + } + req.Topics = append(req.Topics, pt) + } + + gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeProduceRequest(header, req)) + + if gotHeader.APIKey != protocol.APIKeyProduce { + t.Fatalf("apiKey: want %d got %d (double prefix)", protocol.APIKeyProduce, gotHeader.APIKey) + } + if gotHeader.APIVersion != 9 { + t.Fatalf("apiVersion: want 9 got %d (double prefix garbles version)", gotHeader.APIVersion) + } + pr := parsedReq.(*kmsg.ProduceRequest) + if len(pr.Topics) != 2 { + t.Fatalf("topics: want 2 got %d", len(pr.Topics)) + } + if pr.Topics[1].Topic != "citizen.audit.queries" { + t.Fatalf("topic[1]: want citizen.audit.queries got %q", pr.Topics[1].Topic) + } + }) +} + +// splitFetchByOwnerForTest mirrors how groupFetchPartitionsByBroker builds one +// *kmsg.FetchRequest per owning broker (settings copied from the parent, topics +// and partitions distributed by owner). It is owner-map-driven so the test does +// not need a live etcd-backed PartitionRouter to exercise a multi-broker fan-out. +func splitFetchByOwnerForTest(req *kmsg.FetchRequest, owner func(topic string, part int32) string) map[string]*kmsg.FetchRequest { + groups := make(map[string]*kmsg.FetchRequest) + topicIdx := make(map[string]map[string]int) + for _, topic := range req.Topics { + for _, part := range topic.Partitions { + addr := owner(topic.Topic, part.Partition) + sub, ok := groups[addr] + if !ok { + sub = &kmsg.FetchRequest{ + Version: req.Version, + ReplicaID: req.ReplicaID, + MaxWaitMillis: req.MaxWaitMillis, + MinBytes: req.MinBytes, + MaxBytes: req.MaxBytes, + IsolationLevel: req.IsolationLevel, + SessionID: req.SessionID, + SessionEpoch: req.SessionEpoch, + } + groups[addr] = sub + topicIdx[addr] = make(map[string]int) + } + idx, ok := topicIdx[addr][topic.Topic] + if !ok { + idx = len(sub.Topics) + st := kmsg.NewFetchRequestTopic() + st.Topic = topic.Topic + sub.Topics = append(sub.Topics, st) + topicIdx[addr][topic.Topic] = idx + } + sub.Topics[idx].Partitions = append(sub.Topics[idx].Partitions, part) + } + } + return groups +}