diff --git a/misc/python/materialize/checks/all_checks/catalog_views_parse.py b/misc/python/materialize/checks/all_checks/catalog_views_parse.py new file mode 100644 index 0000000000000..ccb85dbdfd0bc --- /dev/null +++ b/misc/python/materialize/checks/all_checks/catalog_views_parse.py @@ -0,0 +1,103 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from textwrap import dedent + +from materialize.checks.actions import Testdrive +from materialize.checks.checks import Check + + +class CatalogViewsParse(Check): + """Test that mz_connections and mz_secrets views work correctly across + restarts/upgrades. These views use parse_catalog_create_sql() in their + WHERE clause, which is evaluated against ALL catalog items. If the parser + can't handle any item's create_sql, the entire view errors out.""" + + def initialize(self) -> Testdrive: + return Testdrive(dedent(f""" + > CREATE CONNECTION cvp_kafka_conn1 FOR KAFKA {self._kafka_broker()} + + > CREATE CONNECTION cvp_csr_conn1 FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}' + + > CREATE SECRET cvp_secret1 AS 'secret_value_1' + > CREATE SECRET cvp_secret2 AS 'secret_value_2' + + > CREATE TYPE cvp_type1 AS LIST (ELEMENT TYPE = text) + > CREATE TABLE cvp_table1 (a int, b text) + > CREATE INDEX cvp_idx1 ON cvp_table1 (a) + > CREATE VIEW cvp_view1 AS SELECT a, b FROM cvp_table1 + > CREATE MATERIALIZED VIEW cvp_mv1 AS SELECT count(*) AS cnt FROM cvp_table1 + + > SELECT name, type FROM mz_connections WHERE name LIKE 'cvp_%' ORDER BY name + cvp_csr_conn1 confluent-schema-registry + cvp_kafka_conn1 kafka + + > SELECT name FROM mz_secrets WHERE name LIKE 'cvp_%' ORDER BY name + cvp_secret1 + cvp_secret2 + """)) + + def manipulate(self) -> list[Testdrive]: + return [ + Testdrive(dedent(f""" + > CREATE CONNECTION cvp_kafka_conn2 FOR KAFKA {self._kafka_broker()} + + > CREATE SECRET cvp_secret3 AS 'secret_value_3' + + > CREATE TABLE cvp_table2 (x int) + > CREATE VIEW cvp_view2 AS SELECT * FROM cvp_table2 + > CREATE MATERIALIZED VIEW cvp_mv2 AS SELECT sum(x) AS total FROM cvp_table2 + + > SELECT name, type FROM mz_connections WHERE name LIKE 'cvp_%' ORDER BY name + cvp_csr_conn1 confluent-schema-registry + cvp_kafka_conn1 kafka + cvp_kafka_conn2 kafka + + > SELECT name FROM mz_secrets WHERE name LIKE 'cvp_%' ORDER BY name + cvp_secret1 + cvp_secret2 + cvp_secret3 + """)), + Testdrive(dedent(""" + > CREATE CONNECTION cvp_csr_conn2 FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}' + + > CREATE SECRET cvp_secret4 AS 'secret_value_4' + + > CREATE TABLE cvp_table3 (y text) + > CREATE TYPE cvp_type2 AS MAP (KEY TYPE = text, VALUE TYPE = int4) + > CREATE INDEX cvp_idx2 ON cvp_table3 (y) + + > SELECT name, type FROM mz_connections WHERE name LIKE 'cvp_%' ORDER BY name + cvp_csr_conn1 confluent-schema-registry + cvp_csr_conn2 confluent-schema-registry + cvp_kafka_conn1 kafka + cvp_kafka_conn2 kafka + + > SELECT name FROM mz_secrets WHERE name LIKE 'cvp_%' ORDER BY name + cvp_secret1 + cvp_secret2 + cvp_secret3 + cvp_secret4 + """)), + ] + + def validate(self) -> Testdrive: + return Testdrive(dedent(""" + > SELECT name, type FROM mz_connections WHERE name LIKE 'cvp_%' ORDER BY name + cvp_csr_conn1 confluent-schema-registry + cvp_csr_conn2 confluent-schema-registry + cvp_kafka_conn1 kafka + cvp_kafka_conn2 kafka + + > SELECT name FROM mz_secrets WHERE name LIKE 'cvp_%' ORDER BY name + cvp_secret1 + cvp_secret2 + cvp_secret3 + cvp_secret4 + """)) diff --git a/misc/python/materialize/checks/all_checks/ssrf.py b/misc/python/materialize/checks/all_checks/ssrf.py new file mode 100644 index 0000000000000..dc45de2802549 --- /dev/null +++ b/misc/python/materialize/checks/all_checks/ssrf.py @@ -0,0 +1,117 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +from textwrap import dedent + +from materialize.checks.actions import Testdrive +from materialize.checks.checks import Check, externally_idempotent + +# Alternative encodings of 169.254.169.254 (AWS instance metadata service). +# All of these resolve to the same address via different URL/IP encoding tricks: +# decimal integer, hex integer, octal, mixed notation, IPv6-mapped, and ULA. +# IPv6 forms need brackets in URLs. +EVIL_METADATA_IPS = [ + "169.254.169.254", + "2852039166", + "0xa9fea9fe", + "025177524776", + "0251.0376.0251.0376", + "0xa9.0xfe.0xa9.0xfe", + "169.16689662", + "169.254.0xa9fe", + "[::ffff:a9fe:a9fe]", + "[::ffff:169.254.169.254]", + "[fd00:ec2::254]", +] + + +@externally_idempotent(False) +class SsrfCopyFrom(Check): + """Verify that COPY FROM rejects evil IP encodings across restarts.""" + + def initialize(self) -> Testdrive: + cmds = """ + $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} + ALTER SYSTEM SET enable_copy_from_remote = true + ALTER SYSTEM SET storage_enforce_external_addresses = true + + > CREATE TABLE ssrf_target (a text) + """ + for ip in EVIL_METADATA_IPS: + cmds += f""" + ! COPY INTO ssrf_target FROM 'http://{ip}/' (FORMAT CSV); + contains:Address resolved to a private IP + """ + return Testdrive(dedent(cmds)) + + def manipulate(self) -> list[Testdrive]: + results = [] + half = len(EVIL_METADATA_IPS) // 2 + for batch in [EVIL_METADATA_IPS[:half], EVIL_METADATA_IPS[half:]]: + cmds = "" + for ip in batch: + cmds += f""" + ! COPY INTO ssrf_target FROM 'http://{ip}/' (FORMAT CSV); + contains:Address resolved to a private IP + """ + results.append(Testdrive(dedent(cmds))) + return results + + def validate(self) -> Testdrive: + cmds = "" + for ip in EVIL_METADATA_IPS: + cmds += f""" + ! COPY INTO ssrf_target FROM 'http://{ip}/' (FORMAT CSV); + contains:Address resolved to a private IP + """ + cmds += """ + > SELECT count(*) FROM ssrf_target; + 0 + """ + return Testdrive(dedent(cmds)) + + +@externally_idempotent(False) +class SsrfAwsConnection(Check): + """Verify that CREATE CONNECTION TO AWS rejects evil IP endpoints across restarts.""" + + def _reject_cmds(self, ips: list[str], prefix: str) -> str: + cmds = "" + for i, ip in enumerate(ips): + cmds += f""" + ! CREATE CONNECTION {prefix}_{i} TO AWS ( + ACCESS KEY ID = 'unused', + SECRET ACCESS KEY = SECRET ssrf_aws_secret, + ENDPOINT = 'http://{ip}/', + REGION = 'us-east-1' + ) WITH (VALIDATE = true); + contains:Address resolved to a private IP + """ + return cmds + + def initialize(self) -> Testdrive: + cmds = """ + $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} + ALTER SYSTEM SET enable_connection_validation_syntax = true + ALTER SYSTEM SET storage_enforce_external_addresses = true + + > CREATE SECRET IF NOT EXISTS ssrf_aws_secret AS 'unused' + """ + cmds += self._reject_cmds(EVIL_METADATA_IPS, "ssrf_aws_init") + return Testdrive(dedent(cmds)) + + def manipulate(self) -> list[Testdrive]: + half = len(EVIL_METADATA_IPS) // 2 + batches = [EVIL_METADATA_IPS[:half], EVIL_METADATA_IPS[half:]] + return [ + Testdrive(dedent(self._reject_cmds(batch, f"ssrf_aws_m{i}"))) + for i, batch in enumerate(batches) + ] + + def validate(self) -> Testdrive: + return Testdrive(dedent(self._reject_cmds(EVIL_METADATA_IPS, "ssrf_aws_val"))) diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 5709dae568719..c94190d57f738 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -607,6 +607,94 @@ def run(self, exe: Executor) -> bool: return True +EVIL_METADATA_IPS = [ + "169.254.169.254", + "2852039166", + "0xa9fea9fe", + "025177524776", + "0251.0376.0251.0376", + "0xa9.0xfe.0xa9.0xfe", + "169.16689662", + "169.254.0xa9fe", + "[::ffff:a9fe:a9fe]", + "[::ffff:169.254.169.254]", + "[fd00:ec2::254]", +] + + +class CopyFromEvilIpAction(Action): + """Try COPY FROM with evil IP encodings of the metadata service address.""" + + def errors_to_ignore(self, exe: Executor) -> list[str]: + result = super().errors_to_ignore(exe) + result.extend( + [ + "Address resolved to a private IP", + "Connection refused", + "Connection timed out", + "timeout", + "Network is unreachable", + "No route to host", + "only 'http://' and 'https://' urls are supported", + ] + ) + if exe.db.complexity == Complexity.DDL: + result.extend( + [ + "does not exist", + "COPY FROM's target table", + ] + ) + return result + + def run(self, exe: Executor) -> bool: + tables = [t for t in exe.db.tables if t.num_rows < MAX_ROWS] + if not tables: + return False + table = self.rng.choice(tables) + ip = self.rng.choice(EVIL_METADATA_IPS) + query = f"COPY INTO {table} FROM 'http://{ip}/latest/meta-data/' (FORMAT CSV)" + exe.execute(query, explainable=False, http=Http.NO, fetch=False) + return True + + +class CreateEvilIpAwsConnectionAction(Action): + """Try CREATE CONNECTION TO AWS with evil IP endpoints.""" + + def errors_to_ignore(self, exe: Executor) -> list[str]: + result = super().errors_to_ignore(exe) + result.extend( + [ + "Address resolved to a private IP", + "Connection refused", + "Connection timed out", + "timeout", + "Network is unreachable", + "No route to host", + "already exists", + ] + ) + return result + + def run(self, exe: Executor) -> bool: + ip = self.rng.choice(EVIL_METADATA_IPS) + conn_name = f"evil_ip_conn_{self.rng.randint(0, 999)}" + query = f"""CREATE CONNECTION {conn_name} TO AWS ( + ACCESS KEY ID = 'unused', + SECRET ACCESS KEY = SECRET minio, + ENDPOINT = 'http://{ip}/', + REGION = 'us-east-1' + ) WITH (VALIDATE = true)""" + exe.execute(query, explainable=False, http=Http.NO, fetch=False) + exe.execute( + f"DROP CONNECTION IF EXISTS {conn_name}", + explainable=False, + http=Http.NO, + fetch=False, + ) + return True + + class InsertAction(Action): def errors_to_ignore(self, exe: Executor) -> list[str]: result = super().errors_to_ignore(exe) @@ -3188,6 +3276,7 @@ def __init__( 100, ), (CopyFromS3Action, 100), + (CopyFromEvilIpAction, 10), (SetClusterAction, 1), (CommitRollbackAction, 30), (ReconnectAction, 1), @@ -3281,6 +3370,7 @@ def __init__( (RenameIcebergSinkAction, 10), (SwapSchemaAction, 10), (ReplaceMaterializedViewAction, 20), + (CreateEvilIpAwsConnectionAction, 2), (FlipFlagsAction, 2), # TODO: Reenable when database-issues#8813 is fixed. # (AlterTableAddColumnAction, 10), diff --git a/misc/python/materialize/parallel_workload/settings.py b/misc/python/materialize/parallel_workload/settings.py index 39894cd0dc536..e142458bbab0f 100644 --- a/misc/python/materialize/parallel_workload/settings.py +++ b/misc/python/materialize/parallel_workload/settings.py @@ -49,4 +49,7 @@ def _missing_(cls, value): # it on outside that scenario is harmless: no Parallel Workload codegen # emits `repeat_row` unless the scenario is active. "enable_repeat_row": "true", + # Needed for CopyFromEvilIpAction / SsrfCopyFrom platform check + "enable_copy_from_remote": "true", + "enable_connection_validation_syntax": "true", } diff --git a/src/ore/src/netio/dns.rs b/src/ore/src/netio/dns.rs index 40d4216f938a8..7d9b70e5584b7 100644 --- a/src/ore/src/netio/dns.rs +++ b/src/ore/src/netio/dns.rs @@ -261,4 +261,35 @@ mod tests { .expect("host:port form should parse"); assert!(ips.contains(&PUBLIC_V4.parse::().unwrap())); } + + #[crate::test] + fn ensure_url_ip_global_rejects_metadata_encodings() { + // Alternative encodings of 169.254.169.254 (AWS instance metadata). + // The WHATWG URL Standard (implemented by the `url` crate) normalizes + // decimal-integer, hex, octal, and mixed-notation IPv4 literals to + // dotted-decimal, so they all resolve to the same link-local address. + let evil_urls = [ + "http://169.254.169.254/", + "http://2852039166/", + "http://0xa9fea9fe/", + "http://025177524776/", + "http://0251.0376.0251.0376/", + "http://0xa9.0xfe.0xa9.0xfe/", + "http://169.16689662/", + "http://169.254.0xa9fe/", + // IPv6 forms + "http://[::ffff:a9fe:a9fe]/", + "http://[::ffff:169.254.169.254]/", + // EC2 metadata alternate IPv6 (ULA range) + "http://[fd00:ec2::254]/", + ]; + for raw in &evil_urls { + let url = url::Url::parse(raw).unwrap_or_else(|e| panic!("{raw} should parse: {e}")); + let result = ensure_url_ip_global(&url); + assert!( + matches!(result, Err(DnsResolutionError::PrivateAddress)), + "{raw} should be rejected as private, got {result:?}" + ); + } + } } diff --git a/test/testdrive/connection-create-external.td b/test/testdrive/connection-create-external.td index cbe04d2bea934..f10f45e115e94 100644 --- a/test/testdrive/connection-create-external.td +++ b/test/testdrive/connection-create-external.td @@ -100,6 +100,41 @@ contains:Address resolved to a private IP ! COPY INTO copy_ssrf_target FROM 'http://169.254.169.254/latest/meta-data/' (FORMAT CSV); contains:Address resolved to a private IP +# Alternative encodings of 169.254.169.254. The WHATWG URL Standard normalizes +# decimal-integer, hex, octal, and mixed-notation IPv4 literals, so they all +# resolve to the same link-local address and must be blocked. +! COPY INTO copy_ssrf_target FROM 'http://2852039166/' (FORMAT CSV); +contains:Address resolved to a private IP + +! COPY INTO copy_ssrf_target FROM 'http://0xa9fea9fe/' (FORMAT CSV); +contains:Address resolved to a private IP + +! COPY INTO copy_ssrf_target FROM 'http://025177524776/' (FORMAT CSV); +contains:Address resolved to a private IP + +! COPY INTO copy_ssrf_target FROM 'http://0251.0376.0251.0376/' (FORMAT CSV); +contains:Address resolved to a private IP + +! COPY INTO copy_ssrf_target FROM 'http://0xa9.0xfe.0xa9.0xfe/' (FORMAT CSV); +contains:Address resolved to a private IP + +! COPY INTO copy_ssrf_target FROM 'http://169.16689662/' (FORMAT CSV); +contains:Address resolved to a private IP + +! COPY INTO copy_ssrf_target FROM 'http://169.254.0xa9fe/' (FORMAT CSV); +contains:Address resolved to a private IP + +# IPv6-mapped forms of the metadata IP +! COPY INTO copy_ssrf_target FROM 'http://[::ffff:a9fe:a9fe]/' (FORMAT CSV); +contains:Address resolved to a private IP + +! COPY INTO copy_ssrf_target FROM 'http://[::ffff:169.254.169.254]/' (FORMAT CSV); +contains:Address resolved to a private IP + +# EC2 metadata alternate IPv6 address (falls in fc00::/7 ULA range) +! COPY INTO copy_ssrf_target FROM 'http://[fd00:ec2::254]/' (FORMAT CSV); +contains:Address resolved to a private IP + # Hostname resolving to loopback exercises the reqwest custom DNS resolver # path (as opposed to the upfront IP-literal check above). ! COPY INTO copy_ssrf_target FROM 'http://localhost:1/file.csv' (FORMAT CSV); @@ -139,6 +174,47 @@ contains:Address resolved to a private IP ) WITH (VALIDATE = true); contains:Address resolved to a private IP +# Alternative metadata IP encodings via AWS endpoint +! CREATE CONNECTION aws_ssrf_endpoint_decimal TO AWS ( + ACCESS KEY ID = 'unused', + SECRET ACCESS KEY = SECRET aws_ssrf_secret, + ENDPOINT = 'http://2852039166/', + REGION = 'us-east-1' + ) WITH (VALIDATE = true); +contains:Address resolved to a private IP + +! CREATE CONNECTION aws_ssrf_endpoint_hex TO AWS ( + ACCESS KEY ID = 'unused', + SECRET ACCESS KEY = SECRET aws_ssrf_secret, + ENDPOINT = 'http://0xa9fea9fe/', + REGION = 'us-east-1' + ) WITH (VALIDATE = true); +contains:Address resolved to a private IP + +! CREATE CONNECTION aws_ssrf_endpoint_octal TO AWS ( + ACCESS KEY ID = 'unused', + SECRET ACCESS KEY = SECRET aws_ssrf_secret, + ENDPOINT = 'http://025177524776/', + REGION = 'us-east-1' + ) WITH (VALIDATE = true); +contains:Address resolved to a private IP + +! CREATE CONNECTION aws_ssrf_endpoint_ipv6_mapped TO AWS ( + ACCESS KEY ID = 'unused', + SECRET ACCESS KEY = SECRET aws_ssrf_secret, + ENDPOINT = 'http://[::ffff:a9fe:a9fe]/', + REGION = 'us-east-1' + ) WITH (VALIDATE = true); +contains:Address resolved to a private IP + +! CREATE CONNECTION aws_ssrf_endpoint_ec2_v6 TO AWS ( + ACCESS KEY ID = 'unused', + SECRET ACCESS KEY = SECRET aws_ssrf_secret, + ENDPOINT = 'http://[fd00:ec2::254]/', + REGION = 'us-east-1' + ) WITH (VALIDATE = true); +contains:Address resolved to a private IP + $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} ALTER SYSTEM SET storage_enforce_external_addresses = false