Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions misc/python/materialize/checks/all_checks/catalog_views_parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check


class CatalogViewsParse(Check):
"""Test that mz_connections and mz_secrets views work correctly across
restarts/upgrades. These views use parse_catalog_create_sql() in their
WHERE clause, which is evaluated against ALL catalog items. If the parser
can't handle any item's create_sql, the entire view errors out."""

def initialize(self) -> Testdrive:
return Testdrive(dedent(f"""
> CREATE CONNECTION cvp_kafka_conn1 FOR KAFKA {self._kafka_broker()}

> CREATE CONNECTION cvp_csr_conn1 FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'

> CREATE SECRET cvp_secret1 AS 'secret_value_1'
> CREATE SECRET cvp_secret2 AS 'secret_value_2'

> CREATE TYPE cvp_type1 AS LIST (ELEMENT TYPE = text)
> CREATE TABLE cvp_table1 (a int, b text)
> CREATE INDEX cvp_idx1 ON cvp_table1 (a)
> CREATE VIEW cvp_view1 AS SELECT a, b FROM cvp_table1
> CREATE MATERIALIZED VIEW cvp_mv1 AS SELECT count(*) AS cnt FROM cvp_table1

> SELECT name, type FROM mz_connections WHERE name LIKE 'cvp_%' ORDER BY name
cvp_csr_conn1 confluent-schema-registry
cvp_kafka_conn1 kafka

> SELECT name FROM mz_secrets WHERE name LIKE 'cvp_%' ORDER BY name
cvp_secret1
cvp_secret2
"""))

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(f"""
> CREATE CONNECTION cvp_kafka_conn2 FOR KAFKA {self._kafka_broker()}

> CREATE SECRET cvp_secret3 AS 'secret_value_3'

> CREATE TABLE cvp_table2 (x int)
> CREATE VIEW cvp_view2 AS SELECT * FROM cvp_table2
> CREATE MATERIALIZED VIEW cvp_mv2 AS SELECT sum(x) AS total FROM cvp_table2

> SELECT name, type FROM mz_connections WHERE name LIKE 'cvp_%' ORDER BY name
cvp_csr_conn1 confluent-schema-registry
cvp_kafka_conn1 kafka
cvp_kafka_conn2 kafka

> SELECT name FROM mz_secrets WHERE name LIKE 'cvp_%' ORDER BY name
cvp_secret1
cvp_secret2
cvp_secret3
""")),
Testdrive(dedent("""
> CREATE CONNECTION cvp_csr_conn2 FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}'

> CREATE SECRET cvp_secret4 AS 'secret_value_4'

> CREATE TABLE cvp_table3 (y text)
> CREATE TYPE cvp_type2 AS MAP (KEY TYPE = text, VALUE TYPE = int4)
> CREATE INDEX cvp_idx2 ON cvp_table3 (y)

> SELECT name, type FROM mz_connections WHERE name LIKE 'cvp_%' ORDER BY name
cvp_csr_conn1 confluent-schema-registry
cvp_csr_conn2 confluent-schema-registry
cvp_kafka_conn1 kafka
cvp_kafka_conn2 kafka

> SELECT name FROM mz_secrets WHERE name LIKE 'cvp_%' ORDER BY name
cvp_secret1
cvp_secret2
cvp_secret3
cvp_secret4
""")),
]

def validate(self) -> Testdrive:
return Testdrive(dedent("""
> SELECT name, type FROM mz_connections WHERE name LIKE 'cvp_%' ORDER BY name
cvp_csr_conn1 confluent-schema-registry
cvp_csr_conn2 confluent-schema-registry
cvp_kafka_conn1 kafka
cvp_kafka_conn2 kafka

> SELECT name FROM mz_secrets WHERE name LIKE 'cvp_%' ORDER BY name
cvp_secret1
cvp_secret2
cvp_secret3
cvp_secret4
"""))
117 changes: 117 additions & 0 deletions misc/python/materialize/checks/all_checks/ssrf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent

# Alternative encodings of 169.254.169.254 (AWS instance metadata service).
# All of these resolve to the same address via different URL/IP encoding tricks:
# decimal integer, hex integer, octal, mixed notation, IPv6-mapped, and ULA.
# IPv6 forms need brackets in URLs.
EVIL_METADATA_IPS = [
"169.254.169.254",
"2852039166",
"0xa9fea9fe",
"025177524776",
"0251.0376.0251.0376",
"0xa9.0xfe.0xa9.0xfe",
"169.16689662",
"169.254.0xa9fe",
"[::ffff:a9fe:a9fe]",
"[::ffff:169.254.169.254]",
"[fd00:ec2::254]",
]


@externally_idempotent(False)
class SsrfCopyFrom(Check):
"""Verify that COPY FROM rejects evil IP encodings across restarts."""

def initialize(self) -> Testdrive:
cmds = """
$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_copy_from_remote = true
ALTER SYSTEM SET storage_enforce_external_addresses = true

> CREATE TABLE ssrf_target (a text)
"""
for ip in EVIL_METADATA_IPS:
cmds += f"""
! COPY INTO ssrf_target FROM 'http://{ip}/' (FORMAT CSV);
contains:Address resolved to a private IP
"""
return Testdrive(dedent(cmds))

def manipulate(self) -> list[Testdrive]:
results = []
half = len(EVIL_METADATA_IPS) // 2
for batch in [EVIL_METADATA_IPS[:half], EVIL_METADATA_IPS[half:]]:
cmds = ""
for ip in batch:
cmds += f"""
! COPY INTO ssrf_target FROM 'http://{ip}/' (FORMAT CSV);
contains:Address resolved to a private IP
"""
results.append(Testdrive(dedent(cmds)))
return results

def validate(self) -> Testdrive:
cmds = ""
for ip in EVIL_METADATA_IPS:
cmds += f"""
! COPY INTO ssrf_target FROM 'http://{ip}/' (FORMAT CSV);
contains:Address resolved to a private IP
"""
cmds += """
> SELECT count(*) FROM ssrf_target;
0
"""
return Testdrive(dedent(cmds))


@externally_idempotent(False)
class SsrfAwsConnection(Check):
"""Verify that CREATE CONNECTION TO AWS rejects evil IP endpoints across restarts."""

def _reject_cmds(self, ips: list[str], prefix: str) -> str:
cmds = ""
for i, ip in enumerate(ips):
cmds += f"""
! CREATE CONNECTION {prefix}_{i} TO AWS (
ACCESS KEY ID = 'unused',
SECRET ACCESS KEY = SECRET ssrf_aws_secret,
ENDPOINT = 'http://{ip}/',
REGION = 'us-east-1'
) WITH (VALIDATE = true);
contains:Address resolved to a private IP
"""
return cmds

def initialize(self) -> Testdrive:
cmds = """
$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_connection_validation_syntax = true
ALTER SYSTEM SET storage_enforce_external_addresses = true

> CREATE SECRET IF NOT EXISTS ssrf_aws_secret AS 'unused'
"""
cmds += self._reject_cmds(EVIL_METADATA_IPS, "ssrf_aws_init")
return Testdrive(dedent(cmds))

def manipulate(self) -> list[Testdrive]:
half = len(EVIL_METADATA_IPS) // 2
batches = [EVIL_METADATA_IPS[:half], EVIL_METADATA_IPS[half:]]
return [
Testdrive(dedent(self._reject_cmds(batch, f"ssrf_aws_m{i}")))
for i, batch in enumerate(batches)
]

def validate(self) -> Testdrive:
return Testdrive(dedent(self._reject_cmds(EVIL_METADATA_IPS, "ssrf_aws_val")))
90 changes: 90 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,94 @@ def run(self, exe: Executor) -> bool:
return True


EVIL_METADATA_IPS = [
"169.254.169.254",
"2852039166",
"0xa9fea9fe",
"025177524776",
"0251.0376.0251.0376",
"0xa9.0xfe.0xa9.0xfe",
"169.16689662",
"169.254.0xa9fe",
"[::ffff:a9fe:a9fe]",
"[::ffff:169.254.169.254]",
"[fd00:ec2::254]",
]


class CopyFromEvilIpAction(Action):
"""Try COPY FROM with evil IP encodings of the metadata service address."""

def errors_to_ignore(self, exe: Executor) -> list[str]:
result = super().errors_to_ignore(exe)
result.extend(
[
"Address resolved to a private IP",
"Connection refused",
"Connection timed out",
"timeout",
"Network is unreachable",
"No route to host",
"only 'http://' and 'https://' urls are supported",
]
)
if exe.db.complexity == Complexity.DDL:
result.extend(
[
"does not exist",
"COPY FROM's target table",
]
)
return result

def run(self, exe: Executor) -> bool:
tables = [t for t in exe.db.tables if t.num_rows < MAX_ROWS]
if not tables:
return False
table = self.rng.choice(tables)
ip = self.rng.choice(EVIL_METADATA_IPS)
query = f"COPY INTO {table} FROM 'http://{ip}/latest/meta-data/' (FORMAT CSV)"
exe.execute(query, explainable=False, http=Http.NO, fetch=False)
return True


class CreateEvilIpAwsConnectionAction(Action):
"""Try CREATE CONNECTION TO AWS with evil IP endpoints."""

def errors_to_ignore(self, exe: Executor) -> list[str]:
result = super().errors_to_ignore(exe)
result.extend(
[
"Address resolved to a private IP",
"Connection refused",
"Connection timed out",
"timeout",
"Network is unreachable",
"No route to host",
"already exists",
]
)
return result

def run(self, exe: Executor) -> bool:
ip = self.rng.choice(EVIL_METADATA_IPS)
conn_name = f"evil_ip_conn_{self.rng.randint(0, 999)}"
query = f"""CREATE CONNECTION {conn_name} TO AWS (
ACCESS KEY ID = 'unused',
SECRET ACCESS KEY = SECRET minio,
ENDPOINT = 'http://{ip}/',
REGION = 'us-east-1'
) WITH (VALIDATE = true)"""
exe.execute(query, explainable=False, http=Http.NO, fetch=False)
exe.execute(
f"DROP CONNECTION IF EXISTS {conn_name}",
explainable=False,
http=Http.NO,
fetch=False,
)
return True


class InsertAction(Action):
def errors_to_ignore(self, exe: Executor) -> list[str]:
result = super().errors_to_ignore(exe)
Expand Down Expand Up @@ -3188,6 +3276,7 @@ def __init__(
100,
),
(CopyFromS3Action, 100),
(CopyFromEvilIpAction, 10),
(SetClusterAction, 1),
(CommitRollbackAction, 30),
(ReconnectAction, 1),
Expand Down Expand Up @@ -3281,6 +3370,7 @@ def __init__(
(RenameIcebergSinkAction, 10),
(SwapSchemaAction, 10),
(ReplaceMaterializedViewAction, 20),
(CreateEvilIpAwsConnectionAction, 2),
(FlipFlagsAction, 2),
# TODO: Reenable when database-issues#8813 is fixed.
# (AlterTableAddColumnAction, 10),
Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/parallel_workload/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ def _missing_(cls, value):
# it on outside that scenario is harmless: no Parallel Workload codegen
# emits `repeat_row` unless the scenario is active.
"enable_repeat_row": "true",
# Needed for CopyFromEvilIpAction / SsrfCopyFrom platform check
"enable_copy_from_remote": "true",
"enable_connection_validation_syntax": "true",
}
31 changes: 31 additions & 0 deletions src/ore/src/netio/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,35 @@ mod tests {
.expect("host:port form should parse");
assert!(ips.contains(&PUBLIC_V4.parse::<IpAddr>().unwrap()));
}

#[crate::test]
fn ensure_url_ip_global_rejects_metadata_encodings() {
// Alternative encodings of 169.254.169.254 (AWS instance metadata).
// The WHATWG URL Standard (implemented by the `url` crate) normalizes
// decimal-integer, hex, octal, and mixed-notation IPv4 literals to
// dotted-decimal, so they all resolve to the same link-local address.
let evil_urls = [
"http://169.254.169.254/",
"http://2852039166/",
"http://0xa9fea9fe/",
"http://025177524776/",
"http://0251.0376.0251.0376/",
"http://0xa9.0xfe.0xa9.0xfe/",
"http://169.16689662/",
"http://169.254.0xa9fe/",
// IPv6 forms
"http://[::ffff:a9fe:a9fe]/",
"http://[::ffff:169.254.169.254]/",
// EC2 metadata alternate IPv6 (ULA range)
"http://[fd00:ec2::254]/",
];
for raw in &evil_urls {
let url = url::Url::parse(raw).unwrap_or_else(|e| panic!("{raw} should parse: {e}"));
let result = ensure_url_ip_global(&url);
assert!(
matches!(result, Err(DnsResolutionError::PrivateAddress)),
"{raw} should be rejected as private, got {result:?}"
);
}
}
}
Loading
Loading