From 99d55fafc9bea414b04104e13164921013b5fb27 Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Tue, 3 Mar 2026 16:00:19 -0500 Subject: [PATCH 1/8] doc comments for Kafka resolve_broker_addr --- src/kafka-util/src/client.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 0181af23ae8ef..7905beb37b3fb 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -489,6 +489,10 @@ where } } + /// Look up the broker's address in our book of rewrites. + /// If we've already rewritten it before, reuse the existing rewrite. + /// Otherwise, use our "default tunnel" rewriting strategy to attempt to rewrite this broker's address + /// and record it in the book of rewrites. fn resolve_broker_addr(&self, host: &str, port: u16) -> Result, io::Error> { let return_rewrite = |rewrite: &BrokerRewriteHandle| -> Result, io::Error> { let rewrite = match rewrite { @@ -525,8 +529,12 @@ where let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned(); match rewrite { + // No (successful) broker address rewrite exists yet. None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => { + // "Default tunnel" is actually the configured rewriting strategy used for brokers we haven't already rewritten. match &self.default_tunnel { + // This "default tunnel" is actually a default tunnel. + // Try connecting so we have a valid rewrite for thsi broker address. TunnelConfig::Ssh(default_tunnel) => { // Multiple users could all run `connect` at the same time; only one ssh // tunnel will ever be connected, and only one will be inserted into the @@ -543,6 +551,7 @@ where .await }); match ssh_tunnel { + // Use the tunnel we just created, but only if nobody beat us in the race. Ok(ssh_tunnel) => { let mut rewrites = self.rewrites.lock().expect("poisoned"); let rewrite = match rewrites.entry(addr.clone()) { @@ -565,6 +574,7 @@ where return_rewrite(rewrite) } + // We couldn't connect. Someone else will have to try again. Err(e) => { warn!( "failed to create ssh tunnel for {:?}: {}", @@ -587,14 +597,17 @@ where } } } + // Our rewrite strategy is to use a specific host, e.g. a PrivateLink endpoint. TunnelConfig::StaticHost(host) => (host.as_str(), port) .to_socket_addrs() .map(|addrs| addrs.collect()), + // We leave the broker's address as it is. TunnelConfig::None => { (host, port).to_socket_addrs().map(|addrs| addrs.collect()) } } } + // This broker's address was already rewritten. Reuse the existing rewrite. Some(rewrite) => return_rewrite(&rewrite), } } From 7f4c5b0fc7b192130b8d401f3cc3f2872f175304 Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Thu, 5 Mar 2026 11:42:03 -0500 Subject: [PATCH 2/8] update docs about Kafka + AWS PrivateLink --- doc/user/content/sql/create-connection.md | 37 +++++++++++----- doc/user/data/examples/create_connection.yml | 45 +++++++++++++++++++- 2 files changed, 71 insertions(+), 11 deletions(-) diff --git a/doc/user/content/sql/create-connection.md b/doc/user/content/sql/create-connection.md index 4ce7cfc364488..52ae2da9ad7aa 100644 --- a/doc/user/content/sql/create-connection.md +++ b/doc/user/content/sql/create-connection.md @@ -320,8 +320,32 @@ SSH bastion host. {{< include-md file="shared-content/aws-privatelink-cloud-only-note.md" >}} Depending on the hosted service you are connecting to, you might need to specify -a PrivateLink connection [per advertised broker](#kafka-privatelink-syntax) -(e.g. Amazon MSK), or a single [default PrivateLink connection](#kafka-privatelink-default) (e.g. Redpanda Cloud). +a PrivateLink connection and [per-availability-zone routing rules for brokers](#kafka-privatelinks) (e.g. Confluent Cloud), +a PrivateLink connection [per advertised broker](#kafka-privatelink-syntax) (e.g. Amazon MSK), +or a single [default PrivateLink connection](#kafka-privatelink-default) (e.g. Redpanda Cloud). + +##### Dynamic broker discovery {#kafka-privatelinks} + +Confluent Cloud does not require listing every broker individually. +Instead, you should specify a PrivateLink connection and bootstrap server port +along with rules for matching brokers to the correct availability-zone-specific PrivateLink endpoint. + +{{% include-syntax file="examples/create_connection" example="syntax-kafka-aws-privatelinks" %}} + +```mzsql +CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK ( + SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc', + AVAILABILITY ZONES ('use1-az1', 'use1-az4') +); + +CREATE CONNECTION kafka_connection TO KAFKA ( + AWS PRIVATELINKS ( + '*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1'), + '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az4'), + privatelink_svc + ) +); +``` ##### Broker connection syntax {#kafka-privatelink-syntax} @@ -347,7 +371,7 @@ broker that you want to connect to via the tunnel. Field | Value | Required | Description ----------------------------------------|------------------|:--------:|------------------------------- `AWS PRIVATELINK` | object name | ✓ | The name of an [AWS PrivateLink connection](#aws-privatelink) through which network traffic for this broker should be routed. -`AVAILABILITY ZONE` | `text` | | The ID of the availability zone of the AWS PrivateLink service in which the broker is accessible. If unspecified, traffic will be routed to each availability zone declared in the [AWS PrivateLink connection](#aws-privatelink) in sequence until the correct availability zone for the broker is discovered. If specified, Materialize will always route connections via the specified availability zone. +`AVAILABILITY ZONE` | `text` | | The ID of the availability zone of the AWS PrivateLink service in which the broker is accessible. `PORT` | `integer` | | The port of the AWS PrivateLink service to connect to. Defaults to the broker's port. ##### Example {#kafka-privatelink-example} @@ -388,13 +412,6 @@ PrivateLink connection and the port of the bootstrap server instead. {{% include-syntax file="examples/create_connection" example="syntax-kafka-default-aws-privatelink" %}} -##### Default connection options {#kafka-privatelink-default-options} - -Field | Value | Required | Description -----------------------------------------|------------------|:--------:|------------------------------- -`AWS PRIVATELINK` | object name | ✓ | The name of an [AWS PrivateLink connection](#aws-privatelink) through which network traffic for this broker should be routed. -`PORT` | `integer` | | The port of the AWS PrivateLink service to connect to. Defaults to the broker's port. - ##### Example {#kafka-privatelink-default-example} ```mzsql diff --git a/doc/user/data/examples/create_connection.yml b/doc/user/data/examples/create_connection.yml index 10be06d3a0db7..758bcd5209996 100644 --- a/doc/user/data/examples/create_connection.yml +++ b/doc/user/data/examples/create_connection.yml @@ -242,11 +242,54 @@ syntax_elements: - name: "`AWS PRIVATELINK `" description: | + *Value:* object name. Required. + The name of an AWS PrivateLink connection through which network traffic should be routed. - name: "`PORT`" description: | - The port of the AWS PrivateLink service to connect to. + *Value:* `integer` + + The port of the AWS PrivateLink service to connect to. Defaults to the broker's port. + +- name: "syntax-kafka-aws-privatelinks" + code: | + CREATE CONNECTION TO KAFKA ( + AWS PRIVATELINKS ( + '' TO ( + PORT , + AVAILABILITY ZONE = '' + ), + '' TO , + (PORT ) + ), + ... + ); + syntax_elements: + - name: "`'' TO `" + description: | + Routes brokers whose advertised `host:port` matches `` through + the named AWS PrivateLink connection. + A pattern may begin with `*` to match any prefix. A pattern may end with `*` to match any suffix. + All other characters in the pattern are matched literally. + Rules are evaluated in order. The first matching rule wins. + If no rule matches, Materialize will attempt to connect through the default PrivateLink connection listed at the end. + - name: "`AVAILABILITY ZONE`" + description: | + *Value:* `text` + + The ID of the availability zone of the AWS PrivateLink service in which + the broker is accessible. + - name: "`PORT`" + description: | + *Value:* `integer` + + The port of the AWS PrivateLink service to connect to. Defaults to the broker's port. + - name: "``" + description: | + *Value:* object name. Required. + + The AWS PrivateLink connection to use for bootstrapping. - name: "syntax-csr" code: | From fc6c865fc3c118fb0169799ea6e9f65222e31481 Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Fri, 6 Mar 2026 16:48:44 -0500 Subject: [PATCH 3/8] AWS PRIVATELINKS with a default privatelink for bootstrapping --- src/kafka-util/src/client.rs | 112 +++++++++ src/sql-lexer/src/keywords.txt | 1 + src/sql-parser/src/ast/defs/ddl.rs | 3 + src/sql-parser/src/ast/defs/statement.rs | 91 ++++++++ src/sql-parser/src/parser.rs | 168 ++++++++------ src/sql-parser/tests/testdata/ddl | 38 +++ src/sql/src/names.rs | 3 + src/sql/src/plan/statement.rs | 48 +--- src/sql/src/plan/statement/ddl/connection.rs | 232 ++++++++++++++----- src/sql/src/plan/with_options.rs | 34 ++- src/storage-types/src/connections.rs | 109 +++++++-- test/testdrive/connection-alter.td | 6 +- test/testdrive/connection-create-drop.td | 14 +- 13 files changed, 666 insertions(+), 193 deletions(-) diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 7905beb37b3fb..0d7e68e7a278f 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -288,6 +288,13 @@ pub struct BrokerAddr { pub port: u16, } +impl BrokerAddr { + /// Attempt to resolve this broker address into a list of socket addresses. + pub fn to_socket_addrs(&self) -> Result, io::Error> { + Ok((self.host.as_str(), self.port).to_socket_addrs()?.collect()) + } +} + /// Rewrites a broker address. /// /// For use with [`TunnelingClientContext`]. @@ -301,6 +308,16 @@ pub struct BrokerRewrite { pub port: Option, } +impl BrokerRewrite { + /// Apply the rewrite to this broker address. + pub fn rewrite(&self, address: &BrokerAddr) -> BrokerAddr { + BrokerAddr { + host: self.host.clone(), + port: self.port.unwrap_or(address.port), + } + } +} + #[derive(Clone)] enum BrokerRewriteHandle { Simple(BrokerRewrite), @@ -313,6 +330,57 @@ enum BrokerRewriteHandle { FailedDefaultSshTunnel(String), } +#[derive(Clone)] +/// Parsed from a string, with optional leading and trailing '*' wildcards. +pub struct ConnectionRulePattern { + /// If true, allow any combination of characters before the literal match. + pub prefix_wildcard: bool, + /// We expect the broker's host:port to match these characters in their entirety. + pub literal_match: String, + /// If true, allow any combination of characters after the literal match. + pub suffix_wildcard: bool, +} + +impl ConnectionRulePattern { + /// Does this "{host}:{port}" address fit the pattern? + pub fn matches(&self, address: &str) -> bool { + if self.prefix_wildcard { + if self.suffix_wildcard { + address.contains(&self.literal_match) + } else { + address.ends_with(&self.literal_match) + } + } else if self.suffix_wildcard { + address.starts_with(&self.literal_match) + } else { + address == self.literal_match + } + } +} + +#[derive(Clone)] +/// Given a host address, map it to a different host. +pub struct HostMappingRules { + /// Map matching hosts to a different host. First applicable rule wins. + pub rules: Vec<(ConnectionRulePattern, BrokerRewrite)>, + /// If no rules match, use this host. + pub default: BrokerRewrite, +} + +impl HostMappingRules { + /// Rewrite this broker address according to the rules. + pub fn rewrite(&self, src: &BrokerAddr) -> BrokerAddr { + let address = format!("{}:{}", src.host, src.port); + for (pattern, dst) in &self.rules { + if pattern.matches(&address) { + return dst.rewrite(src); + } + } + + self.default.rewrite(src) + } +} + /// Tunneling clients /// used for re-writing ports / hosts #[derive(Clone)] @@ -321,6 +389,8 @@ pub enum TunnelConfig { Ssh(SshTunnelConfig), /// Re-writes internal hosts using the value, used for privatelink StaticHost(String), + /// Re-writes internal hosts according to an ordered list of rules, also used for privatelink + Rules(HostMappingRules), /// Performs no re-writes None, } @@ -601,6 +671,8 @@ where TunnelConfig::StaticHost(host) => (host.as_str(), port) .to_socket_addrs() .map(|addrs| addrs.collect()), + // Rewrite according to the routing rules. + TunnelConfig::Rules(rules) => rules.rewrite(&addr).to_socket_addrs(), // We leave the broker's address as it is. TunnelConfig::None => { (host, port).to_socket_addrs().map(|addrs| addrs.collect()) @@ -971,3 +1043,43 @@ pub fn create_new_client_config( config } + +#[cfg(test)] +mod tests { + use super::*; + + #[mz_ore::test] + fn test_connection_rule_pattern_matches() { + let p = ConnectionRulePattern { + prefix_wildcard: false, + literal_match: "broker:9092".to_string(), + suffix_wildcard: false, + }; + assert!(p.matches("broker:9092")); + assert!(!p.matches("other:9092")); + + let p = ConnectionRulePattern { + prefix_wildcard: true, + literal_match: ":9092".to_string(), + suffix_wildcard: false, + }; + assert!(p.matches("any-host:9092")); + assert!(!p.matches("broker:9093")); + + let p = ConnectionRulePattern { + prefix_wildcard: false, + literal_match: "broker:".to_string(), + suffix_wildcard: true, + }; + assert!(p.matches("broker:9092")); + assert!(!p.matches("other:9092")); + + let p = ConnectionRulePattern { + prefix_wildcard: true, + literal_match: "broker".to_string(), + suffix_wildcard: true, + }; + assert!(p.matches("my-broker-host:1234")); + assert!(!p.matches("other:9092")); + } +} diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index e1aac2a6eb128..a6a6f53c3abb1 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -359,6 +359,7 @@ Prepare Primary Prioritize Privatelink +Privatelinks Privileges Progress Projection diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index cb31a0e06f5bf..74a030da79d7a 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -806,6 +806,7 @@ pub enum ConnectionOptionName { AvailabilityZones, AwsConnection, AwsPrivatelink, + AwsPrivatelinks, Broker, Brokers, Credential, @@ -845,6 +846,7 @@ impl AstDisplay for ConnectionOptionName { ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES", ConnectionOptionName::AwsConnection => "AWS CONNECTION", ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK", + ConnectionOptionName::AwsPrivatelinks => "AWS PRIVATELINKS", ConnectionOptionName::Broker => "BROKER", ConnectionOptionName::Brokers => "BROKERS", ConnectionOptionName::Credential => "CREDENTIAL", @@ -896,6 +898,7 @@ impl WithOptionName for ConnectionOptionName { | ConnectionOptionName::AvailabilityZones | ConnectionOptionName::AwsConnection | ConnectionOptionName::AwsPrivatelink + | ConnectionOptionName::AwsPrivatelinks | ConnectionOptionName::Broker | ConnectionOptionName::Brokers | ConnectionOptionName::Credential diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index a4721a7a2d5c1..6be25b39130fd 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -603,6 +603,92 @@ impl AstDisplay for ConnectionDefaultAwsPrivatelink { } impl_display_t!(ConnectionDefaultAwsPrivatelink); +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum ConnectionAwsPrivatelinkRule { + /// Route to brokers through PrivateLink connections according to these rules. + AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern), + /// Bootstrap through this PrivateLink connection. + AwsPrivatelinkRuleDefault(ConnectionDefaultAwsPrivatelink), +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +/// Match this pattern against some brokers' host:port. +pub struct ConnectionAwsPrivatelinkPattern { + /// Given a broker's host:port, should we use this route? + pub pattern: ConnectionRulePattern, + /// Route to the broker through this PrivateLink connection. + pub to: KafkaBrokerAwsPrivatelink, +} + +#[derive( + Debug, + Clone, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + Serialize, + Deserialize +)] +/// Parsed from a string, with optional leading and trailing '*' wildcards. +pub struct ConnectionRulePattern { + /// If true, allow any combination of characters before the literal match. + pub prefix_wildcard: bool, + /// We expect the broker's host:port to match these characters in their entirety. + pub literal_match: String, + /// If true, allow any combination of characters after the literal match. + pub suffix_wildcard: bool, +} + +impl AstDisplay for ConnectionAwsPrivatelinkRule { + fn fmt(&self, f: &mut AstFormatter) + where + W: fmt::Write, + { + match self { + ConnectionAwsPrivatelinkRule::AwsPrivatelinkRule(x) => f.write_node(x), + ConnectionAwsPrivatelinkRule::AwsPrivatelinkRuleDefault(x) => f.write_node(x), + } + } +} +impl_display_t!(ConnectionAwsPrivatelinkRule); + +impl AstDisplay for ConnectionAwsPrivatelinkPattern { + fn fmt(&self, f: &mut AstFormatter) + where + W: fmt::Write, + { + f.write_node(&self.pattern); + f.write_str(" TO "); + f.write_node(&self.to.connection); + if !self.to.options.is_empty() { + f.write_str(" ("); + f.write_node(&display::comma_separated(&self.to.options)); + f.write_str(")"); + } + } +} +impl_display_t!(ConnectionAwsPrivatelinkPattern); + +impl AstDisplay for ConnectionRulePattern { + fn fmt(&self, f: &mut AstFormatter) + where + W: fmt::Write, + { + f.write_str("'"); + if self.prefix_wildcard { + f.write_str("*"); + } + f.write_node(&display::escape_single_quote_string(&self.literal_match)); + if self.suffix_wildcard { + f.write_str("*"); + } + f.write_str("'"); + } +} +impl_display!(ConnectionRulePattern); + #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct KafkaBroker { pub address: String, @@ -4392,6 +4478,7 @@ pub enum WithOptionValue { ClusterReplicas(Vec>), ConnectionKafkaBroker(KafkaBroker), ConnectionAwsPrivatelink(ConnectionDefaultAwsPrivatelink), + ConnectionAwsPrivatelinkRule(ConnectionAwsPrivatelinkRule), RetainHistoryFor(Value), Refresh(RefreshOptionValue), ClusterScheduleOptionValue(ClusterScheduleOptionValue), @@ -4422,6 +4509,7 @@ impl AstDisplay for WithOptionValue { | WithOptionValue::UnresolvedItemName(_) | WithOptionValue::Ident(_) | WithOptionValue::ConnectionAwsPrivatelink(_) + | WithOptionValue::ConnectionAwsPrivatelinkRule(_) | WithOptionValue::ClusterReplicas(_) | WithOptionValue::ClusterScheduleOptionValue(_) | WithOptionValue::ClusterAlterStrategy(_) @@ -4473,6 +4561,9 @@ impl AstDisplay for WithOptionValue { WithOptionValue::ConnectionAwsPrivatelink(aws_privatelink) => { f.write_node(aws_privatelink); } + WithOptionValue::ConnectionAwsPrivatelinkRule(rule) => { + f.write_node(rule); + } WithOptionValue::ConnectionKafkaBroker(broker) => { f.write_node(broker); } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 91e78749566aa..f5c25b2c3d557 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -1895,6 +1895,22 @@ impl<'a> Parser<'a> { } } + /// Optional '=', then comma-separated list in parens/brackets. + fn parse_list_value(&mut self, f: F) -> Result, ParserError> + where + F: FnMut(&mut Self) -> Result, + { + let _ = self.consume_token(&Token::Eq); + let delimiter = self.expect_one_of_tokens(&[Token::LParen, Token::LBracket])?; + let values = self.parse_comma_separated(f)?; + self.expect_token(&match delimiter { + Token::LParen => Token::RParen, + Token::LBracket => Token::RBracket, + _ => unreachable!(), + })?; + Ok(values) + } + /// Parse a comma-separated list of 1+ items accepted by `F` fn parse_comma_separated(&mut self, mut f: F) -> Result, ParserError> where @@ -2531,6 +2547,14 @@ impl<'a> Parser<'a> { fn parse_default_aws_privatelink(&mut self) -> Result, ParserError> { let _ = self.consume_token(&Token::Eq); + Ok(WithOptionValue::ConnectionAwsPrivatelink( + self.parse_default_aws_privatelink_()?, + )) + } + + fn parse_default_aws_privatelink_( + &mut self, + ) -> Result, ParserError> { let connection = self.parse_raw_name()?; let port = if self.consume_token(&Token::LParen) { self.expect_keyword(PORT)?; @@ -2543,8 +2567,62 @@ impl<'a> Parser<'a> { } else { None }; - Ok(WithOptionValue::ConnectionAwsPrivatelink( - ConnectionDefaultAwsPrivatelink { connection, port }, + Ok(ConnectionDefaultAwsPrivatelink { connection, port }) + } + + /// This is just like 'parse_default_aws_privatelink_' except it supports more PrivateLink options. + fn parse_aws_privatelink(&mut self) -> Result, ParserError> { + let connection = self.parse_raw_name()?; + let options = if self.consume_token(&Token::LParen) { + let options = + self.parse_comma_separated(Parser::parse_kafka_broker_aws_privatelink_option)?; + self.expect_token(&Token::RParen)?; + options + } else { + vec![] + }; + Ok(KafkaBrokerAwsPrivatelink { + connection, + options, + }) + } + + fn parse_connection_rule_pattern(&mut self) -> Result { + let s = self.parse_literal_string()?; + let mut prefix_wildcard = false; + let mut suffix_wildcard = false; + let mut remainder = &s[..]; + + if let Some(stripped) = remainder.strip_prefix('*') { + prefix_wildcard = true; + remainder = stripped; + } + if let Some(stripped) = remainder.strip_suffix('*') { + suffix_wildcard = true; + remainder = stripped; + } + + Ok(ConnectionRulePattern { + prefix_wildcard, + literal_match: remainder.to_owned(), + suffix_wildcard, + }) + } + + fn parse_aws_privatelink_rule(&mut self) -> Result, ParserError> { + Ok(WithOptionValue::ConnectionAwsPrivatelinkRule( + if let Some(Token::String(_)) = self.peek_token() { + let pattern = self.parse_connection_rule_pattern()?; + self.expect_keywords(&[TO])?; + ConnectionAwsPrivatelinkRule::AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { + pattern, + to: self.parse_aws_privatelink()?, + }) + } else { + ConnectionAwsPrivatelinkRule::AwsPrivatelinkRuleDefault( + self.parse_default_aws_privatelink_()?, + ) + }, )) } @@ -2555,20 +2633,7 @@ impl<'a> Parser<'a> { match self.expect_one_of_keywords(&[AWS, SSH])? { AWS => { self.expect_keywords(&[PRIVATELINK])?; - let connection = self.parse_raw_name()?; - let options = if self.consume_token(&Token::LParen) { - let options = self.parse_comma_separated( - Parser::parse_kafka_broker_aws_privatelink_option, - )?; - self.expect_token(&Token::RParen)?; - options - } else { - vec![] - }; - KafkaBrokerTunnel::AwsPrivatelink(KafkaBrokerAwsPrivatelink { - connection, - options, - }) + KafkaBrokerTunnel::AwsPrivatelink(self.parse_aws_privatelink()?) } SSH => { self.expect_keywords(&[TUNNEL])?; @@ -2765,11 +2830,14 @@ impl<'a> Parser<'a> { self.expect_keyword(ZONES)?; ConnectionOptionName::AvailabilityZones } - AWS => match self.expect_one_of_keywords(&[CONNECTION, PRIVATELINK])? { - CONNECTION => ConnectionOptionName::AwsConnection, - PRIVATELINK => ConnectionOptionName::AwsPrivatelink, - _ => unreachable!(), - }, + AWS => { + match self.expect_one_of_keywords(&[CONNECTION, PRIVATELINK, PRIVATELINKS])? { + CONNECTION => ConnectionOptionName::AwsConnection, + PRIVATELINK => ConnectionOptionName::AwsPrivatelink, + PRIVATELINKS => ConnectionOptionName::AwsPrivatelinks, + _ => unreachable!(), + } + } BROKER => ConnectionOptionName::Broker, BROKERS => ConnectionOptionName::Brokers, CATALOG => { @@ -2847,51 +2915,21 @@ impl<'a> Parser<'a> { } fn parse_connection_option_unified(&mut self) -> Result, ParserError> { - let name = match self.parse_connection_option_name()? { - ConnectionOptionName::AwsConnection => { - return Ok(ConnectionOption { - name: ConnectionOptionName::AwsConnection, - value: Some(self.parse_object_option_value()?), - }); - } - ConnectionOptionName::AwsPrivatelink => { - return Ok(ConnectionOption { - name: ConnectionOptionName::AwsPrivatelink, - value: Some(self.parse_default_aws_privatelink()?), - }); - } - ConnectionOptionName::Broker => { - return Ok(ConnectionOption { - name: ConnectionOptionName::Broker, - value: Some(self.parse_kafka_broker()?), - }); - } - ConnectionOptionName::Brokers => { - let _ = self.consume_token(&Token::Eq); - let delimiter = self.expect_one_of_tokens(&[Token::LParen, Token::LBracket])?; - let brokers = self.parse_comma_separated(Parser::parse_kafka_broker)?; - self.expect_token(&match delimiter { - Token::LParen => Token::RParen, - Token::LBracket => Token::RBracket, - _ => unreachable!(), - })?; - return Ok(ConnectionOption { - name: ConnectionOptionName::Brokers, - value: Some(WithOptionValue::Sequence(brokers)), - }); - } - ConnectionOptionName::SshTunnel => { - return Ok(ConnectionOption { - name: ConnectionOptionName::SshTunnel, - value: Some(self.parse_object_option_value()?), - }); - } - name => name, + let name = self.parse_connection_option_name()?; + let value = match name { + ConnectionOptionName::AwsConnection => Some(self.parse_object_option_value()?), + ConnectionOptionName::AwsPrivatelink => Some(self.parse_default_aws_privatelink()?), + ConnectionOptionName::AwsPrivatelinks => Some(WithOptionValue::Sequence( + self.parse_list_value(Parser::parse_aws_privatelink_rule)?, + )), + ConnectionOptionName::Broker => Some(self.parse_kafka_broker()?), + ConnectionOptionName::Brokers => Some(WithOptionValue::Sequence( + self.parse_list_value(Parser::parse_kafka_broker)?, + )), + ConnectionOptionName::SshTunnel => Some(self.parse_object_option_value()?), + _ => self.parse_optional_option_value()?, }; - Ok(ConnectionOption { - name, - value: self.parse_optional_option_value()?, - }) + Ok(ConnectionOption { name, value }) } fn parse_create_subsource(&mut self) -> Result, ParserError> { diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 2dd03110b13a1..31a00ab7016be 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -606,6 +606,44 @@ CREATE CONNECTION mysqlconn TO MYSQL (AWS PRIVATELINK = db.schema.item, PORT = 1 => CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("mysqlconn")]), connection_type: MySql, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelink, value: Some(ConnectionAwsPrivatelink(ConnectionDefaultAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("db"), Ident("schema"), Ident("item")])), port: None })) }, ConnectionOption { name: Port, value: Some(Value(Number("1234"))) }, ConnectionOption { name: Host, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("foo")]))) }, ConnectionOption { name: SslCertificate, value: Some(Value(String("cert"))) }, ConnectionOption { name: SslCertificateAuthority, value: Some(Value(String("auth"))) }, ConnectionOption { name: SslKey, value: Some(Value(String("key"))) }], with_options: [] }) +# AWS PRIVATELINKS for Kafka connections + +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS (privatelink_svc)) +---- +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = (privatelink_svc)) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(AwsPrivatelinkRuleDefault(ConnectionDefaultAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), port: None }))])) }], with_options: [] }) + +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1'), '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az4'), privatelink_svc)) +---- +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1'), '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az4'), privatelink_svc)) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] } })), ConnectionAwsPrivatelinkRule(AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az4.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az4"))) }] } })), ConnectionAwsPrivatelinkRule(AwsPrivatelinkRuleDefault(ConnectionDefaultAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), port: None }))])) }], with_options: [] }) + +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1', PORT 9092), privatelink_svc)) +---- +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1', PORT = 9092), privatelink_svc)) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }, KafkaBrokerAwsPrivatelinkOption { name: Port, value: Some(Value(Number("9092"))) }] } })), ConnectionAwsPrivatelinkRule(AwsPrivatelinkRuleDefault(ConnectionDefaultAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), port: None }))])) }], with_options: [] }) + +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ()) +---- +error: Expected identifier, found right parenthesis +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ()) + ^ + +# This expression is not valid, since there's no default rule, but we don't check that in the parser. +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) +---- +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1'))) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] } }))])) }], with_options: [] }) + parse-statement CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn FOR TABLES (foo, bar as qux, baz into zop); ---- diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index f50451dad26ba..54c5ea46f6452 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -2098,6 +2098,9 @@ impl<'a> Fold for NameResolver<'a> { ConnectionAwsPrivatelink(privatelink) => { ConnectionAwsPrivatelink(self.fold_connection_default_aws_privatelink(privatelink)) } + ConnectionAwsPrivatelinkRule(x) => { + ConnectionAwsPrivatelinkRule(self.fold_connection_aws_privatelink_rule(x)) + } RetainHistoryFor(value) => RetainHistoryFor(self.fold_value(value)), Refresh(refresh) => Refresh(self.fold_refresh_option_value(refresh)), ClusterScheduleOptionValue(value) => ClusterScheduleOptionValue(value), diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index 874ffe5cba61d..3be54c1b79278 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -19,12 +19,10 @@ use mz_repr::{ CatalogItemId, ColumnIndex, RelationDesc, RelationVersionSelector, SqlColumnType, SqlScalarType, }; use mz_sql_parser::ast::{ - ColumnDef, ColumnName, ConnectionDefaultAwsPrivatelink, CreateMaterializedViewStatement, - RawItemName, ShowStatement, StatementKind, TableConstraint, UnresolvedDatabaseName, - UnresolvedSchemaName, + ColumnDef, ColumnName, CreateMaterializedViewStatement, RawItemName, ShowStatement, + StatementKind, TableConstraint, UnresolvedDatabaseName, UnresolvedSchemaName, }; -use mz_storage_types::connections::inline::ReferencedConnection; -use mz_storage_types::connections::{AwsPrivatelink, Connection, SshTunnel, Tunnel}; +use mz_storage_types::connections::Connection; use crate::ast::{Ident, Statement, UnresolvedItemName}; use crate::catalog::{ @@ -39,7 +37,7 @@ use crate::names::{ }; use crate::normalize; use crate::plan::error::PlanError; -use crate::plan::{Params, Plan, PlanContext, PlanKind, query, with_options}; +use crate::plan::{Params, Plan, PlanContext, PlanKind, query}; use crate::session::vars::FeatureFlag; mod acl; @@ -883,44 +881,6 @@ impl<'a> StatementContext<'a> { self.catalog.humanize_sql_column_type(typ, postgres_compat) } - pub(crate) fn build_tunnel_definition( - &self, - ssh_tunnel: Option, - aws_privatelink: Option>, - ) -> Result, PlanError> { - match (ssh_tunnel, aws_privatelink) { - (None, None) => Ok(Tunnel::Direct), - (Some(ssh_tunnel), None) => { - let id = CatalogItemId::from(ssh_tunnel); - let ssh_tunnel = self.catalog.get_item(&id); - match ssh_tunnel.connection()? { - Connection::Ssh(_connection) => Ok(Tunnel::Ssh(SshTunnel { - connection_id: id, - connection: id, - })), - _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item), - } - } - (None, Some(aws_privatelink)) => { - let id = aws_privatelink.connection.item_id().clone(); - let entry = self.catalog.get_item(&id); - match entry.connection()? { - Connection::AwsPrivatelink(_) => Ok(Tunnel::AwsPrivatelink(AwsPrivatelink { - connection_id: id, - // By default we do not specify an availability zone for the tunnel. - availability_zone: None, - // We always use the port as specified by the top-level connection. - port: aws_privatelink.port, - })), - _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item), - } - } - (Some(_), Some(_)) => { - sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK"); - } - } - } - pub fn relation_desc_into_table_defs( &self, desc: &RelationDesc, diff --git a/src/sql/src/plan/statement/ddl/connection.rs b/src/sql/src/plan/statement/ddl/connection.rs index 5b94d4e9d3553..65f8c0c5e1d7e 100644 --- a/src/sql/src/plan/statement/ddl/connection.rs +++ b/src/sql/src/plan/statement/ddl/connection.rs @@ -21,9 +21,9 @@ use mz_repr::CatalogItemId; use mz_sql_parser::ast::ConnectionOptionName::*; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::{ - ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType, - KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName, - KafkaBrokerTunnel, + ConnectionAwsPrivatelinkRule, ConnectionDefaultAwsPrivatelink, ConnectionOption, + ConnectionOptionName, CreateConnectionType, KafkaBroker, KafkaBrokerAwsPrivatelinkOption, + KafkaBrokerAwsPrivatelinkOptionName, KafkaBrokerTunnel, }; use mz_ssh_util::keys::SshKeyPair; use mz_storage_types::connections::aws::{ @@ -32,11 +32,11 @@ use mz_storage_types::connections::aws::{ use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::string_or_secret::StringOrSecret; use mz_storage_types::connections::{ - AwsPrivatelink, AwsPrivatelinkConnection, CsrConnection, CsrConnectionHttpAuth, - IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, KafkaConnection, - KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, MySqlSslMode, - PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, SqlServerConnectionDetails, - SshConnection, SshTunnel, TlsIdentity, Tunnel, + AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection, + CsrConnectionHttpAuth, IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, + KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, + MySqlSslMode, PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, + SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, Tunnel, }; use crate::names::Aug; @@ -53,6 +53,7 @@ generate_extracted_config!( (AvailabilityZones, Vec), (AwsConnection, with_options::Object), (AwsPrivatelink, ConnectionDefaultAwsPrivatelink), + (AwsPrivatelinks, Vec>), // (AwsPrivatelink, with_options::Object), (Broker, Vec>), (Brokers, Vec>), @@ -134,6 +135,7 @@ pub(super) fn validate_options_per_connection_type( ProgressTopic, ProgressTopicReplicationFactor, AwsPrivatelink, + AwsPrivatelinks, SshTunnel, SslKey, SslCertificate, @@ -318,8 +320,12 @@ impl ConnectionOptionExtracted { ConnectionDetails::Kafka(KafkaConnection { brokers: self.get_brokers(scx)?, - default_tunnel: scx - .build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?, + default_tunnel: build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + self.aws_privatelinks + )?, progress_topic: self.progress_topic, progress_topic_options: KafkaTopicOptions { // We only allow configuring the progress topic replication factor for now. @@ -377,7 +383,12 @@ impl ConnectionOptionExtracted { ) } } - let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?; + let tunnel = build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + None, /* Rule-based PrivateLink is not supported for CSR. */ + )?; ConnectionDetails::Csr(CsrConnection { url, @@ -419,7 +430,12 @@ impl ConnectionOptionExtracted { ) } } - let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?; + let tunnel = build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + None, /* Rule-based PrivateLink is not supported for Postgres. */ + )?; ConnectionDetails::Postgres(PostgresConnection { database: self @@ -520,7 +536,12 @@ impl ConnectionOptionExtracted { ) } } - let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?; + let tunnel = build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + None, /* Rule-based PrivateLink is not supported for MySQL. */ + )?; ConnectionDetails::MySql(MySqlConnection { password: self.password.map(|password| password.into()), @@ -594,7 +615,12 @@ impl ConnectionOptionExtracted { // // See: let port = self.port.unwrap_or(1433_u16); - let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?; + let tunnel = build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + None, /* Rule-based PrivateLink is not supported for SQL Server. */ + )?; ConnectionDetails::SqlServer(SqlServerConnectionDetails { host: self @@ -678,15 +704,23 @@ impl ConnectionOptionExtracted { scx: &StatementContext, ) -> Result>, PlanError> { - let mut brokers = match (&self.broker, &self.brokers, &self.aws_privatelink) { - (Some(v), None, None) => v.to_vec(), - (None, Some(v), None) => v.to_vec(), - (None, None, Some(_)) => vec![], - (None, None, None) => { - sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK") + let mut brokers = match ( + &self.broker, + &self.brokers, + &self.aws_privatelink, + &self.aws_privatelinks, + ) { + (Some(v), None, None, None) => v.to_vec(), + (None, Some(v), None, None) => v.to_vec(), + (None, None, Some(_), None) => vec![], + (None, None, None, Some(_)) => vec![], + (None, None, None, None) => { + sql_bail!( + "invalid CONNECTION: must set one of BROKER, BROKERS, AWS PRIVATELINK, or AWS PRIVATELINKS" + ) } _ => sql_bail!( - "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK" + "invalid CONNECTION: can only set one of BROKER, BROKERS, AWS PRIVATELINK, or AWS PRIVATELINKS" ), }; @@ -703,46 +737,7 @@ Instead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'ka let tunnel = match &broker.tunnel { KafkaBrokerTunnel::Direct => Tunnel::Direct, KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => { - let KafkaBrokerAwsPrivatelinkOptionExtracted { - availability_zone, - port, - seen: _, - } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from( - aws_privatelink.options.clone(), - )?; - - let id = match &aws_privatelink.connection { - ResolvedItemName::Item { id, .. } => id, - _ => sql_bail!( - "internal error: Kafka PrivateLink connection was not resolved" - ), - }; - let entry = scx.catalog.get_item(id); - match entry.connection()? { - Connection::AwsPrivatelink(connection) => { - if let Some(az) = &availability_zone { - if !connection.availability_zones.contains(az) { - sql_bail!( - "AWS PrivateLink availability zone {} does not match any of the \ - availability zones on the AWS PrivateLink connection {}", - az.quoted(), - scx.catalog - .resolve_full_name(entry.name()) - .to_string() - .quoted() - ) - } - } - Tunnel::AwsPrivatelink(AwsPrivatelink { - connection_id: *id, - availability_zone, - port, - }) - } - _ => { - sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item) - } - } + Tunnel::AwsPrivatelink(plan_privatelink(scx, aws_privatelink)?) } KafkaBrokerTunnel::SshTunnel(ssh) => { let id = match &ssh { @@ -927,3 +922,118 @@ fn plan_kafka_security( Ok((tls, sasl)) } +pub fn plan_default_privatelink( + scx: &StatementContext, + pl: &mz_sql_parser::ast::ConnectionDefaultAwsPrivatelink, +) -> Result { + let id = pl.connection.item_id().clone(); + let entry = scx.catalog.get_item(&id); + match entry.connection()? { + Connection::AwsPrivatelink(_) => Ok(AwsPrivatelink { + connection_id: id, + // By default we do not specify an availability zone for the tunnel. + availability_zone: None, + // We always use the port as specified by the top-level connection. + port: pl.port, + }), + _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item), + } +} + +pub fn plan_privatelink( + scx: &StatementContext, + pl: &mz_sql_parser::ast::KafkaBrokerAwsPrivatelink, +) -> Result { + let KafkaBrokerAwsPrivatelinkOptionExtracted { + availability_zone, + port, + seen: _, + } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(pl.options.clone())?; + + let id = match &pl.connection { + ResolvedItemName::Item { id, .. } => id, + _ => sql_bail!("internal error: Kafka PrivateLink connection was not resolved"), + }; + let entry = scx.catalog.get_item(id); + match entry.connection()? { + Connection::AwsPrivatelink(connection) => { + if let Some(az) = &availability_zone { + if !connection.availability_zones.contains(az) { + sql_bail!( + "AWS PrivateLink availability zone {} does not match any of the \ + availability zones on the AWS PrivateLink connection {}", + az.quoted(), + scx.catalog + .resolve_full_name(entry.name()) + .to_string() + .quoted() + ) + } + } + Ok(AwsPrivatelink { + connection_id: *id, + availability_zone, + port, + }) + } + _ => { + sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item) + } + } +} + +pub(crate) fn build_tunnel_definition( + scx: &StatementContext, + ssh_tunnel: Option, + aws_privatelink: Option>, + aws_privatelinks: Option>>, +) -> Result, PlanError> { + Ok(match (ssh_tunnel, aws_privatelink, aws_privatelinks) { + (None, None, None) => Tunnel::Direct, + (Some(ssh_tunnel), None, None) => { + let id = CatalogItemId::from(ssh_tunnel); + let ssh_tunnel = scx.catalog.get_item(&id); + match ssh_tunnel.connection()? { + Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel { + connection_id: id, + connection: id, + }), + _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item), + } + } + (None, Some(aws_privatelink), None) => { + Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?) + } + (None, None, Some(rules)) => { + let Some((default, patterns)) = rules.split_last() else { + sql_bail!("AWS PRIVATELINKS cannot be empty"); + }; + + let patterns = patterns.iter().map(|pattern| { + let ConnectionAwsPrivatelinkRule::AwsPrivatelinkRule(pattern) = pattern else { + sql_bail!("Only the last AWS PRIVATELINKS entry can be a default PrivateLink connection."); + }; + + Ok(AwsPrivatelinkRule{ + pattern: pattern.pattern.clone(), + to: plan_privatelink(scx, &pattern.to)?, + }) + }).collect::, _>>()?; + let ConnectionAwsPrivatelinkRule::AwsPrivatelinkRuleDefault(default) = default else { + sql_bail!( + "The last AWS PRIVATELINKS entry must be a default PrivateLink connection." + ); + }; + let default = plan_default_privatelink(scx, default)?; + Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { + rules: patterns, + default, + }) + } + _ => { + sql_bail!( + "SSH TUNNEL, AWS PRIVATELINK, and AWS PRIVATELINKS are mutually exclusive options" + ); + } + }) +} diff --git a/src/sql/src/plan/with_options.rs b/src/sql/src/plan/with_options.rs index aa9f6840d3e39..de3b10f17f6c1 100644 --- a/src/sql/src/plan/with_options.rs +++ b/src/sql/src/plan/with_options.rs @@ -16,8 +16,9 @@ use mz_repr::adt::interval::Interval; use mz_repr::bytes::ByteSize; use mz_repr::{CatalogItemId, RelationVersionSelector, strconv}; use mz_sql_parser::ast::{ - ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionDefaultAwsPrivatelink, Expr, - Ident, KafkaBroker, NetworkPolicyRuleDefinition, RefreshOptionValue, ReplicaDefinition, + ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionAwsPrivatelinkRule, + ConnectionDefaultAwsPrivatelink, Expr, Ident, KafkaBroker, NetworkPolicyRuleDefinition, + RefreshOptionValue, ReplicaDefinition, }; use mz_storage_types::connections::IcebergCatalogType; use mz_storage_types::connections::string_or_secret::StringOrSecret; @@ -700,6 +701,7 @@ impl, T: AstInfo + std::fmt::Debug> TryFromValue, T: AstInfo + std::fmt::Debug> TryFromValue "exprs", WithOptionValue::ClusterReplicas(_) => "cluster replicas", WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers", - WithOptionValue::ConnectionAwsPrivatelink(_) => "connection kafka brokers", + WithOptionValue::ConnectionAwsPrivatelink(_) => "connection privatelink", + WithOptionValue::ConnectionAwsPrivatelinkRule(_) => + "connection privatelink rule", WithOptionValue::Refresh(_) => "refresh option values", WithOptionValue::ClusterScheduleOptionValue(_) => "cluster schedule", WithOptionValue::NetworkPolicyRules(_) => "network policy rules", @@ -857,12 +861,36 @@ impl TryFromValue> for ConnectionDefaultAwsPrivatelink } } +impl TryFromValue> for ConnectionAwsPrivatelinkRule { + fn try_from_value(v: WithOptionValue) -> Result { + if let WithOptionValue::ConnectionAwsPrivatelinkRule(r) = v { + Ok(r) + } else { + sql_bail!("cannot use value `{}` for a privatelink rule", v) + } + } + + fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option> { + Some(WithOptionValue::ConnectionAwsPrivatelinkRule(self)) + } + + fn name() -> String { + "privatelink rule option value".to_string() + } +} + impl ImpliedValue for ConnectionDefaultAwsPrivatelink { fn implied_value() -> Result { sql_bail!("must provide a value") } } +impl ImpliedValue for ConnectionAwsPrivatelinkRule { + fn implied_value() -> Result { + sql_bail!("must provide a value") + } +} + impl ImpliedValue for ClusterScheduleOptionValue { fn implied_value() -> Result { sql_bail!("must provide a cluster schedule option value") diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 0ae008b7209ba..377ba49922842 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -31,7 +31,8 @@ use mz_ccsr::tls::{Certificate, Identity}; use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host}; use mz_dyncfg::ConfigSet; use mz_kafka_util::client::{ - BrokerAddr, BrokerRewrite, MzClientContext, MzKafkaError, TunnelConfig, TunnelingClientContext, + BrokerAddr, BrokerRewrite, HostMappingRules, MzClientContext, MzKafkaError, TunnelConfig, + TunnelingClientContext, }; use mz_mysql_util::{MySqlConn, MySqlError}; use mz_ore::assert_none; @@ -41,6 +42,7 @@ use mz_ore::netio::resolve_address; use mz_ore::num::NonNeg; use mz_repr::{CatalogItemId, GlobalId}; use mz_secrets::SecretsReader; +use mz_sql_parser::ast::ConnectionRulePattern; use mz_ssh_util::keys::SshKeyPair; use mz_ssh_util::tunnel::SshTunnelConfig; use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager}; @@ -939,7 +941,8 @@ impl KafkaConnection { options.insert("allow.auto.create.topics".into(), "false".into()); let brokers = match &self.default_tunnel { - Tunnel::AwsPrivatelink(t) => { + Tunnel::AwsPrivatelink(t) + | Tunnel::AwsPrivatelinks(AwsPrivatelinks { default: t, .. }) => { assert!(&self.brokers.is_empty()); let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM @@ -1067,10 +1070,15 @@ impl KafkaConnection { // By default, don't offer a default override for broker address lookup. } Tunnel::AwsPrivatelink(pl) => { - context.set_default_tunnel(TunnelConfig::StaticHost(vpc_endpoint_host( - pl.connection_id, - None, // Default tunnel does not support availability zones. - ))); + context.set_default_tunnel(TunnelConfig::StaticHost( + // Possible bug: We have been ignoring the configured port. + KafkaConnection::from_default_aws_privatelink(pl).host, + )); + } + Tunnel::AwsPrivatelinks(pl) => { + context.set_default_tunnel(TunnelConfig::Rules( + KafkaConnection::from_aws_privatelinks(pl), + )); } Tunnel::Ssh(ssh_tunnel) => { let secret = storage_configuration @@ -1098,6 +1106,8 @@ impl KafkaConnection { } } + // Here, we preemptively rewrite broker addresses. + // In concept, this overlaps with 'TunnelingClientContext::resolve_broker_addr'. for broker in &self.brokers { let mut addr_parts = broker.address.splitn(2, ':'); let addr = BrokerAddr { @@ -1124,19 +1134,14 @@ impl KafkaConnection { // in the `TunnelingClientContext`. } Tunnel::AwsPrivatelink(aws_privatelink) => { - let host = mz_cloud_resources::vpc_endpoint_host( - aws_privatelink.connection_id, - aws_privatelink.availability_zone.as_deref(), - ); - let port = aws_privatelink.port; context.add_broker_rewrite( addr, - BrokerRewrite { - host: host.clone(), - port, - }, + KafkaConnection::from_aws_privatelink(aws_privatelink), ); } + Tunnel::AwsPrivatelinks(_) => unreachable!( + "Individually predefined brokers do not use rule-based PrivateLinks routing." + ), Tunnel::Ssh(ssh_tunnel) => { // Ensure any SSH bastion address we connect to is resolved to an external address. let ssh_host_resolved = resolve_address( @@ -1236,6 +1241,49 @@ impl KafkaConnection { } } } + + /// The "default" PrivateLink connection is used for bootstrapping Kafka. + fn from_default_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite { + BrokerRewrite { + host: vpc_endpoint_host( + pl.connection_id, + None, // Default tunnel does not support availability zones. + ), + port: pl.port, + } + } + + /// The "not default" PrivateLink connections are used for routing to specific Kafka brokers. + fn from_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite { + BrokerRewrite { + host: vpc_endpoint_host(pl.connection_id, pl.availability_zone.as_deref()), + port: pl.port, + } + } + + fn from_aws_privatelink_rule( + AwsPrivatelinkRule { pattern, to }: &AwsPrivatelinkRule, + ) -> (mz_kafka_util::client::ConnectionRulePattern, BrokerRewrite) { + ( + mz_kafka_util::client::ConnectionRulePattern { + prefix_wildcard: pattern.prefix_wildcard, + literal_match: pattern.literal_match.clone(), + suffix_wildcard: pattern.suffix_wildcard, + }, + KafkaConnection::from_aws_privatelink(to), + ) + } + + fn from_aws_privatelinks(pl: &AwsPrivatelinks) -> HostMappingRules { + HostMappingRules { + rules: pl + .rules + .iter() + .map(KafkaConnection::from_aws_privatelink_rule) + .collect_vec(), + default: KafkaConnection::from_default_aws_privatelink(&pl.default), + } + } } impl AlterCompatible for KafkaConnection { @@ -1454,6 +1502,9 @@ impl CsrConnection { .collect(); client_config = client_config.resolve_to_addrs(host, &addrs) } + Tunnel::AwsPrivatelinks(_) => { + unreachable!("AWS PRIVATELINKS syntax is only available for Kafka connections."); + } } Ok(client_config.build()?) @@ -1712,6 +1763,9 @@ impl PostgresConnection { connection_id: connection.connection_id, } } + Tunnel::AwsPrivatelinks(_) => { + unreachable!("AWS PRIVATELINKS syntax is only available for Kafka connections."); + } }; Ok(mz_postgres_util::Config::new( @@ -1847,6 +1901,7 @@ pub enum Tunnel { Ssh(SshTunnel), /// Via the specified AWS PrivateLink connection. AwsPrivatelink(AwsPrivatelink), + AwsPrivatelinks(AwsPrivatelinks), } impl IntoInlineConnection for Tunnel { @@ -1855,6 +1910,7 @@ impl IntoInlineConnection for Tunnel Tunnel::Direct, Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)), Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl), + Tunnel::AwsPrivatelinks(x) => Tunnel::AwsPrivatelinks(x), } } } @@ -2064,6 +2120,9 @@ impl MySqlConnection { connection_id: connection.connection_id, } } + Tunnel::AwsPrivatelinks(_) => { + unreachable!("AWS PRIVATELINKS syntax is only available for Kafka connections."); + } }; let aws_config = match self.aws_connection.as_ref() { @@ -2390,6 +2449,9 @@ impl SqlServerConnectionDetails { port: self.port, } } + Tunnel::AwsPrivatelinks(_) => { + unreachable!("AWS PRIVATELINKS syntax is only available for Kafka connections."); + } }; Ok(mz_sql_server_util::Config::new( @@ -2553,6 +2615,23 @@ impl AlterCompatible for AwsPrivatelink { } } +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct AwsPrivatelinks { + /// Route to brokers through PrivateLink connections according to these rules. + /// First applicable rule wins. + pub rules: Vec, + /// Bootstrap through this PrivateLink connection. + pub default: AwsPrivatelink, +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct AwsPrivatelinkRule { + /// Given a broker's host:port, should we use this route? + pub pattern: ConnectionRulePattern, + /// Route to the broker through this PrivateLink connection. + pub to: AwsPrivatelink, +} + /// Specifies an SSH tunnel connection. #[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct SshTunnel { diff --git a/test/testdrive/connection-alter.td b/test/testdrive/connection-alter.td index 90f9899a5e6c7..d5cab685fd4fd 100644 --- a/test/testdrive/connection-alter.td +++ b/test/testdrive/connection-alter.td @@ -50,7 +50,7 @@ first second 1 2 ! ALTER CONNECTION conn RESET (broker); -contains:invalid ALTER CONNECTION: invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK +contains:set one of BROKER, BROKERS, AWS PRIVATELINK, or AWS PRIVATELINKS ! ALTER CONNECTION conn SET (broker = 'abcd') WITH (validate = true); contains:Failed to resolve hostname @@ -111,7 +111,7 @@ contains:BROKER specified more than once contains:BROKER specified more than once ! ALTER CONNECTION conn SET (BROKER '${testdrive.kafka-addr}'), SET (BROKERS ['${testdrive.kafka-addr}']) -contains:invalid ALTER CONNECTION: invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK +contains:set one of BROKER, BROKERS, AWS PRIVATELINK, or AWS PRIVATELINKS ! ALTER CONNECTION conn SET (BROKER '${testdrive.kafka-addr}'), DROP (BROKERS) contains:cannot both SET and DROP/RESET mutually exclusive KAFKA options BROKER, BROKERS @@ -121,7 +121,7 @@ contains:cannot both SET and DROP/RESET mutually exclusive KAFKA options BROKER, # We permit resetting both of these options, and the error occurs later in planning ! ALTER CONNECTION conn RESET (BROKER), RESET (BROKERS); -contains:invalid ALTER CONNECTION: invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK +contains:set one of BROKER, BROKERS, AWS PRIVATELINK, or AWS PRIVATELINKS > ALTER CONNECTION conn SET (BROKER = '${testdrive.kafka-addr}'); diff --git a/test/testdrive/connection-create-drop.td b/test/testdrive/connection-create-drop.td index 059895f18b8ca..49301f27eb608 100644 --- a/test/testdrive/connection-create-drop.td +++ b/test/testdrive/connection-create-drop.td @@ -387,14 +387,14 @@ contains:BROKER specified more than once ! CREATE CONNECTION no_broker TO KAFKA ( SECURITY PROTOCOL PLAINTEXT ); -contains:must set one of BROKER, BROKERS, or AWS PRIVATELINK +contains:must set one of BROKER, BROKERS, AWS PRIVATELINK, or AWS PRIVATELINKS ! CREATE CONNECTION ssl_underspeced TO KAFKA ( BROKER 'kafka:9092', BROKERS ['kafka:9092', 'kafka:9093'], SECURITY PROTOCOL PLAINTEXT ); -contains:can only set one of BROKER, BROKERS, or AWS PRIVATELINK +contains:can only set one of BROKER, BROKERS, AWS PRIVATELINK, or AWS PRIVATELINKS ! CREATE CONNECTION ssl_underspeced TO KAFKA ( BROKER 'kafka:9092', @@ -479,6 +479,16 @@ contains: HOST option is required ! CREATE CONNECTION conn1 TO KAFKA (BROKER '${testdrive.kafka-addr}' USING AWS PRIVATELINK foo (PORT 9093), SECURITY PROTOCOL PLAINTEXT); contains: unknown catalog item 'foo' +! CREATE CONNECTION conn1 TO KAFKA ( + AWS PRIVATELINKS ( + '*.use1-az1.*' TO foo (AVAILABILITY ZONE = 'use1-az1'), + '*.use1-az4.*' TO foo (PORT 9093, AVAILABILITY ZONE = 'use1-az4'), + foo + ), + SECURITY PROTOCOL PLAINTEXT + ); +contains: unknown catalog item 'foo' + ! CREATE CONNECTION conn1 TO CONFLUENT SCHEMA REGISTRY (AWS PRIVATELINK foo, PORT 8080) contains: unknown catalog item 'foo' From b62d000cbd3b248d889ac9a205f728e368a96b8f Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Tue, 24 Mar 2026 13:33:58 -0400 Subject: [PATCH 4/8] pull hardcoded bootstrap addresses from AWS PRIVATELINKS rules --- PROMPT.md | 96 ++++++++++++++++++++ doc/user/content/sql/create-connection.md | 9 +- doc/user/data/examples/create_connection.yml | 12 +-- src/kafka-util/src/client.rs | 16 ++-- src/sql-parser/src/ast/defs/statement.rs | 24 +---- src/sql-parser/src/parser.rs | 16 +--- src/sql-parser/tests/testdata/ddl | 35 ++++--- src/sql/src/names.rs | 2 +- src/sql/src/plan/statement/ddl/connection.rs | 38 +++----- src/sql/src/plan/with_options.rs | 6 +- src/storage-types/src/connections.rs | 33 +++++-- 11 files changed, 192 insertions(+), 95 deletions(-) create mode 100644 PROMPT.md diff --git a/PROMPT.md b/PROMPT.md new file mode 100644 index 0000000000000..e9fa36deb8d39 --- /dev/null +++ b/PROMPT.md @@ -0,0 +1,96 @@ +# AWS PRIVATELINKS syntax changes + +## Problem + +When using the top-level `AWS PRIVATELINK` or `AWS PRIVATELINKS` syntax (without `BROKERS`), `bootstrap.servers` is set to the VPC endpoint hostname (e.g. `vpce-xxx...amazonaws.com:9092`). librdkafka uses this hostname for TLS SNI. Confluent Cloud rejects TLS handshakes with an unrecognized SNI, so connections fail with "SSL handshake failed: Disconnected". + +The `BROKERS` syntax works because `bootstrap.servers` is set to the original Kafka hostname (e.g. `lkc-xxx...confluent.cloud:9092`), and `resolve_broker_addr` routes the TCP connection through the VPC endpoint without changing the hostname librdkafka uses for TLS. + +## Current code paths + +Both paths are in `KafkaConnection::create_with_context` in `src/storage-types/src/connections.rs`. + +**Top-level path** (lines 944-962): Sets `bootstrap.servers` to `vpc_endpoint_host(connection_id, None)`. Sets `default_tunnel` to `TunnelConfig::StaticHost` (singular `AWS PRIVATELINK`) or `TunnelConfig::Rules` (plural `AWS PRIVATELINKS`). The `HostMappingRules` struct in `src/kafka-util/src/client.rs` always has a `default: BrokerRewrite` field — this is the default rule that catches any broker address not matched by a pattern. + +**BROKERS path** (line 964): Sets `bootstrap.servers` to the user-provided broker addresses. Per-broker `AWS PRIVATELINK` tunnels are registered as individual `BrokerRewrite` entries via `add_broker_rewrite`. + +## Changes + +Two changes to the `AWS PRIVATELINKS` syntax: + +### 1. Remove the default (pattern-less) rule + +The `AwsPrivatelinks` struct currently requires a `default: AwsPrivatelink` field — the catch-all rule for brokers that don't match any pattern. Remove this. `AWS PRIVATELINKS` should only contain pattern rules. Every rule must have a pattern. + +This affects: +- `AwsPrivatelinks` struct in `src/storage-types/src/connections.rs` — remove `default` field +- `HostMappingRules` struct in `src/kafka-util/src/client.rs` — remove `default` field; `rewrite()` should return `None` when no pattern matches (instead of applying a default) +- `from_aws_privatelinks` in `src/storage-types/src/connections.rs` — stop building a default +- SQL parser in `src/sql-parser/src/parser.rs` — reject pattern-less entries in `AWS PRIVATELINKS (...)` +- AST types in `src/sql-parser/src/ast/defs/statement.rs` — `ConnectionAwsPrivatelinkRule` no longer needs a default variant +- Planning in `src/sql/src/plan/statement/ddl/connection.rs` — update accordingly + +### 2. Exact-match patterns become bootstrap brokers + +Host patterns without wildcards (no leading or trailing `*`) are exact matches — they match a single known broker address. These should be treated like `BROKERS` entries: their addresses should be included in `bootstrap.servers`, and the VPC endpoint routing should happen via `resolve_broker_addr` (same as per-broker `USING AWS PRIVATELINK` in the `BROKERS` syntax). + +This means in `create_with_context`: +- Collect exact-match patterns from `AWS PRIVATELINKS` rules +- Include their addresses in `bootstrap.servers` (comma-separated, like the `BROKERS` path) +- Register their VPC endpoint routing via `add_broker_rewrite` (like the `BROKERS` path does at lines 1137-1141) +- Wildcard patterns still go into `TunnelConfig::Rules` for dynamic matching in `resolve_broker_addr` + +The `assert!(self.brokers.is_empty())` on line 947 stays — users still don't specify `BROKERS` alongside `AWS PRIVATELINKS`. The bootstrap broker addresses come from the exact-match patterns instead. + +## Expected SQL after changes + +```sql +-- Exact-match patterns: addresses go into bootstrap.servers, +-- routed through their respective PrivateLink endpoints. +-- Wildcard patterns: applied dynamically to discovered brokers. +CREATE CONNECTION kafka_conn TO KAFKA ( + SECURITY PROTOCOL = SASL_SSL, + SASL MECHANISMS = 'PLAIN', + SASL USERNAME = '...', + SASL PASSWORD = SECRET pw, + AWS PRIVATELINKS ( + 'lkc-xxx.domyyy.us-east-1.aws.confluent.cloud:9092' TO privatelink_svc (PORT 9092), + '*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1'), + '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az4') + ) +); +``` + +With the old syntax, you'd also need a pattern-less default at the end: +```sql + privatelink_svc -- default, catch-all: REMOVED +``` + +## Documentation + +Update both documentation files to reflect the syntax changes. + +### `doc/user/content/sql/create-connection.md` + +**"Dynamic broker discovery" section** (lines 326-347, anchor `#kafka-privatelinks`): +- Update the description: exact-match (no-wildcard) patterns are used as bootstrap brokers. Wildcard patterns route dynamically discovered brokers. +- Update the example SQL: remove the bare `privatelink_svc` default line, add an exact-match pattern for the bootstrap broker address. + +**"Default connections" section** (lines 404-429, anchor `#kafka-privatelink-default`): +- This section documents the singular `AWS PRIVATELINK` (default connection) syntax for Redpanda Cloud. It is unchanged by this work, but verify the description still makes sense in context after the `AWS PRIVATELINKS` changes above. + +### `doc/user/data/examples/create_connection.yml` + +**`syntax-kafka-aws-privatelinks` example** (lines 255-293): +- Remove ` (PORT )` from the syntax template. +- Add an exact-match pattern line to the template (no wildcards). +- Update the `syntax_elements` descriptions: + - Remove the "``" element. + - Remove the sentence "If no rule matches, Materialize will attempt to connect through the default PrivateLink connection listed at the end." + - Add a note that patterns without wildcards are used as bootstrap broker addresses. + +## What NOT to change + +- The singular `AWS PRIVATELINK` syntax (top-level, non-Kafka connections like Postgres) is unchanged. +- The `BROKERS (...) USING AWS PRIVATELINK` syntax is unchanged. +- The `ConnectionRulePattern` struct and its `matches()` method are unchanged. diff --git a/doc/user/content/sql/create-connection.md b/doc/user/content/sql/create-connection.md index 52ae2da9ad7aa..331341faeef7f 100644 --- a/doc/user/content/sql/create-connection.md +++ b/doc/user/content/sql/create-connection.md @@ -327,8 +327,9 @@ or a single [default PrivateLink connection](#kafka-privatelink-default) (e.g. R ##### Dynamic broker discovery {#kafka-privatelinks} Confluent Cloud does not require listing every broker individually. -Instead, you should specify a PrivateLink connection and bootstrap server port -along with rules for matching brokers to the correct availability-zone-specific PrivateLink endpoint. +Instead, specify wildcard patterns for routing dynamically discovered brokers +to the correct availability-zone-specific PrivateLink endpoint. +For bootstrap brokers, use exact-match patterns without wildcards. {{% include-syntax file="examples/create_connection" example="syntax-kafka-aws-privatelinks" %}} @@ -340,9 +341,9 @@ CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK ( CREATE CONNECTION kafka_connection TO KAFKA ( AWS PRIVATELINKS ( + 'lkc-xxx.domyyy.us-east-1.aws.confluent.cloud:9092' TO privatelink_svc (PORT 9092), '*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1'), - '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az4'), - privatelink_svc + '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az4') ) ); ``` diff --git a/doc/user/data/examples/create_connection.yml b/doc/user/data/examples/create_connection.yml index 758bcd5209996..218b890e79edd 100644 --- a/doc/user/data/examples/create_connection.yml +++ b/doc/user/data/examples/create_connection.yml @@ -256,12 +256,12 @@ code: | CREATE CONNECTION TO KAFKA ( AWS PRIVATELINKS ( + '' TO (PORT ), '' TO ( PORT , AVAILABILITY ZONE = '' ), - '' TO , - (PORT ) + '' TO ), ... ); @@ -272,8 +272,9 @@ the named AWS PrivateLink connection. A pattern may begin with `*` to match any prefix. A pattern may end with `*` to match any suffix. All other characters in the pattern are matched literally. + Patterns without wildcards are used as bootstrap broker addresses. Rules are evaluated in order. The first matching rule wins. - If no rule matches, Materialize will attempt to connect through the default PrivateLink connection listed at the end. + If no rule matches, Materialize will attempt to connect to the broker directly, without tunneling. - name: "`AVAILABILITY ZONE`" description: | *Value:* `text` @@ -285,11 +286,6 @@ *Value:* `integer` The port of the AWS PrivateLink service to connect to. Defaults to the broker's port. - - name: "``" - description: | - *Value:* object name. Required. - - The AWS PrivateLink connection to use for bootstrapping. - name: "syntax-csr" code: | diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 0d7e68e7a278f..6a4eb8926820d 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -363,21 +363,20 @@ impl ConnectionRulePattern { pub struct HostMappingRules { /// Map matching hosts to a different host. First applicable rule wins. pub rules: Vec<(ConnectionRulePattern, BrokerRewrite)>, - /// If no rules match, use this host. - pub default: BrokerRewrite, } impl HostMappingRules { - /// Rewrite this broker address according to the rules. - pub fn rewrite(&self, src: &BrokerAddr) -> BrokerAddr { + /// Rewrite this broker address according to the rules. Returns `None` when + /// no rule matches. + pub fn rewrite(&self, src: &BrokerAddr) -> Option { let address = format!("{}:{}", src.host, src.port); for (pattern, dst) in &self.rules { if pattern.matches(&address) { - return dst.rewrite(src); + return Some(dst.rewrite(src)); } } - self.default.rewrite(src) + None } } @@ -672,7 +671,10 @@ where .to_socket_addrs() .map(|addrs| addrs.collect()), // Rewrite according to the routing rules. - TunnelConfig::Rules(rules) => rules.rewrite(&addr).to_socket_addrs(), + TunnelConfig::Rules(rules) => { + // If no rules match, just use the address as-is. + rules.rewrite(&addr).unwrap_or(addr).to_socket_addrs() + } // We leave the broker's address as it is. TunnelConfig::None => { (host, port).to_socket_addrs().map(|addrs| addrs.collect()) diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 6be25b39130fd..b11028fb9f070 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -603,14 +603,6 @@ impl AstDisplay for ConnectionDefaultAwsPrivatelink { } impl_display_t!(ConnectionDefaultAwsPrivatelink); -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub enum ConnectionAwsPrivatelinkRule { - /// Route to brokers through PrivateLink connections according to these rules. - AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern), - /// Bootstrap through this PrivateLink connection. - AwsPrivatelinkRuleDefault(ConnectionDefaultAwsPrivatelink), -} - #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] /// Match this pattern against some brokers' host:port. pub struct ConnectionAwsPrivatelinkPattern { @@ -641,18 +633,12 @@ pub struct ConnectionRulePattern { pub suffix_wildcard: bool, } -impl AstDisplay for ConnectionAwsPrivatelinkRule { - fn fmt(&self, f: &mut AstFormatter) - where - W: fmt::Write, - { - match self { - ConnectionAwsPrivatelinkRule::AwsPrivatelinkRule(x) => f.write_node(x), - ConnectionAwsPrivatelinkRule::AwsPrivatelinkRuleDefault(x) => f.write_node(x), - } +impl ConnectionRulePattern { + /// True when the pattern has no wildcards — it matches a single exact broker address. + pub fn is_exact(&self) -> bool { + !self.prefix_wildcard && !self.suffix_wildcard } } -impl_display_t!(ConnectionAwsPrivatelinkRule); impl AstDisplay for ConnectionAwsPrivatelinkPattern { fn fmt(&self, f: &mut AstFormatter) @@ -4478,7 +4464,7 @@ pub enum WithOptionValue { ClusterReplicas(Vec>), ConnectionKafkaBroker(KafkaBroker), ConnectionAwsPrivatelink(ConnectionDefaultAwsPrivatelink), - ConnectionAwsPrivatelinkRule(ConnectionAwsPrivatelinkRule), + ConnectionAwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern), RetainHistoryFor(Value), Refresh(RefreshOptionValue), ClusterScheduleOptionValue(ClusterScheduleOptionValue), diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index f5c25b2c3d557..99aee2ef7b011 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2610,18 +2610,12 @@ impl<'a> Parser<'a> { } fn parse_aws_privatelink_rule(&mut self) -> Result, ParserError> { + let pattern = self.parse_connection_rule_pattern()?; + self.expect_keywords(&[TO])?; Ok(WithOptionValue::ConnectionAwsPrivatelinkRule( - if let Some(Token::String(_)) = self.peek_token() { - let pattern = self.parse_connection_rule_pattern()?; - self.expect_keywords(&[TO])?; - ConnectionAwsPrivatelinkRule::AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { - pattern, - to: self.parse_aws_privatelink()?, - }) - } else { - ConnectionAwsPrivatelinkRule::AwsPrivatelinkRuleDefault( - self.parse_default_aws_privatelink_()?, - ) + ConnectionAwsPrivatelinkPattern { + pattern, + to: self.parse_aws_privatelink()?, }, )) } diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 31a00ab7016be..3c6ad391bfa87 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -608,41 +608,52 @@ CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("my # AWS PRIVATELINKS for Kafka connections +# Every entry requires a pattern string. parse-statement CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS (privatelink_svc)) ---- -CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = (privatelink_svc)) -=> -CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(AwsPrivatelinkRuleDefault(ConnectionDefaultAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), port: None }))])) }], with_options: [] }) +error: Expected literal string, found identifier "privatelink_svc" +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS (privatelink_svc)) + ^ +# Wildcard patterns with availability zones. parse-statement -CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1'), '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az4'), privatelink_svc)) +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1'), '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az4'))) ---- -CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1'), '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az4'), privatelink_svc)) +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1'), '*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az4'))) => -CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] } })), ConnectionAwsPrivatelinkRule(AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az4.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az4"))) }] } })), ConnectionAwsPrivatelinkRule(AwsPrivatelinkRuleDefault(ConnectionDefaultAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), port: None }))])) }], with_options: [] }) +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] } }), ConnectionAwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az4.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az4"))) }] } })])) }], with_options: [] }) +# Wildcard pattern with availability zone and port. parse-statement -CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1', PORT 9092), privatelink_svc)) +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1', PORT 9092))) ---- -CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1', PORT = 9092), privatelink_svc)) +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1', PORT = 9092))) => -CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }, KafkaBrokerAwsPrivatelinkOption { name: Port, value: Some(Value(Number("9092"))) }] } })), ConnectionAwsPrivatelinkRule(AwsPrivatelinkRuleDefault(ConnectionDefaultAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), port: None }))])) }], with_options: [] }) +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }, KafkaBrokerAwsPrivatelinkOption { name: Port, value: Some(Value(Number("9092"))) }] } })])) }], with_options: [] }) parse-statement CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ()) ---- -error: Expected identifier, found right parenthesis +error: Expected literal string, found right parenthesis CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ()) ^ -# This expression is not valid, since there's no default rule, but we don't check that in the parser. +# Single wildcard pattern. parse-statement CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) ---- CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = ('*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1'))) => -CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(AwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] } }))])) }], with_options: [] }) +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] } })])) }], with_options: [] }) + +# Exact-match pattern (no wildcards) — used as a bootstrap broker address. +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS ('broker1:9092' TO privatelink_svc (PORT 9092))) +---- +CREATE CONNECTION kafka_connection TO KAFKA (AWS PRIVATELINKS = ('broker1:9092' TO privatelink_svc (PORT = 9092))) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelinks, value: Some(Sequence([ConnectionAwsPrivatelinkRule(ConnectionAwsPrivatelinkPattern { pattern: ConnectionRulePattern { prefix_wildcard: false, literal_match: "broker1:9092", suffix_wildcard: false }, to: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: Port, value: Some(Value(Number("9092"))) }] } })])) }], with_options: [] }) parse-statement CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn FOR TABLES (foo, bar as qux, baz into zop); diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index 54c5ea46f6452..de053bf9fe0de 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -2099,7 +2099,7 @@ impl<'a> Fold for NameResolver<'a> { ConnectionAwsPrivatelink(self.fold_connection_default_aws_privatelink(privatelink)) } ConnectionAwsPrivatelinkRule(x) => { - ConnectionAwsPrivatelinkRule(self.fold_connection_aws_privatelink_rule(x)) + ConnectionAwsPrivatelinkRule(self.fold_connection_aws_privatelink_pattern(x)) } RetainHistoryFor(value) => RetainHistoryFor(self.fold_value(value)), Refresh(refresh) => Refresh(self.fold_refresh_option_value(refresh)), diff --git a/src/sql/src/plan/statement/ddl/connection.rs b/src/sql/src/plan/statement/ddl/connection.rs index 65f8c0c5e1d7e..bda87da3b8565 100644 --- a/src/sql/src/plan/statement/ddl/connection.rs +++ b/src/sql/src/plan/statement/ddl/connection.rs @@ -21,7 +21,7 @@ use mz_repr::CatalogItemId; use mz_sql_parser::ast::ConnectionOptionName::*; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::{ - ConnectionAwsPrivatelinkRule, ConnectionDefaultAwsPrivatelink, ConnectionOption, + ConnectionAwsPrivatelinkPattern, ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType, KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName, KafkaBrokerTunnel, }; @@ -53,7 +53,7 @@ generate_extracted_config!( (AvailabilityZones, Vec), (AwsConnection, with_options::Object), (AwsPrivatelink, ConnectionDefaultAwsPrivatelink), - (AwsPrivatelinks, Vec>), + (AwsPrivatelinks, Vec>), // (AwsPrivatelink, with_options::Object), (Broker, Vec>), (Brokers, Vec>), @@ -986,7 +986,7 @@ pub(crate) fn build_tunnel_definition( scx: &StatementContext, ssh_tunnel: Option, aws_privatelink: Option>, - aws_privatelinks: Option>>, + aws_privatelinks: Option>>, ) -> Result, PlanError> { Ok(match (ssh_tunnel, aws_privatelink, aws_privatelinks) { (None, None, None) => Tunnel::Direct, @@ -1005,30 +1005,20 @@ pub(crate) fn build_tunnel_definition( Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?) } (None, None, Some(rules)) => { - let Some((default, patterns)) = rules.split_last() else { + if rules.is_empty() { sql_bail!("AWS PRIVATELINKS cannot be empty"); - }; - - let patterns = patterns.iter().map(|pattern| { - let ConnectionAwsPrivatelinkRule::AwsPrivatelinkRule(pattern) = pattern else { - sql_bail!("Only the last AWS PRIVATELINKS entry can be a default PrivateLink connection."); - }; + } - Ok(AwsPrivatelinkRule{ - pattern: pattern.pattern.clone(), - to: plan_privatelink(scx, &pattern.to)?, + let rules = rules + .iter() + .map(|rule| { + Ok(AwsPrivatelinkRule { + pattern: rule.pattern.clone(), + to: plan_privatelink(scx, &rule.to)?, }) - }).collect::, _>>()?; - let ConnectionAwsPrivatelinkRule::AwsPrivatelinkRuleDefault(default) = default else { - sql_bail!( - "The last AWS PRIVATELINKS entry must be a default PrivateLink connection." - ); - }; - let default = plan_default_privatelink(scx, default)?; - Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { - rules: patterns, - default, - }) + }) + .collect::, PlanError>>()?; + Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { rules }) } _ => { sql_bail!( diff --git a/src/sql/src/plan/with_options.rs b/src/sql/src/plan/with_options.rs index de3b10f17f6c1..78a07cd3915bb 100644 --- a/src/sql/src/plan/with_options.rs +++ b/src/sql/src/plan/with_options.rs @@ -16,7 +16,7 @@ use mz_repr::adt::interval::Interval; use mz_repr::bytes::ByteSize; use mz_repr::{CatalogItemId, RelationVersionSelector, strconv}; use mz_sql_parser::ast::{ - ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionAwsPrivatelinkRule, + ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionAwsPrivatelinkPattern, ConnectionDefaultAwsPrivatelink, Expr, Ident, KafkaBroker, NetworkPolicyRuleDefinition, RefreshOptionValue, ReplicaDefinition, }; @@ -861,7 +861,7 @@ impl TryFromValue> for ConnectionDefaultAwsPrivatelink } } -impl TryFromValue> for ConnectionAwsPrivatelinkRule { +impl TryFromValue> for ConnectionAwsPrivatelinkPattern { fn try_from_value(v: WithOptionValue) -> Result { if let WithOptionValue::ConnectionAwsPrivatelinkRule(r) = v { Ok(r) @@ -885,7 +885,7 @@ impl ImpliedValue for ConnectionDefaultAwsPrivatelink { } } -impl ImpliedValue for ConnectionAwsPrivatelinkRule { +impl ImpliedValue for ConnectionAwsPrivatelinkPattern { fn implied_value() -> Result { sql_bail!("must provide a value") } diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 377ba49922842..1d91145b2984f 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -941,8 +941,7 @@ impl KafkaConnection { options.insert("allow.auto.create.topics".into(), "false".into()); let brokers = match &self.default_tunnel { - Tunnel::AwsPrivatelink(t) - | Tunnel::AwsPrivatelinks(AwsPrivatelinks { default: t, .. }) => { + Tunnel::AwsPrivatelink(t) => { assert!(&self.brokers.is_empty()); let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM @@ -960,6 +959,28 @@ impl KafkaConnection { t.port.unwrap_or(9092) ) } + Tunnel::AwsPrivatelinks(pl) => { + assert!(&self.brokers.is_empty()); + + let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM + .get(storage_configuration.config_set()); + options.insert("ssl.endpoint.identification.algorithm".into(), algo.into()); + + // Exact-match patterns (no wildcards) provide bootstrap broker addresses. + // Their original hostnames are used for bootstrap.servers so that TLS SNI is correct. + let brokers = pl + .rules + .iter() + .filter(|r| r.pattern.is_exact()) + .map(|r| &r.pattern.literal_match) + .join(","); + if brokers.is_empty() { + return Err(ContextCreationError::Other(anyhow::anyhow!( + "AWS PRIVATELINKS option has no exact-match rules to use as bootstrap brokers" + ))); + } + brokers + } _ => self.brokers.iter().map(|b| &b.address).join(","), }; options.insert("bootstrap.servers".into(), brokers.into()); @@ -1076,6 +1097,8 @@ impl KafkaConnection { )); } Tunnel::AwsPrivatelinks(pl) => { + // All rules (both exact-match and wildcard) go through dynamic routing. + // Exact-match rules also provide bootstrap broker addresses (set above). context.set_default_tunnel(TunnelConfig::Rules( KafkaConnection::from_aws_privatelinks(pl), )); @@ -1281,7 +1304,6 @@ impl KafkaConnection { .iter() .map(KafkaConnection::from_aws_privatelink_rule) .collect_vec(), - default: KafkaConnection::from_default_aws_privatelink(&pl.default), } } } @@ -2618,10 +2640,9 @@ impl AlterCompatible for AwsPrivatelink { #[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct AwsPrivatelinks { /// Route to brokers through PrivateLink connections according to these rules. - /// First applicable rule wins. + /// Exact-match rules (no wildcards) are used as bootstrap brokers. + /// Wildcard rules are applied dynamically to discovered brokers. pub rules: Vec, - /// Bootstrap through this PrivateLink connection. - pub default: AwsPrivatelink, } #[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] From 3b8847843f62dec739f0ebd62b19bfe163f6f496 Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Wed, 8 Apr 2026 10:24:39 -0400 Subject: [PATCH 5/8] code changes to facilitate manual testing --- src/kafka-util/src/client.rs | 24 +++++++++++++++++++++++- src/storage/src/source/kafka.rs | 30 ++++++++++++++++++++++++------ 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 6a4eb8926820d..e3fe17d2dd799 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -341,6 +341,19 @@ pub struct ConnectionRulePattern { pub suffix_wildcard: bool, } +impl std::fmt::Display for ConnectionRulePattern { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.prefix_wildcard { + f.write_str("*")?; + } + f.write_str(&self.literal_match)?; + if self.suffix_wildcard { + f.write_str("*")?; + } + Ok(()) + } +} + impl ConnectionRulePattern { /// Does this "{host}:{port}" address fit the pattern? pub fn matches(&self, address: &str) -> bool { @@ -372,10 +385,19 @@ impl HostMappingRules { let address = format!("{}:{}", src.host, src.port); for (pattern, dst) in &self.rules { if pattern.matches(&address) { - return Some(dst.rewrite(src)); + let result = dst.rewrite(src); + info!( + "HostMappingRules: broker {}:{} matched pattern '{}' -> rewriting to {}:{}", + src.host, src.port, pattern, result.host, result.port, + ); + return Some(result); } } + warn!( + "HostMappingRules: broker {}:{} matched no rules, using original address", + src.host, src.port, + ); None } } diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 95cdc35cd1097..3d18d7ff686ac 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -14,7 +14,6 @@ use std::sync::Arc; use std::thread; use std::time::Duration; -use anyhow::anyhow; use chrono::{DateTime, NaiveDateTime}; use differential_dataflow::{AsCollection, Hashable}; use futures::StreamExt; @@ -62,7 +61,7 @@ use timely::dataflow::{Scope, StreamVec}; use timely::progress::Antichain; use timely::progress::Timestamp; use tokio::sync::{Notify, mpsc}; -use tracing::{error, info, trace}; +use tracing::{error, info, trace, warn}; use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace}; use crate::metrics::source::kafka::KafkaSourceMetrics; @@ -1468,17 +1467,36 @@ fn fetch_partition_info( let pids = get_partitions(consumer.client(), topic, fetch_timeout)?; let mut offset_requests = TopicPartitionList::with_capacity(pids.len()); - for pid in pids { - offset_requests.add_partition_offset(topic, pid, Offset::End)?; + for pid in &pids { + offset_requests.add_partition_offset(topic, *pid, Offset::End)?; } - let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?; + let offset_responses = match consumer.offsets_for_times(offset_requests, fetch_timeout) { + Ok(r) => r, + Err(e) => { + warn!( + "offsets_for_times failed ({e}), using watermark 0 for all {} partitions", + pids.len() + ); + let mut result = BTreeMap::new(); + for pid in pids { + result.insert(pid, 0); + } + return Ok(result); + } + }; let mut result = BTreeMap::new(); for entry in offset_responses.elements() { let offset = match entry.offset() { Offset::Offset(offset) => offset, - offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?, + offset => { + warn!( + "partition {} has unexpected watermark {offset:?}, using 0", + entry.partition() + ); + 0 + } }; let pid = entry.partition(); From 7bae91b6efff06118a1a61a782daff8380157fbf Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Mon, 16 Mar 2026 12:27:15 -0400 Subject: [PATCH 6/8] Claude: script for waiting and deploying --- bin/staging-deploy | 142 ++++++++++++++++++++++++++++++++ doc/developer/staging-deploy.md | 31 +++++++ staging-test-workflow.md | 20 +++++ 3 files changed, 193 insertions(+) create mode 100755 bin/staging-deploy create mode 100644 doc/developer/staging-deploy.md create mode 100644 staging-test-workflow.md diff --git a/bin/staging-deploy b/bin/staging-deploy new file mode 100755 index 0000000000000..64758e66956fb --- /dev/null +++ b/bin/staging-deploy @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +""" +Waits for a Buildkite build to produce a Docker image, then deploys it +to the staging environment and waits for the region to accept connections. + +See doc/developer/staging-deploy.md for additional detail. + +Prerequisites: push your changes to the PR branch first (e.g. with jj). + +Usage: + bin/staging-deploy # wait for HEAD build, deploy + bin/staging-deploy --commit # wait for a specific commit +""" + +import argparse +import json +import re +import subprocess +import sys +import time +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parent.parent +BK = ["bk", "--org", "materialize"] +BK_PIPELINE = "test" +MZ = [str(REPO_ROOT / "bin" / "mz"), "--profile", "staging"] + + +def notify(msg: str) -> None: + print(f"==> {msg}") + try: + subprocess.run( + ["osascript", "-e", f'display notification "{msg}" with title "Staging Test"'], + capture_output=True, + ) + except FileNotFoundError: + pass + + +def get_commit_sha() -> str: + return subprocess.check_output(["git", "rev-parse", "HEAD"], text=True).strip() + + +def wait_for_build(commit_sha: str) -> str: + """Wait for the Buildkite build and return the docker_tag.""" + print(f"==> Waiting for Buildkite build for {commit_sha}...") + + # Wait for build to appear + build_number = None + while build_number is None: + result = subprocess.run( + [*BK, "build", "list", "--commit", commit_sha, "--pipeline", BK_PIPELINE, "--limit", "1", "--json"], + capture_output=True, text=True, + ) + if result.returncode == 0: + builds = json.loads(result.stdout) + if builds: + build_number = builds[0]["number"] + break + print(" Build not yet created, waiting 30s...") + time.sleep(30) + + print(f" Found build #{build_number}") + + # Watch build until it finishes + print(f"==> Watching build #{build_number}...") + subprocess.run( + [*BK, "build", "watch", str(build_number), "--pipeline", BK_PIPELINE], + ) + + # Check final state + result = subprocess.run( + ["bk", "build", "list", "--commit", commit_sha, "--pipeline", BK_PIPELINE, "--limit", "1", "--json"], + capture_output=True, text=True, check=True, + ) + state = json.loads(result.stdout)[0]["state"] + print(f" Build state: {state}") + + # Get Docker tag from annotations + print("==> Fetching Docker tag from annotations...") + result = subprocess.run( + [*BK, "api", f"/pipelines/{BK_PIPELINE}/builds/{build_number}/annotations"], + capture_output=True, text=True, check=True, + ) + annotations = json.loads(result.stdout) + for annotation in annotations: + m = re.match(r"build-tags-(v.+)", annotation.get("context", "")) + if m: + return m.group(1) + + notify("ERROR: No Docker tag found in build annotations") + print(f" Build may have failed before pushing images. State: {state}") + sys.exit(1) + + +def cycle_staging(docker_tag: str) -> None: + print("==> Disabling staging region...") + subprocess.run([*MZ, "region", "disable"]) + + print(f"==> Enabling staging region with version {docker_tag}...") + subprocess.run([*MZ, "region", "enable", "--version", docker_tag], check=True) + + +def wait_for_staging_ready() -> None: + """Wait for the staging region to accept connections.""" + print("==> Waiting for staging to accept connections...") + cmd = [*MZ, "sql", "--", "-c", "SELECT 1"] + max_attempts = 10 + for attempt in range(1, max_attempts + 1): + result = subprocess.run(cmd, capture_output=True) + if result.returncode == 0: + print(" Connected.") + return + if attempt < max_attempts: + print(f" Not ready, retrying in 15s (attempt {attempt}/{max_attempts})...") + time.sleep(15) + print("ERROR: Staging region did not become ready.", file=sys.stderr) + sys.exit(1) + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--commit", help="Commit SHA to look up (default: HEAD)") + args = parser.parse_args() + + commit_sha = args.commit or get_commit_sha() + print(f"==> Commit: {commit_sha}") + + # Step 1: Wait for build and get Docker tag + docker_tag = wait_for_build(commit_sha) + print(f"==> Docker tag: {docker_tag}") + + # Step 2: Deploy to staging + cycle_staging(docker_tag) + + # Step 3: Wait for staging to be reachable + wait_for_staging_ready() + notify("Staging is ready") + + +if __name__ == "__main__": + main() diff --git a/doc/developer/staging-deploy.md b/doc/developer/staging-deploy.md new file mode 100644 index 0000000000000..9d9ec45fd07f6 --- /dev/null +++ b/doc/developer/staging-deploy.md @@ -0,0 +1,31 @@ +# Staging Deploy + +## Introduction + +`bin/staging-deploy` automates deploying a commit to a staging environment. It replaces the manual loop of watching Buildkite for a Docker tag, cycling the staging region, and waiting for it to come back online. + +## Prerequisites + +- **`bk` CLI**: Install with `brew install buildkite/buildkite/bk` and authenticate with `bk auth login`. +- **`bin/mz` staging profile**: Ensure you have a `staging` profile in your `mz` config. + +## Usage + +Push your commit to a PR branch first — the script monitors the Buildkite `test` pipeline, which only runs on PR builds. + +```shell +# Wait for HEAD's build to finish, then deploy to staging +bin/staging-deploy + +# Wait for a specific commit's build to finish, then deploy to staging +bin/staging-deploy --commit +``` + +The script sends a macOS desktop notification when staging is ready. + +## How it works + +1. **Build lookup**: Uses `bk build list --commit --pipeline test` to find the build, then `bk build watch` to wait for it to complete. +2. **Docker tag extraction**: Parses the `build-tags-*` annotation context set by `ci/test/build.py` during the image push step. +3. **Region cycling**: Runs `bin/mz --profile staging region disable` followed by `bin/mz --profile staging region enable --version `. +4. **Readiness check**: Polls `bin/mz --profile staging sql -- -c 'SELECT 1'` until the region accepts connections. diff --git a/staging-test-workflow.md b/staging-test-workflow.md new file mode 100644 index 0000000000000..4235eb4cd46d5 --- /dev/null +++ b/staging-test-workflow.md @@ -0,0 +1,20 @@ +## Workflow + +The user runs iterative staging tests using this loop: + +1. **Deploy**: Run `bin/staging-deploy` — waits for the HEAD commit's Buildkite build to produce a Docker image, deploys it to staging, and waits for staging to accept connections. +2. **Test**: The user provides queries as a single `.sql` file. Split it into individual statements on `;` and run each separately via `bin/mz --profile staging sql -- -c ""`. + - **Transient errors** (connection errors, timeouts, "not ready" style messages): retry with backoff, up to 2 minutes total per query. + - **Definitive errors** (syntax errors, object not found, permission denied, etc.): stop immediately and report. + - **Psql variables**: If a query contains `:'VAR_NAME'`-style substitutions, pass the corresponding environment variable via `-v`: `bin/mz --profile staging sql -- -v VAR_NAME="$VAR_NAME" -c ""`. Multiple `-v` flags can be chained. The `--` args are forwarded directly to `psql`. +3. **Fix**: On a definitive failure, diagnose the root cause, attempt a code fix, and notify the user for review. +4. **Loop**: Wait for the user to push the fix, then return to step 1. + +## Relevant context + +- `bin/staging-deploy`: Python script that uses `bk` CLI to find the Buildkite build for HEAD, watches it, extracts the Docker tag from build annotations, disables/re-enables the staging region via `bin/mz --profile staging region enable --version `, then polls until `SELECT 1` succeeds. +- `bin/mz`: thin wrapper — runs `cargo run --bin mz -- "$@"`. +- SQL is run against staging with: `bin/mz --profile staging sql -- -c ""`. +- For queries with psql variable substitution (`:'VAR'`): `bin/mz --profile staging sql -- -v VAR="$VAR" -c ""`. +- `mz sql` uses `trailing_var_arg`, forwarding everything after `--` to `psql` unchanged. +- Queries like `VALIDATE CONNECTION` may be transiently failing after `CREATE CONNECTION` while the connection is being established; these should be retried. From 1eaf1d9e57d4493845ffbdfb2f370ae6c2f27de1 Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Wed, 8 Apr 2026 15:13:21 -0400 Subject: [PATCH 7/8] more logging --- src/kafka-util/src/client.rs | 27 +++++++++++++++++++++++++-- src/storage-types/src/connections.rs | 22 ++++++++++++++++++++-- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index e3fe17d2dd799..3eb0f9b262313 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -585,6 +585,7 @@ where /// Otherwise, use our "default tunnel" rewriting strategy to attempt to rewrite this broker's address /// and record it in the book of rewrites. fn resolve_broker_addr(&self, host: &str, port: u16) -> Result, io::Error> { + info!("kafka: resolve_broker_addr called for {}:{}", host, port); let return_rewrite = |rewrite: &BrokerRewriteHandle| -> Result, io::Error> { let rewrite = match rewrite { BrokerRewriteHandle::Simple(rewrite) => rewrite.clone(), @@ -695,7 +696,23 @@ where // Rewrite according to the routing rules. TunnelConfig::Rules(rules) => { // If no rules match, just use the address as-is. - rules.rewrite(&addr).unwrap_or(addr).to_socket_addrs() + let resolved = rules.rewrite(&addr).unwrap_or_else(|| addr.clone()); + match resolved.to_socket_addrs() { + Ok(addrs) => { + info!( + "kafka: resolve_broker_addr {}:{} -> {}:{} resolved to {:?}", + host, port, resolved.host, resolved.port, addrs, + ); + Ok(addrs) + } + Err(e) => { + warn!( + "kafka: resolve_broker_addr {}:{} -> {}:{} DNS resolution FAILED: {e}", + host, port, resolved.host, resolved.port, + ); + Err(e) + } + } } // We leave the broker's address as it is. TunnelConfig::None => { @@ -704,7 +721,13 @@ where } } // This broker's address was already rewritten. Reuse the existing rewrite. - Some(rewrite) => return_rewrite(&rewrite), + Some(rewrite) => { + info!( + "kafka: resolve_broker_addr {}:{} using cached rewrite", + host, port + ); + return_rewrite(&rewrite) + } } } diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 1d91145b2984f..2d2c5c9977bcd 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -55,7 +55,7 @@ use serde::{Deserialize, Deserializer, Serialize}; use tokio::net; use tokio::runtime::Handle; use tokio_postgres::config::SslMode; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; use url::Url; use crate::AlterCompatible; @@ -983,13 +983,16 @@ impl KafkaConnection { } _ => self.brokers.iter().map(|b| &b.address).join(","), }; - options.insert("bootstrap.servers".into(), brokers.into()); + options.insert("bootstrap.servers".into(), brokers.clone().into()); let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) { (false, false) => "PLAINTEXT", (true, false) => "SSL", (false, true) => "SASL_PLAINTEXT", (true, true) => "SASL_SSL", }; + info!( + "kafka: create_with_context bootstrap.servers={brokers}, security_protocol={security_protocol}" + ); options.insert("security.protocol".into(), security_protocol.into()); if let Some(tls) = &self.tls { if let Some(root_cert) = &tls.root_cert { @@ -1128,6 +1131,16 @@ impl KafkaConnection { })); } } + info!( + "kafka: tunnel config set to {}", + match &self.default_tunnel { + Tunnel::Direct => "Direct".to_string(), + Tunnel::AwsPrivatelink(_) => "AwsPrivatelink (static host)".to_string(), + Tunnel::AwsPrivatelinks(pl) => + format!("AwsPrivatelinks ({} rules)", pl.rules.len()), + Tunnel::Ssh(_) => "Ssh".to_string(), + } + ); // Here, we preemptively rewrite broker addresses. // In concept, this overlaps with 'TunnelingClientContext::resolve_broker_addr'. @@ -1232,11 +1245,16 @@ impl KafkaConnection { // The downside of this approach is it produces a generic error message like // "metadata fetch error" with no additional details. The real networking // error is buried in the librdkafka logs, which are not visible to users. + info!("kafka: starting connection validation via fetch_metadata (timeout={timeout:?})"); let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", { let consumer = Arc::clone(&consumer); move || consumer.fetch_metadata(None, timeout) }) .await; + info!( + "kafka: connection validation result: {}", + if result.is_ok() { "success" } else { "failed" }, + ); match result { Ok(_) => Ok(()), // The error returned by `fetch_metadata` does not provide any details which makes for From 559cbc079652bd1e9b055b2e7098ef8ae32a8f6e Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Wed, 8 Apr 2026 16:01:46 -0400 Subject: [PATCH 8/8] force librdkafka_log_level = DEBUG --- src/storage-types/src/connections.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 2d2c5c9977bcd..48041e955b515 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -1218,6 +1218,14 @@ impl KafkaConnection { _id: CatalogItemId, storage_configuration: &StorageConfiguration, ) -> Result<(), anyhow::Error> { + // TEMPORARY: Force debug-level librdkafka logging during validation so + // we can see transport errors in environmentd logs. + let mut storage_configuration = storage_configuration.clone(); + storage_configuration + .connection_context + .librdkafka_log_level = tracing::Level::DEBUG; + let storage_configuration = &storage_configuration; + let (context, error_rx) = MzClientContext::with_errors(); let consumer: BaseConsumer<_> = self .create_with_context(