Conversation
| , schema_registry_use_rpc( | ||
| *this, | ||
| "schema_registry_use_rpc", | ||
| "Produce schema registry messages using internal Redpanda RPCs. When " | ||
| "disabled, produce schema registry messages using a Kafka client " | ||
| "instead.", | ||
| {.needs_restart = needs_restart::yes, .visibility = visibility::tunable}, | ||
| true) | ||
| , cloud_storage_enabled( |
There was a problem hiding this comment.
Question: default true for a new feature? This means all existing deployments will switch to the RPC transport path on upgrade (assuming cluster version is sufficient). While the feature-gating on v26_1_1 in api.cc is a good safety net for mixed-version clusters, defaulting to true is aggressive for a brand new code path.
Consider defaulting to false for the initial release and switching to true once the RPC path has been validated in production. This gives operators an opt-in experience rather than a surprise behavioral change on upgrade.
PR Review: SR on kafka RPCsSummaryThis PR introduces a polymorphic
The refactoring is well-structured — the transport interface is clean, the separation of concerns is good, and the leadership-aware retry policy ( Test coverage is solid: new C++ unit tests for both Issues FoundPotential bugs:
Robustness concerns: Minor/Nits: What looks good
|
There was a problem hiding this comment.
Pull request overview
Adds a new “transport” abstraction for Schema Registry internal topic I/O, enabling an internal Kafka data RPC-based path (no client auth overhead) alongside the existing kafka::client-based path, and updates tests to explicitly select transport mode.
Changes:
- Introduce
pandaproxy::schema_registry::transportwithrpc_transportandkafka_client_transportimplementations, wired through Schema Registryapi/service/seq_writer. - Extend kafka data RPC to return base/last offsets from produce, add leadership-mitigating retries, and add targeted unit tests.
- Update rptest coverage to explicitly set
schema_registry_use_rpcper test and add RPC-transport variants/stress coverage.
Reviewed changes
Copilot reviewed 44 out of 44 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/tls_version_test.py | Explicitly disables SR RPC transport for TLS version tests. |
| tests/rptest/tests/tls_metrics_test.py | Explicitly disables SR RPC transport for TLS metrics tests. |
| tests/rptest/tests/security_report_test.py | Forces SR RPC transport off for security-report related SR tests. |
| tests/rptest/tests/schema_registry_test.py | Requires explicit schema_registry_use_rpc, adds RPC variants + stress test. |
| tests/rptest/tests/rpk_registry_test.py | Disables SR RPC transport for rpk registry tests. |
| tests/rptest/tests/redpanda_oauth_test.py | Disables SR RPC transport for OAuth test cluster config. |
| tests/rptest/tests/metrics_reporter_test.py | Enables SR RPC transport for metrics reporter SR usage. |
| tests/rptest/tests/crl_test.py | Disables SR RPC transport for CRL-related TLS config. |
| tests/rptest/tests/cluster_linking_topic_syncing_test.py | Enables SR RPC transport for cluster-linking topic syncing test config. |
| tests/rptest/tests/audit_log_test.py | Disables SR RPC transport for audit log test setup. |
| tests/rptest/tests/admin_api_auth_test.py | Disables SR RPC transport for auto-auth admin API test setup. |
| src/v/redpanda/application_runtime.cc | Passes kafka data RPC client into Schema Registry API wiring. |
| src/v/pandaproxy/schema_registry/transport.h | Adds transport interface for SR internal topic I/O. |
| src/v/pandaproxy/schema_registry/test/utils.h | Adds noop_transport for seq_writer-related tests. |
| src/v/pandaproxy/schema_registry/test/rpc_transport_test.cc | Adds unit tests for SR rpc_transport behavior. |
| src/v/pandaproxy/schema_registry/test/consume_to_store.cc | Switches tests from dummy kafka client to noop transport. |
| src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc | Switches 3rd-party compatibility test to noop transport. |
| src/v/pandaproxy/schema_registry/test/BUILD | Adds deps for transport and new rpc_transport gtest. |
| src/v/pandaproxy/schema_registry/service.h | Replaces kafka client dependency with transport pointer; delegates mitigation/ephemeral checks. |
| src/v/pandaproxy/schema_registry/service.cc | Routes internal topic reads/writes via transport; adds startup retry loop for topic metadata race. |
| src/v/pandaproxy/schema_registry/seq_writer.h | Replaces kafka client with transport; adds delete-subject retry cache state. |
| src/v/pandaproxy/schema_registry/seq_writer.cc | Uses transport for HWM/consume/produce; implements delete-subject retry caching behavior. |
| src/v/pandaproxy/schema_registry/rpc_transport.h | Declares RPC-based SR transport using kafka data RPC client. |
| src/v/pandaproxy/schema_registry/rpc_transport.cc | Implements RPC-based SR internal topic produce/consume/HWM. |
| src/v/pandaproxy/schema_registry/kafka_client_transport.h | Introduces kafka::client-backed SR transport (legacy path). |
| src/v/pandaproxy/schema_registry/kafka_client_transport.cc | Implements legacy transport, including ephemeral-credential mitigation logic. |
| src/v/pandaproxy/schema_registry/fwd.h | Adds forward declarations for new transport types. |
| src/v/pandaproxy/schema_registry/BUILD | Adds build targets/deps for transport and implementations. |
| src/v/pandaproxy/schema_registry/api.h | Adds rpc client input + transport variant storage for SR API. |
| src/v/pandaproxy/schema_registry/api.cc | Selects RPC vs Kafka-client transport using config + feature table; wires transport into service/seq_writer. |
| src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc | Adds tests for produce_with_offset returning base/last offsets. |
| src/v/kafka/data/rpc/test/deps.h | Extends fake leader cache with leader-term + not-leader mitigation hooks. |
| src/v/kafka/data/rpc/service.h | Documents produce() return semantics for offsets. |
| src/v/kafka/data/rpc/service.cc | Returns base/last offsets in produce reply; computes base from last+record_count. |
| src/v/kafka/data/rpc/serde.h | Bumps kafka_topic_data_result serde version and adds optional base/last offsets. |
| src/v/kafka/data/rpc/serde.cc | Extends formatting to include optional base/last offsets. |
| src/v/kafka/data/rpc/deps.h | Extends leader cache interface with leader-term lookup and mitigation API. |
| src/v/kafka/data/rpc/deps.cc | Implements leadership-change wait mitigation via partition_leaders_table notifications. |
| src/v/kafka/data/rpc/client.h | Adds produce_with_offset and get_single_partition_offsets; adds retry_mitigating helpers. |
| src/v/kafka/data/rpc/client.cc | Adds leadership-mitigating retry policy and exposes new client methods. |
| src/v/kafka/data/rpc/BUILD | Adds expiring_promise dependency for mitigation implementation. |
| src/v/config/configuration.h | Adds schema_registry_use_rpc tunable property. |
| src/v/config/configuration.cc | Defines schema_registry_use_rpc property and help text (default true). |
| src/v/cluster_link/tests/deps.h | Updates fake leader cache to satisfy new leader-term/mitigation interface. |
529dcbe to
8852936
Compare
|
/ci-repeat 1 |
|
/ci-repeat 1 |
|
/ci-repeat 1 |
8852936 to
5f0fbab
Compare
Retry command for Build#82735please wait until all jobs are finished before running the slash command |
5f0fbab to
2bda853
Compare
|
/ci-repeat 3 |
Reliable version that tries to mitigate not_leader errors Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Rework fake_partition_leader_cache to track per-ntp term and expose clear_leader/bump_term/set_recovery_callback. The recovery callback runs synchronously inside wait_for_term_change(), so tests can drive a "leader lost -> election -> recovered" scenario without timing races. The delegator now forwards wait_for_term_change to the underlying fake instead of no-oping. Add recovery tests for produce_with_leader_mitigation, consume, and get_single_partition_offsets that assert the retry succeeds and that the not_leader branch was taken, by counting wait_for_term_change invocations and verifying stale_term snapshots advance across attempts. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
do_delete_subject_impermanent called get_versions(include_deleted::no) before is_subject_deleted(). On retry after a write collision where the subject was already soft-deleted by the winning writer, get_versions throws subject_not_found, which propagates as HTTP 404 to the client. Fix by checking is_subject_deleted first and returning the full version list via include_deleted::yes on that branch. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
6bab74c to
6b916cf
Compare
|
/ci-repeat 1 |
Some of this might be better placed under k/d/rpc/tests, but this is the specific functionality required by schema registry itself. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Defaults to true. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Pimpl wrapper for kafka_client_transport and rpc_transport. Dispatch to various methods as appropriate. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
And plumb into seq_writer & service. Transport selected at runtime dependent on config & cluster version. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
SchemaRegistryEndpoints now asserts that every subclass explicitly sets schema_registry_use_rpc. RPC-transport-enabled tests: - SchemaRegistryModeNotMutableTest - SchemaRegistryModeMutableTest - SchemaRegistryContextRpcTransportTest - SchemaRegistryBasicAuthRpcTransportTest - SchemaRegistryRpcTransportTest - SchemaRegistryAutoAuthTest - SchemaRegistryConfluentClient - SchemaRegistryCompatibilityModes - SchemaRegistryACLTest - SchemaRegistryContextAuthzRpcTransportTest - SchemaRegistryRpcTransportStressTest - ClusterLinkingSchemaRegistry - SchemaRegistryContextMetricsTest Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
kafka/data/rpc & pp/schema_registry Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
6b916cf to
f542f88
Compare
|
/ci-repeat 1 |
|
/ci-repeat 3 |
|
/ci-repeat 3 |
Retry command for Build#83723please wait until all jobs are finished before running the slash command |
use a similar scheme but don't bother with the term checks. the longer polling period was all we really care aboutl still falls back to exp backoff if err is anything other than not leader
7132823 to
f9c43f0
Compare
|
/ci-repeat 3 |
Builds on
Actually add the RPC transport variant, integrate and add to tests.
The transport is selected at startup via
schema_registry_use_rpc(default:true, gated on cluster version >= v26.2.1). The kafka client remainsavailable as a fallback. The RPC path requires no explicit auth config and bypasses
the kafka API completely.
Backports Required
Release Notes
Improvements