diff --git a/Cargo.lock b/Cargo.lock index 7487a2dd847d4..ea4cd69d7cacc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7874,6 +7874,7 @@ dependencies = [ "tracing-subscriber", "uncased", "uuid", + "version-compare", ] [[package]] @@ -13043,6 +13044,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version-compare" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index 786754b7f2064..c3b6a1e711227 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -525,6 +525,7 @@ url = "2.5.8" urlencoding = "2.1.3" utoipa = "5.4.0" uuid = "1.19.0" +version-compare = "0.2.1" walkdir = "2.5.0" which = "8" yansi = "1.0.1" diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 1c8bf6f0b6d08..aa129164609cd 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -82,6 +82,7 @@ tracing.workspace = true tracing-subscriber.workspace = true uncased.workspace = true uuid = { workspace = true, features = ["serde", "v4"] } +version-compare.workspace = true [dev-dependencies] datadriven.workspace = true diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 700b372f9c67e..68e93672fd6b2 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -1659,6 +1659,7 @@ pub fn plan_create_subsource( SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, } => SourceExportDetails::MySql(MySqlSourceExportDetails { table, initial_gtid_set, @@ -1667,6 +1668,7 @@ pub fn plan_create_subsource( .into_iter() .map(|c| c.into_string()) .collect(), + binlog_full_metadata, }), SourceExportStatementDetails::SqlServer { table, @@ -1816,6 +1818,7 @@ pub fn plan_create_table_from_source( SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, } => SourceExportDetails::MySql(MySqlSourceExportDetails { table, initial_gtid_set, @@ -1824,6 +1827,7 @@ pub fn plan_create_table_from_source( .into_iter() .map(|c| c.into_string()) .collect(), + binlog_full_metadata, }), SourceExportStatementDetails::SqlServer { table, diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 43ddba044e008..325907872ca0c 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -84,6 +84,7 @@ use crate::plan::error::PlanError; use crate::plan::statement::ddl::load_generator_ast_to_generator; use crate::plan::{SourceReferences, StatementContext}; use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError}; +use crate::pure::mysql::ensure_binlog_full_metadata; use crate::session::vars::ENABLE_SQL_SERVER_SOURCE; use crate::{kafka_util, normalize}; @@ -252,6 +253,7 @@ pub enum PurifiedExportDetails { text_columns: Option>, exclude_columns: Option>, initial_gtid_set: String, + binlog_full_metadata: bool, }, Postgres { table: PostgresTableDesc, @@ -1095,6 +1097,8 @@ async fn purify_create_source( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; + let binlog_full_metadata = ensure_binlog_full_metadata(&mut conn).await.is_ok(); + let reference_client = SourceReferenceClient::MySql { conn: &mut conn, include_system_schemas: mysql::references_system_schemas(external_references), @@ -1114,6 +1118,7 @@ async fn purify_create_source( source_name, initial_gtid_set.clone(), &reference_policy, + binlog_full_metadata, ) .await?; requested_subsource_map.extend(subsources); @@ -1504,6 +1509,8 @@ async fn purify_alter_source_add_subsources( let initial_gtid_set = mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?; + let binlog_full_metadata = ensure_binlog_full_metadata(&mut conn).await.is_ok(); + let requested_references = Some(ExternalReferences::SubsetTables(external_references)); let reference_client = SourceReferenceClient::MySql { @@ -1525,6 +1532,7 @@ async fn purify_alter_source_add_subsources( &unresolved_source_name, initial_gtid_set, &SourceReferencePolicy::Required, + binlog_full_metadata, ) .await?; requested_subsource_map.extend(subsources); @@ -1858,6 +1866,9 @@ async fn purify_create_table_from_source( ) .await?; + ensure_binlog_full_metadata(&mut conn).await?; + let binlog_full_metadata = true; + // Retrieve the current @gtid_executed value of the server to mark as the effective // initial snapshot point for this table. let initial_gtid_set = @@ -1885,6 +1896,7 @@ async fn purify_create_table_from_source( &unresolved_source_name, initial_gtid_set, &SourceReferencePolicy::Required, + binlog_full_metadata, ) .await?; // There should be exactly one source_export returned for this statement diff --git a/src/sql/src/pure/error.rs b/src/sql/src/pure/error.rs index 418a51c5d955d..a055f188ad805 100644 --- a/src/sql/src/pure/error.rs +++ b/src/sql/src/pure/error.rs @@ -310,6 +310,14 @@ pub enum MySqlSourcePurificationError { NoTablesFoundForSchemas(Vec), #[error(transparent)] InvalidConnection(#[from] MySqlConnectionValidationError), + #[error( + "The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources." + )] + UnsupportedBinlogMetadataSetting { setting: String }, + #[error( + "The MySQL version is unsupported for this operation. Materialize requires MySQL 8.0.1 or later to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources." + )] + UnsupportedMySqlVersion, } impl MySqlSourcePurificationError { diff --git a/src/sql/src/pure/mysql.rs b/src/sql/src/pure/mysql.rs index 8a19efd6c87a6..88a1248a56e82 100644 --- a/src/sql/src/pure/mysql.rs +++ b/src/sql/src/pure/mysql.rs @@ -130,6 +130,7 @@ pub(super) fn generate_source_export_statement_values( text_columns, exclude_columns, initial_gtid_set, + binlog_full_metadata, } = purified_export.details else { unreachable!("purified export details must be mysql") @@ -188,6 +189,7 @@ pub(super) fn generate_source_export_statement_values( let details = SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, }; let text_columns = text_columns.map(|mut columns| { @@ -339,6 +341,7 @@ pub(super) async fn purify_source_exports( unresolved_source_name: &UnresolvedItemName, initial_gtid_set: String, reference_policy: &SourceReferencePolicy, + binlog_full_metadata: bool, ) -> Result { let requested_exports = match requested_references.as_ref() { Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => { @@ -477,6 +480,7 @@ pub(super) async fn purify_source_exports( .collect() }), initial_gtid_set: initial_gtid_set.clone(), + binlog_full_metadata, }, }, ) @@ -511,3 +515,32 @@ pub(super) fn references_system_schemas(requested_references: &Option false, } } + +pub async fn ensure_binlog_full_metadata( + conn: &mut mz_mysql_util::MySqlConn, +) -> Result<(), MySqlSourcePurificationError> { + if version_compare::compare_to( + mz_mysql_util::query_sys_var(conn, "version") + .await + .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .is_ok_and(|is_lt| is_lt) + { + Err(MySqlSourcePurificationError::UnsupportedMySqlVersion)?; + } + let binlog_metadata_setting = mz_mysql_util::query_sys_var(conn, "binlog_row_metadata") + .await + .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?; + let binlog_full_metadata = binlog_metadata_setting.eq_ignore_ascii_case("FULL"); + if !binlog_full_metadata { + Err( + MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { + setting: binlog_metadata_setting, + }, + ) + } else { + Ok(()) + } +} diff --git a/src/storage-types/src/sources.rs b/src/storage-types/src/sources.rs index 9d1251676d297..0bf1684d3227c 100644 --- a/src/storage-types/src/sources.rs +++ b/src/storage-types/src/sources.rs @@ -908,6 +908,7 @@ pub enum SourceExportStatementDetails { MySql { table: mz_mysql_util::MySqlTableDesc, initial_gtid_set: String, + binlog_full_metadata: bool, }, SqlServer { table: mz_sql_server_util::desc::SqlServerTableDesc, @@ -933,11 +934,13 @@ impl RustType for SourceExportStatementDetail SourceExportStatementDetails::MySql { table, initial_gtid_set, + binlog_full_metadata, } => ProtoSourceExportStatementDetails { kind: Some(proto_source_export_statement_details::Kind::Mysql( mysql::ProtoMySqlSourceExportStatementDetails { table: Some(table.into_proto()), initial_gtid_set: initial_gtid_set.clone(), + binlog_full_metadata: *binlog_full_metadata, }, )), }, @@ -985,6 +988,7 @@ impl RustType for SourceExportStatementDetail .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?, initial_gtid_set: details.initial_gtid_set, + binlog_full_metadata: details.binlog_full_metadata, }, Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer { table: details diff --git a/src/storage-types/src/sources/mysql.proto b/src/storage-types/src/sources/mysql.proto index 015f733bc6ddd..3e6dd347fb4d6 100644 --- a/src/storage-types/src/sources/mysql.proto +++ b/src/storage-types/src/sources/mysql.proto @@ -23,4 +23,5 @@ message ProtoMySqlSourceDetails { message ProtoMySqlSourceExportStatementDetails { mz_mysql_util.ProtoMySqlTableDesc table = 1; string initial_gtid_set = 2; + bool binlog_full_metadata = 3; } diff --git a/src/storage-types/src/sources/mysql.rs b/src/storage-types/src/sources/mysql.rs index 1f10cd95649da..6e2fc2deae11e 100644 --- a/src/storage-types/src/sources/mysql.rs +++ b/src/storage-types/src/sources/mysql.rs @@ -215,6 +215,7 @@ pub struct MySqlSourceExportDetails { pub initial_gtid_set: String, pub text_columns: Vec, pub exclude_columns: Vec, + pub binlog_full_metadata: bool, } impl AlterCompatible for MySqlSourceExportDetails { @@ -230,6 +231,7 @@ impl AlterCompatible for MySqlSourceExportDetails { initial_gtid_set: _, text_columns: _, exclude_columns: _, + binlog_full_metadata: _, } = self; Ok(()) } diff --git a/test/mysql-cdc/mzcompose.py b/test/mysql-cdc/mzcompose.py index c6837f3ad88ba..0ebb9d96aa5dc 100644 --- a/test/mysql-cdc/mzcompose.py +++ b/test/mysql-cdc/mzcompose.py @@ -33,21 +33,35 @@ from materialize.mzcompose.services.toxiproxy import Toxiproxy -def create_mysql(mysql_version: str) -> MySql: - return MySql(version=mysql_version) - - -def create_mysql_replica(mysql_version: str) -> MySql: +def create_mysql(mysql_version: str, binlog_full_metadata: bool = True) -> MySql: + additional_args = [] + if binlog_full_metadata: + additional_args.extend( + [ + "--binlog-row-metadata=FULL", + "--gtid_mode=ON", + "--enforce_gtid_consistency=ON", + ] + ) + return MySql(version=mysql_version, additional_args=additional_args) + + +def create_mysql_replica( + mysql_version: str, binlog_full_metadata: bool = True +) -> MySql: + additional_args = [ + "--gtid_mode=ON", + "--enforce_gtid_consistency=ON", + "--skip-replica-start", + "--server-id=2", + ] + if binlog_full_metadata: + additional_args.append("--binlog-row-metadata=FULL") return MySql( name="mysql-replica", version=mysql_version, use_seeded_image=False, - additional_args=[ - "--gtid_mode=ON", - "--enforce_gtid_consistency=ON", - "--skip-replica-start", - "--server-id=2", - ], + additional_args=additional_args, )