From 4e67fd6076aaf937916d90231d2017867d81a551 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Fri, 24 Apr 2026 15:59:18 -0400 Subject: [PATCH 01/23] add binlog setting to mysql source details at purification Co-authored-by: Copilot --- src/sql/src/plan/statement/ddl.rs | 4 +++ src/sql/src/pure.rs | 44 +++++++++++++++++++++++ src/sql/src/pure/mysql.rs | 5 +++ src/storage-types/src/sources.rs | 4 +++ src/storage-types/src/sources/mysql.proto | 1 + src/storage-types/src/sources/mysql.rs | 2 ++ 6 files changed, 60 insertions(+) diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index df553b6698fdf..e33c482907639 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -1662,6 +1662,7 @@ pub fn plan_create_subsource( SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, } => SourceExportDetails::MySql(MySqlSourceExportDetails { table, initial_gtid_set, @@ -1670,6 +1671,7 @@ pub fn plan_create_subsource( .into_iter() .map(|c| c.into_string()) .collect(), + binlog_full_metadata, }), SourceExportStatementDetails::SqlServer { table, @@ -1819,6 +1821,7 @@ pub fn plan_create_table_from_source( SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, } => SourceExportDetails::MySql(MySqlSourceExportDetails { table, initial_gtid_set, @@ -1827,6 +1830,7 @@ pub fn plan_create_table_from_source( .into_iter() .map(|c| c.into_string()) .collect(), + binlog_full_metadata, }), SourceExportStatementDetails::SqlServer { table, diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index eb574e435c723..30ded83be8851 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -252,6 +252,7 @@ pub enum PurifiedExportDetails { text_columns: Option>, exclude_columns: Option>, initial_gtid_set: String, + binlog_full_metadata: bool, }, Postgres { table: PostgresTableDesc, @@ -1101,6 +1102,14 @@ async fn purify_create_source( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; + let binlog_full_metadata = version_compare::compare_to( + mz_mysql_util::query_sys_var(&mut conn, "version").await?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .expect("failed to parse version string from mysql") + && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await? == "FULL"; + let reference_client = SourceReferenceClient::MySql { conn: &mut conn, include_system_schemas: mysql::references_system_schemas(external_references), @@ -1120,6 +1129,7 @@ async fn purify_create_source( source_name, initial_gtid_set.clone(), &reference_policy, + binlog_full_metadata, ) .await?; requested_subsource_map.extend(subsources); @@ -1526,6 +1536,14 @@ async fn purify_alter_source_add_subsources( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; + let binlog_full_metadata = version_compare::compare_to( + mz_mysql_util::query_sys_var(&mut conn, "version").await?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .expect("failed to parse version string from mysql") + && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await? == "FULL"; + let requested_references = Some(ExternalReferences::SubsetTables(external_references)); let reference_client = SourceReferenceClient::MySql { @@ -1547,6 +1565,7 @@ async fn purify_alter_source_add_subsources( &unresolved_source_name, initial_gtid_set, &SourceReferencePolicy::Required, + binlog_full_metadata, ) .await?; requested_subsource_map.extend(subsources); @@ -1882,6 +1901,30 @@ async fn purify_create_table_from_source( ) .await?; + if version_compare::compare_to( + mz_mysql_util::query_sys_var(&mut conn, "version").await?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .expect("failed to parse version string from mysql") + { + Err( + MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { + setting: "Using MySQL version < 8.0.1, which does not support full binlog row metadata".to_string(), + }, + )?; + } + let binlog_metadata_setting = + mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await?; + let binlog_full_metadata = binlog_metadata_setting.to_uppercase() == "FULL"; + if !binlog_full_metadata { + Err( + MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { + setting: binlog_metadata_setting, + }, + )?; + } + // Retrieve the current @gtid_executed value of the server to mark as the effective // initial snapshot point for this table. let initial_gtid_set = @@ -1909,6 +1952,7 @@ async fn purify_create_table_from_source( &unresolved_source_name, initial_gtid_set, &SourceReferencePolicy::Required, + binlog_full_metadata, ) .await?; // There should be exactly one source_export returned for this statement diff --git a/src/sql/src/pure/mysql.rs b/src/sql/src/pure/mysql.rs index 0f60f41e62eb3..d91efe366d948 100644 --- a/src/sql/src/pure/mysql.rs +++ b/src/sql/src/pure/mysql.rs @@ -11,6 +11,7 @@ use std::collections::{BTreeMap, BTreeSet}; +use mysql_async::binlog; use mz_mysql_util::{ MySqlError, MySqlTableDesc, QualifiedTableRef, SYSTEM_SCHEMAS, validate_source_privileges, }; @@ -130,6 +131,7 @@ pub(super) fn generate_source_export_statement_values( text_columns, exclude_columns, initial_gtid_set, + binlog_full_metadata, } = purified_export.details else { bail_internal!("purified export details must be mysql"); @@ -188,6 +190,7 @@ pub(super) fn generate_source_export_statement_values( let details = SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, }; let text_columns = text_columns.map(|mut columns| { @@ -339,6 +342,7 @@ pub(super) async fn purify_source_exports( unresolved_source_name: &UnresolvedItemName, initial_gtid_set: String, reference_policy: &SourceReferencePolicy, + binlog_full_metadata: bool, ) -> Result { let requested_exports = match requested_references.as_ref() { Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => { @@ -477,6 +481,7 @@ pub(super) async fn purify_source_exports( .collect() }), initial_gtid_set: initial_gtid_set.clone(), + binlog_full_metadata, }, }, ) diff --git a/src/storage-types/src/sources.rs b/src/storage-types/src/sources.rs index 9d1251676d297..0bf1684d3227c 100644 --- a/src/storage-types/src/sources.rs +++ b/src/storage-types/src/sources.rs @@ -908,6 +908,7 @@ pub enum SourceExportStatementDetails { MySql { table: mz_mysql_util::MySqlTableDesc, initial_gtid_set: String, + binlog_full_metadata: bool, }, SqlServer { table: mz_sql_server_util::desc::SqlServerTableDesc, @@ -933,11 +934,13 @@ impl RustType for SourceExportStatementDetail SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, } => ProtoSourceExportStatementDetails { kind: Some(proto_source_export_statement_details::Kind::Mysql( mysql::ProtoMySqlSourceExportStatementDetails { table: Some(table.into_proto()), initial_gtid_set: initial_gtid_set.clone(), + binlog_full_metadata: *binlog_full_metadata, }, )), }, @@ -985,6 +988,7 @@ impl RustType for SourceExportStatementDetail .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?, initial_gtid_set: details.initial_gtid_set, + binlog_full_metadata: details.binlog_full_metadata, }, Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer { table: details diff --git a/src/storage-types/src/sources/mysql.proto b/src/storage-types/src/sources/mysql.proto index 015f733bc6ddd..3e6dd347fb4d6 100644 --- a/src/storage-types/src/sources/mysql.proto +++ b/src/storage-types/src/sources/mysql.proto @@ -23,4 +23,5 @@ message ProtoMySqlSourceDetails { message ProtoMySqlSourceExportStatementDetails { mz_mysql_util.ProtoMySqlTableDesc table = 1; string initial_gtid_set = 2; + bool binlog_full_metadata = 3; } diff --git a/src/storage-types/src/sources/mysql.rs b/src/storage-types/src/sources/mysql.rs index 1f10cd95649da..6e2fc2deae11e 100644 --- a/src/storage-types/src/sources/mysql.rs +++ b/src/storage-types/src/sources/mysql.rs @@ -215,6 +215,7 @@ pub struct MySqlSourceExportDetails { pub initial_gtid_set: String, pub text_columns: Vec, pub exclude_columns: Vec, + pub binlog_full_metadata: bool, } impl AlterCompatible for MySqlSourceExportDetails { @@ -230,6 +231,7 @@ impl AlterCompatible for MySqlSourceExportDetails { initial_gtid_set: _, text_columns: _, exclude_columns: _, + binlog_full_metadata: _, } = self; Ok(()) } From af71b1405452305e06e620dac5f631908159454c Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Fri, 24 Apr 2026 16:13:48 -0400 Subject: [PATCH 02/23] update cargo and fix errors from cherry picking --- Cargo.lock | 7 +++++++ Cargo.toml | 1 + src/sql/Cargo.toml | 1 + src/sql/src/pure/error.rs | 4 ++++ src/sql/src/pure/mysql.rs | 1 - 5 files changed, 13 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 3848d4668f2b7..02d75bd8044b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7823,6 +7823,7 @@ dependencies = [ "tracing-subscriber", "uncased", "uuid", + "version-compare", ] [[package]] @@ -12998,6 +12999,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version-compare" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index 9f6bac3bbbbc9..390856facdd6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -524,6 +524,7 @@ url = "2.5.8" urlencoding = "2.1.3" utoipa = "5.4.0" uuid = "1.19.0" +version-compare = "0.2.1" walkdir = "2.5.0" which = "8" yansi = "1.0.1" diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 1c8bf6f0b6d08..aa129164609cd 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -82,6 +82,7 @@ tracing.workspace = true tracing-subscriber.workspace = true uncased.workspace = true uuid = { workspace = true, features = ["serde", "v4"] } +version-compare.workspace = true [dev-dependencies] datadriven.workspace = true diff --git a/src/sql/src/pure/error.rs b/src/sql/src/pure/error.rs index 418a51c5d955d..2e7c4be77efaf 100644 --- a/src/sql/src/pure/error.rs +++ b/src/sql/src/pure/error.rs @@ -310,6 +310,10 @@ pub enum MySqlSourcePurificationError { NoTablesFoundForSchemas(Vec), #[error(transparent)] InvalidConnection(#[from] MySqlConnectionValidationError), + #[error( + "The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources." + )] + UnsupportedBinlogMetadataSetting { setting: String }, } impl MySqlSourcePurificationError { diff --git a/src/sql/src/pure/mysql.rs b/src/sql/src/pure/mysql.rs index d91efe366d948..7e6814d11711c 100644 --- a/src/sql/src/pure/mysql.rs +++ b/src/sql/src/pure/mysql.rs @@ -11,7 +11,6 @@ use std::collections::{BTreeMap, BTreeSet}; -use mysql_async::binlog; use mz_mysql_util::{ MySqlError, MySqlTableDesc, QualifiedTableRef, SYSTEM_SCHEMAS, validate_source_privileges, }; From 91e164f3fc8369e288e895d3eb291f8dcd477aa1 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Mon, 27 Apr 2026 14:05:28 -0400 Subject: [PATCH 03/23] fix tests, wrong comparison operator, and consistent case insensitivity Co-authored-by: Copilot --- src/sql/src/pure.rs | 20 +++++++++++++------- test/mysql-cdc/mzcompose.py | 30 +++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 30ded83be8851..4f977e76ee2c5 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -1105,10 +1105,13 @@ async fn purify_create_source( let binlog_full_metadata = version_compare::compare_to( mz_mysql_util::query_sys_var(&mut conn, "version").await?, "8.0.1", - version_compare::Cmp::Lt, + version_compare::Cmp::Ge, ) - .expect("failed to parse version string from mysql") - && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await? == "FULL"; + .is_ok_and(|is_ge| is_ge) + && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata") + .await? + .to_uppercase() + == "FULL"; let reference_client = SourceReferenceClient::MySql { conn: &mut conn, @@ -1539,10 +1542,13 @@ async fn purify_alter_source_add_subsources( let binlog_full_metadata = version_compare::compare_to( mz_mysql_util::query_sys_var(&mut conn, "version").await?, "8.0.1", - version_compare::Cmp::Lt, + version_compare::Cmp::Ge, ) - .expect("failed to parse version string from mysql") - && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await? == "FULL"; + .is_ok_and(|is_ge| is_ge) + && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata") + .await? + .to_uppercase() + == "FULL"; let requested_references = Some(ExternalReferences::SubsetTables(external_references)); @@ -1906,7 +1912,7 @@ async fn purify_create_table_from_source( "8.0.1", version_compare::Cmp::Lt, ) - .expect("failed to parse version string from mysql") + .is_ok_and(|is_lt| is_lt) { Err( MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { diff --git a/test/mysql-cdc/mzcompose.py b/test/mysql-cdc/mzcompose.py index c6837f3ad88ba..d43398f76dc35 100644 --- a/test/mysql-cdc/mzcompose.py +++ b/test/mysql-cdc/mzcompose.py @@ -33,21 +33,33 @@ from materialize.mzcompose.services.toxiproxy import Toxiproxy -def create_mysql(mysql_version: str) -> MySql: - return MySql(version=mysql_version) +def create_mysql(mysql_version: str, binlog_full_metadata: bool = True) -> MySql: + additional_args = [] + if binlog_full_metadata: + additional_args.extend( + [ + "--binlog-row-metadata=FULL", + "--gtid_mode=ON", + "--enforce_gtid_consistency=ON", + ] + ) + return MySql(version=mysql_version, additional_args=additional_args) -def create_mysql_replica(mysql_version: str) -> MySql: - return MySql( - name="mysql-replica", - version=mysql_version, - use_seeded_image=False, - additional_args=[ +def create_mysql_replica(mysql_version: str, binlog_full_metadata: bool = True) -> MySql: + additional_args = [ "--gtid_mode=ON", "--enforce_gtid_consistency=ON", "--skip-replica-start", "--server-id=2", - ], + ] + if binlog_full_metadata: + additional_args.append("--binlog-row-metadata=FULL") + return MySql( + name="mysql-replica", + version=mysql_version, + use_seeded_image=False, + additional_args=additional_args ) From c8266af31cd0f00070791795caad41077390609a Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Mon, 27 Apr 2026 14:07:27 -0400 Subject: [PATCH 04/23] fmt --- test/mysql-cdc/mzcompose.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/test/mysql-cdc/mzcompose.py b/test/mysql-cdc/mzcompose.py index d43398f76dc35..0ebb9d96aa5dc 100644 --- a/test/mysql-cdc/mzcompose.py +++ b/test/mysql-cdc/mzcompose.py @@ -46,20 +46,22 @@ def create_mysql(mysql_version: str, binlog_full_metadata: bool = True) -> MySql return MySql(version=mysql_version, additional_args=additional_args) -def create_mysql_replica(mysql_version: str, binlog_full_metadata: bool = True) -> MySql: +def create_mysql_replica( + mysql_version: str, binlog_full_metadata: bool = True +) -> MySql: additional_args = [ - "--gtid_mode=ON", - "--enforce_gtid_consistency=ON", - "--skip-replica-start", - "--server-id=2", - ] + "--gtid_mode=ON", + "--enforce_gtid_consistency=ON", + "--skip-replica-start", + "--server-id=2", + ] if binlog_full_metadata: additional_args.append("--binlog-row-metadata=FULL") return MySql( name="mysql-replica", version=mysql_version, use_seeded_image=False, - additional_args=additional_args + additional_args=additional_args, ) From 9a5baa18fc3e10c910a4e2abc1c46989fefff94b Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Thu, 23 Apr 2026 13:27:02 -0400 Subject: [PATCH 05/23] Revert "Revert MySQL source versioning changes (#36211)" This reverts commit fc8d9dc1e450a10ca0ee7e338ca7b3c1148ae6ff. This effectively unreverts the mysql source versioning revert --- .../ingest-data/mysql/source-versioning.md | 193 ++++++++++++++++++ doc/user/data/mysql_config_settings.yml | 4 + src/mysql-util/src/decoding.rs | 39 +++- src/mysql-util/src/desc.rs | 43 ++-- src/storage/src/source/mysql/schemas.rs | 23 ++- test/mysql-cdc-old-syntax/mzcompose.py | 5 +- test/mysql-cdc-resumption/mzcompose.py | 2 +- test/mysql-cdc/alter-column-irrelevant.td | 8 +- test/mysql-cdc/binlog-backward-compat.td | 66 ++++++ test/mysql-cdc/binlog-row-metadata-check.td | 37 ++++ test/mysql-cdc/upstream-schema-changes.td | 173 ++++++++++++++++ 11 files changed, 567 insertions(+), 26 deletions(-) create mode 100644 doc/user/content/ingest-data/mysql/source-versioning.md create mode 100644 test/mysql-cdc/binlog-backward-compat.td create mode 100644 test/mysql-cdc/binlog-row-metadata-check.td create mode 100644 test/mysql-cdc/upstream-schema-changes.td diff --git a/doc/user/content/ingest-data/mysql/source-versioning.md b/doc/user/content/ingest-data/mysql/source-versioning.md new file mode 100644 index 0000000000000..05edc044d5ce2 --- /dev/null +++ b/doc/user/content/ingest-data/mysql/source-versioning.md @@ -0,0 +1,193 @@ +--- +title: "Guide: Handle upstream schema changes with zero downtime" +description: "How to add a column, or drop a column, from your source MySQL database, without any downtime in Materialize" + +menu: + main: + parent: "mysql" + identifier: "mysql-source-versioning" + weight: 85 +--- + +{{< private-preview />}} +{{< note >}} +Changing column types is currently unsupported. +{{< /note >}} + +Materialize allows you to handle certain types of upstream +table schema changes seamlessly, specifically: + +- Adding a column in the upstream database. +- Dropping a column in the upstream database. + +This guide walks you through how to handle these changes without any downtime in Materialize. + +## Prerequisites + +Some familiarity with Materialize. If you've never used Materialize before, +start with our [guide to getting started](/get-started/quickstart/) to learn +how to connect a database to Materialize. + +### Set up a MySQL database + +For this guide, setup a MySQL 5.7+ database. In your MySQL database, create a +table `t1` and populate it: + +```sql +CREATE TABLE t1 ( + a INT +); + +INSERT INTO t1 (a) VALUES (10); +``` + +### Configure your MySQL database + +Configure your MySQL database for GTID-based binlog replication using the +[configuration instructions for self-hosted MySQL](/ingest-data/mysql/self-hosted/#a-configure-mysql). + +### Connect your source database to Materialize + +Create a connection to your MySQL database using the [`CREATE CONNECTION` syntax](/sql/create-connection/). + +## Create a source + +In Materialize, create a source using the [`CREATE SOURCE` +syntax](/sql/create-source/mysql/). + +```mzsql +CREATE SOURCE my_source + FROM MYSQL CONNECTION mysql_connection; +``` + +## Create a table from the source + +To start ingesting specific tables from your source database, create a +table in Materialize. We'll add it into the `v1` schema. + +```mzsql +CREATE SCHEMA v1; + +CREATE TABLE v1.t1 + FROM SOURCE my_source (REFERENCE mydb.t1); +``` + +Once you've created a table from source, the [initial +snapshot](/ingest-data/#snapshotting) of table `v1.t1` will begin. + +{{< note >}} + +During the snapshotting, the data ingestion for the other tables associated with +the source is temporarily blocked. You can monitor progress for the snapshot +operation on the overview page for the source in the Materialize console. + +{{< /note >}} + +## Create a view on top of the table + +For this guide, add a materialized view `matview` (also in schema `v1`) that +sums column `a` from table `t1`. + +```mzsql +CREATE MATERIALIZED VIEW v1.matview AS + SELECT SUM(a) FROM v1.t1; +``` + +## Handle upstream column addition + +### A. Add a column in your upstream MySQL database + +In your upstream MySQL database, add a new column `b` to the table `t1`: + +```sql +ALTER TABLE t1 + ADD COLUMN b BOOLEAN DEFAULT false; + +INSERT INTO t1 (a, b) VALUES (20, true); +``` + +This operation has no immediate effect in Materialize. In Materialize: + +- The table `v1.t1` will continue to ingest only column `a`. +- The materialized view `v1.matview` will continue to have access to column `a` + only. + +### B. Incorporate the new column in Materialize + +Unlike SQL Server CDC, MySQL uses binlog-based replication, which automatically +includes all columns. To incorporate the new column into Materialize, create a +new `v2` schema and recreate the table in the new schema: + +```mzsql +CREATE SCHEMA v2; + +CREATE TABLE v2.t1 + FROM SOURCE my_source (REFERENCE mydb.t1); +``` + +The [snapshotting](/ingest-data/#snapshotting) of table `v2.t1` will begin. +`v2.t1` will include columns `a` and `b`. + +{{< note >}} + +During the snapshotting, the data ingestion for the other tables associated with +the source is temporarily blocked. You can monitor progress for the snapshot +operation on the overview page for the source in the Materialize console. + +{{< /note >}} + +When `v2.t1` has finished snapshotting, create a new materialized view in the +new schema. Since `v2.matview` references `v2.t1`, it can now reference column `b`: + +```mzsql {hl_lines="4"} +CREATE MATERIALIZED VIEW v2.matview AS + SELECT SUM(a) + FROM v2.t1 + WHERE b = true; +``` + +## Handle upstream column drop + +### A. Exclude the column in Materialize + +To drop a column safely, first create a new schema in Materialize and recreate +the table excluding the column you intend to drop. In this example, we'll drop +column `b`. + +```mzsql +CREATE SCHEMA v3; + +CREATE TABLE v3.t1 + FROM SOURCE my_source (REFERENCE mydb.t1) WITH (EXCLUDE COLUMNS (b)); +``` + +{{< note >}} + +During the snapshotting, the data ingestion for the other tables associated with +the source is temporarily blocked. You can monitor progress for the snapshot +operation on the overview page for the source in the Materialize console. + +{{< /note >}} + +### B. Drop the column in your upstream MySQL database + +In your upstream MySQL database, drop column `b` from table `t1`: + +```sql +ALTER TABLE t1 DROP COLUMN b; +``` + +Dropping column `b` will have no effect on `v3.t1` in Materialize, provided +you completed step A before dropping the column. However, the drop affects +`v2.T` and `v2.matview` from our earlier examples. When the user attempts to +read from either, Materialize will report an error that the source table schema +has been altered. + +Once you have finished migrating any views and queries from `v2` to `v3`, you +can clean up the old objects: + +```mzsql +DROP TABLE v2.t1; +DROP MATERIALIZED VIEW v2.matview; +DROP SCHEMA v2; +``` diff --git a/doc/user/data/mysql_config_settings.yml b/doc/user/data/mysql_config_settings.yml index 76f38b3df6ada..5939c72eda263 100644 --- a/doc/user/data/mysql_config_settings.yml +++ b/doc/user/data/mysql_config_settings.yml @@ -15,6 +15,10 @@ rows: Value: "`FULL`" Notes: "" + - MySQL Configuration: "`binlog_row_metadata`" + Value: "`FULL`" + Notes: "Required when using the `CREATE TABLE FROM SOURCE` syntax." + - MySQL Configuration: "`gtid_mode`" Value: "`ON`" Notes: "{{ $gtid_mode_note }}" diff --git a/src/mysql-util/src/decoding.rs b/src/mysql-util/src/decoding.rs index 18aa6e101600a..f0ff20f3d270b 100644 --- a/src/mysql-util/src/decoding.rs +++ b/src/mysql-util/src/decoding.rs @@ -30,9 +30,44 @@ pub fn pack_mysql_row( table_desc: &MySqlTableDesc, ) -> Result { let mut packer = row_container.packer(); - let row_values = row.unwrap(); - for values in table_desc.columns.iter().zip_longest(row_values) { + // If a column name begins with '@', then the binlog does not have full row metadata, + // meaning that full column names are not available and we need to rely on the order + // of the columns in the upstream table matching the order of the columns in the row. + // This is a fallback for MySQL servers that do not have `binlog_row_metadata` set to + // `FULL`. If the first column name does not begin with '@', then we can assume that + // full metadata is available and we can match columns by name. + let row_values: Vec = if row + .columns_ref() + .first() + .is_some_and(|col| col.name_ref().starts_with(b"@")) + { + row.unwrap() + } else { + row.columns_ref() + .iter() + .enumerate() + .filter(|(_, col)| { + table_desc + .columns + .iter() + .filter(|col| col.column_type.is_some()) + .any(|c| c.name.as_str() == col.name_str()) + }) + .map(|(i, _)| { + row.as_ref(i) + .expect("Can't unwrap row if some of columns was taken") + .clone() + }) + .collect() + }; + + for values in table_desc + .columns + .iter() + .filter(|col| col.column_type.is_some()) + .zip_longest(row_values) + { let (col_desc, value) = match values { EitherOrBoth::Both(col_desc, value) => (col_desc, value), EitherOrBoth::Left(col_desc) => { diff --git a/src/mysql-util/src/desc.rs b/src/mysql-util/src/desc.rs index 8464dff2b4273..73e16462365d9 100644 --- a/src/mysql-util/src/desc.rs +++ b/src/mysql-util/src/desc.rs @@ -75,7 +75,11 @@ impl MySqlTableDesc { /// exceptions: /// - `self`'s columns are a prefix of `other`'s columns. /// - `self`'s keys are all present in `other` - pub fn determine_compatibility(&self, other: &MySqlTableDesc) -> Result<(), anyhow::Error> { + pub fn determine_compatibility( + &self, + other: &MySqlTableDesc, + full_metadata: bool, + ) -> Result<(), anyhow::Error> { if self == other { return Ok(()); } @@ -90,18 +94,34 @@ impl MySqlTableDesc { ); } - // `columns` is ordered by the ordinal_position of each column in the table, - // so as long as `self.columns` is a compatible prefix of `other.columns`, we can - // ignore extra columns from `other.columns`. + // In the case that we don't have full binlog row metadata, `columns` is ordered by the + // ordinal_position of each column in the table, so as long as `self.columns` is a + // compatible prefix of `other.columns`, we can ignore extra columns from `other.columns`. + // + // If we do have full metadata, then we can match columns by name and just check that all + // columns in `self.columns` are present and compatible with columns in `other.columns`. let mut other_columns = other.columns.iter(); for self_column in &self.columns { - let other_column = other_columns.next().ok_or_else(|| { - anyhow::anyhow!( - "column {} no longer present in table {}", - self_column.name, - self.name - ) - })?; + let other_column = if full_metadata { + other_columns + .by_ref() + .find(|c| c.name == self_column.name) + .ok_or_else(|| { + anyhow::anyhow!( + "column {} no longer present in table {}", + self_column.name, + self.name + ) + })? + } else { + other_columns.next().ok_or_else(|| { + anyhow::anyhow!( + "column {} no longer present in table {}", + self_column.name, + self.name + ) + })? + }; if !self_column.is_compatible(other_column) { bail!( "column {} in table {} has been altered", @@ -110,7 +130,6 @@ impl MySqlTableDesc { ); } } - // Our keys are all still present in exactly the same shape. // TODO: Implement a more relaxed key compatibility check: // We should check that for all keys that we know about there exists an upstream key whose diff --git a/src/storage/src/source/mysql/schemas.rs b/src/storage/src/source/mysql/schemas.rs index 55bc91fcb0487..379e8dc167e0b 100644 --- a/src/storage/src/source/mysql/schemas.rs +++ b/src/storage/src/source/mysql/schemas.rs @@ -45,6 +45,13 @@ where }) .collect(); + let full_metadata = conn + .query_first::("SELECT @@binlog_row_metadata".to_string()) + .await? + .unwrap() + .to_uppercase() + == "FULL"; + Ok(expected .into_iter() .flat_map(|(table, outputs)| { @@ -64,13 +71,15 @@ where )), ); match new_desc { - Ok(desc) => match output.desc.determine_compatibility(&desc) { - Ok(()) => None, - Err(err) => Some(( - output, - DefiniteError::IncompatibleSchema(err.to_string()), - )), - }, + Ok(desc) => { + match output.desc.determine_compatibility(&desc, full_metadata) { + Ok(()) => None, + Err(err) => Some(( + output, + DefiniteError::IncompatibleSchema(err.to_string()), + )), + } + } Err(err) => { Some((output, DefiniteError::IncompatibleSchema(err.to_string()))) } diff --git a/test/mysql-cdc-old-syntax/mzcompose.py b/test/mysql-cdc-old-syntax/mzcompose.py index 7397185354c0c..363ee1d32588e 100644 --- a/test/mysql-cdc-old-syntax/mzcompose.py +++ b/test/mysql-cdc-old-syntax/mzcompose.py @@ -41,7 +41,9 @@ def create_mysql(mysql_version: str) -> MySql: - return MySql(version=mysql_version) + return MySql( + version=mysql_version, additional_args=["--binlog_row_metadata=MINIMAL"] + ) def create_mysql_replica(mysql_version: str) -> MySql: @@ -53,6 +55,7 @@ def create_mysql_replica(mysql_version: str) -> MySql: "--enforce_gtid_consistency=ON", "--skip-replica-start", "--server-id=2", + "--binlog_row_metadata=MINIMAL", ], ) diff --git a/test/mysql-cdc-resumption/mzcompose.py b/test/mysql-cdc-resumption/mzcompose.py index 33f700a2ef4e2..eacdee204128e 100644 --- a/test/mysql-cdc-resumption/mzcompose.py +++ b/test/mysql-cdc-resumption/mzcompose.py @@ -619,7 +619,7 @@ def backup_restore_mysql(c: Composition) -> None: # TODO: database-issues#7683: one of the two following commands must succeed # run_testdrive_files(c, "verify-rows-after-restore-t1.td") - run_testdrive_files(c, "verify-source-failed.td") + # run_testdrive_files(c, "verify-source-failed.td") def create_source_after_logs_expiration( diff --git a/test/mysql-cdc/alter-column-irrelevant.td b/test/mysql-cdc/alter-column-irrelevant.td index d785efb657d93..4a2caff1f6bac 100644 --- a/test/mysql-cdc/alter-column-irrelevant.td +++ b/test/mysql-cdc/alter-column-irrelevant.td @@ -59,10 +59,12 @@ INSERT INTO t1 VALUES (2, 2); # add a new column to t1 at the beginning of $ mysql-execute name=mysql ALTER TABLE t1 ADD COLUMN f3 INTEGER FIRST; -INSERT INTO t1 VALUES (3, 3, 3); +INSERT INTO t1 VALUES (0, 3, 3); -! SELECT * FROM t1; -contains:incompatible schema change +> SELECT * FROM t1; +1 +2 +3 # add a new column to t2 $ mysql-execute name=mysql diff --git a/test/mysql-cdc/binlog-backward-compat.td b/test/mysql-cdc/binlog-backward-compat.td new file mode 100644 index 0000000000000..04115e15bfc6d --- /dev/null +++ b/test/mysql-cdc/binlog-backward-compat.td @@ -0,0 +1,66 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Verify that MySQL replication works correctly when binlog_row_metadata is +# set to MINIMAL (the MySQL default prior to 8.0). In MINIMAL mode the binlog +# does not include column names, so Materialize must fall back to matching +# columns by position rather than by name. + +> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}' + +> CREATE CONNECTION mysql_conn TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) + +$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} + +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = MINIMAL; +DROP DATABASE IF EXISTS public; +CREATE DATABASE public; +USE public; +CREATE TABLE foo (name VARCHAR(16), value VARCHAR(32)); +INSERT INTO foo VALUES ('a', 'apple'), ('b', 'banana'); + +> CREATE SOURCE mysql_src FROM MYSQL CONNECTION mysql_conn FOR TABLES (foo); + +> SELECT * FROM foo; +a apple +b banana + +$ mysql-execute name=mysql +INSERT INTO foo VALUES ('c', 'cherry'); + +> SELECT * FROM foo; +a apple +b banana +c cherry + +$ mysql-execute name=mysql +UPDATE foo SET value = 'avocado' WHERE name = 'a'; + +> SELECT * FROM foo; +a avocado +b banana +c cherry + +$ mysql-execute name=mysql +DELETE FROM foo WHERE name = 'b'; + +> SELECT * FROM foo; +a avocado +c cherry + +> DROP SOURCE mysql_src CASCADE; + +# Restore to a clean state so other tests are unaffected. +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = FULL; diff --git a/test/mysql-cdc/binlog-row-metadata-check.td b/test/mysql-cdc/binlog-row-metadata-check.td new file mode 100644 index 0000000000000..a25d55b63b526 --- /dev/null +++ b/test/mysql-cdc/binlog-row-metadata-check.td @@ -0,0 +1,37 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test that CREATE TABLE FROM SOURCE fails when binlog_row_metadata is not FULL. + +$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} + +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = MINIMAL; +DROP DATABASE IF EXISTS public; +CREATE DATABASE public; +USE public; +CREATE TABLE t1 (id INT PRIMARY KEY, val TEXT); +INSERT INTO t1 VALUES (1, 'hello'); + +> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}' + +> CREATE CONNECTION mysql_conn TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) + +> CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn; + +! CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1); +contains: binlog_row_metadata + +# Restore to a clean state so other tests are unaffected. +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = FULL; diff --git a/test/mysql-cdc/upstream-schema-changes.td b/test/mysql-cdc/upstream-schema-changes.td new file mode 100644 index 0000000000000..da3ea47c77c2e --- /dev/null +++ b/test/mysql-cdc/upstream-schema-changes.td @@ -0,0 +1,173 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Perform various schema updates to the upstream table + +> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}' + +> CREATE CONNECTION mysql_conn TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) + +$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} + +$ mysql-execute name=mysql +DROP DATABASE IF EXISTS public; +CREATE DATABASE public; +USE public; +CREATE TABLE foo (name VARCHAR(16), value VARCHAR(32)); +INSERT INTO foo VALUES ('a', 'apple'), ('b', 'banana'); + +> CREATE SOURCE mysql_src FROM MYSQL CONNECTION mysql_conn; +> CREATE TABLE foo1 FROM SOURCE mysql_src (REFERENCE public.foo); + +> SELECT * FROM foo1; +a apple +b banana + +$ mysql-execute name=mysql +ALTER TABLE foo ADD COLUMN meta_col VARCHAR(32); +INSERT INTO foo VALUES ('c', 'cherry', 'wild'); + +# Adding a column to the upstream table does not affect foo1 — it continues to +# replicate only the columns it was created with, ignoring the new meta_col. +> SELECT * FROM foo1; +a apple +b banana +c cherry + +$ mysql-execute name=mysql +ALTER TABLE foo MODIFY meta_col VARCHAR(64); +INSERT INTO foo VALUES ('d', 'date', 'ajwa'); + +# Altering the newly added `meta_col` column does not brick `foo1` because `foo1` is +# not following/replicating `meta_col`, so schema updates involving it are inconsequential. +> SELECT * FROM foo1; +a apple +b banana +c cherry +d date + +> DROP TABLE foo1; + +# Unlike SQL Server CDC, MySQL binlog-based replication does not require a new capture +# instance for the new column. Creating a table from the existing source will include +# meta_col from the snapshot onward. +> CREATE TABLE foo2 FROM SOURCE mysql_src (REFERENCE public.foo); +> SELECT * FROM foo2; +a apple +b banana +c cherry wild +d date ajwa + +$ mysql-execute name=mysql +INSERT INTO foo VALUES ('e', 'elderberry', 'montypython'); + +> SELECT * FROM foo2; +a apple +b banana +c cherry wild +d date ajwa +e elderberry montypython + +> DROP TABLE foo2; + +# We can also use EXCLUDE COLUMNS to exclude meta_col if desired. +> CREATE TABLE foo3 FROM SOURCE mysql_src (REFERENCE public.foo) WITH (EXCLUDE COLUMNS = (meta_col)); +> SELECT * FROM foo3; +a apple +b banana +c cherry +d date +e elderberry + +$ mysql-execute name=mysql +INSERT INTO foo VALUES ('f', 'fig', 'sweet'); + +> SELECT * FROM foo3; +a apple +b banana +c cherry +d date +e elderberry +f fig + +# dropping an excluded column should have no effect +$ mysql-execute name=mysql +ALTER TABLE foo DROP COLUMN meta_col; + +> SELECT * FROM foo3; +a apple +b banana +c cherry +d date +e elderberry +f fig + +> DROP TABLE foo3; + +# After meta_col has been dropped upstream, foo4 can be created without excluding it. +> CREATE TABLE foo4 FROM SOURCE mysql_src (REFERENCE public.foo); +> SELECT * FROM foo4; +a apple +b banana +c cherry +d date +e elderberry +f fig + +$ mysql-execute name=mysql +INSERT INTO foo VALUES ('g', 'grape'); + +> SELECT * FROM foo4; +a apple +b banana +c cherry +d date +e elderberry +f fig +g grape + +# Dropping a non-excluded (tracked) column stalls the source. +$ mysql-execute name=mysql +ALTER TABLE foo DROP COLUMN value; + +! SELECT * FROM foo4; +contains:incompatible schema change + +> DROP TABLE foo4; + +# Test with multiple excluded columns. +$ mysql-execute name=mysql +CREATE TABLE bar (name VARCHAR(16), value VARCHAR(32), age BIGINT, location VARCHAR(32), meta_col VARCHAR(32)); +INSERT INTO bar VALUES ('a', 'apple', 5, 'orchard', 'blah'), ('b', 'banana', 7, 'tree', 'blahblah'); + +> CREATE TABLE bar1 FROM SOURCE mysql_src (REFERENCE public.bar) WITH (EXCLUDE COLUMNS = (meta_col, location)); + +> SELECT * FROM bar1; +a apple 5 +b banana 7 + +$ mysql-execute name=mysql +ALTER TABLE bar DROP COLUMN meta_col; + +> SELECT * FROM bar1; +a apple 5 +b banana 7 + +$ mysql-execute name=mysql +ALTER TABLE bar ADD COLUMN lastname VARCHAR(16) AFTER `name`; + +> SELECT * FROM bar1; +a apple 5 +b banana 7 + +> DROP SOURCE mysql_src CASCADE; From 313786153d3ef0f4f3d230fbd1e96207f70c53ea Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Tue, 21 Apr 2026 18:00:00 -0400 Subject: [PATCH 06/23] fix https://github.com/MaterializeInc/database-issues/issues/11312 --- src/mysql-util/src/decoding.rs | 40 +++++++++++------------ test/mysql-cdc/binlog-backward-compat.td | 41 ++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 20 deletions(-) diff --git a/src/mysql-util/src/decoding.rs b/src/mysql-util/src/decoding.rs index f0ff20f3d270b..081033736cc3d 100644 --- a/src/mysql-util/src/decoding.rs +++ b/src/mysql-util/src/decoding.rs @@ -37,37 +37,37 @@ pub fn pack_mysql_row( // This is a fallback for MySQL servers that do not have `binlog_row_metadata` set to // `FULL`. If the first column name does not begin with '@', then we can assume that // full metadata is available and we can match columns by name. - let row_values: Vec = if row + let zip_values: Vec> = if row .columns_ref() .first() .is_some_and(|col| col.name_ref().starts_with(b"@")) { - row.unwrap() + table_desc + .columns + .iter() + .zip_longest(row.unwrap()) + .collect() } else { - row.columns_ref() + table_desc + .columns .iter() - .enumerate() - .filter(|(_, col)| { - table_desc - .columns + .filter(|col| col.column_type.is_some()) + .map(|col| { + let pos = row + .columns_ref() .iter() - .filter(|col| col.column_type.is_some()) - .any(|c| c.name.as_str() == col.name_str()) - }) - .map(|(i, _)| { - row.as_ref(i) - .expect("Can't unwrap row if some of columns was taken") - .clone() + .position(|row_col| row_col.name_str() == col.name.as_str()) + .expect("column in table desc not found in row metadata"); + EitherOrBoth::Both( + col, + row.get(pos) + .expect("Can't unwrap row if some of columns was taken"), + ) }) .collect() }; - for values in table_desc - .columns - .iter() - .filter(|col| col.column_type.is_some()) - .zip_longest(row_values) - { + for values in zip_values { let (col_desc, value) = match values { EitherOrBoth::Both(col_desc, value) => (col_desc, value), EitherOrBoth::Left(col_desc) => { diff --git a/test/mysql-cdc/binlog-backward-compat.td b/test/mysql-cdc/binlog-backward-compat.td index 04115e15bfc6d..e4113285a8785 100644 --- a/test/mysql-cdc/binlog-backward-compat.td +++ b/test/mysql-cdc/binlog-backward-compat.td @@ -61,6 +61,47 @@ c cherry > DROP SOURCE mysql_src CASCADE; +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = FULL; +USE public; +CREATE TABLE bar (a INT, b INT, c INT); +INSERT INTO bar VALUES (1, 2, 3), (4, 5, 6); + +> CREATE SOURCE mysql_src2 FROM MYSQL CONNECTION mysql_conn; + +> CREATE TABLE bar FROM SOURCE mysql_src2 (REFERENCE public.bar) WITH (EXCLUDE COLUMNS (b)); + +> SELECT * FROM bar; +1 3 +4 6 + +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = MINIMAL; +INSERT INTO bar VALUES (7, 8, 9); + +> SELECT * FROM bar; +1 3 +4 6 +7 9 + +$ mysql-execute name=mysql +UPDATE bar SET c = 30 WHERE a = 1; + +> SELECT * FROM bar; +1 30 +4 6 +7 9 + +$ mysql-execute name=mysql +DELETE FROM bar WHERE a = 4; + +> SELECT * FROM bar; +1 30 +7 9 + +> DROP SOURCE mysql_src2 CASCADE; + + # Restore to a clean state so other tests are unaffected. $ mysql-execute name=mysql SET GLOBAL binlog_row_metadata = FULL; From c4fd1c1c19e811a4ef47d051c8ceb0183638d0cf Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Tue, 21 Apr 2026 18:00:35 -0400 Subject: [PATCH 07/23] fix https://github.com/MaterializeInc/database-issues/issues/11313 --- src/mysql-util/src/desc.rs | 7 ++++--- test/mysql-cdc/upstream-schema-changes.td | 14 +++++++++++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/mysql-util/src/desc.rs b/src/mysql-util/src/desc.rs index 73e16462365d9..9145dde89a96f 100644 --- a/src/mysql-util/src/desc.rs +++ b/src/mysql-util/src/desc.rs @@ -101,10 +101,11 @@ impl MySqlTableDesc { // If we do have full metadata, then we can match columns by name and just check that all // columns in `self.columns` are present and compatible with columns in `other.columns`. let mut other_columns = other.columns.iter(); - for self_column in &self.columns { + for self_column in self.columns.iter().filter(|col| col.column_type.is_some()) { let other_column = if full_metadata { - other_columns - .by_ref() + other + .columns + .iter() .find(|c| c.name == self_column.name) .ok_or_else(|| { anyhow::anyhow!( diff --git a/test/mysql-cdc/upstream-schema-changes.td b/test/mysql-cdc/upstream-schema-changes.td index da3ea47c77c2e..8bd4fef9794fe 100644 --- a/test/mysql-cdc/upstream-schema-changes.td +++ b/test/mysql-cdc/upstream-schema-changes.td @@ -103,7 +103,10 @@ f fig # dropping an excluded column should have no effect $ mysql-execute name=mysql ALTER TABLE foo DROP COLUMN meta_col; +INSERT INTO foo VALUES ('g', 'grape'); +# The INSERT after the DROP COLUMN forces the binlog to advance past the ALTER +# TABLE event, so the SELECT must succeed after the schema change is processed. > SELECT * FROM foo3; a apple b banana @@ -111,6 +114,7 @@ c cherry d date e elderberry f fig +g grape > DROP TABLE foo3; @@ -123,9 +127,10 @@ c cherry d date e elderberry f fig +g grape $ mysql-execute name=mysql -INSERT INTO foo VALUES ('g', 'grape'); +INSERT INTO foo VALUES ('h', 'honeydew'); > SELECT * FROM foo4; a apple @@ -158,16 +163,23 @@ b banana 7 $ mysql-execute name=mysql ALTER TABLE bar DROP COLUMN meta_col; +INSERT INTO bar VALUES ('c', 'cherry', 9, 'grove'); +# The INSERT after the DROP COLUMN forces the binlog to advance past the ALTER +# TABLE event, so the SELECT must succeed after the schema change is processed. > SELECT * FROM bar1; a apple 5 b banana 7 +c cherry 9 $ mysql-execute name=mysql ALTER TABLE bar ADD COLUMN lastname VARCHAR(16) AFTER `name`; +INSERT INTO bar VALUES ('d', 'date_lastname', 'date', 10, 'oasis'); > SELECT * FROM bar1; a apple 5 b banana 7 +c cherry 9 +d date 10 > DROP SOURCE mysql_src CASCADE; From c6dda80dd31716610ef2907e8fe56b6741f03a15 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Tue, 21 Apr 2026 18:01:16 -0400 Subject: [PATCH 08/23] fix https://github.com/MaterializeInc/database-issues/issues/11315 --- Cargo.lock | 1 + src/sql/src/pure.rs | 1 + src/storage/Cargo.toml | 1 + src/storage/src/source/mysql/schemas.rs | 22 ++++++++++++++++------ 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 02d75bd8044b3..40749cc6385b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8075,6 +8075,7 @@ dependencies = [ "tokio-util", "tracing", "uuid", + "version-compare", ] [[package]] diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 4f977e76ee2c5..eb4ab741db366 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -68,6 +68,7 @@ use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree} use rdkafka::admin::AdminClient; use references::{RetrievedSourceReferences, SourceReferenceClient}; use uuid::Uuid; +use version_compare; use crate::ast::{ AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement, diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index f96d9991511dc..83dbf36f91a24 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -91,6 +91,7 @@ tracing.workspace = true thiserror.workspace = true uuid = { workspace = true, features = ["serde", "v4"] } arrow-ipc.workspace = true +version-compare.workspace = true [dev-dependencies] async-trait.workspace = true diff --git a/src/storage/src/source/mysql/schemas.rs b/src/storage/src/source/mysql/schemas.rs index 379e8dc167e0b..18c90ed411c89 100644 --- a/src/storage/src/source/mysql/schemas.rs +++ b/src/storage/src/source/mysql/schemas.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use std::collections::{BTreeMap, BTreeSet}; +use version_compare; use mysql_async::prelude::Queryable; use mz_mysql_util::{MySqlError, SchemaRequest, schema_info}; @@ -45,12 +46,21 @@ where }) .collect(); - let full_metadata = conn - .query_first::("SELECT @@binlog_row_metadata".to_string()) - .await? - .unwrap() - .to_uppercase() - == "FULL"; + let full_metadata = version_compare::compare_to( + conn.query_first::("SELECT VERSION()".to_string()) + .await? + .unwrap() + .to_uppercase(), + "8.0.1", + version_compare::Cmp::Ge, + ) + .expect("failed to parse version string from mysql") + && conn + .query_first::("SELECT @@binlog_row_metadata".to_string()) + .await? + .unwrap() + .to_uppercase() + == "FULL"; Ok(expected .into_iter() From d2ee463db1061769c74bef341d1f4e9b2ad74257 Mon Sep 17 00:00:00 2001 From: Marty Kulma <18468315+martykulma@users.noreply.github.com> Date: Tue, 21 Apr 2026 13:45:30 -0400 Subject: [PATCH 09/23] mysql: log row shape on decoding error --- .../src/source/mysql/replication/events.rs | 74 ++++++++++++++++++- 1 file changed, 70 insertions(+), 4 deletions(-) diff --git a/src/storage/src/source/mysql/replication/events.rs b/src/storage/src/source/mysql/replication/events.rs index c6d30701971a8..2473c1e8ac715 100644 --- a/src/storage/src/source/mysql/replication/events.rs +++ b/src/storage/src/source/mysql/replication/events.rs @@ -7,9 +7,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::fmt::Write; + use maplit::btreemap; +use mysql_common::Value; use mysql_common::binlog::events::{QueryEvent, RowsEventData}; -use mz_mysql_util::{MySqlError, pack_mysql_row}; +use mz_mysql_util::{MySqlError, MySqlTableDesc, pack_mysql_row}; use mz_ore::iter::IteratorExt; use mz_repr::{Diff, Row}; use mz_storage_types::errors::DataflowError; @@ -292,6 +295,7 @@ pub(super) async fn handle_rows_event( for (binlog_row, diff) in updates.into_iter().flatten() { let row = mysql_async::Row::try_from(binlog_row)?; for (output, row_val) in outputs.iter().repeat_clone(row) { + let row_shape = describe_row_shape(&row_val, &output.desc); let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) { Ok(row) => Ok(SourceMessage { key: Row::default(), @@ -299,9 +303,17 @@ pub(super) async fn handle_rows_event( metadata: Row::default(), }), // Produce a DefiniteError in the stream for any rows that fail to decode - Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from( - DefiniteError::ValueDecodeError(err.to_string()), - )), + Err(err @ MySqlError::ValueDecodeError { .. }) => { + tracing::warn!( + %id, + "timely-{worker_id} decode error for {table:?} \ + output={} gtid={new_gtid:?}: {err}; row shape: {row_shape}", + output.output_index, + ); + Err(DataflowError::from(DefiniteError::ValueDecodeError( + err.to_string(), + ))) + } Err(err) => Err(err)?, }; @@ -354,3 +366,57 @@ pub(super) async fn handle_rows_event( Ok(()) } + +/// Describes the structural shape of a row without revealing any data values. +/// Emits per-column ordinal, the binlog wire type, the expected Materialize +/// scalar type from the table descriptor, the character-set id (or `binary`), +/// and a value disposition (`null` or `bytes(len=N)` / primitive kind). +/// Intended for diagnostic logging on decode errors: MySQL serializes CHAR, +/// VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`, so the wire type +/// tag and the expected scalar type are what distinguish them. +fn describe_row_shape(row: &mysql_async::Row, desc: &MySqlTableDesc) -> String { + let mut out = String::new(); + out.push('['); + for i in 0..row.len() { + if i > 0 { + out.push_str(", "); + } + let (wire_type, charset) = match row.columns_ref().get(i) { + Some(c) => { + let cs = c.character_set(); + // 63 = binary collation (binary/blob columns). + let cs_str = if cs == 63 { + "binary".to_string() + } else { + format!("charset={cs}") + }; + (format!("{:?}", c.column_type()), cs_str) + } + None => ("?".to_string(), "charset=?".to_string()), + }; + let expected = match desc.columns.get(i) { + Some(col) => match &col.column_type { + Some(ct) => format!("{:?}", ct.scalar_type), + None => "ignored".to_string(), + }, + None => "?".to_string(), + }; + let val_desc = match row.as_ref(i) { + None => "absent".to_string(), + Some(Value::NULL) => "null".to_string(), + Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()), + Some(Value::Int(_)) => "int".to_string(), + Some(Value::UInt(_)) => "uint".to_string(), + Some(Value::Float(_)) => "float".to_string(), + Some(Value::Double(_)) => "double".to_string(), + Some(Value::Date(..)) => "date".to_string(), + Some(Value::Time(..)) => "time".to_string(), + }; + let _ = write!( + out, + "{{ord={i}, wire={wire_type}, {charset}, expected={expected}, val={val_desc}}}" + ); + } + out.push(']'); + out +} From e09257b713d545224d9e360379876d81b8ec6b4b Mon Sep 17 00:00:00 2001 From: Marty Kulma <18468315+martykulma@users.noreply.github.com> Date: Tue, 21 Apr 2026 17:17:52 -0400 Subject: [PATCH 10/23] Move logging further down to avoid string allocation in hot path --- src/mysql-util/src/decoding.rs | 180 ++++++++++++++---- .../src/source/mysql/replication/events.rs | 94 ++------- src/storage/src/source/mysql/snapshot.rs | 30 +-- 3 files changed, 169 insertions(+), 135 deletions(-) diff --git a/src/mysql-util/src/decoding.rs b/src/mysql-util/src/decoding.rs index 081033736cc3d..09d7da3347bb1 100644 --- a/src/mysql-util/src/decoding.rs +++ b/src/mysql-util/src/decoding.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::fmt::Write; use std::str::FromStr; use itertools::{EitherOrBoth, Itertools}; @@ -28,6 +29,7 @@ pub fn pack_mysql_row( row_container: &mut Row, row: MySqlRow, table_desc: &MySqlTableDesc, + gtid_set: Option<&str>, ) -> Result { let mut packer = row_container.packer(); @@ -37,49 +39,41 @@ pub fn pack_mysql_row( // This is a fallback for MySQL servers that do not have `binlog_row_metadata` set to // `FULL`. If the first column name does not begin with '@', then we can assume that // full metadata is available and we can match columns by name. - let zip_values: Vec> = if row + let fallback_names = row .columns_ref() .first() - .is_some_and(|col| col.name_ref().starts_with(b"@")) - { - table_desc - .columns - .iter() - .zip_longest(row.unwrap()) - .collect() + .is_some_and(|col| col.name_ref().starts_with(b"@")); + // Wire indices of `row` to decode, in iteration order. Keeping indices + // (rather than moving values out via `row.unwrap()`) lets us describe the + // row's shape on a decode error without paying the allocation cost on the + // happy path. + let active_indices: Vec = if fallback_names { + (0..row.len()).collect() } else { - table_desc - .columns + row.columns_ref() .iter() - .filter(|col| col.column_type.is_some()) - .map(|col| { - let pos = row - .columns_ref() + .enumerate() + .filter(|(_, col)| { + table_desc + .columns .iter() - .position(|row_col| row_col.name_str() == col.name.as_str()) - .expect("column in table desc not found in row metadata"); - EitherOrBoth::Both( - col, - row.get(pos) - .expect("Can't unwrap row if some of columns was taken"), - ) + .any(|c| c.name.as_str() == col.name_str()) }) + .map(|(i, _)| i) .collect() }; - for values in zip_values { - let (col_desc, value) = match values { - EitherOrBoth::Both(col_desc, value) => (col_desc, value), + for pair in table_desc.columns.iter().zip_longest(&active_indices) { + let (col_desc, wire_idx) = match pair { + EitherOrBoth::Both(col_desc, &idx) => (col_desc, idx), EitherOrBoth::Left(col_desc) => { - tracing::error!( - "mysql: extra column description {col_desc:?} for table {}", - table_desc.name - ); - Err(MySqlError::ValueDecodeError { - column_name: col_desc.name.clone(), - qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name), - error: "extra column description".to_string(), - })? + return Err(decode_error( + "extra column description", + col_desc, + table_desc, + gtid_set, + &row, + )); } EitherOrBoth::Right(_) => { // If there are extra columns on the upstream table we can safely ignore them @@ -90,19 +84,123 @@ pub fn pack_mysql_row( // This column is ignored, so don't decode it. continue; } - match pack_val_as_datum(value, col_desc, &mut packer) { - Err(err) => Err(MySqlError::ValueDecodeError { - column_name: col_desc.name.clone(), - qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name), - error: err.to_string(), - })?, - Ok(()) => (), - }; + let value = row + .as_ref(wire_idx) + .expect("active_indices is within row bounds") + .clone(); + if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) { + return Err(decode_error( + &err.to_string(), + col_desc, + table_desc, + gtid_set, + &row, + )); + } } Ok(row_container.clone()) } +/// Build a `ValueDecodeError`, logging the schema, table, column, source +/// gtid_set (if any), and a shape description of `row` at the same time. +/// The shape string is only built here — pack_mysql_row's happy path does no +/// per-row allocation beyond what decoding requires. +fn decode_error( + err_msg: &str, + col_desc: &MySqlColumnDesc, + table_desc: &MySqlTableDesc, + gtid_set: Option<&str>, + row: &MySqlRow, +) -> MySqlError { + let row_shape = describe_row_shape(row, table_desc); + tracing::warn!( + "mysql decode error for `{}`.`{}` column `{}`: {}; gtid_set={:?}; row_shape={}", + table_desc.schema_name, + table_desc.name, + col_desc.name, + err_msg, + gtid_set, + row_shape, + ); + MySqlError::ValueDecodeError { + column_name: col_desc.name.clone(), + qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name), + error: err_msg.to_string(), + } +} + +/// Describes the structural shape of a row without revealing any data values. +/// Iterates every wire column. For each, emits the wire name, the binlog +/// wire type, the character-set id (or `binary`), a classification relative +/// to `table_desc` (`expected=` for active columns, `ignored` for +/// columns excluded from the source, `extra` for upstream columns with no +/// descriptor entry), and a value disposition (`null` or `bytes(len=N)` / +/// primitive kind). Intended for diagnostic logging on decode errors: MySQL +/// serializes CHAR, VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`, +/// so the wire type tag and the expected scalar type are what distinguish +/// them. +fn describe_row_shape(row: &MySqlRow, table_desc: &MySqlTableDesc) -> String { + // Binlogs without full row metadata use positional "@N" names, so we + // have to match by wire position rather than by name. + let fallback_names = row + .columns_ref() + .first() + .is_some_and(|col| col.name_ref().starts_with(b"@")); + + let mut out = String::new(); + out.push('['); + for (i, wire_col) in row.columns_ref().iter().enumerate() { + if i > 0 { + out.push_str(", "); + } + let wire_name = wire_col.name_str(); + let cs = wire_col.character_set(); + // 63 = binary collation (binary/blob columns). + let cs_str = if cs == 63 { + "binary".to_string() + } else { + format!("charset={cs}") + }; + let wire_type = format!("{:?}", wire_col.column_type()); + + let matched_col = if fallback_names { + table_desc.columns.get(i) + } else { + table_desc + .columns + .iter() + .find(|c| c.name.as_str() == wire_name) + }; + let match_info = match matched_col { + Some(col) => match &col.column_type { + Some(ct) => format!("expected={:?}", ct.scalar_type), + None => "ignored".to_string(), + }, + None => "extra".to_string(), + }; + + let val_desc = match row.as_ref(i) { + None => "absent".to_string(), + Some(Value::NULL) => "null".to_string(), + Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()), + Some(Value::Int(_)) => "int".to_string(), + Some(Value::UInt(_)) => "uint".to_string(), + Some(Value::Float(_)) => "float".to_string(), + Some(Value::Double(_)) => "double".to_string(), + Some(Value::Date(..)) => "date".to_string(), + Some(Value::Time(..)) => "time".to_string(), + }; + + let _ = write!( + out, + "{{name={wire_name}, wire={wire_type}, {cs_str}, {match_info}, val={val_desc}}}" + ); + } + out.push(']'); + out +} + // TODO(guswynn|roshan): This function has various `.to_string()` and `format!` calls that should // use a shared allocation if possible. fn pack_val_as_datum( diff --git a/src/storage/src/source/mysql/replication/events.rs b/src/storage/src/source/mysql/replication/events.rs index 2473c1e8ac715..5b6c82c0a621e 100644 --- a/src/storage/src/source/mysql/replication/events.rs +++ b/src/storage/src/source/mysql/replication/events.rs @@ -7,12 +7,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::fmt::Write; - use maplit::btreemap; -use mysql_common::Value; use mysql_common::binlog::events::{QueryEvent, RowsEventData}; -use mz_mysql_util::{MySqlError, MySqlTableDesc, pack_mysql_row}; +use mz_mysql_util::{MySqlError, pack_mysql_row}; use mz_ore::iter::IteratorExt; use mz_repr::{Diff, Row}; use mz_storage_types::errors::DataflowError; @@ -292,30 +289,23 @@ pub(super) async fn handle_rows_event( before_row.map(|r| (r, Diff::MINUS_ONE)), after_row.map(|r| (r, Diff::ONE)), ]; + let gtid_str = format!("{new_gtid:?}"); for (binlog_row, diff) in updates.into_iter().flatten() { let row = mysql_async::Row::try_from(binlog_row)?; for (output, row_val) in outputs.iter().repeat_clone(row) { - let row_shape = describe_row_shape(&row_val, &output.desc); - let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) { - Ok(row) => Ok(SourceMessage { - key: Row::default(), - value: row, - metadata: Row::default(), - }), - // Produce a DefiniteError in the stream for any rows that fail to decode - Err(err @ MySqlError::ValueDecodeError { .. }) => { - tracing::warn!( - %id, - "timely-{worker_id} decode error for {table:?} \ - output={} gtid={new_gtid:?}: {err}; row shape: {row_shape}", - output.output_index, - ); - Err(DataflowError::from(DefiniteError::ValueDecodeError( - err.to_string(), - ))) - } - Err(err) => Err(err)?, - }; + let event = + match pack_mysql_row(&mut final_row, row_val, &output.desc, Some(>id_str)) { + Ok(row) => Ok(SourceMessage { + key: Row::default(), + value: row, + metadata: Row::default(), + }), + // Produce a DefiniteError in the stream for any rows that fail to decode + Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from( + DefiniteError::ValueDecodeError(err.to_string()), + )), + Err(err) => Err(err)?, + }; let data = (output.output_index, event); @@ -366,57 +356,3 @@ pub(super) async fn handle_rows_event( Ok(()) } - -/// Describes the structural shape of a row without revealing any data values. -/// Emits per-column ordinal, the binlog wire type, the expected Materialize -/// scalar type from the table descriptor, the character-set id (or `binary`), -/// and a value disposition (`null` or `bytes(len=N)` / primitive kind). -/// Intended for diagnostic logging on decode errors: MySQL serializes CHAR, -/// VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`, so the wire type -/// tag and the expected scalar type are what distinguish them. -fn describe_row_shape(row: &mysql_async::Row, desc: &MySqlTableDesc) -> String { - let mut out = String::new(); - out.push('['); - for i in 0..row.len() { - if i > 0 { - out.push_str(", "); - } - let (wire_type, charset) = match row.columns_ref().get(i) { - Some(c) => { - let cs = c.character_set(); - // 63 = binary collation (binary/blob columns). - let cs_str = if cs == 63 { - "binary".to_string() - } else { - format!("charset={cs}") - }; - (format!("{:?}", c.column_type()), cs_str) - } - None => ("?".to_string(), "charset=?".to_string()), - }; - let expected = match desc.columns.get(i) { - Some(col) => match &col.column_type { - Some(ct) => format!("{:?}", ct.scalar_type), - None => "ignored".to_string(), - }, - None => "?".to_string(), - }; - let val_desc = match row.as_ref(i) { - None => "absent".to_string(), - Some(Value::NULL) => "null".to_string(), - Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()), - Some(Value::Int(_)) => "int".to_string(), - Some(Value::UInt(_)) => "uint".to_string(), - Some(Value::Float(_)) => "float".to_string(), - Some(Value::Double(_)) => "double".to_string(), - Some(Value::Date(..)) => "date".to_string(), - Some(Value::Time(..)) => "time".to_string(), - }; - let _ = write!( - out, - "{{ord={i}, wire={wire_type}, {charset}, expected={expected}, val={val_desc}}}" - ); - } - out.push(']'); - out -} diff --git a/src/storage/src/source/mysql/snapshot.rs b/src/storage/src/source/mysql/snapshot.rs index ec7aceb420982..cd7ebad917e6a 100644 --- a/src/storage/src/source/mysql/snapshot.rs +++ b/src/storage/src/source/mysql/snapshot.rs @@ -417,21 +417,21 @@ pub(crate) fn render<'scope>( let row: MySqlRow = row; snapshot_staged += 1; for (output, row_val) in outputs.iter().repeat_clone(row) { - let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) - { - Ok(row) => Ok(SourceMessage { - key: Row::default(), - value: row, - metadata: Row::default(), - }), - // Produce a DefiniteError in the stream for any rows that fail to decode - Err(err @ MySqlError::ValueDecodeError { .. }) => { - Err(DataflowError::from(DefiniteError::ValueDecodeError( - err.to_string(), - ))) - } - Err(err) => Err(err)?, - }; + let event = + match pack_mysql_row(&mut final_row, row_val, &output.desc, None) { + Ok(row) => Ok(SourceMessage { + key: Row::default(), + value: row, + metadata: Row::default(), + }), + // Produce a DefiniteError in the stream for any rows that fail to decode + Err(err @ MySqlError::ValueDecodeError { .. }) => { + Err(DataflowError::from(DefiniteError::ValueDecodeError( + err.to_string(), + ))) + } + Err(err) => Err(err)?, + }; let update = ( (output.output_index, event), GtidPartition::minimum(), From ff7741013b8eb6e9aa76df61c53011d41cb30375 Mon Sep 17 00:00:00 2001 From: Marty Kulma <18468315+martykulma@users.noreply.github.com> Date: Tue, 21 Apr 2026 18:01:02 -0400 Subject: [PATCH 11/23] Make column iteration based on table_desc --- src/mysql-util/src/decoding.rs | 56 +++++++++++++--------------------- 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/src/mysql-util/src/decoding.rs b/src/mysql-util/src/decoding.rs index 09d7da3347bb1..922aa30230bb2 100644 --- a/src/mysql-util/src/decoding.rs +++ b/src/mysql-util/src/decoding.rs @@ -10,7 +10,6 @@ use std::fmt::Write; use std::str::FromStr; -use itertools::{EitherOrBoth, Itertools}; use mysql_common::value::convert::from_value_opt; use mysql_common::{Row as MySqlRow, Value}; @@ -43,50 +42,39 @@ pub fn pack_mysql_row( .columns_ref() .first() .is_some_and(|col| col.name_ref().starts_with(b"@")); - // Wire indices of `row` to decode, in iteration order. Keeping indices - // (rather than moving values out via `row.unwrap()`) lets us describe the - // row's shape on a decode error without paying the allocation cost on the - // happy path. - let active_indices: Vec = if fallback_names { - (0..row.len()).collect() - } else { - row.columns_ref() - .iter() - .enumerate() - .filter(|(_, col)| { - table_desc - .columns - .iter() - .any(|c| c.name.as_str() == col.name_str()) - }) - .map(|(i, _)| i) - .collect() - }; - for pair in table_desc.columns.iter().zip_longest(&active_indices) { - let (col_desc, wire_idx) = match pair { - EitherOrBoth::Both(col_desc, &idx) => (col_desc, idx), - EitherOrBoth::Left(col_desc) => { + // For each column in `table_desc` (in descriptor order), resolve its wire + // index. Non-fallback rows are matched by name so a reordered upstream + // still decodes correctly; fallback rows have no names and are matched + // positionally. A `None` here means the upstream row is missing this + // column and is only tolerated for ignored columns. + for (i, col_desc) in table_desc.columns.iter().enumerate() { + let wire_idx = if fallback_names { + (i < row.len()).then_some(i) + } else { + row.columns_ref() + .iter() + .position(|wc| wc.name_str() == col_desc.name.as_str()) + }; + if col_desc.column_type.is_none() { + // This column is ignored, so don't decode it. + continue; + } + let wire_idx = match wire_idx { + Some(idx) => idx, + None => { return Err(decode_error( - "extra column description", + "upstream row is missing column", col_desc, table_desc, gtid_set, &row, )); } - EitherOrBoth::Right(_) => { - // If there are extra columns on the upstream table we can safely ignore them - break; - } }; - if col_desc.column_type.is_none() { - // This column is ignored, so don't decode it. - continue; - } let value = row .as_ref(wire_idx) - .expect("active_indices is within row bounds") + .expect("wire_idx resolved from row") .clone(); if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) { return Err(decode_error( From 622d420a66b0fea967607b340cf227448eaa7fb9 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Tue, 21 Apr 2026 18:52:51 -0400 Subject: [PATCH 12/23] fix exclude columns test --- test/mysql-cdc/35-exclude-columns.td | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/mysql-cdc/35-exclude-columns.td b/test/mysql-cdc/35-exclude-columns.td index 98efb4c91fa12..ca0e3007610d1 100644 --- a/test/mysql-cdc/35-exclude-columns.td +++ b/test/mysql-cdc/35-exclude-columns.td @@ -68,5 +68,6 @@ contains:column "f2" does not exist $ mysql-execute name=mysql ALTER TABLE t1 DROP COLUMN f2; -! select * from t1; -contains:incompatible schema change +> select * from t1; +1 "test" +1 "test" From 13a8617c43c4f0fa6cb2a732c5d9cd8502a3c0eb Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Mon, 27 Apr 2026 14:37:42 -0400 Subject: [PATCH 13/23] update decoding to check binlog setting from source creation Co-authored-by: Copilot --- src/mysql-util/src/decoding.rs | 13 ++++++- src/storage/src/source/mysql.rs | 2 ++ .../src/source/mysql/replication/events.rs | 31 +++++++++------- src/storage/src/source/mysql/snapshot.rs | 36 +++++++++++-------- 4 files changed, 53 insertions(+), 29 deletions(-) diff --git a/src/mysql-util/src/decoding.rs b/src/mysql-util/src/decoding.rs index 922aa30230bb2..c42f01b23b914 100644 --- a/src/mysql-util/src/decoding.rs +++ b/src/mysql-util/src/decoding.rs @@ -10,6 +10,7 @@ use std::fmt::Write; use std::str::FromStr; +use mysql_async::binlog; use mysql_common::value::convert::from_value_opt; use mysql_common::{Row as MySqlRow, Value}; @@ -29,6 +30,7 @@ pub fn pack_mysql_row( row: MySqlRow, table_desc: &MySqlTableDesc, gtid_set: Option<&str>, + binlog_full_metadata: bool, ) -> Result { let mut packer = row_container.packer(); @@ -43,13 +45,22 @@ pub fn pack_mysql_row( .first() .is_some_and(|col| col.name_ref().starts_with(b"@")); + if binlog_full_metadata && fallback_names { + // This should never happen, but if it does, it's a sign that something is very wrong with the MySQL server's binlog configuration. We want to error rather than silently producing incorrect results. + return Err(MySqlError::ValueDecodeError { + column_name: "".to_string(), + qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name), + error: "Table was created with binlog_row_metadata=FULL but binlog_row_metadata has since been set to a different value, meaning we cannot reliably decode the columns".to_string(), + }); + } + // For each column in `table_desc` (in descriptor order), resolve its wire // index. Non-fallback rows are matched by name so a reordered upstream // still decodes correctly; fallback rows have no names and are matched // positionally. A `None` here means the upstream row is missing this // column and is only tolerated for ignored columns. for (i, col_desc) in table_desc.columns.iter().enumerate() { - let wire_idx = if fallback_names { + let wire_idx = if !binlog_full_metadata { (i < row.len()).then_some(i) } else { row.columns_ref() diff --git a/src/storage/src/source/mysql.rs b/src/storage/src/source/mysql.rs index 67555a3090bd7..40e9af10d7999 100644 --- a/src/storage/src/source/mysql.rs +++ b/src/storage/src/source/mysql.rs @@ -151,6 +151,7 @@ impl SourceRender for MySqlSourceConnection { initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"), resume_upper, export_id: id.clone(), + binlog_full_metadata: details.binlog_full_metadata, }); } @@ -257,6 +258,7 @@ struct SourceOutputInfo { initial_gtid_set: Antichain, resume_upper: Antichain, export_id: GlobalId, + binlog_full_metadata: bool, } #[derive(Clone, Debug, thiserror::Error)] diff --git a/src/storage/src/source/mysql/replication/events.rs b/src/storage/src/source/mysql/replication/events.rs index 5b6c82c0a621e..8e31dc6654e3a 100644 --- a/src/storage/src/source/mysql/replication/events.rs +++ b/src/storage/src/source/mysql/replication/events.rs @@ -293,19 +293,24 @@ pub(super) async fn handle_rows_event( for (binlog_row, diff) in updates.into_iter().flatten() { let row = mysql_async::Row::try_from(binlog_row)?; for (output, row_val) in outputs.iter().repeat_clone(row) { - let event = - match pack_mysql_row(&mut final_row, row_val, &output.desc, Some(>id_str)) { - Ok(row) => Ok(SourceMessage { - key: Row::default(), - value: row, - metadata: Row::default(), - }), - // Produce a DefiniteError in the stream for any rows that fail to decode - Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from( - DefiniteError::ValueDecodeError(err.to_string()), - )), - Err(err) => Err(err)?, - }; + let event = match pack_mysql_row( + &mut final_row, + row_val, + &output.desc, + Some(>id_str), + output.binlog_full_metadata, + ) { + Ok(row) => Ok(SourceMessage { + key: Row::default(), + value: row, + metadata: Row::default(), + }), + // Produce a DefiniteError in the stream for any rows that fail to decode + Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from( + DefiniteError::ValueDecodeError(err.to_string()), + )), + Err(err) => Err(err)?, + }; let data = (output.output_index, event); diff --git a/src/storage/src/source/mysql/snapshot.rs b/src/storage/src/source/mysql/snapshot.rs index cd7ebad917e6a..31c37e9c213b1 100644 --- a/src/storage/src/source/mysql/snapshot.rs +++ b/src/storage/src/source/mysql/snapshot.rs @@ -417,21 +417,26 @@ pub(crate) fn render<'scope>( let row: MySqlRow = row; snapshot_staged += 1; for (output, row_val) in outputs.iter().repeat_clone(row) { - let event = - match pack_mysql_row(&mut final_row, row_val, &output.desc, None) { - Ok(row) => Ok(SourceMessage { - key: Row::default(), - value: row, - metadata: Row::default(), - }), - // Produce a DefiniteError in the stream for any rows that fail to decode - Err(err @ MySqlError::ValueDecodeError { .. }) => { - Err(DataflowError::from(DefiniteError::ValueDecodeError( - err.to_string(), - ))) - } - Err(err) => Err(err)?, - }; + let event = match pack_mysql_row( + &mut final_row, + row_val, + &output.desc, + None, + output.binlog_full_metadata, + ) { + Ok(row) => Ok(SourceMessage { + key: Row::default(), + value: row, + metadata: Row::default(), + }), + // Produce a DefiniteError in the stream for any rows that fail to decode + Err(err @ MySqlError::ValueDecodeError { .. }) => { + Err(DataflowError::from(DefiniteError::ValueDecodeError( + err.to_string(), + ))) + } + Err(err) => Err(err)?, + }; let update = ( (output.output_index, event), GtidPartition::minimum(), @@ -603,6 +608,7 @@ mod tests { initial_gtid_set: Antichain::default(), resume_upper: Antichain::default(), export_id: mz_repr::GlobalId::User(1), + binlog_full_metadata: false, }; let query = build_snapshot_query(&[info.clone(), info]); assert_eq!( From 7fb00f5b22b24bbc611eb7d499be1e59ee92317b Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Mon, 27 Apr 2026 14:48:20 -0400 Subject: [PATCH 14/23] update schema verification to use locked binlog setting --- src/mysql-util/src/decoding.rs | 1 - src/storage/src/source/mysql/schemas.rs | 22 ++++------------------ 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/src/mysql-util/src/decoding.rs b/src/mysql-util/src/decoding.rs index c42f01b23b914..fb07f6c22aca0 100644 --- a/src/mysql-util/src/decoding.rs +++ b/src/mysql-util/src/decoding.rs @@ -10,7 +10,6 @@ use std::fmt::Write; use std::str::FromStr; -use mysql_async::binlog; use mysql_common::value::convert::from_value_opt; use mysql_common::{Row as MySqlRow, Value}; diff --git a/src/storage/src/source/mysql/schemas.rs b/src/storage/src/source/mysql/schemas.rs index 18c90ed411c89..0bd1fc2db2b31 100644 --- a/src/storage/src/source/mysql/schemas.rs +++ b/src/storage/src/source/mysql/schemas.rs @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0. use std::collections::{BTreeMap, BTreeSet}; -use version_compare; use mysql_async::prelude::Queryable; use mz_mysql_util::{MySqlError, SchemaRequest, schema_info}; @@ -46,22 +45,6 @@ where }) .collect(); - let full_metadata = version_compare::compare_to( - conn.query_first::("SELECT VERSION()".to_string()) - .await? - .unwrap() - .to_uppercase(), - "8.0.1", - version_compare::Cmp::Ge, - ) - .expect("failed to parse version string from mysql") - && conn - .query_first::("SELECT @@binlog_row_metadata".to_string()) - .await? - .unwrap() - .to_uppercase() - == "FULL"; - Ok(expected .into_iter() .flat_map(|(table, outputs)| { @@ -82,7 +65,10 @@ where ); match new_desc { Ok(desc) => { - match output.desc.determine_compatibility(&desc, full_metadata) { + match output + .desc + .determine_compatibility(&desc, output.binlog_full_metadata) + { Ok(()) => None, Err(err) => Some(( output, From 7adc9979a06f1db6b7e447c80cf14c9016202918 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Mon, 27 Apr 2026 16:03:52 -0400 Subject: [PATCH 15/23] update schema compat logic --- src/mysql-util/src/desc.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/mysql-util/src/desc.rs b/src/mysql-util/src/desc.rs index 9145dde89a96f..094358b39eeb5 100644 --- a/src/mysql-util/src/desc.rs +++ b/src/mysql-util/src/desc.rs @@ -101,7 +101,7 @@ impl MySqlTableDesc { // If we do have full metadata, then we can match columns by name and just check that all // columns in `self.columns` are present and compatible with columns in `other.columns`. let mut other_columns = other.columns.iter(); - for self_column in self.columns.iter().filter(|col| col.column_type.is_some()) { + for self_column in self.columns.iter() { let other_column = if full_metadata { other .columns @@ -123,6 +123,10 @@ impl MySqlTableDesc { ) })? }; + if self_column.column_type.is_none() { + // This is an excluded column and can be ignored. + continue; + } if !self_column.is_compatible(other_column) { bail!( "column {} in table {} has been altered", From d123410f274aabe312b476a4cb7779b170492ac5 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Mon, 27 Apr 2026 16:52:28 -0400 Subject: [PATCH 16/23] fix excluded col handling in desc Co-authored-by: Copilot --- src/mysql-util/src/desc.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/mysql-util/src/desc.rs b/src/mysql-util/src/desc.rs index 094358b39eeb5..dc71b125df248 100644 --- a/src/mysql-util/src/desc.rs +++ b/src/mysql-util/src/desc.rs @@ -103,6 +103,11 @@ impl MySqlTableDesc { let mut other_columns = other.columns.iter(); for self_column in self.columns.iter() { let other_column = if full_metadata { + if self_column.column_type.is_none() { + // This is an excluded column and can be ignored, as it may not have a + // corresponding column in `other.columns` if the column was dropped upstream. + continue; + } other .columns .iter() From bdf28518b1a6dd2f6e8b7fe4ada4e1ee59991488 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Tue, 28 Apr 2026 14:13:19 -0400 Subject: [PATCH 17/23] address comments - split binlog check to helper func, add new error type, use better str cmp function Co-authored-by: Copilot --- src/sql/src/pure.rs | 48 ++++----------------------------------- src/sql/src/pure/error.rs | 4 ++++ src/sql/src/pure/mysql.rs | 29 +++++++++++++++++++++++ 3 files changed, 38 insertions(+), 43 deletions(-) diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index eb4ab741db366..60affc17e2f57 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -85,6 +85,7 @@ use crate::plan::error::PlanError; use crate::plan::statement::ddl::load_generator_ast_to_generator; use crate::plan::{SourceReferences, StatementContext}; use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError}; +use crate::pure::mysql::ensure_binlog_full_metadata; use crate::session::vars::ENABLE_SQL_SERVER_SOURCE; use crate::{kafka_util, normalize}; @@ -1103,16 +1104,7 @@ async fn purify_create_source( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; - let binlog_full_metadata = version_compare::compare_to( - mz_mysql_util::query_sys_var(&mut conn, "version").await?, - "8.0.1", - version_compare::Cmp::Ge, - ) - .is_ok_and(|is_ge| is_ge) - && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata") - .await? - .to_uppercase() - == "FULL"; + let binlog_full_metadata = ensure_binlog_full_metadata(&mut conn).await.is_ok(); let reference_client = SourceReferenceClient::MySql { conn: &mut conn, @@ -1540,16 +1532,7 @@ async fn purify_alter_source_add_subsources( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; - let binlog_full_metadata = version_compare::compare_to( - mz_mysql_util::query_sys_var(&mut conn, "version").await?, - "8.0.1", - version_compare::Cmp::Ge, - ) - .is_ok_and(|is_ge| is_ge) - && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata") - .await? - .to_uppercase() - == "FULL"; + let binlog_full_metadata = ensure_binlog_full_metadata(&mut conn).await.is_ok(); let requested_references = Some(ExternalReferences::SubsetTables(external_references)); @@ -1908,29 +1891,8 @@ async fn purify_create_table_from_source( ) .await?; - if version_compare::compare_to( - mz_mysql_util::query_sys_var(&mut conn, "version").await?, - "8.0.1", - version_compare::Cmp::Lt, - ) - .is_ok_and(|is_lt| is_lt) - { - Err( - MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { - setting: "Using MySQL version < 8.0.1, which does not support full binlog row metadata".to_string(), - }, - )?; - } - let binlog_metadata_setting = - mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await?; - let binlog_full_metadata = binlog_metadata_setting.to_uppercase() == "FULL"; - if !binlog_full_metadata { - Err( - MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { - setting: binlog_metadata_setting, - }, - )?; - } + ensure_binlog_full_metadata(&mut conn).await?; + let binlog_full_metadata = true; // Retrieve the current @gtid_executed value of the server to mark as the effective // initial snapshot point for this table. diff --git a/src/sql/src/pure/error.rs b/src/sql/src/pure/error.rs index 2e7c4be77efaf..a055f188ad805 100644 --- a/src/sql/src/pure/error.rs +++ b/src/sql/src/pure/error.rs @@ -314,6 +314,10 @@ pub enum MySqlSourcePurificationError { "The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources." )] UnsupportedBinlogMetadataSetting { setting: String }, + #[error( + "The MySQL version is unsupported for this operation. Materialize requires MySQL 8.0.1 or later to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources." + )] + UnsupportedMySqlVersion, } impl MySqlSourcePurificationError { diff --git a/src/sql/src/pure/mysql.rs b/src/sql/src/pure/mysql.rs index 7e6814d11711c..6ee6dc5dc025f 100644 --- a/src/sql/src/pure/mysql.rs +++ b/src/sql/src/pure/mysql.rs @@ -515,3 +515,32 @@ pub(super) fn references_system_schemas(requested_references: &Option false, } } + +pub async fn ensure_binlog_full_metadata( + conn: &mut mz_mysql_util::MySqlConn, +) -> Result<(), MySqlSourcePurificationError> { + if version_compare::compare_to( + mz_mysql_util::query_sys_var(conn, "version") + .await + .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .is_ok_and(|is_lt| is_lt) + { + Err(MySqlSourcePurificationError::UnsupportedMySqlVersion)?; + } + let binlog_metadata_setting = mz_mysql_util::query_sys_var(conn, "binlog_row_metadata") + .await + .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?; + let binlog_full_metadata = binlog_metadata_setting.eq_ignore_ascii_case("FULL"); + if !binlog_full_metadata { + Err( + MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { + setting: binlog_metadata_setting, + }, + ) + } else { + Ok(()) + } +} From bdaf1d2a52ef8ef72d11949725e1a20c12f1685f Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Tue, 28 Apr 2026 17:01:21 -0400 Subject: [PATCH 18/23] lint --- src/sql/src/pure.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 60affc17e2f57..61b84918705f1 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -68,7 +68,6 @@ use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree} use rdkafka::admin::AdminClient; use references::{RetrievedSourceReferences, SourceReferenceClient}; use uuid::Uuid; -use version_compare; use crate::ast::{ AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement, From c40697b40940fc1347e9ce3947d97efa348f51f4 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Wed, 29 Apr 2026 08:43:00 -0400 Subject: [PATCH 19/23] update backcompat test for new decoding logic --- test/mysql-cdc/binlog-backward-compat.td | 66 +++++++++++++++++------- 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/test/mysql-cdc/binlog-backward-compat.td b/test/mysql-cdc/binlog-backward-compat.td index e4113285a8785..aa24f79a16939 100644 --- a/test/mysql-cdc/binlog-backward-compat.td +++ b/test/mysql-cdc/binlog-backward-compat.td @@ -27,17 +27,17 @@ SET GLOBAL binlog_row_metadata = MINIMAL; DROP DATABASE IF EXISTS public; CREATE DATABASE public; USE public; -CREATE TABLE foo (name VARCHAR(16), value VARCHAR(32)); -INSERT INTO foo VALUES ('a', 'apple'), ('b', 'banana'); +CREATE TABLE foo (name VARCHAR(16), age INT, value VARCHAR(32)); +INSERT INTO foo VALUES ('a', 1, 'apple'), ('b', 2, 'banana'); -> CREATE SOURCE mysql_src FROM MYSQL CONNECTION mysql_conn FOR TABLES (foo); +> CREATE SOURCE mysql_src FROM MYSQL CONNECTION mysql_conn (EXCLUDE COLUMNS (public.foo.age)) FOR TABLES (public.foo); > SELECT * FROM foo; a apple b banana $ mysql-execute name=mysql -INSERT INTO foo VALUES ('c', 'cherry'); +INSERT INTO foo VALUES ('c', 3, 'cherry'); > SELECT * FROM foo; a apple @@ -59,13 +59,17 @@ DELETE FROM foo WHERE name = 'b'; a avocado c cherry -> DROP SOURCE mysql_src CASCADE; - $ mysql-execute name=mysql SET GLOBAL binlog_row_metadata = FULL; USE public; CREATE TABLE bar (a INT, b INT, c INT); INSERT INTO bar VALUES (1, 2, 3), (4, 5, 6); +INSERT INTO foo VALUES ('d', 4, 'date'); + +> SELECT * FROM foo; +a avocado +c cherry +d date > CREATE SOURCE mysql_src2 FROM MYSQL CONNECTION mysql_conn; @@ -78,30 +82,52 @@ INSERT INTO bar VALUES (1, 2, 3), (4, 5, 6); $ mysql-execute name=mysql SET GLOBAL binlog_row_metadata = MINIMAL; INSERT INTO bar VALUES (7, 8, 9); +INSERT INTO foo VALUES ('e', 5, 'eggplant'); -> SELECT * FROM bar; -1 3 -4 6 -7 9 +> SELECT * FROM foo; +a avocado +c cherry +d date +e eggplant + +! SELECT * FROM bar; +contains:binlog_row_metadata has since been set to a different value $ mysql-execute name=mysql UPDATE bar SET c = 30 WHERE a = 1; -> SELECT * FROM bar; -1 30 -4 6 -7 9 +! SELECT * FROM bar; +contains:binlog_row_metadata has since been set to a different value $ mysql-execute name=mysql DELETE FROM bar WHERE a = 4; -> SELECT * FROM bar; -1 30 -7 9 +! SELECT * FROM bar; +contains:binlog_row_metadata has since been set to a different value -> DROP SOURCE mysql_src2 CASCADE; +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = FULL; +INSERT INTO bar VALUES (10, 11, 12); +INSERT INTO foo VALUES ('f', 6, 'fig'); +# This should be an unrecoverable error, even though binlog_row_metadata has been restored to FULL +! SELECT * FROM bar; +contains:binlog_row_metadata has since been set to a different value + +> SELECT * FROM foo; +a avocado +c cherry +d date +e eggplant +f fig -# Restore to a clean state so other tests are unaffected. $ mysql-execute name=mysql -SET GLOBAL binlog_row_metadata = FULL; +ALTER TABLE foo DROP COLUMN age; + +# this should fail even with FULL metadata turned on, as the source was created with minimal metadata +! SELECT * FROM foo; +contains:incompatible schema change + +> DROP SOURCE mysql_src CASCADE; + +> DROP SOURCE mysql_src2 CASCADE; From e07f8c89d02ca24fe1528fda303bd0c52df77770 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Wed, 29 Apr 2026 09:57:32 -0400 Subject: [PATCH 20/23] update schema compat logic to keep parity with old logic - can always drop excluded columns at the end of the table schema --- src/mysql-util/src/desc.rs | 27 +++++++------------ .../35-exclude-columns.td | 26 +++++++++++++----- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/mysql-util/src/desc.rs b/src/mysql-util/src/desc.rs index dc71b125df248..46570fd65dc7d 100644 --- a/src/mysql-util/src/desc.rs +++ b/src/mysql-util/src/desc.rs @@ -108,30 +108,21 @@ impl MySqlTableDesc { // corresponding column in `other.columns` if the column was dropped upstream. continue; } - other - .columns - .iter() - .find(|c| c.name == self_column.name) - .ok_or_else(|| { - anyhow::anyhow!( - "column {} no longer present in table {}", - self_column.name, - self.name - ) - })? + other.columns.iter().find(|c| c.name == self_column.name) } else { - other_columns.next().ok_or_else(|| { - anyhow::anyhow!( - "column {} no longer present in table {}", - self_column.name, - self.name - ) - })? + other_columns.next() }; if self_column.column_type.is_none() { // This is an excluded column and can be ignored. continue; } + let other_column = other_column.ok_or_else(|| { + anyhow::anyhow!( + "column {} no longer present in table {}", + self_column.name, + self.name + ) + })?; if !self_column.is_compatible(other_column) { bail!( "column {} in table {} has been altered", diff --git a/test/mysql-cdc-old-syntax/35-exclude-columns.td b/test/mysql-cdc-old-syntax/35-exclude-columns.td index cc0bd960fda3a..8a68413858199 100644 --- a/test/mysql-cdc-old-syntax/35-exclude-columns.td +++ b/test/mysql-cdc-old-syntax/35-exclude-columns.td @@ -26,9 +26,9 @@ $ mysql-execute name=mysql DROP DATABASE IF EXISTS public; CREATE DATABASE public; USE public; -CREATE TABLE t1 (f1 INTEGER, f2 GEOMETRY, f3 POINT, f4 VARCHAR(64)); +CREATE TABLE t1 (f1 INTEGER, f2 GEOMETRY, f3 POINT, f4 VARCHAR(64), f5 INT); -INSERT INTO t1 VALUES (1, ST_GeomFromText('LINESTRING(0 0,1 1,2 2)'), ST_GeomFromText('POINT(1 1)'), 'test'); +INSERT INTO t1 VALUES (1, ST_GeomFromText('LINESTRING(0 0,1 1,2 2)'), ST_GeomFromText('POINT(1 1)'), 'test', 1); ! CREATE SOURCE da_other FROM MYSQL CONNECTION mysqc @@ -44,7 +44,7 @@ contains:invalid EXCLUDE COLUMNS option value: column name 't1.f2' must have at > CREATE SOURCE da FROM MYSQL CONNECTION mysqc ( - EXCLUDE COLUMNS (public.t1.f2, public.t1.f3) + EXCLUDE COLUMNS (public.t1.f2, public.t1.f3, public.t1.f5) ) FOR TABLES (public.t1); @@ -62,18 +62,30 @@ INSERT INTO t1 SELECT * FROM t1; "test" >[14000<=version<2600700] SHOW CREATE SOURCE t1; -materialize.public.t1 "CREATE SUBSOURCE materialize.public.t1 (f1 pg_catalog.int4, f4 pg_catalog.varchar(64)) OF SOURCE materialize.public.da WITH (EXTERNAL REFERENCE = public.t1, EXCLUDE COLUMNS = (f2, f3));" +materialize.public.t1 "CREATE SUBSOURCE materialize.public.t1 (f1 pg_catalog.int4, f4 pg_catalog.varchar(64)) OF SOURCE materialize.public.da WITH (EXTERNAL REFERENCE = public.t1, EXCLUDE COLUMNS = (f2, f3, f5));" >[version>=2600700] SHOW CREATE SOURCE t1; -materialize.public.t1 "CREATE SUBSOURCE materialize.public.t1 (f1 pg_catalog.int4, f4 pg_catalog.varchar(64))\nOF SOURCE materialize.public.da\nWITH (EXTERNAL REFERENCE = public.t1, EXCLUDE COLUMNS = (f2, f3));" +materialize.public.t1 "CREATE SUBSOURCE materialize.public.t1 (f1 pg_catalog.int4, f4 pg_catalog.varchar(64))\nOF SOURCE materialize.public.da\nWITH (EXTERNAL REFERENCE = public.t1, EXCLUDE COLUMNS = (f2, f3, f5));" >[version<14000] SHOW CREATE SOURCE t1; -materialize.public.t1 "CREATE SUBSOURCE \"materialize\".\"public\".\"t1\" (\"f1\" \"pg_catalog\".\"int4\", \"f4\" \"pg_catalog\".\"varchar\"(64)) OF SOURCE \"materialize\".\"public\".\"da\" WITH (EXTERNAL REFERENCE = \"public\".\"t1\", EXCLUDE COLUMNS = (\"f2\", \"f3\"))" +materialize.public.t1 "CREATE SUBSOURCE \"materialize\".\"public\".\"t1\" (\"f1\" \"pg_catalog\".\"int4\", \"f4\" \"pg_catalog\".\"varchar\"(64)) OF SOURCE \"materialize\".\"public\".\"da\" WITH (EXTERNAL REFERENCE = \"public\".\"t1\", EXCLUDE COLUMNS = (\"f2\", \"f3\", \"f5\"))" ! SELECT f2 FROM t1; contains:column "f2" does not exist -# Remove one of the ignored columns, and we should still error +# removing an ignored column from the end of the table should be okay +$ mysql-execute name=mysql +ALTER TABLE t1 DROP COLUMN f5; +INSERT INTO t1 SELECT * FROM t1; + +> SELECT * FROM t1; +1 "test" +1 "test" +1 "test" +1 "test" + +# Remove one of the ignored columns from the middle of the table schema, and we should error +# because now we can't reliably decode by column index $ mysql-execute name=mysql ALTER TABLE t1 DROP COLUMN f2; From bd2be14d07f2326378b5d775e7d25362749be758 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Wed, 29 Apr 2026 10:25:32 -0400 Subject: [PATCH 21/23] update mysql test configs Co-authored-by: Copilot --- misc/python/materialize/mzcompose/services/mysql.py | 4 ++-- test/mysql-cdc-resumption-old-syntax/mzcompose.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/misc/python/materialize/mzcompose/services/mysql.py b/misc/python/materialize/mzcompose/services/mysql.py index 6722a51086ffe..bb55a4bca9490 100644 --- a/misc/python/materialize/mzcompose/services/mysql.py +++ b/misc/python/materialize/mzcompose/services/mysql.py @@ -14,14 +14,14 @@ ) -def create_mysql_server_args(server_id: str, is_master: bool) -> list[str]: +def create_mysql_server_args(server_id: str, is_master: bool, binlog_row_metadata: str = "full") -> list[str]: args = [ "--log-bin=mysql-bin", "--gtid_mode=ON", "--enforce_gtid_consistency=ON", "--binlog-format=row", "--binlog-row-image=full", - "--binlog-row-metadata=full", + f"--binlog-row-metadata={binlog_row_metadata}", f"--server-id={server_id}", "--max-connections=500", ] diff --git a/test/mysql-cdc-resumption-old-syntax/mzcompose.py b/test/mysql-cdc-resumption-old-syntax/mzcompose.py index 79e8af104a5c5..733047363e925 100644 --- a/test/mysql-cdc-resumption-old-syntax/mzcompose.py +++ b/test/mysql-cdc-resumption-old-syntax/mzcompose.py @@ -32,16 +32,16 @@ Alpine(), Mz(app_password=""), Materialized(default_replication_factor=2), - MySql(), + MySql(additional_args=create_mysql_server_args(server_id="1", is_master=True, binlog_row_metadata="minimal")), MySql( name="mysql-replica-1", version=MySql.DEFAULT_VERSION, - additional_args=create_mysql_server_args(server_id="2", is_master=False), + additional_args=create_mysql_server_args(server_id="2", is_master=False, binlog_row_metadata="minimal"), ), MySql( name="mysql-replica-2", version=MySql.DEFAULT_VERSION, - additional_args=create_mysql_server_args(server_id="3", is_master=False), + additional_args=create_mysql_server_args(server_id="3", is_master=False, binlog_row_metadata="minimal"), ), Toxiproxy(), Testdrive( From 7e7f02f6136b7f0d4e5f419d3210ca69e72a5fda Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Wed, 29 Apr 2026 11:17:37 -0400 Subject: [PATCH 22/23] update error message to match old message to allow retractions --- src/mysql-util/src/decoding.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mysql-util/src/decoding.rs b/src/mysql-util/src/decoding.rs index fb07f6c22aca0..d47757742b4cf 100644 --- a/src/mysql-util/src/decoding.rs +++ b/src/mysql-util/src/decoding.rs @@ -74,7 +74,7 @@ pub fn pack_mysql_row( Some(idx) => idx, None => { return Err(decode_error( - "upstream row is missing column", + "extra column description", col_desc, table_desc, gtid_set, From cd40618f3260229478ff8d96478ffe11c2e68f4a Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Wed, 29 Apr 2026 12:04:18 -0400 Subject: [PATCH 23/23] python lint --- .../python/materialize/mzcompose/services/mysql.py | 4 +++- test/mysql-cdc-resumption-old-syntax/mzcompose.py | 14 +++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/misc/python/materialize/mzcompose/services/mysql.py b/misc/python/materialize/mzcompose/services/mysql.py index bb55a4bca9490..b72a610dd9108 100644 --- a/misc/python/materialize/mzcompose/services/mysql.py +++ b/misc/python/materialize/mzcompose/services/mysql.py @@ -14,7 +14,9 @@ ) -def create_mysql_server_args(server_id: str, is_master: bool, binlog_row_metadata: str = "full") -> list[str]: +def create_mysql_server_args( + server_id: str, is_master: bool, binlog_row_metadata: str = "full" +) -> list[str]: args = [ "--log-bin=mysql-bin", "--gtid_mode=ON", diff --git a/test/mysql-cdc-resumption-old-syntax/mzcompose.py b/test/mysql-cdc-resumption-old-syntax/mzcompose.py index 733047363e925..de531f69803d1 100644 --- a/test/mysql-cdc-resumption-old-syntax/mzcompose.py +++ b/test/mysql-cdc-resumption-old-syntax/mzcompose.py @@ -32,16 +32,24 @@ Alpine(), Mz(app_password=""), Materialized(default_replication_factor=2), - MySql(additional_args=create_mysql_server_args(server_id="1", is_master=True, binlog_row_metadata="minimal")), + MySql( + additional_args=create_mysql_server_args( + server_id="1", is_master=True, binlog_row_metadata="minimal" + ) + ), MySql( name="mysql-replica-1", version=MySql.DEFAULT_VERSION, - additional_args=create_mysql_server_args(server_id="2", is_master=False, binlog_row_metadata="minimal"), + additional_args=create_mysql_server_args( + server_id="2", is_master=False, binlog_row_metadata="minimal" + ), ), MySql( name="mysql-replica-2", version=MySql.DEFAULT_VERSION, - additional_args=create_mysql_server_args(server_id="3", is_master=False, binlog_row_metadata="minimal"), + additional_args=create_mysql_server_args( + server_id="3", is_master=False, binlog_row_metadata="minimal" + ), ), Toxiproxy(), Testdrive(