From 2eba6aaf2d097d36f0396fbee641da2ac56227e1 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Wed, 4 Mar 2026 16:59:09 -0800 Subject: [PATCH 1/7] k/d/rpc: Hold the gate open during client retry loop Signed-off-by: Oren Leiman --- src/v/kafka/data/rpc/client.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/data/rpc/client.cc b/src/v/kafka/data/rpc/client.cc index e9ccad996bdb1..e813a6e05efcd 100644 --- a/src/v/kafka/data/rpc/client.cc +++ b/src/v/kafka/data/rpc/client.cc @@ -147,7 +147,8 @@ client::client( template std::invoke_result_t client::retry(Func&& func) { - return retry_with_backoff(std::forward(func), &_as); + auto holder = _gate.hold(); + co_return co_await retry_with_backoff(std::forward(func), &_as); } ss::future client::produce( From c2be22f912c905c18dccb1dd47541b40aa50bab4 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Sat, 14 Mar 2026 02:18:08 -0700 Subject: [PATCH 2/7] k/d/rpc/client: Extend retry_with_backoff w/ RetryPolicy Also adds partition_operation_failed to list of retriable errors Signed-off-by: Oren Leiman --- src/v/kafka/data/rpc/client.cc | 80 ++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 24 deletions(-) diff --git a/src/v/kafka/data/rpc/client.cc b/src/v/kafka/data/rpc/client.cc index e813a6e05efcd..c2ffb6f7f0f88 100644 --- a/src/v/kafka/data/rpc/client.cc +++ b/src/v/kafka/data/rpc/client.cc @@ -30,60 +30,91 @@ namespace kafka::data::rpc { namespace { constexpr auto timeout = std::chrono::seconds(1); constexpr int max_client_retries = 5; +constexpr auto base_backoff_duration = 100ms; +constexpr auto max_backoff_duration = base_backoff_duration + * max_client_retries; template concept ResponseWithErrorCode = requires(T resp) { { resp.ec } -> std::same_as; }; -template -std::invoke_result_t retry_with_backoff(Func func, ss::abort_source* as) { - constexpr auto base_backoff_duration = 100ms; - constexpr auto max_backoff_duration = base_backoff_duration - * max_client_retries; - auto backoff = ::make_exponential_backoff_policy( +/// Extracts the cluster::errc from a response of any supported type. +template +cluster::errc extract_errc(const T& r) { + if constexpr (std::is_same_v) { + return r; + } else if constexpr (outcome::is_basic_result_v) { + return r.has_error() ? r.error() : cluster::errc::success; + } else if constexpr (ResponseWithErrorCode) { + return r.ec; + } else { + static_assert( + base::unsupported_type::value, "unsupported response type"); + } +} + +/// Policy for retry_with_backoff: controls what happens before each attempt +/// (prepare_for_request) and how retriable errors are mitigated between +/// retries. +template +concept RetryPolicy = requires(T ctx, cluster::errc ec) { + { ctx.prepare_for_request() } -> std::same_as; + { ctx.handle_error(ec) } -> std::same_as>; + { ctx.as() } -> std::same_as; +}; + +/// Default policy: blind exponential backoff on retriable errors. +struct backoff_retry_policy { + ss::abort_source& _as; + backoff_policy _backoff = make_exponential_backoff_policy( base_backoff_duration, max_backoff_duration); + + explicit backoff_retry_policy(ss::abort_source& as) + : _as(as) {} + + void prepare_for_request() {} + + ss::future<> handle_error(cluster::errc) { + auto dur = _backoff.current_backoff_duration(); + _backoff.next_backoff(); + return ss::sleep_abortable(dur, _as); + } + + ss::abort_source& as() { return _as; } +}; + +template +std::invoke_result_t retry_with_backoff(Func func, Policy policy) { int attempts = 0; while (true) { ++attempts; - co_await ss::sleep_abortable( - backoff.current_backoff_duration(), *as); + policy.prepare_for_request(); using result_type = ss::futurize>::value_type; auto fut = co_await ss::coroutine::as_future( ss::futurize_invoke(func)); - backoff.next_backoff(); if (fut.failed()) { if (attempts >= max_client_retries) { co_return co_await std::move(fut); } auto ex = fut.get_exception(); vlog(log.debug, "Retrying after error: {}", ex); + co_await policy.handle_error(cluster::errc::timeout); continue; } result_type r = fut.get(); - cluster::errc ec = cluster::errc::success; - if constexpr (std::is_same_v) { - ec = r; - } else if constexpr (outcome::is_basic_result_v) { - ec = r.has_error() ? r.error() : cluster::errc::success; - } else if constexpr (ResponseWithErrorCode) { - ec = r.ec; - } else { - static_assert( - base::unsupported_type::value, - "unsupported response type"); - } + auto ec = extract_errc(r); switch (ec) { case cluster::errc::not_leader: case cluster::errc::timeout: - // We've ran out of retries, return our error + case cluster::errc::partition_operation_failed: if (attempts >= max_client_retries) { co_return r; } + co_await policy.handle_error(ec); break; case cluster::errc::success: - // Don't retry arbitrary error codes. default: co_return r; } @@ -148,7 +179,8 @@ client::client( template std::invoke_result_t client::retry(Func&& func) { auto holder = _gate.hold(); - co_return co_await retry_with_backoff(std::forward(func), &_as); + co_return co_await retry_with_backoff( + std::forward(func), backoff_retry_policy{_as}); } ss::future client::produce( From 780732f75cf39ae52b4be595f7e21f6abb4151bd Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Sat, 14 Mar 2026 02:28:18 -0700 Subject: [PATCH 3/7] k/d/rpc/client: Introduce retry_with_leadership_mitigation Retry wrapper that backs off then waits for a leadership change notification. Useful in situations like servicing schema registry requests, where burning through rpc retries too quickly means failing the request altogether. This is a rather crude approximation of the metadata refresh baked into the kafka client. Potentially could be improved to be more proactive, but that's a larger change. Signed-off-by: Oren Leiman --- src/v/kafka/data/rpc/BUILD | 1 + src/v/kafka/data/rpc/client.cc | 44 ++++++++++++++++++++++++++++++++++ src/v/kafka/data/rpc/client.h | 3 +++ 3 files changed, 48 insertions(+) diff --git a/src/v/kafka/data/rpc/BUILD b/src/v/kafka/data/rpc/BUILD index 5a78e3b24a563..4d3f4e75b816e 100644 --- a/src/v/kafka/data/rpc/BUILD +++ b/src/v/kafka/data/rpc/BUILD @@ -50,6 +50,7 @@ redpanda_cc_library( "//src/v/kafka/data:partition_proxy", "//src/v/model", "//src/v/raft", + "//src/v/random:time_jitter", "//src/v/rpc", "//src/v/serde", "//src/v/serde:map", diff --git a/src/v/kafka/data/rpc/client.cc b/src/v/kafka/data/rpc/client.cc index c2ffb6f7f0f88..3d391bccf7d74 100644 --- a/src/v/kafka/data/rpc/client.cc +++ b/src/v/kafka/data/rpc/client.cc @@ -13,6 +13,7 @@ #include "kafka/data/rpc/rpc_service.h" #include "logger.h" +#include "random/simple_time_jitter.h" #include "rpc/connection_cache.h" #include "ssx/async_algorithm.h" #include "utils/backoff_policy.h" @@ -33,6 +34,11 @@ constexpr int max_client_retries = 5; constexpr auto base_backoff_duration = 100ms; constexpr auto max_backoff_duration = base_backoff_duration * max_client_retries; +/// Per-attempt sleep applied on not_leader mitigations: gives enough time +/// to propagate a new leader between attempts. Sized so 4 mitigations stay +/// safely under typical RPC caller deadlines (e.g. SR's 10s HTTP timeout). +constexpr auto mitigation_interval = std::chrono::seconds(2); +constexpr auto mitigation_jitter = 500ms; template concept ResponseWithErrorCode = requires(T resp) { @@ -84,6 +90,37 @@ struct backoff_retry_policy { ss::abort_source& as() { return _as; } }; +/// Retry policy for callers that need to ride out leadership churn (e.g. +/// schema registry RPCs). Behaves like backoff_retry_policy on transient +/// errors, but additionally sleeps a jittered interval on not_leader so +/// leadership has time to stabilize before the next attempt — without +/// burning through the retry budget faster than the local leader cache +/// gets refreshed. +struct leader_mitigating_retry_policy { + ss::abort_source& _as; + backoff_policy _backoff = make_exponential_backoff_policy( + base_backoff_duration, max_backoff_duration); + simple_time_jitter _interval{ + mitigation_interval, mitigation_jitter}; + + explicit leader_mitigating_retry_policy(ss::abort_source& as) + : _as(as) {} + + void prepare_for_request() {} + + ss::future<> handle_error(cluster::errc ec) { + auto dur = _backoff.current_backoff_duration(); + _backoff.next_backoff(); + co_await ss::sleep_abortable(dur, _as); + if (ec == cluster::errc::not_leader) { + co_await ss::sleep_abortable( + _interval.next_duration(), _as); + } + } + + ss::abort_source& as() { return _as; } +}; + template std::invoke_result_t retry_with_backoff(Func func, Policy policy) { int attempts = 0; @@ -183,6 +220,13 @@ std::invoke_result_t client::retry(Func&& func) { std::forward(func), backoff_retry_policy{_as}); } +template +std::invoke_result_t client::retry_with_leader_mitigation(Func&& func) { + auto holder = _gate.hold(); + co_return co_await retry_with_backoff( + std::forward(func), leader_mitigating_retry_policy{_as}); +} + ss::future client::produce( model::topic_partition tp, ss::chunked_fifo batches) { if (batches.empty()) { diff --git a/src/v/kafka/data/rpc/client.h b/src/v/kafka/data/rpc/client.h index def9c9aef71fa..4d8e0cafcfda7 100644 --- a/src/v/kafka/data/rpc/client.h +++ b/src/v/kafka/data/rpc/client.h @@ -100,6 +100,9 @@ class client { template std::invoke_result_t retry(Func&&); + template + std::invoke_result_t retry_with_leader_mitigation(Func&&); + model::node_id _self; std::unique_ptr _leaders; std::unique_ptr _topic_creator; From 126d7f5f59ac7fe779285e7291c4359da01bffac Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Mon, 23 Feb 2026 17:58:06 -0800 Subject: [PATCH 4/7] k/d/rpc: Produce RPC returns base & last offset of produced batches Required for usage in SR Signed-off-by: Oren Leiman --- src/v/kafka/data/rpc/client.cc | 31 ++++++++++++++++++------------- src/v/kafka/data/rpc/client.h | 12 +++++++++++- src/v/kafka/data/rpc/serde.cc | 8 +++++++- src/v/kafka/data/rpc/serde.h | 15 +++++++++++++-- src/v/kafka/data/rpc/service.cc | 31 +++++++++++++++++++++++++++++-- src/v/kafka/data/rpc/service.h | 1 + src/v/kafka/data/rpc/test/deps.h | 4 ++-- 7 files changed, 81 insertions(+), 21 deletions(-) diff --git a/src/v/kafka/data/rpc/client.cc b/src/v/kafka/data/rpc/client.cc index 3d391bccf7d74..b214896551911 100644 --- a/src/v/kafka/data/rpc/client.cc +++ b/src/v/kafka/data/rpc/client.cc @@ -42,7 +42,7 @@ constexpr auto mitigation_jitter = 500ms; template concept ResponseWithErrorCode = requires(T resp) { - { resp.ec } -> std::same_as; + { resp.ec } -> std::same_as; }; /// Extracts the cluster::errc from a response of any supported type. @@ -235,8 +235,9 @@ ss::future client::produce( produce_request req; req.topic_data.emplace_back(std::move(tp), std::move(batches)); req.timeout = timeout; - co_return co_await retry( - [this, &req]() { return do_produce_once(req.share()); }); + co_return ( + co_await retry([this, &req]() { return do_produce_once(req.share()); })) + .ec; } ss::future @@ -244,33 +245,37 @@ client::produce(model::topic_partition tp, model::record_batch batch) { produce_request req; req.topic_data.emplace_back(std::move(tp), std::move(batch)); req.timeout = timeout; - co_return co_await retry( - [this, &req]() { return do_produce_once(req.share()); }); + co_return ( + co_await retry([this, &req]() { return do_produce_once(req.share()); })) + .ec; } -ss::future client::do_produce_once(produce_request req) { +ss::future client::do_produce_once(produce_request req) { vassert( req.topic_data.size() == 1, "expected a single batch: {}", req.topic_data.size()); - const auto& tp = req.topic_data.front().tp; + model::ktp ktp{ + req.topic_data.front().tp.topic, req.topic_data.front().tp.partition}; auto leader = _leaders->get_leader_node( - model::topic_namespace_view(model::kafka_namespace, tp.topic), - tp.partition); + ktp.as_tn_view(), ktp.get_partition()); if (!leader) { - co_return cluster::errc::not_leader; + co_return produce_result{.ec = cluster::errc::not_leader}; } vlog(log.trace, "do_produce_once_request(node={}): {}", *leader, req); auto reply = co_await ( *leader == _self ? do_local_produce(std::move(req)) : do_remote_produce(*leader, std::move(req))); - vlog(log.trace, "do_produce_once_reply(node={}): {}", *leader, req); + vlog(log.trace, "do_produce_once_reply(node={}): {}", *leader, reply); vassert( reply.results.size() == 1, "expected a single result: {}", reply.results.size()); - - co_return reply.results.front().err; + const auto& front = reply.results.front(); + co_return produce_result{ + .ec = front.err, + .base_offset = front.base_offset, + .last_offset = front.last_offset}; } ss::future client::create_topic( diff --git a/src/v/kafka/data/rpc/client.h b/src/v/kafka/data/rpc/client.h index 4d8e0cafcfda7..465d5b582a750 100644 --- a/src/v/kafka/data/rpc/client.h +++ b/src/v/kafka/data/rpc/client.h @@ -22,6 +22,16 @@ namespace kafka::data::rpc { +/// Result of a produce operation that includes both base and last offset. +/// +/// base_offset and last_offset are nullopt if @p ec != success or if the +/// request contained zero records. +struct produce_result { + cluster::errc ec{cluster::errc::success}; + std::optional base_offset; + std::optional last_offset; +}; + /** * A client for kafka data plane rpcs. * @@ -88,7 +98,7 @@ class client { model::timeout_clock::duration timeout); private: - ss::future do_produce_once(produce_request); + ss::future do_produce_once(produce_request); ss::future do_local_produce(produce_request); ss::future do_remote_produce(model::node_id, produce_request); diff --git a/src/v/kafka/data/rpc/serde.cc b/src/v/kafka/data/rpc/serde.cc index eba225d3a7e27..3597ddc08ba00 100644 --- a/src/v/kafka/data/rpc/serde.cc +++ b/src/v/kafka/data/rpc/serde.cc @@ -57,7 +57,13 @@ fmt::iterator produce_request::format_to(fmt::iterator it) const { } fmt::iterator kafka_topic_data_result::format_to(fmt::iterator it) const { - return fmt::format_to(it, "{{ tp: {}, err: {} }}", tp, err); + return fmt::format_to( + it, + "{{ tp: {}, err: {}, base_offset: {}, last_offset: {} }}", + tp, + err, + base_offset, + last_offset); } fmt::iterator produce_reply::format_to(fmt::iterator it) const { diff --git a/src/v/kafka/data/rpc/serde.h b/src/v/kafka/data/rpc/serde.h index 199c3568b0e46..8888fd4bd7dfc 100644 --- a/src/v/kafka/data/rpc/serde.h +++ b/src/v/kafka/data/rpc/serde.h @@ -66,17 +66,28 @@ struct produce_request struct kafka_topic_data_result : serde::envelope< kafka_topic_data_result, - serde::version<0>, + serde::version<1>, serde::compat_version<0>> { kafka_topic_data_result() = default; kafka_topic_data_result(model::topic_partition tp, cluster::errc ec) : tp(std::move(tp)) , err(ec) {} + kafka_topic_data_result( + model::topic_partition tp, + cluster::errc ec, + model::offset base_offset, + model::offset last_offset) + : tp(std::move(tp)) + , err(ec) + , base_offset(base_offset) + , last_offset(last_offset) {} model::topic_partition tp; cluster::errc err{cluster::errc::success}; + std::optional base_offset; + std::optional last_offset; - auto serde_fields() { return std::tie(tp, err); } + auto serde_fields() { return std::tie(tp, err, base_offset, last_offset); } fmt::iterator format_to(fmt::iterator it) const; }; diff --git a/src/v/kafka/data/rpc/service.cc b/src/v/kafka/data/rpc/service.cc index 596f1a4ed09d3..fd40718dca23c 100644 --- a/src/v/kafka/data/rpc/service.cc +++ b/src/v/kafka/data/rpc/service.cc @@ -249,9 +249,36 @@ local_service::consume( ss::future local_service::produce( kafka_topic_data data, model::timeout_clock::duration timeout) { auto ktp = model::ktp(data.tp.topic, data.tp.partition); + // Count records before moving batches — needed to convert the + // last_offset returned by replicate() into a base_offset. + // Same arithmetic as kafka/server/handlers/produce.cc. Int64 + // accumulator because record_count() returns int32_t per batch and + // a produce request may include many batches. + int64_t total_records = 0; + for (const auto& b : data.batches) { + total_records += b.record_count(); + } auto result = co_await produce(ktp, std::move(data.batches), timeout); - auto ec = result.has_error() ? result.error() : cluster::errc::success; - co_return kafka_topic_data_result(data.tp, ec); + if (result.has_error()) { + co_return kafka_topic_data_result(std::move(data.tp), result.error()); + } + if (total_records == 0) { + co_return kafka_topic_data_result( + std::move(data.tp), cluster::errc::success); + } + auto last_offset = result.value(); + // The raft replicate batcher assigns contiguous offsets to all records + // in a single replicate() call (see replicate_batcher::propagate_result). + auto base_offset = model::offset{last_offset() - (total_records - 1)}; + vassert( + base_offset >= model::offset{0}, + "derived base_offset {} underflowed from last_offset {} and " + "total_records {}", + base_offset, + last_offset, + total_records); + co_return kafka_topic_data_result( + std::move(data.tp), cluster::errc::success, base_offset, last_offset); } ss::future> local_service::produce( diff --git a/src/v/kafka/data/rpc/service.h b/src/v/kafka/data/rpc/service.h index 9ed4f7c5eab67..efe56be4c0bb4 100644 --- a/src/v/kafka/data/rpc/service.h +++ b/src/v/kafka/data/rpc/service.h @@ -49,6 +49,7 @@ class local_service { ss::future produce(kafka_topic_data, model::timeout_clock::duration); + /// Returns the last_offset (kafka space) of the replicated data. ss::future> produce( model::any_ntp auto, ss::chunked_fifo, diff --git a/src/v/kafka/data/rpc/test/deps.h b/src/v/kafka/data/rpc/test/deps.h index b2d5fdfc5ca9d..7896121a3bd78 100644 --- a/src/v/kafka/data/rpc/test/deps.h +++ b/src/v/kafka/data/rpc/test/deps.h @@ -156,7 +156,7 @@ class in_memory_proxy : public kafka::partition_proxy::impl { ss::future> replicate( chunked_vector batches, raft::replicate_options) final { - auto offset = latest_offset(); + auto offset = model::next_offset(latest_offset()); for (const auto& batch : batches) { auto b = batch.copy(); b.header().base_offset = offset++; @@ -202,7 +202,7 @@ class in_memory_proxy : public kafka::partition_proxy::impl { private: model::offset latest_offset() { - auto o = model::offset(0); + auto o = model::offset(-1); for (const auto& b : *_produced_batches) { if (b.ntp == _ntp) { o = b.batch.last_offset(); From cad3b646388b4d46f4815898e9300a842e88ae72 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Sat, 11 Apr 2026 22:36:24 -0700 Subject: [PATCH 5/7] k/d/rpc/client: Introduce produce_with_leader_mitigation - Retry with leader mitication - Includes the produced batch offsets in the response - Operates on a single batch Geared toward the write pattern in schema registry Also updates in-memory partition proxy implementation (rpc/test/deps) to serve "real" high watermark and LSO based on whatever's been produced. Signed-off-by: Oren Leiman --- src/v/kafka/data/rpc/client.cc | 9 +++ src/v/kafka/data/rpc/client.h | 5 ++ src/v/kafka/data/rpc/test/deps.h | 8 +- .../data/rpc/test/kafka_data_rpc_test.cc | 73 ++++++++++++++++++- 4 files changed, 89 insertions(+), 6 deletions(-) diff --git a/src/v/kafka/data/rpc/client.cc b/src/v/kafka/data/rpc/client.cc index b214896551911..6dceca091299b 100644 --- a/src/v/kafka/data/rpc/client.cc +++ b/src/v/kafka/data/rpc/client.cc @@ -250,6 +250,15 @@ client::produce(model::topic_partition tp, model::record_batch batch) { .ec; } +ss::future client::produce_with_leader_mitigation( + model::topic_partition tp, model::record_batch b) { + produce_request req; + req.topic_data.emplace_back(std::move(tp), std::move(b)); + req.timeout = timeout; + co_return co_await retry_with_leader_mitigation( + [this, &req]() { return do_produce_once(req.share()); }); +} + ss::future client::do_produce_once(produce_request req) { vassert( req.topic_data.size() == 1, diff --git a/src/v/kafka/data/rpc/client.h b/src/v/kafka/data/rpc/client.h index 465d5b582a750..35250b76d491e 100644 --- a/src/v/kafka/data/rpc/client.h +++ b/src/v/kafka/data/rpc/client.h @@ -69,6 +69,11 @@ class client { ss::future produce(model::topic_partition, model::record_batch); + /// Produce a single batch and return a produce_result containing both + /// base_offset and last_offset on success. + ss::future produce_with_leader_mitigation( + model::topic_partition, model::record_batch); + ss::future create_topic( model::topic_namespace_view, cluster::topic_properties, diff --git a/src/v/kafka/data/rpc/test/deps.h b/src/v/kafka/data/rpc/test/deps.h index 7896121a3bd78..0ecf771937a0d 100644 --- a/src/v/kafka/data/rpc/test/deps.h +++ b/src/v/kafka/data/rpc/test/deps.h @@ -97,9 +97,11 @@ class in_memory_proxy : public kafka::partition_proxy::impl { model::offset start_offset() const final { throw std::runtime_error("unimplemented"); } - model::offset high_watermark() const final { return model::offset(102); } + model::offset high_watermark() const final { + return model::next_offset(latest_offset()); + } checked last_stable_offset() const final { - return model::offset(101); + return latest_offset(); } kafka::leader_epoch leader_epoch() const final { throw std::runtime_error("unimplemented"); @@ -201,7 +203,7 @@ class in_memory_proxy : public kafka::partition_proxy::impl { } private: - model::offset latest_offset() { + model::offset latest_offset() const { auto o = model::offset(-1); for (const auto& b : *_produced_batches) { if (b.ntp == _ntp) { diff --git a/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc b/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc index 76e82bb001dcf..5002f1c094797 100644 --- a/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc +++ b/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc @@ -119,6 +119,14 @@ class KafkaDataRpcTest : public ::testing::TestWithParam { .get(); } + produce_result produce_with_leader_mitigation( + const model::ntp& ntp, model::record_batch batch) { + return _kd->client() + .local() + .produce_with_leader_mitigation(ntp.tp, std::move(batch)) + .get(); + } + std::optional local_find_topic_cfg(model::topic_namespace_view tp_ns) { return _kd->local_metadata_cache()->find_topic_cfg(tp_ns); @@ -158,6 +166,14 @@ class KafkaDataRpcTest : public ::testing::TestWithParam { .get(); } + kafka::offset get_hwm(const model::ntp& ntp) { + auto res = get_partition_offsets( + model::ktp(ntp.tp.topic, ntp.tp.partition)); + vassert(res.has_value(), "get_partition_offsets failed"); + return res.value()[ntp.tp.topic][ntp.tp.partition] + .offsets.high_watermark; + } + result consume( model::topic_partition tp, kafka::offset start_offset, @@ -251,9 +267,9 @@ TEST_P(KafkaDataRpcTest, ClientCanRequestPartitionOffsets) { auto p_offsets = offsets[ntp.tp.topic][ntp.tp.partition]; EXPECT_EQ(p_offsets.err, cluster::errc::success); - // expect the hardcoded values from the in-memory proxy - EXPECT_EQ(p_offsets.offsets.high_watermark, kafka::offset(102)); - EXPECT_EQ(p_offsets.offsets.last_stable_offset, kafka::offset(101)); + // No records produced — HWM is 0 (next of -1), LSO is -1 + EXPECT_EQ(p_offsets.offsets.high_watermark, kafka::offset(0)); + EXPECT_EQ(p_offsets.offsets.last_stable_offset, kafka::offset(-1)); auto not_existing = make_ntp("bar"); @@ -338,6 +354,57 @@ TEST_P(KafkaDataRpcTest, ProduceRejectsUnderMemoryPressure) { EXPECT_EQ(result, cluster::errc::timeout); } +TEST_P(KafkaDataRpcTest, ProduceWithLeaderMitigationSingleRecord) { + auto ntp = make_ntp("single_rec"); + create_topic(model::topic_namespace(ntp.ns, ntp.tp.topic)); + EXPECT_EQ(get_hwm(ntp), kafka::offset(0)); + + auto batch = model::test::make_random_batch({.count = 1, .records = 1}); + auto r = produce_with_leader_mitigation(ntp, std::move(batch)); + ASSERT_EQ(r.ec, cluster::errc::success); + ASSERT_TRUE(r.base_offset.has_value()); + ASSERT_TRUE(r.last_offset.has_value()); + EXPECT_EQ(r.base_offset, r.last_offset); + EXPECT_EQ(get_hwm(ntp), kafka::offset(model::next_offset(*r.last_offset))); + + // Second produce must land at the next offset. + auto batch2 = model::test::make_random_batch({.count = 1, .records = 1}); + auto r2 = produce_with_leader_mitigation(ntp, std::move(batch2)); + ASSERT_EQ(r2.ec, cluster::errc::success); + EXPECT_EQ(*r2.base_offset, *r.last_offset + model::offset{1}); + EXPECT_EQ(get_hwm(ntp), kafka::offset(model::next_offset(*r2.last_offset))); +} + +TEST_P(KafkaDataRpcTest, ProduceWithLeaderMitigationMultiRecord) { + auto ntp = make_ntp("multi_rec"); + create_topic(model::topic_namespace(ntp.ns, ntp.tp.topic)); + EXPECT_EQ(get_hwm(ntp), kafka::offset(0)); + + constexpr int num_records = 3; + auto batch = model::test::make_random_batch({.count = num_records}); + ASSERT_EQ(batch.record_count(), num_records); + auto r = produce_with_leader_mitigation(ntp, std::move(batch)); + ASSERT_EQ(r.ec, cluster::errc::success); + ASSERT_TRUE(r.base_offset.has_value()); + ASSERT_TRUE(r.last_offset.has_value()); + EXPECT_EQ(*r.last_offset, *r.base_offset + model::offset{num_records - 1}); + EXPECT_EQ(get_hwm(ntp), kafka::offset(model::next_offset(*r.last_offset))); +} + +TEST_P(KafkaDataRpcTest, ProduceWithLeaderMitigationRetries) { + auto ntp = make_ntp("retry_produce"); + create_topic(model::topic_namespace(ntp.ns, ntp.tp.topic)); + + // Inject timeouts on the first 2 attempts; the basic retry policy + // should back off and succeed on the 3rd attempt. + set_errors_to_inject(2); + auto batch = model::test::make_random_batch({.count = 1, .records = 1}); + auto r = produce_with_leader_mitigation(ntp, std::move(batch)); + ASSERT_EQ(r.ec, cluster::errc::success); + ASSERT_TRUE(r.base_offset.has_value()); + ASSERT_TRUE(r.last_offset.has_value()); + EXPECT_EQ(get_hwm(ntp), kafka::offset(model::next_offset(*r.last_offset))); +} INSTANTIATE_TEST_SUITE_P( WorksLocallyAndRemotely, KafkaDataRpcTest, From 43998a945fea08414e07ed89844bf27d7ebfb14c Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Sat, 14 Mar 2026 02:43:45 -0700 Subject: [PATCH 6/7] k/d/rpc/client: Introduce get_single_partition_offsets Reliable version that tries to mitigate not_leader errors Signed-off-by: Oren Leiman --- src/v/kafka/data/rpc/client.cc | 58 +++++++++++++++++++ src/v/kafka/data/rpc/client.h | 9 +++ .../data/rpc/test/kafka_data_rpc_test.cc | 50 ++++++++++++++++ 3 files changed, 117 insertions(+) diff --git a/src/v/kafka/data/rpc/client.cc b/src/v/kafka/data/rpc/client.cc index 6dceca091299b..a1ec37c8b78db 100644 --- a/src/v/kafka/data/rpc/client.cc +++ b/src/v/kafka/data/rpc/client.cc @@ -524,6 +524,64 @@ client::get_remote_partition_offsets( co_return ret_t(std::move(result.value().partition_offsets)); } +ss::future> +client::get_single_partition_offsets(model::topic_partition tp) { + co_return co_await retry_with_leader_mitigation( + [this, tp]() { return do_get_single_partition_offsets_once(tp); }); +} + +ss::future> +client::do_get_single_partition_offsets_once(model::topic_partition tp) { + using ret_t = result; + model::topic_namespace_view tp_ns(model::kafka_namespace, tp.topic); + + auto topic_cfg = _metadata_cache->find_topic_cfg(tp_ns); + if (!topic_cfg) { + co_return ret_t(cluster::errc::topic_not_exists); + } + + auto leader = _leaders->get_leader_node(tp_ns, tp.partition); + if (!leader) { + co_return ret_t(cluster::errc::not_leader); + } + + // Build the single-partition request in the same wire format the + // existing get_offsets RPC expects. + chunked_vector topics; + topic_partitions tps; + tps.topic = tp.topic; + tps.partitions.push_back(tp.partition); + topics.push_back(std::move(tps)); + + partition_offsets_map offsets_map; + if (*leader == _self) { + offsets_map = co_await _local_service->local().get_offsets( + std::move(topics)); + } else { + auto remote_result = co_await get_remote_partition_offsets( + *leader, std::move(topics)); + if (remote_result.has_error()) { + co_return ret_t(remote_result.error()); + } + offsets_map = std::move(remote_result.value()); + } + + // Extract the single result. + auto topic_it = offsets_map.find(tp.topic); + if (topic_it == offsets_map.end()) { + co_return ret_t(cluster::errc::not_leader); + } + auto part_it = topic_it->second.find(tp.partition); + if (part_it == topic_it->second.end()) { + co_return ret_t(cluster::errc::not_leader); + } + auto& por = part_it->second; + if (por.err != cluster::errc::success) { + co_return ret_t(por.err); + } + co_return ret_t(por.offsets); +} + ss::future> client::consume( model::topic_partition tp, kafka::offset start_offset, diff --git a/src/v/kafka/data/rpc/client.h b/src/v/kafka/data/rpc/client.h index 35250b76d491e..8366a4c92592e 100644 --- a/src/v/kafka/data/rpc/client.h +++ b/src/v/kafka/data/rpc/client.h @@ -94,6 +94,12 @@ class client { ss::future> get_partition_offsets(chunked_vector); + /// Get offsets for a single partition. Retries with leadership + /// mitigation — suitable for latency-sensitive callers like schema + /// registry that always target a single known partition. + ss::future> + get_single_partition_offsets(model::topic_partition); + ss::future> consume( model::topic_partition, kafka::offset start_offset, @@ -108,6 +114,9 @@ class client { ss::future do_remote_produce(model::node_id, produce_request); + ss::future> + do_get_single_partition_offsets_once(model::topic_partition); + ss::future> get_remote_partition_offsets( model::node_id, chunked_vector topics); diff --git a/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc b/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc index 5002f1c094797..25610be5b8fb4 100644 --- a/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc +++ b/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc @@ -174,6 +174,14 @@ class KafkaDataRpcTest : public ::testing::TestWithParam { .offsets.high_watermark; } + result + get_single_partition_offsets(model::topic_partition tp) { + return _kd->client() + .local() + .get_single_partition_offsets(std::move(tp)) + .get(); + } + result consume( model::topic_partition tp, kafka::offset start_offset, @@ -405,6 +413,48 @@ TEST_P(KafkaDataRpcTest, ProduceWithLeaderMitigationRetries) { ASSERT_TRUE(r.last_offset.has_value()); EXPECT_EQ(get_hwm(ntp), kafka::offset(model::next_offset(*r.last_offset))); } + +TEST_P(KafkaDataRpcTest, ClientCanGetSinglePartitionOffsets) { + auto ntp = make_ntp("single_offsets"); + create_topic(model::topic_namespace(ntp.ns, ntp.tp.topic)); + + auto res = get_single_partition_offsets(ntp.tp); + ASSERT_TRUE(res.has_value()); + // No records produced — HWM is 0 (next of -1), LSO is -1 + EXPECT_EQ(res.value().high_watermark, kafka::offset(0)); + EXPECT_EQ(res.value().last_stable_offset, kafka::offset(-1)); + + // After producing a record, offsets should advance. + auto batch = model::test::make_random_batch({.count = 1, .records = 1}); + auto pr = produce_with_leader_mitigation(ntp, std::move(batch)); + ASSERT_EQ(pr.ec, cluster::errc::success); + + auto res2 = get_single_partition_offsets(ntp.tp); + ASSERT_TRUE(res2.has_value()); + EXPECT_EQ( + res2.value().high_watermark, + kafka::offset(model::next_offset(*pr.last_offset))); +} + +TEST_P(KafkaDataRpcTest, GetSinglePartitionOffsetsReturnsTopicNotExists) { + auto ntp = make_ntp("does-not-exist"); + auto res = get_single_partition_offsets(ntp.tp); + ASSERT_TRUE(res.has_error()); + EXPECT_EQ(res.error(), cluster::errc::topic_not_exists); +} + +TEST_P(KafkaDataRpcTest, GetSinglePartitionOffsetsRetries) { + auto ntp = make_ntp("retry_offsets"); + create_topic(model::topic_namespace(ntp.ns, ntp.tp.topic)); + + // Inject timeouts on the first 2 attempts; the basic retry policy + // should back off and succeed on the 3rd attempt. + set_errors_to_inject(2); + auto res = get_single_partition_offsets(ntp.tp); + ASSERT_TRUE(res.has_value()); + EXPECT_EQ(res.value().high_watermark, kafka::offset(0)); + EXPECT_EQ(res.value().last_stable_offset, kafka::offset(-1)); +} INSTANTIATE_TEST_SUITE_P( WorksLocallyAndRemotely, KafkaDataRpcTest, From 97ad3b918da1688aab8dfee1303a0b9fda7fb6ae Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Sat, 14 Mar 2026 02:44:38 -0700 Subject: [PATCH 7/7] k/d/rpc/client: Add retry_with_leader_mitigation to consume Signed-off-by: Oren Leiman --- src/v/kafka/data/rpc/client.cc | 50 ++++++++++++------- src/v/kafka/data/rpc/client.h | 8 +++ .../data/rpc/test/kafka_data_rpc_test.cc | 18 +++++++ 3 files changed, 58 insertions(+), 18 deletions(-) diff --git a/src/v/kafka/data/rpc/client.cc b/src/v/kafka/data/rpc/client.cc index a1ec37c8b78db..ecdbe02cf1e0b 100644 --- a/src/v/kafka/data/rpc/client.cc +++ b/src/v/kafka/data/rpc/client.cc @@ -583,6 +583,20 @@ client::do_get_single_partition_offsets_once(model::topic_partition tp) { } ss::future> client::consume( + model::topic_partition tp, + kafka::offset start_offset, + kafka::offset max_offset, + size_t min_bytes, + size_t max_bytes, + model::timeout_clock::duration timeout) { + co_return co_await retry_with_leader_mitigation( + [this, tp, start_offset, max_offset, min_bytes, max_bytes, timeout]() { + return do_consume_once( + tp, start_offset, max_offset, min_bytes, max_bytes, timeout); + }); +} + +ss::future> client::do_consume_once( model::topic_partition tp, kafka::offset start_offset, kafka::offset max_offset, @@ -590,10 +604,9 @@ ss::future> client::consume( size_t max_bytes, model::timeout_clock::duration timeout) { using ret_t = result; + model::topic_namespace_view tp_ns(model::kafka_namespace, tp.topic); - // Check if topic exists first - auto topic_cfg = _metadata_cache->find_topic_cfg( - model::topic_namespace_view(model::kafka_namespace, tp.topic)); + auto topic_cfg = _metadata_cache->find_topic_cfg(tp_ns); if (!topic_cfg) { consume_reply reply; reply.tp = tp; @@ -601,25 +614,25 @@ ss::future> client::consume( co_return ret_t(std::move(reply)); } - // Find the leader for this partition - auto ktp = model::ktp(tp.topic, tp.partition); - auto leader = _leaders->get_leader_node( - model::topic_namespace_view(model::kafka_namespace, tp.topic), - tp.partition); - + auto leader = _leaders->get_leader_node(tp_ns, tp.partition); if (!leader) { - consume_reply reply; - reply.tp = tp; - reply.err = cluster::errc::not_leader; - co_return ret_t(std::move(reply)); + co_return ret_t(cluster::errc::not_leader); } consume_request req( tp, start_offset, max_offset, min_bytes, max_bytes, timeout); + auto is_retriable = [](cluster::errc ec) -> bool { + return ec == cluster::errc::not_leader || ec == cluster::errc::timeout + || ec == cluster::errc::partition_operation_failed; + }; + // If leader is local, call local service if (*leader == _self) { auto reply = co_await _local_service->local().consume(std::move(req)); + if (is_retriable(reply.err)) { + co_return ret_t(reply.err); + } co_return ret_t(std::move(reply)); } @@ -643,12 +656,13 @@ ss::future> client::consume( }); if (result.has_error()) { - consume_reply reply; - reply.tp = tp; - reply.err = map_errc(result.assume_error()); - co_return ret_t(std::move(reply)); + co_return ret_t(map_errc(result.assume_error())); } - co_return ret_t(std::move(result.value())); + auto reply = std::move(result).value(); + if (is_retriable(reply.err)) { + co_return ret_t(reply.err); + } + co_return ret_t(std::move(reply)); } } // namespace kafka::data::rpc diff --git a/src/v/kafka/data/rpc/client.h b/src/v/kafka/data/rpc/client.h index 8366a4c92592e..2258f5defe623 100644 --- a/src/v/kafka/data/rpc/client.h +++ b/src/v/kafka/data/rpc/client.h @@ -121,6 +121,14 @@ class client { get_remote_partition_offsets( model::node_id, chunked_vector topics); + ss::future> do_consume_once( + model::topic_partition, + kafka::offset start_offset, + kafka::offset max_offset, + size_t min_bytes, + size_t max_bytes, + model::timeout_clock::duration timeout); + template std::invoke_result_t retry(Func&&); diff --git a/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc b/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc index 25610be5b8fb4..50623014b5352 100644 --- a/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc +++ b/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc @@ -455,6 +455,24 @@ TEST_P(KafkaDataRpcTest, GetSinglePartitionOffsetsRetries) { EXPECT_EQ(res.value().high_watermark, kafka::offset(0)); EXPECT_EQ(res.value().last_stable_offset, kafka::offset(-1)); } + +TEST_P(KafkaDataRpcTest, ConsumeRetries) { + auto ntp = make_ntp("retry_consume"); + create_topic(model::topic_namespace(ntp.ns, ntp.tp.topic)); + + auto batches = record_batches::make(); + cluster::errc ec = produce(ntp, batches); + ASSERT_EQ(ec, cluster::errc::success); + + // Inject timeouts on the first 2 consume attempts; the basic retry + // policy should back off and succeed on the 3rd attempt. + set_errors_to_inject(2); + auto r = consume(ntp.tp, kafka::offset(0), kafka::offset::max()); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(r.value().err, cluster::errc::success); + EXPECT_EQ(r.value().batches.size(), batches.size()); +} + INSTANTIATE_TEST_SUITE_P( WorksLocallyAndRemotely, KafkaDataRpcTest,