[CORE-16136]: Kafka RPC: Minor improvements geared toward Schema Registry use case#30255
[CORE-16136]: Kafka RPC: Minor improvements geared toward Schema Registry use case#30255
Conversation
a874b54 to
6abd7f0
Compare
There was a problem hiding this comment.
Pull request overview
This PR enhances the internal Kafka data RPC client/service to better support Schema Registry-style workflows by returning produce offsets, adding single-partition offset lookup, and introducing term-aware leader-mitigating retries (also applied to consume).
Changes:
- Extend produce responses to optionally include
base_offset/last_offsetand plumb them through client/service. - Add leader-mitigating retry policy built on leader+term snapshots and leadership-change notifications.
- Add
get_single_partition_offsetsand apply leader mitigation toconsume; update/extend unit tests accordingly.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/kafka/data/rpc/client.h | Adds produce_result, new client APIs, and retry helper declarations. |
| src/v/kafka/data/rpc/client.cc | Implements leader-mitigating retry policy, produce_with_leader_mitigation, get_single_partition_offsets, and leader-mitigated consume. |
| src/v/kafka/data/rpc/deps.h | Extends partition_leader_cache interface with leader+term lookup and term-change wait API. |
| src/v/kafka/data/rpc/deps.cc | Implements term-change waiting via partition leader table notifications. |
| src/v/kafka/data/rpc/service.h | Documents produce overload behavior (offset semantics). |
| src/v/kafka/data/rpc/service.cc | Computes base/last offsets for produce results and returns them to callers. |
| src/v/kafka/data/rpc/serde.h | Bumps kafka_topic_data_result serde version and adds optional offset fields. |
| src/v/kafka/data/rpc/serde.cc | Updates formatting to include optional offsets in logs/debug output. |
| src/v/kafka/data/rpc/BUILD | Adds utils:expiring_promise dependency for term-change waiting. |
| src/v/kafka/data/rpc/test/deps.h | Updates in-memory proxy offset behavior; extends fake leader caches for new interface. |
| src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc | Updates offset expectations and adds tests for offset-returning produce. |
| src/v/cluster_link/tests/deps.h | Updates test fakes to satisfy the expanded partition_leader_cache interface. |
Comments suppressed due to low confidence (1)
src/v/kafka/data/rpc/test/deps.h:167
in_memory_proxy::replicateadvances the next base offset by+1per batch (offset++), butrecord_batch::last_offset()depends onlast_offset_delta(record count). If a single replicate() call contains multiple batches withrecord_count()>1, subsequent batches will get overlapping offsets and the returned last_offset/HWM will be inconsistent. Consider advancingoffsetby the batch’s record count (orbatch.last_offset_delta() + 1) after assigningbase_offsetto better mirror the real replicate batcher behavior.
auto offset = model::next_offset(latest_offset());
for (const auto& batch : batches) {
auto b = batch.copy();
b.header().base_offset = offset++;
_produced_batches->emplace_back(_ntp, std::move(b));
}
co_return _produced_batches->back().batch.last_offset();
46988d3 to
21e1598
Compare
Review summaryNice refactor — pulling retry into a policy trait is cleaner than the old inline switch, and the Potentially real bugs
Behavior changes worth calling out in the PR description
Smaller things
Things I liked
The serde version bump (0→1, compat 0) looks correct — new fields at the tail, nullopt defaults for v0 readers. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
src/v/kafka/data/rpc/test/deps.h:167
in_memory_proxy::replicateincrementsoffsetby 1 per batch, but Kafka/raft offsets are per record. Ifbatchescontains multiple record_batches withrecord_count() > 1, this will assign overlapping/non-contiguous offsets andlast_offset()will not match the total record count. Consider advancingoffsetby each batch'srecord_count()(or bymodel::next_offset(b.last_offset())) after settingbase_offsetto keep offsets contiguous across all records in the replicate call.
auto offset = model::next_offset(latest_offset());
for (const auto& batch : batches) {
auto b = batch.copy();
b.header().base_offset = offset++;
_produced_batches->emplace_back(_ntp, std::move(b));
}
co_return _produced_batches->back().batch.last_offset();
11d0884 to
eee8a6e
Compare
Review summaryNice refactor of Higher-level observations
Minor / nits
TestsGood coverage of the new behaviors (single/multi-record produce, retries via error injection, Overall direction looks good — main asks are documenting the |
Retry command for Build#83595please wait until all jobs are finished before running the slash command |
CI test resultstest results on build#83595
test results on build#83608
test results on build#83654
test results on build#83663
|
f18d1e6 to
ecd3db7
Compare
ecd3db7 to
e71eb75
Compare
dotnwat
left a comment
There was a problem hiding this comment.
yeh lgtm! just a few questions
| /// Leadership-aware policy: on not_leader, waits for a leadership change | ||
| /// notification using a term snapshot taken before the attempt. Falls back | ||
| /// to backoff on timeout errors. |
There was a problem hiding this comment.
what i'm a little confused about here is why we are doing anything with terms when we have a mechanism for receiving updates on leadership change (i think this was used in the previous commit).
There was a problem hiding this comment.
the scenario I observed is something like:
- get leader from table
- dispatch request to leader
- request returns not_leader for whatever reason
- backoff sleep
- while we're sleeping, term advances
- needlessly subscribe to leadership change
so the idea is to grab the current term before submitting the request, and if we get not_leader then we say "wait for a leadership change unless the term has already advanced". Maybe there's a simpler way to approach it.
There was a problem hiding this comment.
the part i'm struggling with is that i'm not exactly sure what we are trying to optimize here. if we get a not_leader, then we use a backoff loop to poll the leader again. but if it keeps getting the same answer from the leader table, then its wasted work?
There was a problem hiding this comment.
nuked. still uses a similar retry policy for non_leader errors specifically (longer fixed polling on non_leader error, fallback to exp backoff on anything else) on the request that needed them.
but as discussed offline the leader notification w/ fixed timeout from the local table is functionally no different from polling.
2086f48 to
dc81d59
Compare
Retry command for Build#83663please wait until all jobs are finished before running the slash command |
|
/ci-repeat 1 |
Also adds partition_operation_failed to list of retriable errors Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
dc81d59 to
08c11e8
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
src/v/kafka/data/rpc/test/deps.h:166
- in_memory_proxy::replicate() assigns base offsets by doing offset++ per batch. This breaks Kafka offset semantics when a replicate call contains multiple batches and any batch has record_count() > 1: subsequent batches can overlap earlier batches’ record offsets and HWM/LSO become inconsistent. Update the offset advancement to account for each batch’s last_offset (or record_count) so the next batch starts at model::next_offset(prev_last_offset).
auto offset = model::next_offset(latest_offset());
for (const auto& batch : batches) {
auto b = batch.copy();
b.header().base_offset = offset++;
_produced_batches->emplace_back(_ntp, std::move(b));
}
08c11e8 to
a048a05
Compare
Retry wrapper that backs off then waits for a leadership change notification. Useful in situations like servicing schema registry requests, where burning through rpc retries too quickly means failing the request altogether. This is a rather crude approximation of the metadata refresh baked into the kafka client. Potentially could be improved to be more proactive, but that's a larger change. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Required for usage in SR Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
- Retry with leader mitication - Includes the produced batch offsets in the response - Operates on a single batch Geared toward the write pattern in schema registry Also updates in-memory partition proxy implementation (rpc/test/deps) to serve "real" high watermark and LSO based on whatever's been produced. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Reliable version that tries to mitigate not_leader errors Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
a048a05 to
97ad3b9
Compare
Extends kafka::data::rpc::client with leader-mitigating retry behavior and
single-batch/single-partition APIs for the Schema Registry use case — latency-sensitive
callers that can't afford to burn RPC retries against stale leader caches.
get_single_partition_offsets. consume gets the same leader-mitigating retry.
drive the not_leader → election → recovered scenario deterministically.
Backports Required
Release Notes