From ec827a759b5148cb9f0927f393be95edb12346ac Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Fri, 24 Apr 2026 15:59:18 -0400 Subject: [PATCH 1/5] add binlog setting to mysql source details at purification Co-authored-by: Copilot --- src/sql/src/plan/statement/ddl.rs | 4 +++ src/sql/src/pure.rs | 44 +++++++++++++++++++++++ src/sql/src/pure/mysql.rs | 5 +++ src/storage-types/src/sources.rs | 4 +++ src/storage-types/src/sources/mysql.proto | 1 + src/storage-types/src/sources/mysql.rs | 2 ++ 6 files changed, 60 insertions(+) diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 700b372f9c67e..68e93672fd6b2 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -1659,6 +1659,7 @@ pub fn plan_create_subsource( SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, } => SourceExportDetails::MySql(MySqlSourceExportDetails { table, initial_gtid_set, @@ -1667,6 +1668,7 @@ pub fn plan_create_subsource( .into_iter() .map(|c| c.into_string()) .collect(), + binlog_full_metadata, }), SourceExportStatementDetails::SqlServer { table, @@ -1816,6 +1818,7 @@ pub fn plan_create_table_from_source( SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, } => SourceExportDetails::MySql(MySqlSourceExportDetails { table, initial_gtid_set, @@ -1824,6 +1827,7 @@ pub fn plan_create_table_from_source( .into_iter() .map(|c| c.into_string()) .collect(), + binlog_full_metadata, }), SourceExportStatementDetails::SqlServer { table, diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 43ddba044e008..0dd60ad94ac59 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -252,6 +252,7 @@ pub enum PurifiedExportDetails { text_columns: Option>, exclude_columns: Option>, initial_gtid_set: String, + binlog_full_metadata: bool, }, Postgres { table: PostgresTableDesc, @@ -1095,6 +1096,14 @@ async fn purify_create_source( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; + let binlog_full_metadata = version_compare::compare_to( + mz_mysql_util::query_sys_var(&mut conn, "version").await?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .expect("failed to parse version string from mysql") + && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await? == "FULL"; + let reference_client = SourceReferenceClient::MySql { conn: &mut conn, include_system_schemas: mysql::references_system_schemas(external_references), @@ -1114,6 +1123,7 @@ async fn purify_create_source( source_name, initial_gtid_set.clone(), &reference_policy, + binlog_full_metadata, ) .await?; requested_subsource_map.extend(subsources); @@ -1504,6 +1514,14 @@ async fn purify_alter_source_add_subsources( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; + let binlog_full_metadata = version_compare::compare_to( + mz_mysql_util::query_sys_var(&mut conn, "version").await?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .expect("failed to parse version string from mysql") + && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await? == "FULL"; + let requested_references = Some(ExternalReferences::SubsetTables(external_references)); let reference_client = SourceReferenceClient::MySql { @@ -1525,6 +1543,7 @@ async fn purify_alter_source_add_subsources( &unresolved_source_name, initial_gtid_set, &SourceReferencePolicy::Required, + binlog_full_metadata, ) .await?; requested_subsource_map.extend(subsources); @@ -1858,6 +1877,30 @@ async fn purify_create_table_from_source( ) .await?; + if version_compare::compare_to( + mz_mysql_util::query_sys_var(&mut conn, "version").await?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .expect("failed to parse version string from mysql") + { + Err( + MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { + setting: "Using MySQL version < 8.0.1, which does not support full binlog row metadata".to_string(), + }, + )?; + } + let binlog_metadata_setting = + mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await?; + let binlog_full_metadata = binlog_metadata_setting.to_uppercase() == "FULL"; + if !binlog_full_metadata { + Err( + MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { + setting: binlog_metadata_setting, + }, + )?; + } + // Retrieve the current @gtid_executed value of the server to mark as the effective // initial snapshot point for this table. let initial_gtid_set = @@ -1885,6 +1928,7 @@ async fn purify_create_table_from_source( &unresolved_source_name, initial_gtid_set, &SourceReferencePolicy::Required, + binlog_full_metadata, ) .await?; // There should be exactly one source_export returned for this statement diff --git a/src/sql/src/pure/mysql.rs b/src/sql/src/pure/mysql.rs index 8a19efd6c87a6..75087507c31dd 100644 --- a/src/sql/src/pure/mysql.rs +++ b/src/sql/src/pure/mysql.rs @@ -11,6 +11,7 @@ use std::collections::{BTreeMap, BTreeSet}; +use mysql_async::binlog; use mz_mysql_util::{ MySqlError, MySqlTableDesc, QualifiedTableRef, SYSTEM_SCHEMAS, validate_source_privileges, }; @@ -130,6 +131,7 @@ pub(super) fn generate_source_export_statement_values( text_columns, exclude_columns, initial_gtid_set, + binlog_full_metadata, } = purified_export.details else { unreachable!("purified export details must be mysql") @@ -188,6 +190,7 @@ pub(super) fn generate_source_export_statement_values( let details = SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, }; let text_columns = text_columns.map(|mut columns| { @@ -339,6 +342,7 @@ pub(super) async fn purify_source_exports( unresolved_source_name: &UnresolvedItemName, initial_gtid_set: String, reference_policy: &SourceReferencePolicy, + binlog_full_metadata: bool, ) -> Result { let requested_exports = match requested_references.as_ref() { Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => { @@ -477,6 +481,7 @@ pub(super) async fn purify_source_exports( .collect() }), initial_gtid_set: initial_gtid_set.clone(), + binlog_full_metadata, }, }, ) diff --git a/src/storage-types/src/sources.rs b/src/storage-types/src/sources.rs index 9d1251676d297..0bf1684d3227c 100644 --- a/src/storage-types/src/sources.rs +++ b/src/storage-types/src/sources.rs @@ -908,6 +908,7 @@ pub enum SourceExportStatementDetails { MySql { table: mz_mysql_util::MySqlTableDesc, initial_gtid_set: String, + binlog_full_metadata: bool, }, SqlServer { table: mz_sql_server_util::desc::SqlServerTableDesc, @@ -933,11 +934,13 @@ impl RustType for SourceExportStatementDetail SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, } => ProtoSourceExportStatementDetails { kind: Some(proto_source_export_statement_details::Kind::Mysql( mysql::ProtoMySqlSourceExportStatementDetails { table: Some(table.into_proto()), initial_gtid_set: initial_gtid_set.clone(), + binlog_full_metadata: *binlog_full_metadata, }, )), }, @@ -985,6 +988,7 @@ impl RustType for SourceExportStatementDetail .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?, initial_gtid_set: details.initial_gtid_set, + binlog_full_metadata: details.binlog_full_metadata, }, Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer { table: details diff --git a/src/storage-types/src/sources/mysql.proto b/src/storage-types/src/sources/mysql.proto index 015f733bc6ddd..3e6dd347fb4d6 100644 --- a/src/storage-types/src/sources/mysql.proto +++ b/src/storage-types/src/sources/mysql.proto @@ -23,4 +23,5 @@ message ProtoMySqlSourceDetails { message ProtoMySqlSourceExportStatementDetails { mz_mysql_util.ProtoMySqlTableDesc table = 1; string initial_gtid_set = 2; + bool binlog_full_metadata = 3; } diff --git a/src/storage-types/src/sources/mysql.rs b/src/storage-types/src/sources/mysql.rs index 1f10cd95649da..6e2fc2deae11e 100644 --- a/src/storage-types/src/sources/mysql.rs +++ b/src/storage-types/src/sources/mysql.rs @@ -215,6 +215,7 @@ pub struct MySqlSourceExportDetails { pub initial_gtid_set: String, pub text_columns: Vec, pub exclude_columns: Vec, + pub binlog_full_metadata: bool, } impl AlterCompatible for MySqlSourceExportDetails { @@ -230,6 +231,7 @@ impl AlterCompatible for MySqlSourceExportDetails { initial_gtid_set: _, text_columns: _, exclude_columns: _, + binlog_full_metadata: _, } = self; Ok(()) } From 60cd39755ab75f5c265bc91bfb1d9a59757f467f Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Fri, 24 Apr 2026 16:13:48 -0400 Subject: [PATCH 2/5] update cargo and fix errors from cherry picking --- Cargo.lock | 7 +++++++ Cargo.toml | 1 + src/sql/Cargo.toml | 1 + src/sql/src/pure/error.rs | 4 ++++ src/sql/src/pure/mysql.rs | 1 - 5 files changed, 13 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 7487a2dd847d4..ea4cd69d7cacc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7874,6 +7874,7 @@ dependencies = [ "tracing-subscriber", "uncased", "uuid", + "version-compare", ] [[package]] @@ -13043,6 +13044,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version-compare" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index 786754b7f2064..c3b6a1e711227 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -525,6 +525,7 @@ url = "2.5.8" urlencoding = "2.1.3" utoipa = "5.4.0" uuid = "1.19.0" +version-compare = "0.2.1" walkdir = "2.5.0" which = "8" yansi = "1.0.1" diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 1c8bf6f0b6d08..aa129164609cd 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -82,6 +82,7 @@ tracing.workspace = true tracing-subscriber.workspace = true uncased.workspace = true uuid = { workspace = true, features = ["serde", "v4"] } +version-compare.workspace = true [dev-dependencies] datadriven.workspace = true diff --git a/src/sql/src/pure/error.rs b/src/sql/src/pure/error.rs index 418a51c5d955d..2e7c4be77efaf 100644 --- a/src/sql/src/pure/error.rs +++ b/src/sql/src/pure/error.rs @@ -310,6 +310,10 @@ pub enum MySqlSourcePurificationError { NoTablesFoundForSchemas(Vec), #[error(transparent)] InvalidConnection(#[from] MySqlConnectionValidationError), + #[error( + "The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources." + )] + UnsupportedBinlogMetadataSetting { setting: String }, } impl MySqlSourcePurificationError { diff --git a/src/sql/src/pure/mysql.rs b/src/sql/src/pure/mysql.rs index 75087507c31dd..a068d786f222f 100644 --- a/src/sql/src/pure/mysql.rs +++ b/src/sql/src/pure/mysql.rs @@ -11,7 +11,6 @@ use std::collections::{BTreeMap, BTreeSet}; -use mysql_async::binlog; use mz_mysql_util::{ MySqlError, MySqlTableDesc, QualifiedTableRef, SYSTEM_SCHEMAS, validate_source_privileges, }; From 294cae333e3e4b4304335dace7c3205077540e77 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Mon, 27 Apr 2026 14:05:28 -0400 Subject: [PATCH 3/5] fix tests, wrong comparison operator, and consistent case insensitivity Co-authored-by: Copilot --- src/sql/src/pure.rs | 20 +++++++++++++------- test/mysql-cdc/mzcompose.py | 30 +++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 0dd60ad94ac59..d295082649a96 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -1099,10 +1099,13 @@ async fn purify_create_source( let binlog_full_metadata = version_compare::compare_to( mz_mysql_util::query_sys_var(&mut conn, "version").await?, "8.0.1", - version_compare::Cmp::Lt, + version_compare::Cmp::Ge, ) - .expect("failed to parse version string from mysql") - && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await? == "FULL"; + .is_ok_and(|is_ge| is_ge) + && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata") + .await? + .to_uppercase() + == "FULL"; let reference_client = SourceReferenceClient::MySql { conn: &mut conn, @@ -1517,10 +1520,13 @@ async fn purify_alter_source_add_subsources( let binlog_full_metadata = version_compare::compare_to( mz_mysql_util::query_sys_var(&mut conn, "version").await?, "8.0.1", - version_compare::Cmp::Lt, + version_compare::Cmp::Ge, ) - .expect("failed to parse version string from mysql") - && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await? == "FULL"; + .is_ok_and(|is_ge| is_ge) + && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata") + .await? + .to_uppercase() + == "FULL"; let requested_references = Some(ExternalReferences::SubsetTables(external_references)); @@ -1882,7 +1888,7 @@ async fn purify_create_table_from_source( "8.0.1", version_compare::Cmp::Lt, ) - .expect("failed to parse version string from mysql") + .is_ok_and(|is_lt| is_lt) { Err( MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { diff --git a/test/mysql-cdc/mzcompose.py b/test/mysql-cdc/mzcompose.py index c6837f3ad88ba..d43398f76dc35 100644 --- a/test/mysql-cdc/mzcompose.py +++ b/test/mysql-cdc/mzcompose.py @@ -33,21 +33,33 @@ from materialize.mzcompose.services.toxiproxy import Toxiproxy -def create_mysql(mysql_version: str) -> MySql: - return MySql(version=mysql_version) +def create_mysql(mysql_version: str, binlog_full_metadata: bool = True) -> MySql: + additional_args = [] + if binlog_full_metadata: + additional_args.extend( + [ + "--binlog-row-metadata=FULL", + "--gtid_mode=ON", + "--enforce_gtid_consistency=ON", + ] + ) + return MySql(version=mysql_version, additional_args=additional_args) -def create_mysql_replica(mysql_version: str) -> MySql: - return MySql( - name="mysql-replica", - version=mysql_version, - use_seeded_image=False, - additional_args=[ +def create_mysql_replica(mysql_version: str, binlog_full_metadata: bool = True) -> MySql: + additional_args = [ "--gtid_mode=ON", "--enforce_gtid_consistency=ON", "--skip-replica-start", "--server-id=2", - ], + ] + if binlog_full_metadata: + additional_args.append("--binlog-row-metadata=FULL") + return MySql( + name="mysql-replica", + version=mysql_version, + use_seeded_image=False, + additional_args=additional_args ) From c933a4cb400ff5e5bdd8cae81c18f2a806c286f9 Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Mon, 27 Apr 2026 14:07:27 -0400 Subject: [PATCH 4/5] fmt --- test/mysql-cdc/mzcompose.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/test/mysql-cdc/mzcompose.py b/test/mysql-cdc/mzcompose.py index d43398f76dc35..0ebb9d96aa5dc 100644 --- a/test/mysql-cdc/mzcompose.py +++ b/test/mysql-cdc/mzcompose.py @@ -46,20 +46,22 @@ def create_mysql(mysql_version: str, binlog_full_metadata: bool = True) -> MySql return MySql(version=mysql_version, additional_args=additional_args) -def create_mysql_replica(mysql_version: str, binlog_full_metadata: bool = True) -> MySql: +def create_mysql_replica( + mysql_version: str, binlog_full_metadata: bool = True +) -> MySql: additional_args = [ - "--gtid_mode=ON", - "--enforce_gtid_consistency=ON", - "--skip-replica-start", - "--server-id=2", - ] + "--gtid_mode=ON", + "--enforce_gtid_consistency=ON", + "--skip-replica-start", + "--server-id=2", + ] if binlog_full_metadata: additional_args.append("--binlog-row-metadata=FULL") return MySql( name="mysql-replica", version=mysql_version, use_seeded_image=False, - additional_args=additional_args + additional_args=additional_args, ) From 5d15ebae7169f10a75fa20fa97a4e65222589b9c Mon Sep 17 00:00:00 2001 From: Patrick Butler Date: Tue, 28 Apr 2026 14:13:19 -0400 Subject: [PATCH 5/5] address comments - split binlog check to helper func, add new error type, use better str cmp function Co-authored-by: Copilot --- src/sql/src/pure.rs | 48 ++++----------------------------------- src/sql/src/pure/error.rs | 4 ++++ src/sql/src/pure/mysql.rs | 29 +++++++++++++++++++++++ 3 files changed, 38 insertions(+), 43 deletions(-) diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index d295082649a96..325907872ca0c 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -84,6 +84,7 @@ use crate::plan::error::PlanError; use crate::plan::statement::ddl::load_generator_ast_to_generator; use crate::plan::{SourceReferences, StatementContext}; use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError}; +use crate::pure::mysql::ensure_binlog_full_metadata; use crate::session::vars::ENABLE_SQL_SERVER_SOURCE; use crate::{kafka_util, normalize}; @@ -1096,16 +1097,7 @@ async fn purify_create_source( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; - let binlog_full_metadata = version_compare::compare_to( - mz_mysql_util::query_sys_var(&mut conn, "version").await?, - "8.0.1", - version_compare::Cmp::Ge, - ) - .is_ok_and(|is_ge| is_ge) - && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata") - .await? - .to_uppercase() - == "FULL"; + let binlog_full_metadata = ensure_binlog_full_metadata(&mut conn).await.is_ok(); let reference_client = SourceReferenceClient::MySql { conn: &mut conn, @@ -1517,16 +1509,7 @@ async fn purify_alter_source_add_subsources( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; - let binlog_full_metadata = version_compare::compare_to( - mz_mysql_util::query_sys_var(&mut conn, "version").await?, - "8.0.1", - version_compare::Cmp::Ge, - ) - .is_ok_and(|is_ge| is_ge) - && mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata") - .await? - .to_uppercase() - == "FULL"; + let binlog_full_metadata = ensure_binlog_full_metadata(&mut conn).await.is_ok(); let requested_references = Some(ExternalReferences::SubsetTables(external_references)); @@ -1883,29 +1866,8 @@ async fn purify_create_table_from_source( ) .await?; - if version_compare::compare_to( - mz_mysql_util::query_sys_var(&mut conn, "version").await?, - "8.0.1", - version_compare::Cmp::Lt, - ) - .is_ok_and(|is_lt| is_lt) - { - Err( - MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { - setting: "Using MySQL version < 8.0.1, which does not support full binlog row metadata".to_string(), - }, - )?; - } - let binlog_metadata_setting = - mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await?; - let binlog_full_metadata = binlog_metadata_setting.to_uppercase() == "FULL"; - if !binlog_full_metadata { - Err( - MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { - setting: binlog_metadata_setting, - }, - )?; - } + ensure_binlog_full_metadata(&mut conn).await?; + let binlog_full_metadata = true; // Retrieve the current @gtid_executed value of the server to mark as the effective // initial snapshot point for this table. diff --git a/src/sql/src/pure/error.rs b/src/sql/src/pure/error.rs index 2e7c4be77efaf..a055f188ad805 100644 --- a/src/sql/src/pure/error.rs +++ b/src/sql/src/pure/error.rs @@ -314,6 +314,10 @@ pub enum MySqlSourcePurificationError { "The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources." )] UnsupportedBinlogMetadataSetting { setting: String }, + #[error( + "The MySQL version is unsupported for this operation. Materialize requires MySQL 8.0.1 or later to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources." + )] + UnsupportedMySqlVersion, } impl MySqlSourcePurificationError { diff --git a/src/sql/src/pure/mysql.rs b/src/sql/src/pure/mysql.rs index a068d786f222f..88a1248a56e82 100644 --- a/src/sql/src/pure/mysql.rs +++ b/src/sql/src/pure/mysql.rs @@ -515,3 +515,32 @@ pub(super) fn references_system_schemas(requested_references: &Option false, } } + +pub async fn ensure_binlog_full_metadata( + conn: &mut mz_mysql_util::MySqlConn, +) -> Result<(), MySqlSourcePurificationError> { + if version_compare::compare_to( + mz_mysql_util::query_sys_var(conn, "version") + .await + .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .is_ok_and(|is_lt| is_lt) + { + Err(MySqlSourcePurificationError::UnsupportedMySqlVersion)?; + } + let binlog_metadata_setting = mz_mysql_util::query_sys_var(conn, "binlog_row_metadata") + .await + .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?; + let binlog_full_metadata = binlog_metadata_setting.eq_ignore_ascii_case("FULL"); + if !binlog_full_metadata { + Err( + MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { + setting: binlog_metadata_setting, + }, + ) + } else { + Ok(()) + } +}