Skip to content

[CORE-16137]: SR on kafka RPCs#30046

Draft
oleiman wants to merge 27 commits intodevfrom
sr/noticket/sr-on-kdrpc
Draft

[CORE-16137]: SR on kafka RPCs#30046
oleiman wants to merge 27 commits intodevfrom
sr/noticket/sr-on-kdrpc

Conversation

@oleiman
Copy link
Copy Markdown
Member

@oleiman oleiman commented Apr 2, 2026

Builds on

Actually add the RPC transport variant, integrate and add to tests.

The transport is selected at startup via schema_registry_use_rpc (default:
true, gated on cluster version >= v26.2.1). The kafka client remains
available as a fallback. The RPC path requires no explicit auth config and bypasses
the kafka API completely.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v26.1.x
  • v25.3.x
  • v25.2.x

Release Notes

Improvements

  • Adds the option to configure Schema Registry to use internal RPCs for interacting with the schemas topic. No explicit authN/Z or ephemeral credentials required.

@oleiman oleiman self-assigned this Apr 2, 2026
@oleiman oleiman requested a review from a team as a code owner April 2, 2026 05:44
@oleiman oleiman added the claude-review Adding this label to a PR will trigger a workflow to review the code using claude. label Apr 2, 2026
Copilot AI review requested due to automatic review settings April 2, 2026 05:44
Comment thread src/v/kafka/data/rpc/client.cc
Comment thread src/v/pandaproxy/schema_registry/rpc_transport.cc
Comment on lines +2009 to 2017
, schema_registry_use_rpc(
*this,
"schema_registry_use_rpc",
"Produce schema registry messages using internal Redpanda RPCs. When "
"disabled, produce schema registry messages using a Kafka client "
"instead.",
{.needs_restart = needs_restart::yes, .visibility = visibility::tunable},
true)
, cloud_storage_enabled(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: default true for a new feature? This means all existing deployments will switch to the RPC transport path on upgrade (assuming cluster version is sufficient). While the feature-gating on v26_1_1 in api.cc is a good safety net for mixed-version clusters, defaulting to true is aggressive for a brand new code path.

Consider defaulting to false for the initial release and switching to true once the RPC path has been validated in production. This gives operators an opt-in experience rather than a surprise behavioral change on upgrade.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤠

Comment thread src/v/pandaproxy/schema_registry/rpc_transport.cc Outdated
Comment thread src/v/kafka/data/rpc/client.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/api.cc Outdated
Comment thread src/v/kafka/data/rpc/service.cc
Comment thread src/v/pandaproxy/schema_registry/seq_writer.cc Outdated
@claude
Copy link
Copy Markdown

claude Bot commented Apr 2, 2026

PR Review: SR on kafka RPCs

Summary

This PR introduces a polymorphic transport abstraction for schema registry's internal _schemas topic I/O, with two implementations:

  • kafka_client_transport (legacy): uses the Kafka client with full auth/ACL management
  • rpc_transport (new): uses internal kafka::data::rpc::client, bypassing Kafka auth overhead

The refactoring is well-structured — the transport interface is clean, the separation of concerns is good, and the leadership-aware retry policy (leader_mitigating_retry_policy) is a nice improvement over blind exponential backoff. The produce_with_offset, get_single_partition_offsets, and retry-mitigating consume additions to the RPC client are well-motivated by the schema registry's single-partition access pattern.

Test coverage is solid: new C++ unit tests for both rpc_transport and produce_with_offset, plus Python integration tests running the full SR test suites against both transport modes, and a leadership-transfer stress test.

Issues Found

Potential bugs:

  1. Inverted exception retry logic in retry_with_backoff (pre-existing, but surfaced by this refactor) — when a future fails with an exception, the code returns immediately when retries are available and retries when they're exhausted. See inline comment with suggested fix.
  2. rpc_transport::consume_range empty batch handling — throws on an empty batches vector, but this could be a legitimate response after compaction/truncation. Should this break the loop instead?

Robustness concerns:
3. consume_range has no overall timeout/iteration cap — individual RPCs timeout at 5s, but the outer loop could spin indefinitely on slow-advancing offsets.
4. Config default trueschema_registry_use_rpc defaults to true, meaning all deployments switch to the new RPC path on upgrade. The cluster-version gate is good, but consider defaulting to false for the initial release to give operators an opt-in path.

Minor/Nits:
5. Race between prepare() term snapshot and the actual leader lookup in do_produce_once/do_consume_once — benign but worth documenting.
6. monostate branch in the kafka client error callback during api::start() — works correctly but a comment would help.

What looks good

  • Clean transport abstraction with sensible default implementations for optional methods
  • Leadership-aware retry with mitigate_not_leader using notification-based waiting rather than polling
  • Proper serde versioning (kafka_topic_data_result v0→v1 with compat_version 0)
  • _delete_versions_cache in seq_writer correctly handles the retry-after-collision edge case
  • Comprehensive test matrix: both transport modes tested across local/remote leader configurations
  • Good feature gating: cluster version check + graceful fallback with logging

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new “transport” abstraction for Schema Registry internal topic I/O, enabling an internal Kafka data RPC-based path (no client auth overhead) alongside the existing kafka::client-based path, and updates tests to explicitly select transport mode.

Changes:

  • Introduce pandaproxy::schema_registry::transport with rpc_transport and kafka_client_transport implementations, wired through Schema Registry api/service/seq_writer.
  • Extend kafka data RPC to return base/last offsets from produce, add leadership-mitigating retries, and add targeted unit tests.
  • Update rptest coverage to explicitly set schema_registry_use_rpc per test and add RPC-transport variants/stress coverage.

Reviewed changes

Copilot reviewed 44 out of 44 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tests/rptest/tests/tls_version_test.py Explicitly disables SR RPC transport for TLS version tests.
tests/rptest/tests/tls_metrics_test.py Explicitly disables SR RPC transport for TLS metrics tests.
tests/rptest/tests/security_report_test.py Forces SR RPC transport off for security-report related SR tests.
tests/rptest/tests/schema_registry_test.py Requires explicit schema_registry_use_rpc, adds RPC variants + stress test.
tests/rptest/tests/rpk_registry_test.py Disables SR RPC transport for rpk registry tests.
tests/rptest/tests/redpanda_oauth_test.py Disables SR RPC transport for OAuth test cluster config.
tests/rptest/tests/metrics_reporter_test.py Enables SR RPC transport for metrics reporter SR usage.
tests/rptest/tests/crl_test.py Disables SR RPC transport for CRL-related TLS config.
tests/rptest/tests/cluster_linking_topic_syncing_test.py Enables SR RPC transport for cluster-linking topic syncing test config.
tests/rptest/tests/audit_log_test.py Disables SR RPC transport for audit log test setup.
tests/rptest/tests/admin_api_auth_test.py Disables SR RPC transport for auto-auth admin API test setup.
src/v/redpanda/application_runtime.cc Passes kafka data RPC client into Schema Registry API wiring.
src/v/pandaproxy/schema_registry/transport.h Adds transport interface for SR internal topic I/O.
src/v/pandaproxy/schema_registry/test/utils.h Adds noop_transport for seq_writer-related tests.
src/v/pandaproxy/schema_registry/test/rpc_transport_test.cc Adds unit tests for SR rpc_transport behavior.
src/v/pandaproxy/schema_registry/test/consume_to_store.cc Switches tests from dummy kafka client to noop transport.
src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc Switches 3rd-party compatibility test to noop transport.
src/v/pandaproxy/schema_registry/test/BUILD Adds deps for transport and new rpc_transport gtest.
src/v/pandaproxy/schema_registry/service.h Replaces kafka client dependency with transport pointer; delegates mitigation/ephemeral checks.
src/v/pandaproxy/schema_registry/service.cc Routes internal topic reads/writes via transport; adds startup retry loop for topic metadata race.
src/v/pandaproxy/schema_registry/seq_writer.h Replaces kafka client with transport; adds delete-subject retry cache state.
src/v/pandaproxy/schema_registry/seq_writer.cc Uses transport for HWM/consume/produce; implements delete-subject retry caching behavior.
src/v/pandaproxy/schema_registry/rpc_transport.h Declares RPC-based SR transport using kafka data RPC client.
src/v/pandaproxy/schema_registry/rpc_transport.cc Implements RPC-based SR internal topic produce/consume/HWM.
src/v/pandaproxy/schema_registry/kafka_client_transport.h Introduces kafka::client-backed SR transport (legacy path).
src/v/pandaproxy/schema_registry/kafka_client_transport.cc Implements legacy transport, including ephemeral-credential mitigation logic.
src/v/pandaproxy/schema_registry/fwd.h Adds forward declarations for new transport types.
src/v/pandaproxy/schema_registry/BUILD Adds build targets/deps for transport and implementations.
src/v/pandaproxy/schema_registry/api.h Adds rpc client input + transport variant storage for SR API.
src/v/pandaproxy/schema_registry/api.cc Selects RPC vs Kafka-client transport using config + feature table; wires transport into service/seq_writer.
src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc Adds tests for produce_with_offset returning base/last offsets.
src/v/kafka/data/rpc/test/deps.h Extends fake leader cache with leader-term + not-leader mitigation hooks.
src/v/kafka/data/rpc/service.h Documents produce() return semantics for offsets.
src/v/kafka/data/rpc/service.cc Returns base/last offsets in produce reply; computes base from last+record_count.
src/v/kafka/data/rpc/serde.h Bumps kafka_topic_data_result serde version and adds optional base/last offsets.
src/v/kafka/data/rpc/serde.cc Extends formatting to include optional base/last offsets.
src/v/kafka/data/rpc/deps.h Extends leader cache interface with leader-term lookup and mitigation API.
src/v/kafka/data/rpc/deps.cc Implements leadership-change wait mitigation via partition_leaders_table notifications.
src/v/kafka/data/rpc/client.h Adds produce_with_offset and get_single_partition_offsets; adds retry_mitigating helpers.
src/v/kafka/data/rpc/client.cc Adds leadership-mitigating retry policy and exposes new client methods.
src/v/kafka/data/rpc/BUILD Adds expiring_promise dependency for mitigation implementation.
src/v/config/configuration.h Adds schema_registry_use_rpc tunable property.
src/v/config/configuration.cc Defines schema_registry_use_rpc property and help text (default true).
src/v/cluster_link/tests/deps.h Updates fake leader cache to satisfy new leader-term/mitigation interface.

Comment thread src/v/kafka/data/rpc/client.cc
Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/rpc_transport.cc Outdated
Comment thread tests/rptest/tests/schema_registry_test.py
@oleiman oleiman marked this pull request as draft April 3, 2026 02:31
@oleiman oleiman force-pushed the sr/noticket/sr-on-kdrpc branch from 529dcbe to 8852936 Compare April 3, 2026 03:09
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 3, 2026

/ci-repeat 1
tests/rptest/tests/schema_registry_test.py

@oleiman oleiman closed this Apr 3, 2026
@oleiman oleiman deleted the sr/noticket/sr-on-kdrpc branch April 3, 2026 03:33
@oleiman oleiman restored the sr/noticket/sr-on-kdrpc branch April 3, 2026 03:42
@oleiman oleiman reopened this Apr 3, 2026
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 3, 2026

/ci-repeat 1
tests/rptest/tests/schema_registry_test.py

@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 3, 2026

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase

@oleiman oleiman force-pushed the sr/noticket/sr-on-kdrpc branch from 8852936 to 5f0fbab Compare April 3, 2026 06:53
@vbotbuildovich
Copy link
Copy Markdown
Collaborator

Retry command for Build#82735

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/schema_registry_test.py::SchemaRegistryRpcTransportStressTest.test_no_errors_during_leadership_transfers

@oleiman oleiman force-pushed the sr/noticket/sr-on-kdrpc branch from 5f0fbab to 2bda853 Compare April 3, 2026 16:58
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 3, 2026

/ci-repeat 3
skip-units
dt-repeat=20
tests/rptest/tests/schema_registry_test.py::SchemaRegistryRpcTransportStressTest
tests/rptest/tests/schema_registry_test.py::SchemaRegistryKafkaClientTransportStressTest

oleiman added 10 commits April 23, 2026 20:26
Reliable version that tries to mitigate not_leader errors

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Rework fake_partition_leader_cache to track per-ntp term and expose
clear_leader/bump_term/set_recovery_callback. The recovery callback
runs synchronously inside wait_for_term_change(), so tests can drive
a "leader lost -> election -> recovered" scenario without timing
races. The delegator now forwards wait_for_term_change to the
underlying fake instead of no-oping.

Add recovery tests for produce_with_leader_mitigation, consume, and
get_single_partition_offsets that assert the retry succeeds and that
the not_leader branch was taken, by counting wait_for_term_change
invocations and verifying stale_term snapshots advance across
attempts.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
do_delete_subject_impermanent called get_versions(include_deleted::no)
before is_subject_deleted(). On retry after a write collision where
the subject was already soft-deleted by the winning writer,
get_versions throws subject_not_found, which propagates as HTTP 404
to the client.

Fix by checking is_subject_deleted first and returning the full
version list via include_deleted::yes on that branch.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
@oleiman oleiman force-pushed the sr/noticket/sr-on-kdrpc branch from 6bab74c to 6b916cf Compare April 24, 2026 08:16
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 24, 2026

/ci-repeat 1

oleiman added 9 commits April 24, 2026 11:22
Some of this might be better placed under k/d/rpc/tests, but this is the
specific functionality required by schema registry itself.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Defaults to true.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Pimpl wrapper for kafka_client_transport and rpc_transport.

Dispatch to various methods as appropriate.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
And plumb into seq_writer & service.

Transport selected at runtime dependent on config & cluster version.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
SchemaRegistryEndpoints now asserts that every subclass explicitly sets
schema_registry_use_rpc.

RPC-transport-enabled tests:
- SchemaRegistryModeNotMutableTest
- SchemaRegistryModeMutableTest
- SchemaRegistryContextRpcTransportTest
- SchemaRegistryBasicAuthRpcTransportTest
- SchemaRegistryRpcTransportTest
- SchemaRegistryAutoAuthTest
- SchemaRegistryConfluentClient
- SchemaRegistryCompatibilityModes
- SchemaRegistryACLTest
- SchemaRegistryContextAuthzRpcTransportTest
- SchemaRegistryRpcTransportStressTest
- ClusterLinkingSchemaRegistry
- SchemaRegistryContextMetricsTest

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
kafka/data/rpc & pp/schema_registry

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
@oleiman oleiman force-pushed the sr/noticket/sr-on-kdrpc branch from 6b916cf to f542f88 Compare April 24, 2026 18:22
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 24, 2026

/ci-repeat 1
skip-units
dt-repeat=30
tests/rptest/tests/schema_registry_test.py::SchemaRegistryRpcTransportStressTest

@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 24, 2026

/ci-repeat 3
skip-redpanda-build
skip-units
skip-rebase
dt-repeat=100
tests/rptest/tests/schema_registry_test.py::SchemaRegistryRpcTransportStressTest

@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 28, 2026

/ci-repeat 3
dt-repeat=100
tests/rptest/tests/schema_registry_test.py::SchemaRegistryRpcTransportStressTest

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented Apr 28, 2026

Retry command for Build#83723

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/schema_registry_test.py::SchemaRegistryRpcTransportStressTest.test_no_errors_during_leadership_transfers

use a similar scheme but don't bother with the term checks. the longer polling
period was all we really care aboutl still falls back to exp backoff if err
is anything other than not leader
@oleiman oleiman force-pushed the sr/noticket/sr-on-kdrpc branch from 7132823 to f9c43f0 Compare April 28, 2026 03:22
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 28, 2026

/ci-repeat 3
skip-units
dt-repeat=100
tests/rptest/tests/schema_registry_test.py::SchemaRegistryRpcTransportStressTest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/build area/redpanda claude-review Adding this label to a PR will trigger a workflow to review the code using claude.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants