Skip to content

[CORE-16136]: Kafka RPC: Minor improvements geared toward Schema Registry use case#30255

Open
oleiman wants to merge 7 commits intodevfrom
kdrpc/noticket/tweaks-for-sr-use
Open

[CORE-16136]: Kafka RPC: Minor improvements geared toward Schema Registry use case#30255
oleiman wants to merge 7 commits intodevfrom
kdrpc/noticket/tweaks-for-sr-use

Conversation

@oleiman
Copy link
Copy Markdown
Member

@oleiman oleiman commented Apr 22, 2026

Extends kafka::data::rpc::client with leader-mitigating retry behavior and
single-batch/single-partition APIs for the Schema Registry use case — latency-sensitive
callers that can't afford to burn RPC retries against stale leader caches.

  • Produce RPC now returns base/last offsets of replicated batches.
  • New partition_leader_cache primitives: get_leader_term, wait_for_term_change.
  • New leader_mitigating_retry_policy: on not_leader, poll with longer fixed backoff to let the metadata refresh; falls back to plain backoff otherwise.
  • New client APIs: produce_with_leader_mitigation (single batch → produce_result) and
    get_single_partition_offsets. consume gets the same leader-mitigating retry.
  • Test fake gains per-ntp term tracking + a set_recovery_callback hook so recovery tests
    drive the not_leader → election → recovered scenario deterministically.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v26.1.x
  • v25.3.x
  • v25.2.x

Release Notes

  • none

@oleiman oleiman self-assigned this Apr 22, 2026
@oleiman oleiman force-pushed the kdrpc/noticket/tweaks-for-sr-use branch from a874b54 to 6abd7f0 Compare April 22, 2026 23:35
@oleiman oleiman added the claude-review Adding this label to a PR will trigger a workflow to review the code using claude. label Apr 22, 2026
@oleiman oleiman requested a review from Copilot April 22, 2026 23:37
@oleiman oleiman mentioned this pull request Apr 22, 2026
7 tasks
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances the internal Kafka data RPC client/service to better support Schema Registry-style workflows by returning produce offsets, adding single-partition offset lookup, and introducing term-aware leader-mitigating retries (also applied to consume).

Changes:

  • Extend produce responses to optionally include base_offset/last_offset and plumb them through client/service.
  • Add leader-mitigating retry policy built on leader+term snapshots and leadership-change notifications.
  • Add get_single_partition_offsets and apply leader mitigation to consume; update/extend unit tests accordingly.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/v/kafka/data/rpc/client.h Adds produce_result, new client APIs, and retry helper declarations.
src/v/kafka/data/rpc/client.cc Implements leader-mitigating retry policy, produce_with_leader_mitigation, get_single_partition_offsets, and leader-mitigated consume.
src/v/kafka/data/rpc/deps.h Extends partition_leader_cache interface with leader+term lookup and term-change wait API.
src/v/kafka/data/rpc/deps.cc Implements term-change waiting via partition leader table notifications.
src/v/kafka/data/rpc/service.h Documents produce overload behavior (offset semantics).
src/v/kafka/data/rpc/service.cc Computes base/last offsets for produce results and returns them to callers.
src/v/kafka/data/rpc/serde.h Bumps kafka_topic_data_result serde version and adds optional offset fields.
src/v/kafka/data/rpc/serde.cc Updates formatting to include optional offsets in logs/debug output.
src/v/kafka/data/rpc/BUILD Adds utils:expiring_promise dependency for term-change waiting.
src/v/kafka/data/rpc/test/deps.h Updates in-memory proxy offset behavior; extends fake leader caches for new interface.
src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc Updates offset expectations and adds tests for offset-returning produce.
src/v/cluster_link/tests/deps.h Updates test fakes to satisfy the expanded partition_leader_cache interface.
Comments suppressed due to low confidence (1)

src/v/kafka/data/rpc/test/deps.h:167

  • in_memory_proxy::replicate advances the next base offset by +1 per batch (offset++), but record_batch::last_offset() depends on last_offset_delta (record count). If a single replicate() call contains multiple batches with record_count()>1, subsequent batches will get overlapping offsets and the returned last_offset/HWM will be inconsistent. Consider advancing offset by the batch’s record count (or batch.last_offset_delta() + 1) after assigning base_offset to better mirror the real replicate batcher behavior.
        auto offset = model::next_offset(latest_offset());
        for (const auto& batch : batches) {
            auto b = batch.copy();
            b.header().base_offset = offset++;
            _produced_batches->emplace_back(_ntp, std::move(b));
        }
        co_return _produced_batches->back().batch.last_offset();

Comment thread src/v/kafka/data/rpc/client.h Outdated
Comment thread src/v/kafka/data/rpc/client.cc
@oleiman oleiman force-pushed the kdrpc/noticket/tweaks-for-sr-use branch 2 times, most recently from 46988d3 to 21e1598 Compare April 23, 2026 03:29
@oleiman oleiman added claude-review Adding this label to a PR will trigger a workflow to review the code using claude. and removed claude-review Adding this label to a PR will trigger a workflow to review the code using claude. labels Apr 23, 2026
@oleiman oleiman requested a review from Copilot April 23, 2026 03:37
Comment thread src/v/kafka/data/rpc/client.cc Outdated
Comment thread src/v/kafka/data/rpc/service.cc
@claude
Copy link
Copy Markdown

claude Bot commented Apr 23, 2026

Review summary

Nice refactor — pulling retry into a policy trait is cleaner than the old inline switch, and the leader_mitigating_retry_policy design (register-then-check to close the notification race, per-attempt term snapshot) is sound. The test fake additions (arm_leader_mitigation, recorded_stale_terms) are a thoughtful way to cover the mitigation path deterministically.

Potentially real bugs

  1. partition_operation_failed in consume is swallowed (client.cc:243 + the two consume branches). The retry switch now treats partition_operation_failed as retriable, but only not_leader/timeout in reply.err get translated into an error result. A local-service reply with err == partition_operation_failed comes back as a success result<consume_reply, cluster::errc>, so extract_errc sees success and the retry never fires. Produce works because produce_result is picked up directly by ResponseWithErrorCode; consume is inconsistent.

  2. Exception-path mitigation is always treated as timeout (client.cc:222). A reset connection to the old leader is one of the most common "leader changed" signals, and under the new policy it takes the blind-backoff branch instead of wait_for_term_change. For the SR use case this PR is aimed at, mapping ::rpc::errc::disconnected_endpoint / connection_timeout to not_leader at this call site would make the mitigation actually fire where it matters most.

  3. Abort-source path not handled in wait_for_term_change (deps.cc). If the abort fires during the wait, expiring_promise surfaces ss::abort_requested_exception, which escapes through mitigate and the public consume/produce APIs during shutdown. Either catch it or document it.

Behavior changes worth calling out in the PR description

  • consume now returns error(errc) where it previously returned success(consume_reply{err=...}) after retry exhaustion. Any caller unpacking .value() to inspect the error will break. Please grep call sites.
  • retry_with_backoff no longer sleeps before attempt initial readme for oss #1. Applies to every existing caller of retry, not just the new leader-mitigating variant. Probably a win, but it's a steady-state latency change.

Smaller things

  • total_records in service.cc should be int64_t — you're summing int32_t record counts across all batches.
  • do_get_single_partition_offsets_once returns not_leader when the server's response doesn't contain the requested partition (client.cc:394/398). That path is a contract violation, not a leadership issue, and not_leader is retriable — a misbehaving server will burn the full retry budget. partition_operation_failed or vassert would be more honest.
  • The set_recovery_callback stub in cluster_link/tests/deps.h is dead code (not virtual, not overriding, unused by cluster_link tests).
  • The "freshest possible state" comment in leader_mitigating_retry_policy::mitigate overstates the behavior: we always pay the backoff first. Consider skipping the sleep when the term has already advanced.

Things I liked

  • client::retry now holds _gate around the retry loop — closes a real shutdown-ordering hole.
  • The docstring on prepare() explaining the intentional TOCTOU between snapshot and dispatch is exactly the kind of comment that's worth writing.
  • Test coverage for the mitigation path is solid, including the stale-term-sequence invariant assertion.

The serde version bump (0→1, compat 0) looks correct — new fields at the tail, nullopt defaults for v0 readers.

Comment thread src/v/kafka/data/rpc/client.cc Outdated
Comment thread src/v/kafka/data/rpc/client.cc
Comment thread src/v/kafka/data/rpc/client.cc
Comment thread src/v/kafka/data/rpc/client.cc
Comment thread src/v/kafka/data/rpc/client.cc
Comment thread src/v/kafka/data/rpc/deps.cc Outdated
Comment thread src/v/cluster_link/tests/deps.h Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

src/v/kafka/data/rpc/test/deps.h:167

  • in_memory_proxy::replicate increments offset by 1 per batch, but Kafka/raft offsets are per record. If batches contains multiple record_batches with record_count() > 1, this will assign overlapping/non-contiguous offsets and last_offset() will not match the total record count. Consider advancing offset by each batch's record_count() (or by model::next_offset(b.last_offset())) after setting base_offset to keep offsets contiguous across all records in the replicate call.
        auto offset = model::next_offset(latest_offset());
        for (const auto& batch : batches) {
            auto b = batch.copy();
            b.header().base_offset = offset++;
            _produced_batches->emplace_back(_ntp, std::move(b));
        }
        co_return _produced_batches->back().batch.last_offset();

Comment thread src/v/kafka/data/rpc/service.cc Outdated
Comment thread src/v/kafka/data/rpc/client.cc
Comment thread src/v/kafka/data/rpc/client.cc Outdated
@oleiman oleiman changed the title Kafka RPC: Minor improvements geared toward Schema Registry use case [CORE-16136]: Kafka RPC: Minor improvements geared toward Schema Registry use case Apr 23, 2026
@oleiman oleiman force-pushed the kdrpc/noticket/tweaks-for-sr-use branch 2 times, most recently from 11d0884 to eee8a6e Compare April 23, 2026 05:18
@oleiman oleiman marked this pull request as ready for review April 23, 2026 05:22
Comment thread src/v/kafka/data/rpc/deps.cc Outdated
Comment thread src/v/kafka/data/rpc/service.cc Outdated
@claude
Copy link
Copy Markdown

claude Bot commented Apr 23, 2026

Review summary

Nice refactor of retry_with_backoff into a RetryPolicy concept — the split between backoff_retry_policy and leader_mitigating_retry_policy is clean, the snapshot-per-attempt invariant is well-reasoned, and the test fake's set_recovery_callback / recorded_stale_terms hooks drive deterministic tests rather than timing-dependent ones. The version bump on kafka_topic_data_result keeps compat_version=0, so wire compatibility is preserved.

Higher-level observations

  • Behavioral change for client::consume: previously one-shot, now retried with leader mitigation (up to ~20s worst-case). Existing non-SR callers inherit this. Also, retriable errors that were previously surfaced inside consume_reply.err now become result<>::error — callers inspecting reply.err for those codes will miss them. (Inline comment on client.cc:613-626.)

  • Worst-case latency: max_client_retries=5 × mitigation_timeout=5s = up to ~20s of blocking per call. For "latency-sensitive callers" this may still be too long; consider plumbing an overall deadline through the retry loop, or a shorter default mitigation_timeout. (Inline comment on client.cc:33.)

  • Exception path skips leadership wait: in retry_with_backoff, failed futures always call mitigate(cluster::errc::timeout) — so raft/rpc exceptions caused by a leader change never take the wait_for_term_change path. Worth classifying the exception or documenting the intent. (Inline comment on client.cc:162.)

  • produce_result nullopt semantics: base_offset/last_offset can be nullopt not just on error but also when responding from a v0 peer during a rolling upgrade. SR callers must handle "success but no offsets". Worth documenting. (Inline comment on client.h:25-28.)

  • Derived base_offset arithmetic: last_offset - (total_records - 1) assumes contiguous offsets across all batches in a request. Robust only if the replication layer never inserts gaps. Ideally the actual base offset should come from raft instead of being derived. (Inline comment on service.cc:271.)

  • Offsets response missing a partition entry is mapped to not_leader: this triggers a full wait_for_term_change (up to 5s) on a response shape mismatch. Suggest a different errc. (Inline comment on client.cc:597-604.)

  • Notification callback safety in wait_for_term_change: expiring_promise::set_value is idempotent (good), and lw_shared_ptr<promise> keeps the promise alive past the caller. Confirm register_leadership_change_notification fires only on the same shard; cross-shard callbacks would be unsafe here. (Inline comment on deps.cc:68-75.)

Minor / nits

  • retry now holds _gate.hold() before invoking the loop — good catch, this was needed for orderly shutdown.
  • do_produce_once's trace log fix (was logging req in the reply line, now reply) is a nice drive-by.
  • The ResponseWithErrorCode concept now uses cluster::errc& instead of cluster::errc. If the old form was failing silently (concept never satisfied), this effectively enables a new code path — worth a sanity check that no callers relied on it being unreachable.
  • fake_partition_leader_cache::recorded_stale_terms returning a static empty vector by const-ref is fine for tests but standard hygiene is returning by value or optional<const vector&>; not worth blocking.

Tests

Good coverage of the new behaviors (single/multi-record produce, retries via error injection, set_recovery_callback driving the not_leader → election path, stale-term snapshot invariant). The test fake's wait_for_term_change semantics mirror the production cache reasonably well.

Overall direction looks good — main asks are documenting the produce_result nullopt contract for SR callers, tightening the error semantics in do_get_single_partition_offsets_once, and confirming the downstream impact of making consume retry by default.

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

Retry command for Build#83595

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_with_restart@{"storage_mode":"cloud"}

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented Apr 23, 2026

CI test results

test results on build#83595
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "storage_mode": "cloud", "with_failures": true} integration https://buildkite.com/redpanda/redpanda/builds/83595#019dbb9c-617b-47e5-8cf7-b4412d01cbb0 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0012, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
FLAKY(PASS) ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "storage_mode": "tiered_cloud", "with_failures": true} integration https://buildkite.com/redpanda/redpanda/builds/83595#019dbb9c-617e-4228-bd75-7f5607bfb8ce 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0012, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
FLAKY(FAIL) ShadowLinkingReplicationTests test_with_restart {"storage_mode": "cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83595#019dbb9c-617b-47e5-8cf7-b4412d01cbb0 5/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0152, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_with_restart
test results on build#83608
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "storage_mode": "local", "with_failures": true} integration https://buildkite.com/redpanda/redpanda/builds/83608#019dbcd5-8cf1-40e8-89d4-6832bc26d288 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0012, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
FLAKY(PASS) ShadowLinkingReplicationTests test_with_restart {"storage_mode": "cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83608#019dbcd8-8a53-4e26-93af-827391f9e494 28/31 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0166, p0=0.0887, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.4114, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_with_restart
FLAKY(PASS) WriteCachingFailureInjectionE2ETest test_crash_all {"use_transactions": false} integration https://buildkite.com/redpanda/redpanda/builds/83608#019dbcd8-8a50-4a47-84d0-b1e48f277da0 25/31 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0807, p0=0.0898, reject_threshold=0.0100. adj_baseline=0.2230, p1=0.3121, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=WriteCachingFailureInjectionE2ETest&test_method=test_crash_all
test results on build#83654
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) ShadowLinkingReplicationTests test_with_restart {"storage_mode": "cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83654#019dc11a-b76c-4aa3-a1fc-83fcbdf40eb7 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0214, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_with_restart
FLAKY(PASS) ShadowLinkingReplicationTests test_with_restart {"storage_mode": "tiered_cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83654#019dc11a-b768-4c00-9872-e53211362be6 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0214, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_with_restart
test results on build#83663
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(FAIL) ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "redpanda"}, "storage_mode": "cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83663#019dc27c-e459-4107-9d41-2b90eb505d4c 9/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic

@oleiman oleiman force-pushed the kdrpc/noticket/tweaks-for-sr-use branch from f18d1e6 to ecd3db7 Compare April 23, 2026 23:56
@oleiman oleiman marked this pull request as draft April 24, 2026 08:23
@oleiman oleiman force-pushed the kdrpc/noticket/tweaks-for-sr-use branch from ecd3db7 to e71eb75 Compare April 24, 2026 19:48
@oleiman oleiman marked this pull request as ready for review April 24, 2026 19:48
Copy link
Copy Markdown
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeh lgtm! just a few questions

Comment thread src/v/kafka/data/rpc/deps.cc Outdated
Comment thread src/v/kafka/data/rpc/client.cc Outdated
Comment thread src/v/kafka/data/rpc/client.cc Outdated
Comment on lines +91 to +93
/// Leadership-aware policy: on not_leader, waits for a leadership change
/// notification using a term snapshot taken before the attempt. Falls back
/// to backoff on timeout errors.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what i'm a little confused about here is why we are doing anything with terms when we have a mechanism for receiving updates on leadership change (i think this was used in the previous commit).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scenario I observed is something like:

  • get leader from table
  • dispatch request to leader
  • request returns not_leader for whatever reason
  • backoff sleep
    • while we're sleeping, term advances
  • needlessly subscribe to leadership change

so the idea is to grab the current term before submitting the request, and if we get not_leader then we say "wait for a leadership change unless the term has already advanced". Maybe there's a simpler way to approach it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the part i'm struggling with is that i'm not exactly sure what we are trying to optimize here. if we get a not_leader, then we use a backoff loop to poll the leader again. but if it keeps getting the same answer from the leader table, then its wasted work?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nuked. still uses a similar retry policy for non_leader errors specifically (longer fixed polling on non_leader error, fallback to exp backoff on anything else) on the request that needed them.

but as discussed offline the leader notification w/ fixed timeout from the local table is functionally no different from polling.

Comment thread src/v/kafka/data/rpc/client.cc
@oleiman oleiman force-pushed the kdrpc/noticket/tweaks-for-sr-use branch 2 times, most recently from 2086f48 to dc81d59 Compare April 25, 2026 02:11
@oleiman oleiman requested a review from dotnwat April 25, 2026 02:20
@vbotbuildovich
Copy link
Copy Markdown
Collaborator

Retry command for Build#83663

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_replication_basic@{"shuffle_leadership":true,"source_cluster_spec":{"cluster_type":"redpanda"},"storage_mode":"cloud"}

@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 25, 2026

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_replication_basic@{"shuffle_leadership":true,"source_cluster_spec":{"cluster_type":"redpanda"},"storage_mode":"cloud"}

Also adds partition_operation_failed to list of retriable errors

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
@oleiman oleiman force-pushed the kdrpc/noticket/tweaks-for-sr-use branch from dc81d59 to 08c11e8 Compare April 28, 2026 05:25
@oleiman oleiman requested a review from Copilot April 28, 2026 05:28
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Comments suppressed due to low confidence (1)

src/v/kafka/data/rpc/test/deps.h:166

  • in_memory_proxy::replicate() assigns base offsets by doing offset++ per batch. This breaks Kafka offset semantics when a replicate call contains multiple batches and any batch has record_count() > 1: subsequent batches can overlap earlier batches’ record offsets and HWM/LSO become inconsistent. Update the offset advancement to account for each batch’s last_offset (or record_count) so the next batch starts at model::next_offset(prev_last_offset).
        auto offset = model::next_offset(latest_offset());
        for (const auto& batch : batches) {
            auto b = batch.copy();
            b.header().base_offset = offset++;
            _produced_batches->emplace_back(_ntp, std::move(b));
        }

Comment thread src/v/kafka/data/rpc/client.cc
Comment thread src/v/kafka/data/rpc/client.cc Outdated
Comment thread src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc
Comment thread src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc
@oleiman oleiman force-pushed the kdrpc/noticket/tweaks-for-sr-use branch from 08c11e8 to a048a05 Compare April 28, 2026 05:41
oleiman added 5 commits April 27, 2026 23:12
Retry wrapper that backs off then waits for a leadership change notification.
Useful in situations like servicing schema registry requests, where burning
through rpc retries too quickly means failing the request altogether.

This is a rather crude approximation of the metadata refresh baked into the
kafka client. Potentially could be improved to be more proactive, but that's
a larger change.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Required for usage in SR

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
- Retry with leader mitication
- Includes the produced batch offsets in the response
- Operates on a single batch

Geared toward the write pattern in schema registry

Also updates in-memory partition proxy implementation (rpc/test/deps) to
serve "real" high watermark and LSO based on whatever's been produced.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Reliable version that tries to mitigate not_leader errors

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
@oleiman oleiman force-pushed the kdrpc/noticket/tweaks-for-sr-use branch from a048a05 to 97ad3b9 Compare April 28, 2026 06:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/build area/redpanda claude-review Adding this label to a PR will trigger a workflow to review the code using claude.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants