diff --git a/misc/python/materialize/cloudtest/k8s/toxiproxy.py b/misc/python/materialize/cloudtest/k8s/toxiproxy.py new file mode 100644 index 0000000000000..6dcf93149a63d --- /dev/null +++ b/misc/python/materialize/cloudtest/k8s/toxiproxy.py @@ -0,0 +1,295 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from kubernetes.client import ( + V1Container, + V1ContainerPort, + V1Deployment, + V1DeploymentSpec, + V1LabelSelector, + V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, + V1Service, + V1ServicePort, + V1ServiceSpec, +) + +from materialize.cloudtest import DEFAULT_K8S_NAMESPACE +from materialize.cloudtest.k8s.api.k8s_deployment import K8sDeployment +from materialize.cloudtest.k8s.api.k8s_resource import K8sResource +from materialize.cloudtest.k8s.api.k8s_service import K8sService +from materialize.mzcompose.services.redpanda import REDPANDA_VERSION + +TOXIPROXY_IMAGE = "jauderho/toxiproxy:v2.8.0" + + +class ToxiproxyDeployment(K8sDeployment): + """Kubernetes Deployment for Toxiproxy. + + Args: + namespace: Kubernetes namespace + name: Name for this toxiproxy instance (e.g., "toxiproxy" or "toxiproxy-az1") + apply_node_selectors: Whether to apply node selectors for supporting services + """ + + def __init__( + self, + namespace: str, + name: str = "toxiproxy", + apply_node_selectors: bool = False, + ) -> None: + super().__init__(namespace) + self._name = name + app_label = name + + container = V1Container( + name="toxiproxy", + image=TOXIPROXY_IMAGE, + args=["-host=0.0.0.0"], + ports=[ + V1ContainerPort(name="admin", container_port=8474), + V1ContainerPort(name="kafka-proxy", container_port=9092), + ], + ) + + node_selector = None + if apply_node_selectors: + node_selector = {"supporting-services": "true"} + + template = V1PodTemplateSpec( + metadata=V1ObjectMeta(namespace=namespace, labels={"app": app_label}), + spec=V1PodSpec(containers=[container], node_selector=node_selector), + ) + + selector = V1LabelSelector(match_labels={"app": app_label}) + + spec = V1DeploymentSpec(replicas=1, template=template, selector=selector) + + self.deployment = V1Deployment( + api_version="apps/v1", + kind="Deployment", + metadata=V1ObjectMeta(name=name, namespace=namespace), + spec=spec, + ) + + def delete(self) -> None: + self.apps_api().delete_namespaced_deployment( + name=self._name, namespace=self.namespace() + ) + + +class ToxiproxyService(K8sService): + """Kubernetes Service for Toxiproxy. + + Args: + namespace: Kubernetes namespace + name: Name for this toxiproxy service (should match deployment name) + """ + + def __init__(self, namespace: str, name: str = "toxiproxy") -> None: + super().__init__(namespace) + self._name = name + app_label = name + + ports = [ + V1ServicePort(name="admin", port=8474), + V1ServicePort(name="kafka-proxy", port=9092), + ] + + self.service = V1Service( + metadata=V1ObjectMeta( + name=name, namespace=namespace, labels={"app": app_label} + ), + spec=V1ServiceSpec( + type="NodePort", ports=ports, selector={"app": app_label} + ), + ) + + def delete(self) -> None: + self.api().delete_namespaced_service( + name=self._name, namespace=self.namespace() + ) + + +class PrivateLinkExternalNameService(K8sService): + """Creates an ExternalName service to simulate VpcEndpoint DNS resolution. + + In production, the environment-controller creates this service when a + VpcEndpoint becomes "available". The service name follows the pattern: + - 'connection-{catalog_id}' for the base endpoint + - 'connection-{catalog_id}-{az}' for AZ-specific endpoints + + For testing, we point it to Toxiproxy which proxies to the actual service. + + Args: + connection_id: The CatalogItemId of the PrivateLink connection (e.g., "u5") + target_service: DNS name to resolve to (e.g., "toxiproxy.default.svc.cluster.local") + namespace: Kubernetes namespace + availability_zone: Optional AZ for AZ-specific endpoint (e.g., "use1-az1") + """ + + def __init__( + self, + connection_id: str, + target_service: str, + namespace: str, + availability_zone: str | None = None, + ) -> None: + super().__init__(namespace) + # Name matches vpc_endpoint_host() in src/cloud-resources/src/vpc_endpoint.rs + if availability_zone: + self._name = f"connection-{connection_id}-{availability_zone}" + else: + self._name = f"connection-{connection_id}" + + self.service = V1Service( + api_version="v1", + kind="Service", + metadata=V1ObjectMeta(name=self._name, namespace=namespace), + spec=V1ServiceSpec( + type="ExternalName", + external_name=target_service, + ), + ) + + def delete(self) -> None: + self.api().delete_namespaced_service( + name=self._name, namespace=self.namespace() + ) + + +def toxiproxy_resources( + namespace: str = DEFAULT_K8S_NAMESPACE, + name: str = "toxiproxy", + apply_node_selectors: bool = False, +) -> list[K8sResource]: + """Create Toxiproxy deployment and service resources. + + Args: + namespace: Kubernetes namespace + name: Name for this toxiproxy instance (use different names for multi-AZ) + apply_node_selectors: Whether to apply node selectors + """ + return [ + ToxiproxyDeployment(namespace, name, apply_node_selectors), + ToxiproxyService(namespace, name), + ] + + +class PrivateLinkTestRedpandaDeployment(K8sDeployment): + """Redpanda deployment that advertises an AZ-specific hostname for PrivateLink testing. + + This allows testing pattern-based broker routing where the advertised broker + address contains an AZ identifier that can be matched by routing rules. + + Args: + namespace: Kubernetes namespace + name: Name for this Redpanda instance + advertise_addr: The address Redpanda advertises to clients (e.g., "broker.use1-az1.internal:9092") + apply_node_selectors: Whether to apply node selectors + """ + + def __init__( + self, + namespace: str, + name: str = "redpanda-privatelink", + advertise_addr: str = "broker.use1-az1.internal:9092", + apply_node_selectors: bool = False, + ) -> None: + super().__init__(namespace) + self._name = name + app_label = name + + container = V1Container( + name="redpanda", + image=f"redpandadata/redpanda:{REDPANDA_VERSION}", + command=[ + "/usr/bin/rpk", + "redpanda", + "start", + "--overprovisioned", + "--smp", + "1", + "--memory", + "1G", + "--reserve-memory", + "0M", + "--node-id", + "0", + "--check=false", + "--set", + "redpanda.enable_transactions=true", + "--set", + "redpanda.enable_idempotence=true", + "--set", + "redpanda.auto_create_topics_enabled=true", + "--set", + "redpanda.topic_memory_per_partition=4096", + "--advertise-kafka-addr", + advertise_addr, + ], + ) + + node_selector = None + if apply_node_selectors: + node_selector = {"supporting-services": "true"} + + template = V1PodTemplateSpec( + metadata=V1ObjectMeta(namespace=namespace, labels={"app": app_label}), + spec=V1PodSpec(containers=[container], node_selector=node_selector), + ) + + selector = V1LabelSelector(match_labels={"app": app_label}) + spec = V1DeploymentSpec(replicas=1, template=template, selector=selector) + + self.deployment = V1Deployment( + api_version="apps/v1", + kind="Deployment", + metadata=V1ObjectMeta(name=name, namespace=namespace), + spec=spec, + ) + + def delete(self) -> None: + self.apps_api().delete_namespaced_deployment( + name=self._name, namespace=self.namespace() + ) + + +class PrivateLinkTestRedpandaService(K8sService): + """Service for the PrivateLink test Redpanda instance. + + Args: + namespace: Kubernetes namespace + name: Name for this service (should match deployment name) + """ + + def __init__(self, namespace: str, name: str = "redpanda-privatelink") -> None: + super().__init__(namespace) + self._name = name + app_label = name + + ports = [ + V1ServicePort(name="kafka", port=9092), + V1ServicePort(name="schema-registry", port=8081), + ] + + self.service = V1Service( + metadata=V1ObjectMeta( + name=name, namespace=namespace, labels={"app": app_label} + ), + spec=V1ServiceSpec( + type="NodePort", ports=ports, selector={"app": app_label} + ), + ) + + def delete(self) -> None: + self.api().delete_namespaced_service( + name=self._name, namespace=self.namespace() + ) diff --git a/test/cloudtest/reset b/test/cloudtest/reset index 2fca530111dc8..9bd86417d1999 100755 --- a/test/cloudtest/reset +++ b/test/cloudtest/reset @@ -18,3 +18,5 @@ cd "$(dirname "$0")/../.." run kubectl --context="$K8S_CONTEXT" delete all --all run kubectl --context="$K8S_CONTEXT" delete pvc --all +run kubectl --context="$K8S_CONTEXT" delete configmap --all +run kubectl --context="$K8S_CONTEXT" delete vpcendpoint --all 2>/dev/null || true diff --git a/test/cloudtest/test_privatelink_connection.py b/test/cloudtest/test_privatelink_connection.py index ae6effaeabd6a..bec906df3af0a 100644 --- a/test/cloudtest/test_privatelink_connection.py +++ b/test/cloudtest/test_privatelink_connection.py @@ -7,15 +7,26 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +import json +import random +import subprocess import time from textwrap import dedent import pytest +import requests from pg8000.dbapi import DatabaseError, ProgrammingError +from materialize.cloudtest import DEFAULT_K8S_NAMESPACE from materialize.cloudtest.app.materialize_application import MaterializeApplication +from materialize.cloudtest.k8s.toxiproxy import ( + PrivateLinkExternalNameService, + ToxiproxyDeployment, + ToxiproxyService, +) from materialize.cloudtest.util.common import retry from materialize.cloudtest.util.exists import exists, not_exists +from materialize.cloudtest.util.wait import wait from materialize.ui import UIError @@ -254,3 +265,580 @@ def test_retry_drop_privatelink_connection(mz: MaterializeApplication) -> None: max_attempts=10, exception_types=[UIError], ) + + +def test_privatelink_e2e_connectivity(mz: MaterializeApplication) -> None: + """ + End-to-end test validating PrivateLink connectivity via Toxiproxy. + + Uses a single exact-match rule for bootstrap and routing. Standard Redpanda + (unpatched) advertises the same address it bootstraps from, so a single + rule suffices. This test validates: + 1. Connectivity through the PrivateLink simulation (Toxiproxy) + 2. Source goes stalled when the proxy is disabled + 3. Source recovers when the proxy is re-enabled + + Architecture: + Materialize --> ExternalName (connection-{id}) --> Toxiproxy --> Redpanda + """ + namespace = DEFAULT_K8S_NAMESPACE + + # Track resources for cleanup + toxiproxy_deployment = None + toxiproxy_service = None + privatelink_svc: PrivateLinkExternalNameService | None = None + + # Step 1: Deploy a single Toxiproxy instance + toxiproxy_deployment = ToxiproxyDeployment(namespace, name="toxiproxy-e2e") + toxiproxy_service = ToxiproxyService(namespace, name="toxiproxy-e2e") + toxiproxy_deployment.create() + toxiproxy_service.create() + + try: + wait(condition="condition=Available", resource="deployment/toxiproxy-e2e") + + # Step 2: Enable PrivateLink connections and create connection + mz.environmentd.sql( + "ALTER SYSTEM SET max_aws_privatelink_connections = 5", + port="internal", + user="mz_system", + ) + mz.environmentd.sql( + "ALTER SYSTEM SET enable_connection_validation_syntax = true", + port="internal", + user="mz_system", + ) + mz.environmentd.sql( + "ALTER SYSTEM SET enable_kafka_broker_matching_rules = true", + port="internal", + user="mz_system", + ) + + mz.environmentd.sql(dedent("""\ + CREATE CONNECTION privatelink_e2e_conn + TO AWS PRIVATELINK ( + SERVICE NAME 'com.amazonaws.vpce.test.vpce-svc-e2e-test', + AVAILABILITY ZONES ('use1-az1') + ) + """)) + + connection_id = mz.environmentd.sql_query( + "SELECT id FROM mz_connections WHERE name = 'privatelink_e2e_conn'" + )[0][0] + + # Step 3: Verify VpcEndpoint resource exists + exists(resource=f"vpcendpoint/connection-{connection_id}") + + # Step 4: Create ExternalName service to simulate VpcEndpoint controller + privatelink_svc = PrivateLinkExternalNameService( + connection_id=connection_id, + target_service=f"toxiproxy-e2e.{namespace}.svc.cluster.local", + namespace=namespace, + availability_zone=None, + ) + privatelink_svc.create() + + # Step 5: Configure Toxiproxy to proxy to Redpanda + admin_port = toxiproxy_service.node_port("admin") + requests.post( + f"http://localhost:{admin_port}/proxies", + json={ + "name": "kafka", + "listen": "0.0.0.0:9092", + "upstream": f"redpanda.{namespace}.svc.cluster.local:9092", + "enabled": True, + }, + ) + + # Step 6: Create Kafka connection with a static broker (for bootstrap) + # and a catch-all MATCHING rule that routes through the PrivateLink endpoint. + redpanda_addr = f"redpanda.{namespace}.svc.cluster.local:9092" + mz.environmentd.sql(dedent(f"""\ + CREATE CONNECTION kafka_via_privatelink_e2e TO KAFKA ( + BROKERS ( + '{redpanda_addr}' USING AWS PRIVATELINK privatelink_e2e_conn (PORT 9092), + MATCHING '*' USING AWS PRIVATELINK privatelink_e2e_conn (PORT 9092) + ), + SECURITY PROTOCOL PLAINTEXT + ) WITH (VALIDATE = false) + """)) + + # Create a topic for testing with an explicit seed for reproducibility + topic_base = "privatelink-e2e-test" + seed = random.randint(0, 2**31 - 1) + full_topic_name = f"testdrive-{topic_base}-{seed}" + + mz.testdrive.run( + input=f"$ kafka-create-topic topic={topic_base}\n", + no_reset=True, + seed=seed, + ) + + # Step 7: Create source (proxy is enabled so this should succeed) + mz.environmentd.sql(dedent(f"""\ + CREATE SOURCE privatelink_e2e_source + FROM KAFKA CONNECTION kafka_via_privatelink_e2e ( + TOPIC '{full_topic_name}' + ) + """)) + + mz.environmentd.sql(dedent(f"""\ + CREATE TABLE privatelink_e2e_tbl + FROM SOURCE privatelink_e2e_source ( + REFERENCE "{full_topic_name}" + ) + FORMAT BYTES + ENVELOPE NONE + """)) + + # Verify data flows initially + mz.testdrive.run( + input=dedent(f"""\ + $ kafka-ingest topic={topic_base} format=bytes + test_data_via_privatelink + + > SELECT COUNT(*) FROM privatelink_e2e_tbl + 1 + """), + no_reset=True, + seed=seed, + ) + + # Step 8: Disable the proxy to test error handling + requests.post( + f"http://localhost:{admin_port}/proxies/kafka", + json={ + "name": "kafka", + "listen": "0.0.0.0:9092", + "upstream": f"redpanda.{namespace}.svc.cluster.local:9092", + "enabled": False, + }, + ) + + # Wait for the source to detect connection loss + def check_source_not_running() -> None: + status = mz.environmentd.sql_query( + "SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'privatelink_e2e_source'" + )[0][0] + assert status in ( + "stalled", + "starting", + ), f"Expected source to be stalled or starting when proxy is down, got: {status}" + + retry( + f=check_source_not_running, + max_attempts=30, + exception_types=[AssertionError], + ) + + # Step 9: Re-enable the proxy and verify recovery + requests.post( + f"http://localhost:{admin_port}/proxies/kafka", + json={ + "name": "kafka", + "listen": "0.0.0.0:9092", + "upstream": f"redpanda.{namespace}.svc.cluster.local:9092", + "enabled": True, + }, + ) + + mz.testdrive.run( + input=dedent(f"""\ + $ kafka-ingest topic={topic_base} format=bytes + recovery_data + + > SELECT COUNT(*) FROM privatelink_e2e_tbl + 2 + """), + no_reset=True, + seed=seed, + ) + + # Verify source is now running + def check_source_running() -> None: + status = mz.environmentd.sql_query( + "SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'privatelink_e2e_source'" + )[0][0] + assert status == "running", f"Source status is {status}, expected running" + + retry( + f=check_source_running, + max_attempts=30, + exception_types=[AssertionError], + ) + + finally: + # Cleanup + mz.environmentd.sql("DROP TABLE IF EXISTS privatelink_e2e_tbl CASCADE") + mz.environmentd.sql("DROP SOURCE IF EXISTS privatelink_e2e_source CASCADE") + mz.environmentd.sql( + "DROP CONNECTION IF EXISTS kafka_via_privatelink_e2e CASCADE" + ) + mz.environmentd.sql("DROP CONNECTION IF EXISTS privatelink_e2e_conn CASCADE") + if privatelink_svc is not None: + privatelink_svc.delete() + if toxiproxy_service is not None: + toxiproxy_service.delete() + if toxiproxy_deployment is not None: + toxiproxy_deployment.delete() + + +def test_privatelink_pattern_matching(mz: MaterializeApplication) -> None: + """ + Test that pattern-based AZ routing works with MATCHING broker rules. + + Patches Redpanda to advertise an AZ-specific broker address, then verifies + that the MATCHING rule routes post-metadata traffic through the AZ-specific + toxiproxy, NOT the default one. + + Architecture: + Bootstrap: + Static broker 'redpanda.default.svc.cluster.local:9092' in BROKERS + -> resolve_broker_addr -> connection-{id} (default) + -> ExternalName -> toxiproxy-default -> Redpanda + + Post-metadata (after Redpanda returns AZ-specific advertised address): + resolve_broker_addr('redpanda-use1-az1.default.svc.cluster.local', 9092) + -> MATCHING '*use1-az1*' -> connection-{id}-use1-az1 (AZ endpoint) + -> ExternalName -> toxiproxy-use1-az1 -> Redpanda + + After bootstrap, toxiproxy-default is DISABLED. If data still flows, it + proves the MATCHING rule routed through the AZ toxiproxy. + """ + namespace = DEFAULT_K8S_NAMESPACE + az = "use1-az1" + broker_alias = f"redpanda-{az}" + advertised_addr = f"{broker_alias}.{namespace}.svc.cluster.local:9092" + # The bootstrap address uses the standard Redpanda service name. + # This is different from the advertised address, which contains the AZ. + bootstrap_addr = f"redpanda.{namespace}.svc.cluster.local:9092" + + # Track resources for cleanup + toxiproxy_az_deployment = None + toxiproxy_az_service = None + toxiproxy_default_deployment = None + toxiproxy_default_service = None + privatelink_svc_az = None + privatelink_svc_default = None + original_redpanda_args = None + broker_alias_service_created = False + + try: + # Step 1: Create a K8s service alias for Redpanda with an AZ-specific name. + # Redpanda will advertise this address, and it must resolve in the cluster + # so that testdrive can also reach it for topic creation. + broker_alias_svc = { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"name": broker_alias, "namespace": namespace}, + "spec": { + "selector": {"app": "redpanda"}, + "ports": [{"port": 9092, "targetPort": 9092}], + }, + } + subprocess.run( + ["kubectl", "--context=kind-mzcloud", "apply", "-f", "-"], + input=json.dumps(broker_alias_svc), + text=True, + check=True, + ) + broker_alias_service_created = True + + # Step 2: Save original Redpanda config for restoration + redpanda_deployment = mz.kubectl("get", "deployment", "redpanda", "-o", "json") + redpanda_config = json.loads(redpanda_deployment) + original_redpanda_args = redpanda_config["spec"]["template"]["spec"][ + "containers" + ][0].get("command", []) + + # Step 3: Patch Redpanda to advertise the AZ-specific address. + # After bootstrap, metadata will return this address instead of the + # bootstrap address, which is what triggers pattern-based routing. + patch = { + "spec": { + "template": { + "spec": { + "containers": [ + { + "name": "redpanda", + "command": [ + "/usr/bin/rpk", + "redpanda", + "start", + "--overprovisioned", + "--smp", + "1", + "--memory", + "1G", + "--reserve-memory", + "0M", + "--node-id", + "0", + "--check=false", + "--set", + "redpanda.enable_transactions=true", + "--set", + "redpanda.enable_idempotence=true", + "--set", + "redpanda.auto_create_topics_enabled=true", + "--advertise-kafka-addr", + advertised_addr, + ], + } + ] + } + } + } + } + mz.kubectl( + "patch", + "deployment", + "redpanda", + "--type=strategic", + "-p", + json.dumps(patch), + ) + + mz.kubectl("rollout", "status", "deployment/redpanda", "--timeout=120s") + wait(condition="condition=Available", resource="deployment/redpanda") + time.sleep(5) + + # Step 4: Deploy two toxiproxy instances + toxiproxy_az_deployment = ToxiproxyDeployment(namespace, name=f"toxiproxy-{az}") + toxiproxy_az_service = ToxiproxyService(namespace, name=f"toxiproxy-{az}") + toxiproxy_az_deployment.create() + toxiproxy_az_service.create() + + toxiproxy_default_deployment = ToxiproxyDeployment( + namespace, name="toxiproxy-default" + ) + toxiproxy_default_service = ToxiproxyService( + namespace, name="toxiproxy-default" + ) + toxiproxy_default_deployment.create() + toxiproxy_default_service.create() + + wait(condition="condition=Available", resource=f"deployment/toxiproxy-{az}") + wait(condition="condition=Available", resource="deployment/toxiproxy-default") + time.sleep(5) + + # Configure both toxiproxies to route to Redpanda (both ENABLED initially) + az_admin_port = toxiproxy_az_service.node_port("admin") + requests.post( + f"http://localhost:{az_admin_port}/proxies", + json={ + "name": "kafka", + "listen": "0.0.0.0:9092", + "upstream": f"redpanda.{namespace}.svc.cluster.local:9092", + "enabled": True, + }, + ) + + default_admin_port = toxiproxy_default_service.node_port("admin") + requests.post( + f"http://localhost:{default_admin_port}/proxies", + json={ + "name": "kafka", + "listen": "0.0.0.0:9092", + "upstream": f"redpanda.{namespace}.svc.cluster.local:9092", + "enabled": True, + }, + ) + + # Step 5: Create PrivateLink connection + mz.environmentd.sql( + "ALTER SYSTEM SET max_aws_privatelink_connections = 5", + port="internal", + user="mz_system", + ) + mz.environmentd.sql( + "ALTER SYSTEM SET enable_connection_validation_syntax = true", + port="internal", + user="mz_system", + ) + mz.environmentd.sql( + "ALTER SYSTEM SET enable_kafka_broker_matching_rules = true", + port="internal", + user="mz_system", + ) + + mz.environmentd.sql(dedent(f"""\ + CREATE CONNECTION privatelink_pattern_conn + TO AWS PRIVATELINK ( + SERVICE NAME 'com.amazonaws.vpce.test.vpce-svc-pattern-test', + AVAILABILITY ZONES ('{az}') + ) + """)) + + connection_id = mz.environmentd.sql_query( + "SELECT id FROM mz_connections WHERE name = 'privatelink_pattern_conn'" + )[0][0] + + # Step 6: Create ExternalName services + # AZ-specific endpoint -> toxiproxy-az (stays enabled, for post-metadata traffic) + privatelink_svc_az = PrivateLinkExternalNameService( + connection_id=connection_id, + target_service=f"toxiproxy-{az}.{namespace}.svc.cluster.local", + namespace=namespace, + availability_zone=az, + ) + privatelink_svc_az.create() + + # Default endpoint -> toxiproxy-default (enabled for bootstrap, disabled later) + privatelink_svc_default = PrivateLinkExternalNameService( + connection_id=connection_id, + target_service=f"toxiproxy-default.{namespace}.svc.cluster.local", + namespace=namespace, + availability_zone=None, + ) + privatelink_svc_default.create() + + # Step 7: Create Kafka connection with a static broker (for bootstrap) + # and a MATCHING rule for AZ-specific routing. + # Static broker: bootstrap address -> default endpoint (no AZ) + # MATCHING rule: anything containing the AZ -> AZ-specific endpoint + mz.environmentd.sql(dedent(f"""\ + CREATE CONNECTION kafka_pattern_test TO KAFKA ( + BROKERS ( + '{bootstrap_addr}' USING AWS PRIVATELINK privatelink_pattern_conn (PORT 9092), + MATCHING '*{az}*' USING AWS PRIVATELINK privatelink_pattern_conn ( + AVAILABILITY ZONE '{az}', + PORT 9092 + ) + ), + SECURITY PROTOCOL PLAINTEXT + ) WITH (VALIDATE = false) + """)) + + # Step 8: Create topic and source + topic_base = "privatelink-pattern-test" + seed = random.randint(0, 2**31 - 1) + full_topic_name = f"testdrive-{topic_base}-{seed}" + + mz.testdrive.run( + input=f"$ kafka-create-topic topic={topic_base}\n", + no_reset=True, + seed=seed, + ) + + mz.environmentd.sql(dedent(f"""\ + CREATE SOURCE privatelink_pattern_source + FROM KAFKA CONNECTION kafka_pattern_test ( + TOPIC '{full_topic_name}' + ) + """)) + + mz.environmentd.sql(dedent(f"""\ + CREATE TABLE privatelink_pattern_tbl + FROM SOURCE privatelink_pattern_source ( + REFERENCE "{full_topic_name}" + ) + FORMAT BYTES + ENVELOPE NONE + """)) + + # Step 9: Wait for bootstrap + initial metadata fetch to complete, + # then DISABLE toxiproxy-default. After this, the only working path + # to Redpanda is through toxiproxy-use1-az1 (the AZ-specific proxy). + # If pattern matching fails, traffic falls back to the default endpoint + # which now hits a dead proxy -> source stalls. + time.sleep(10) + + requests.post( + f"http://localhost:{default_admin_port}/proxies/kafka", + json={ + "name": "kafka", + "listen": "0.0.0.0:9092", + "upstream": f"redpanda.{namespace}.svc.cluster.local:9092", + "enabled": False, + }, + ) + + # Step 10: Verify data flows through the AZ-specific path. + # This only succeeds if resolve_broker_addr matched '*use1-az1*' + # against the advertised address 'redpanda-use1-az1.default...:9092' + # and routed through the AZ endpoint instead of the (now dead) default. + mz.testdrive.run( + input=dedent(f"""\ + $ kafka-ingest topic={topic_base} format=bytes + pattern_matching_works + + > SELECT COUNT(*) FROM privatelink_pattern_tbl + 1 + """), + no_reset=True, + seed=seed, + ) + + # Verify source is running + def check_source_running() -> None: + status = mz.environmentd.sql_query( + "SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'privatelink_pattern_source'" + )[0][0] + assert ( + status == "running" + ), f"Source should be running (pattern matching worked!), got: {status}" + + retry( + f=check_source_running, + max_attempts=30, + exception_types=[AssertionError], + ) + + finally: + # Cleanup SQL objects + mz.environmentd.sql("DROP TABLE IF EXISTS privatelink_pattern_tbl CASCADE") + mz.environmentd.sql("DROP SOURCE IF EXISTS privatelink_pattern_source CASCADE") + mz.environmentd.sql("DROP CONNECTION IF EXISTS kafka_pattern_test CASCADE") + mz.environmentd.sql( + "DROP CONNECTION IF EXISTS privatelink_pattern_conn CASCADE" + ) + + # Cleanup ExternalName services + if privatelink_svc_az is not None: + privatelink_svc_az.delete() + if privatelink_svc_default is not None: + privatelink_svc_default.delete() + + # Cleanup toxiproxy + if toxiproxy_az_service is not None: + toxiproxy_az_service.delete() + if toxiproxy_az_deployment is not None: + toxiproxy_az_deployment.delete() + if toxiproxy_default_service is not None: + toxiproxy_default_service.delete() + if toxiproxy_default_deployment is not None: + toxiproxy_default_deployment.delete() + + # Cleanup broker alias service + if broker_alias_service_created: + try: + mz.kubectl("delete", "service", broker_alias) + except Exception: + pass # Best effort cleanup + + # Restore original Redpanda configuration + if original_redpanda_args: + restore_patch = { + "spec": { + "template": { + "spec": { + "containers": [ + {"name": "redpanda", "command": original_redpanda_args} + ] + } + } + } + } + try: + mz.kubectl( + "patch", + "deployment", + "redpanda", + "--type=strategic", + "-p", + json.dumps(restore_patch), + ) + mz.kubectl("rollout", "status", "deployment/redpanda", "--timeout=120s") + except Exception: + pass # Best effort restoration