Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions PROMPT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# AWS PRIVATELINKS syntax changes

## Problem

When using the top-level `AWS PRIVATELINK` or `AWS PRIVATELINKS` syntax (without `BROKERS`), `bootstrap.servers` is set to the VPC endpoint hostname (e.g. `vpce-xxx...amazonaws.com:9092`). librdkafka uses this hostname for TLS SNI. Confluent Cloud rejects TLS handshakes with an unrecognized SNI, so connections fail with "SSL handshake failed: Disconnected".

The `BROKERS` syntax works because `bootstrap.servers` is set to the original Kafka hostname (e.g. `lkc-xxx...confluent.cloud:9092`), and `resolve_broker_addr` routes the TCP connection through the VPC endpoint without changing the hostname librdkafka uses for TLS.

## Current code paths

Both paths are in `KafkaConnection::create_with_context` in `src/storage-types/src/connections.rs`.

**Top-level path** (lines 944-962): Sets `bootstrap.servers` to `vpc_endpoint_host(connection_id, None)`. Sets `default_tunnel` to `TunnelConfig::StaticHost` (singular `AWS PRIVATELINK`) or `TunnelConfig::Rules` (plural `AWS PRIVATELINKS`). The `HostMappingRules` struct in `src/kafka-util/src/client.rs` always has a `default: BrokerRewrite` field — this is the default rule that catches any broker address not matched by a pattern.

**BROKERS path** (line 964): Sets `bootstrap.servers` to the user-provided broker addresses. Per-broker `AWS PRIVATELINK` tunnels are registered as individual `BrokerRewrite` entries via `add_broker_rewrite`.

## Changes

Two changes to the `AWS PRIVATELINKS` syntax:

### 1. Remove the default (pattern-less) rule

The `AwsPrivatelinks` struct currently requires a `default: AwsPrivatelink` field — the catch-all rule for brokers that don't match any pattern. Remove this. `AWS PRIVATELINKS` should only contain pattern rules. Every rule must have a pattern.

This affects:
- `AwsPrivatelinks` struct in `src/storage-types/src/connections.rs` — remove `default` field
- `HostMappingRules` struct in `src/kafka-util/src/client.rs` — remove `default` field; `rewrite()` should return `None` when no pattern matches (instead of applying a default)
- `from_aws_privatelinks` in `src/storage-types/src/connections.rs` — stop building a default
- SQL parser in `src/sql-parser/src/parser.rs` — reject pattern-less entries in `AWS PRIVATELINKS (...)`
- AST types in `src/sql-parser/src/ast/defs/statement.rs` — `ConnectionAwsPrivatelinkRule` no longer needs a default variant
- Planning in `src/sql/src/plan/statement/ddl/connection.rs` — update accordingly

### 2. Exact-match patterns become bootstrap brokers

Host patterns without wildcards (no leading or trailing `*`) are exact matches — they match a single known broker address. These should be treated like `BROKERS` entries: their addresses should be included in `bootstrap.servers`, and the VPC endpoint routing should happen via `resolve_broker_addr` (same as per-broker `USING AWS PRIVATELINK` in the `BROKERS` syntax).

This means in `create_with_context`:
- Collect exact-match patterns from `AWS PRIVATELINKS` rules
- Include their addresses in `bootstrap.servers` (comma-separated, like the `BROKERS` path)
- Register their VPC endpoint routing via `add_broker_rewrite` (like the `BROKERS` path does at lines 1137-1141)
- Wildcard patterns still go into `TunnelConfig::Rules` for dynamic matching in `resolve_broker_addr`

The `assert!(self.brokers.is_empty())` on line 947 stays — users still don't specify `BROKERS` alongside `AWS PRIVATELINKS`. The bootstrap broker addresses come from the exact-match patterns instead.

## Expected SQL after changes

```sql
-- Exact-match patterns: addresses go into bootstrap.servers,
-- routed through their respective PrivateLink endpoints.
-- Wildcard patterns: applied dynamically to discovered brokers.
CREATE CONNECTION kafka_conn TO KAFKA (
SECURITY PROTOCOL = SASL_SSL,
SASL MECHANISMS = 'PLAIN',
SASL USERNAME = '...',
SASL PASSWORD = SECRET pw,
AWS PRIVATELINKS (
'lkc-xxx.domyyy.us-east-1.aws.confluent.cloud:9092' TO privatelink_svc (PORT 9092),
'*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az1'),
'*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE 'use1-az4')
)
);
```

With the old syntax, you'd also need a pattern-less default at the end:
```sql
privatelink_svc -- default, catch-all: REMOVED
```

## Documentation

Update both documentation files to reflect the syntax changes.

### `doc/user/content/sql/create-connection.md`

**"Dynamic broker discovery" section** (lines 326-347, anchor `#kafka-privatelinks`):
- Update the description: exact-match (no-wildcard) patterns are used as bootstrap brokers. Wildcard patterns route dynamically discovered brokers.
- Update the example SQL: remove the bare `privatelink_svc` default line, add an exact-match pattern for the bootstrap broker address.

**"Default connections" section** (lines 404-429, anchor `#kafka-privatelink-default`):
- This section documents the singular `AWS PRIVATELINK` (default connection) syntax for Redpanda Cloud. It is unchanged by this work, but verify the description still makes sense in context after the `AWS PRIVATELINKS` changes above.

### `doc/user/data/examples/create_connection.yml`

**`syntax-kafka-aws-privatelinks` example** (lines 255-293):
- Remove `<default_privatelink_connection> (PORT <default_port>)` from the syntax template.
- Add an exact-match pattern line to the template (no wildcards).
- Update the `syntax_elements` descriptions:
- Remove the "`<default_connection_name>`" element.
- Remove the sentence "If no rule matches, Materialize will attempt to connect through the default PrivateLink connection listed at the end."
- Add a note that patterns without wildcards are used as bootstrap broker addresses.

## What NOT to change

- The singular `AWS PRIVATELINK` syntax (top-level, non-Kafka connections like Postgres) is unchanged.
- The `BROKERS (...) USING AWS PRIVATELINK` syntax is unchanged.
- The `ConnectionRulePattern` struct and its `matches()` method are unchanged.
142 changes: 142 additions & 0 deletions bin/staging-deploy
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Waits for a Buildkite build to produce a Docker image, then deploys it
to the staging environment and waits for the region to accept connections.

See doc/developer/staging-deploy.md for additional detail.

Prerequisites: push your changes to the PR branch first (e.g. with jj).

Usage:
bin/staging-deploy # wait for HEAD build, deploy
bin/staging-deploy --commit <sha> # wait for a specific commit
"""

import argparse
import json
import re
import subprocess
import sys
import time
from pathlib import Path

REPO_ROOT = Path(__file__).resolve().parent.parent
BK = ["bk", "--org", "materialize"]
BK_PIPELINE = "test"
MZ = [str(REPO_ROOT / "bin" / "mz"), "--profile", "staging"]


def notify(msg: str) -> None:
print(f"==> {msg}")
try:
subprocess.run(
["osascript", "-e", f'display notification "{msg}" with title "Staging Test"'],
capture_output=True,
)
except FileNotFoundError:
pass


def get_commit_sha() -> str:
return subprocess.check_output(["git", "rev-parse", "HEAD"], text=True).strip()


def wait_for_build(commit_sha: str) -> str:
"""Wait for the Buildkite build and return the docker_tag."""
print(f"==> Waiting for Buildkite build for {commit_sha}...")

# Wait for build to appear
build_number = None
while build_number is None:
result = subprocess.run(
[*BK, "build", "list", "--commit", commit_sha, "--pipeline", BK_PIPELINE, "--limit", "1", "--json"],
capture_output=True, text=True,
)
if result.returncode == 0:
builds = json.loads(result.stdout)
if builds:
build_number = builds[0]["number"]
break
print(" Build not yet created, waiting 30s...")
time.sleep(30)

print(f" Found build #{build_number}")

# Watch build until it finishes
print(f"==> Watching build #{build_number}...")
subprocess.run(
[*BK, "build", "watch", str(build_number), "--pipeline", BK_PIPELINE],
)

# Check final state
result = subprocess.run(
["bk", "build", "list", "--commit", commit_sha, "--pipeline", BK_PIPELINE, "--limit", "1", "--json"],
capture_output=True, text=True, check=True,
)
state = json.loads(result.stdout)[0]["state"]
print(f" Build state: {state}")

# Get Docker tag from annotations
print("==> Fetching Docker tag from annotations...")
result = subprocess.run(
[*BK, "api", f"/pipelines/{BK_PIPELINE}/builds/{build_number}/annotations"],
capture_output=True, text=True, check=True,
)
annotations = json.loads(result.stdout)
for annotation in annotations:
m = re.match(r"build-tags-(v.+)", annotation.get("context", ""))
if m:
return m.group(1)

notify("ERROR: No Docker tag found in build annotations")
print(f" Build may have failed before pushing images. State: {state}")
sys.exit(1)


def cycle_staging(docker_tag: str) -> None:
print("==> Disabling staging region...")
subprocess.run([*MZ, "region", "disable"])

print(f"==> Enabling staging region with version {docker_tag}...")
subprocess.run([*MZ, "region", "enable", "--version", docker_tag], check=True)


def wait_for_staging_ready() -> None:
"""Wait for the staging region to accept connections."""
print("==> Waiting for staging to accept connections...")
cmd = [*MZ, "sql", "--", "-c", "SELECT 1"]
max_attempts = 10
for attempt in range(1, max_attempts + 1):
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
print(" Connected.")
return
if attempt < max_attempts:
print(f" Not ready, retrying in 15s (attempt {attempt}/{max_attempts})...")
time.sleep(15)
print("ERROR: Staging region did not become ready.", file=sys.stderr)
sys.exit(1)


def main() -> None:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--commit", help="Commit SHA to look up (default: HEAD)")
args = parser.parse_args()

commit_sha = args.commit or get_commit_sha()
print(f"==> Commit: {commit_sha}")

# Step 1: Wait for build and get Docker tag
docker_tag = wait_for_build(commit_sha)
print(f"==> Docker tag: {docker_tag}")

# Step 2: Deploy to staging
cycle_staging(docker_tag)

# Step 3: Wait for staging to be reachable
wait_for_staging_ready()
notify("Staging is ready")


if __name__ == "__main__":
main()
31 changes: 31 additions & 0 deletions doc/developer/staging-deploy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Staging Deploy

## Introduction

`bin/staging-deploy` automates deploying a commit to a staging environment. It replaces the manual loop of watching Buildkite for a Docker tag, cycling the staging region, and waiting for it to come back online.

## Prerequisites

- **`bk` CLI**: Install with `brew install buildkite/buildkite/bk` and authenticate with `bk auth login`.
- **`bin/mz` staging profile**: Ensure you have a `staging` profile in your `mz` config.

## Usage

Push your commit to a PR branch first — the script monitors the Buildkite `test` pipeline, which only runs on PR builds.

```shell
# Wait for HEAD's build to finish, then deploy to staging
bin/staging-deploy

# Wait for a specific commit's build to finish, then deploy to staging
bin/staging-deploy --commit <full-sha>
```

The script sends a macOS desktop notification when staging is ready.

## How it works

1. **Build lookup**: Uses `bk build list --commit <sha> --pipeline test` to find the build, then `bk build watch` to wait for it to complete.
2. **Docker tag extraction**: Parses the `build-tags-*` annotation context set by `ci/test/build.py` during the image push step.
3. **Region cycling**: Runs `bin/mz --profile staging region disable` followed by `bin/mz --profile staging region enable --version <tag>`.
4. **Readiness check**: Polls `bin/mz --profile staging sql -- -c 'SELECT 1'` until the region accepts connections.
38 changes: 28 additions & 10 deletions doc/user/content/sql/create-connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,33 @@ SSH bastion host.
{{< include-md file="shared-content/aws-privatelink-cloud-only-note.md" >}}

Depending on the hosted service you are connecting to, you might need to specify
a PrivateLink connection [per advertised broker](#kafka-privatelink-syntax)
(e.g. Amazon MSK), or a single [default PrivateLink connection](#kafka-privatelink-default) (e.g. Redpanda Cloud).
a PrivateLink connection and [per-availability-zone routing rules for brokers](#kafka-privatelinks) (e.g. Confluent Cloud),
a PrivateLink connection [per advertised broker](#kafka-privatelink-syntax) (e.g. Amazon MSK),
or a single [default PrivateLink connection](#kafka-privatelink-default) (e.g. Redpanda Cloud).

##### Dynamic broker discovery {#kafka-privatelinks}

Confluent Cloud does not require listing every broker individually.
Instead, specify wildcard patterns for routing dynamically discovered brokers
to the correct availability-zone-specific PrivateLink endpoint.
For bootstrap brokers, use exact-match patterns without wildcards.

{{% include-syntax file="examples/create_connection" example="syntax-kafka-aws-privatelinks" %}}

```mzsql
CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

CREATE CONNECTION kafka_connection TO KAFKA (
AWS PRIVATELINKS (
'lkc-xxx.domyyy.us-east-1.aws.confluent.cloud:9092' TO privatelink_svc (PORT 9092),
'*.use1-az1.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az1'),
'*.use1-az4.*' TO privatelink_svc (AVAILABILITY ZONE = 'use1-az4')
)
);
```

##### Broker connection syntax {#kafka-privatelink-syntax}

Expand All @@ -347,7 +372,7 @@ broker that you want to connect to via the tunnel.
Field | Value | Required | Description
----------------------------------------|------------------|:--------:|-------------------------------
`AWS PRIVATELINK` | object name | ✓ | The name of an [AWS PrivateLink connection](#aws-privatelink) through which network traffic for this broker should be routed.
`AVAILABILITY ZONE` | `text` | | The ID of the availability zone of the AWS PrivateLink service in which the broker is accessible. If unspecified, traffic will be routed to each availability zone declared in the [AWS PrivateLink connection](#aws-privatelink) in sequence until the correct availability zone for the broker is discovered. If specified, Materialize will always route connections via the specified availability zone.
`AVAILABILITY ZONE` | `text` | | The ID of the availability zone of the AWS PrivateLink service in which the broker is accessible.
`PORT` | `integer` | | The port of the AWS PrivateLink service to connect to. Defaults to the broker's port.

##### Example {#kafka-privatelink-example}
Expand Down Expand Up @@ -388,13 +413,6 @@ PrivateLink connection and the port of the bootstrap server instead.

{{% include-syntax file="examples/create_connection" example="syntax-kafka-default-aws-privatelink" %}}

##### Default connection options {#kafka-privatelink-default-options}

Field | Value | Required | Description
----------------------------------------|------------------|:--------:|-------------------------------
`AWS PRIVATELINK` | object name | ✓ | The name of an [AWS PrivateLink connection](#aws-privatelink) through which network traffic for this broker should be routed.
`PORT` | `integer` | | The port of the AWS PrivateLink service to connect to. Defaults to the broker's port.

##### Example {#kafka-privatelink-default-example}

```mzsql
Expand Down
41 changes: 40 additions & 1 deletion doc/user/data/examples/create_connection.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,50 @@
syntax_elements:
- name: "`AWS PRIVATELINK <privatelink_connection_name>`"
description: |
*Value:* object name. Required.

The name of an AWS PrivateLink connection through which network traffic
should be routed.
- name: "`PORT`"
description: |
The port of the AWS PrivateLink service to connect to.
*Value:* `integer`

The port of the AWS PrivateLink service to connect to. Defaults to the broker's port.

- name: "syntax-kafka-aws-privatelinks"
code: |
CREATE CONNECTION <connection_name> TO KAFKA (
AWS PRIVATELINKS (
'<broker_address>' TO <privatelink_connection> (PORT <port>),
'<pattern1>' TO <privatelink_connection1> (
PORT <port1>,
AVAILABILITY ZONE = '<az_id1>'
),
'<pattern2>' TO <privatelink_connection2>
),
...
);
syntax_elements:
- name: "`'<pattern>' TO <connection_name>`"
description: |
Routes brokers whose advertised `host:port` matches `<pattern>` through
the named AWS PrivateLink connection.
A pattern may begin with `*` to match any prefix. A pattern may end with `*` to match any suffix.
All other characters in the pattern are matched literally.
Patterns without wildcards are used as bootstrap broker addresses.
Rules are evaluated in order. The first matching rule wins.
If no rule matches, Materialize will attempt to connect to the broker directly, without tunneling.
- name: "`AVAILABILITY ZONE`"
description: |
*Value:* `text`

The ID of the availability zone of the AWS PrivateLink service in which
the broker is accessible.
- name: "`PORT`"
description: |
*Value:* `integer`

The port of the AWS PrivateLink service to connect to. Defaults to the broker's port.

- name: "syntax-csr"
code: |
Expand Down
Loading
Loading