Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ url = "2.5.8"
urlencoding = "2.1.3"
utoipa = "5.4.0"
uuid = "1.19.0"
version-compare = "0.2.1"
walkdir = "2.5.0"
which = "8"
yansi = "1.0.1"
Expand Down
1 change: 1 addition & 0 deletions src/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ tracing.workspace = true
tracing-subscriber.workspace = true
uncased.workspace = true
uuid = { workspace = true, features = ["serde", "v4"] }
version-compare.workspace = true

[dev-dependencies]
datadriven.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,7 @@ pub fn plan_create_subsource(
SourceExportStatementDetails::MySql {
table,
initial_gtid_set,
binlog_full_metadata,
} => SourceExportDetails::MySql(MySqlSourceExportDetails {
table,
initial_gtid_set,
Expand All @@ -1667,6 +1668,7 @@ pub fn plan_create_subsource(
.into_iter()
.map(|c| c.into_string())
.collect(),
binlog_full_metadata,
}),
SourceExportStatementDetails::SqlServer {
table,
Expand Down Expand Up @@ -1816,6 +1818,7 @@ pub fn plan_create_table_from_source(
SourceExportStatementDetails::MySql {
table,
initial_gtid_set,
binlog_full_metadata,
} => SourceExportDetails::MySql(MySqlSourceExportDetails {
table,
initial_gtid_set,
Expand All @@ -1824,6 +1827,7 @@ pub fn plan_create_table_from_source(
.into_iter()
.map(|c| c.into_string())
.collect(),
binlog_full_metadata,
}),
SourceExportStatementDetails::SqlServer {
table,
Expand Down
12 changes: 12 additions & 0 deletions src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use crate::plan::error::PlanError;
use crate::plan::statement::ddl::load_generator_ast_to_generator;
use crate::plan::{SourceReferences, StatementContext};
use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
use crate::pure::mysql::ensure_binlog_full_metadata;
use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
use crate::{kafka_util, normalize};

Expand Down Expand Up @@ -252,6 +253,7 @@ pub enum PurifiedExportDetails {
text_columns: Option<Vec<Ident>>,
exclude_columns: Option<Vec<Ident>>,
initial_gtid_set: String,
binlog_full_metadata: bool,
},
Postgres {
table: PostgresTableDesc,
Expand Down Expand Up @@ -1095,6 +1097,8 @@ async fn purify_create_source(
let initial_gtid_set =
mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;

let binlog_full_metadata = ensure_binlog_full_metadata(&mut conn).await.is_ok();

let reference_client = SourceReferenceClient::MySql {
conn: &mut conn,
include_system_schemas: mysql::references_system_schemas(external_references),
Expand All @@ -1114,6 +1118,7 @@ async fn purify_create_source(
source_name,
initial_gtid_set.clone(),
&reference_policy,
binlog_full_metadata,
)
.await?;
requested_subsource_map.extend(subsources);
Expand Down Expand Up @@ -1504,6 +1509,8 @@ async fn purify_alter_source_add_subsources(
let initial_gtid_set =
mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;

let binlog_full_metadata = ensure_binlog_full_metadata(&mut conn).await.is_ok();

let requested_references = Some(ExternalReferences::SubsetTables(external_references));

let reference_client = SourceReferenceClient::MySql {
Expand All @@ -1525,6 +1532,7 @@ async fn purify_alter_source_add_subsources(
&unresolved_source_name,
initial_gtid_set,
&SourceReferencePolicy::Required,
binlog_full_metadata,
)
.await?;
requested_subsource_map.extend(subsources);
Expand Down Expand Up @@ -1858,6 +1866,9 @@ async fn purify_create_table_from_source(
)
.await?;

ensure_binlog_full_metadata(&mut conn).await?;
let binlog_full_metadata = true;

// Retrieve the current @gtid_executed value of the server to mark as the effective
// initial snapshot point for this table.
let initial_gtid_set =
Expand Down Expand Up @@ -1885,6 +1896,7 @@ async fn purify_create_table_from_source(
&unresolved_source_name,
initial_gtid_set,
&SourceReferencePolicy::Required,
binlog_full_metadata,
)
.await?;
// There should be exactly one source_export returned for this statement
Expand Down
8 changes: 8 additions & 0 deletions src/sql/src/pure/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ pub enum MySqlSourcePurificationError {
NoTablesFoundForSchemas(Vec<String>),
#[error(transparent)]
InvalidConnection(#[from] MySqlConnectionValidationError),
#[error(
"The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
)]
UnsupportedBinlogMetadataSetting { setting: String },
#[error(
"The MySQL version is unsupported for this operation. Materialize requires MySQL 8.0.1 or later to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
)]
UnsupportedMySqlVersion,
}

impl MySqlSourcePurificationError {
Expand Down
33 changes: 33 additions & 0 deletions src/sql/src/pure/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub(super) fn generate_source_export_statement_values(
text_columns,
exclude_columns,
initial_gtid_set,
binlog_full_metadata,
} = purified_export.details
else {
unreachable!("purified export details must be mysql")
Expand Down Expand Up @@ -188,6 +189,7 @@ pub(super) fn generate_source_export_statement_values(
let details = SourceExportStatementDetails::MySql {
table,
initial_gtid_set,
binlog_full_metadata,
};

let text_columns = text_columns.map(|mut columns| {
Expand Down Expand Up @@ -339,6 +341,7 @@ pub(super) async fn purify_source_exports(
unresolved_source_name: &UnresolvedItemName,
initial_gtid_set: String,
reference_policy: &SourceReferencePolicy,
binlog_full_metadata: bool,
) -> Result<PurifiedSourceExports, PlanError> {
let requested_exports = match requested_references.as_ref() {
Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
Expand Down Expand Up @@ -477,6 +480,7 @@ pub(super) async fn purify_source_exports(
.collect()
}),
initial_gtid_set: initial_gtid_set.clone(),
binlog_full_metadata,
},
},
)
Expand Down Expand Up @@ -511,3 +515,32 @@ pub(super) fn references_system_schemas(requested_references: &Option<ExternalRe
None => false,
}
}

pub async fn ensure_binlog_full_metadata(
conn: &mut mz_mysql_util::MySqlConn,
) -> Result<(), MySqlSourcePurificationError> {
if version_compare::compare_to(
mz_mysql_util::query_sys_var(conn, "version")
.await
.map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?,
"8.0.1",
version_compare::Cmp::Lt,
)
.is_ok_and(|is_lt| is_lt)
{
Err(MySqlSourcePurificationError::UnsupportedMySqlVersion)?;
}
let binlog_metadata_setting = mz_mysql_util::query_sys_var(conn, "binlog_row_metadata")
.await
.map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?;
let binlog_full_metadata = binlog_metadata_setting.eq_ignore_ascii_case("FULL");
if !binlog_full_metadata {
Err(
MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting {
setting: binlog_metadata_setting,
},
)
} else {
Ok(())
}
}
4 changes: 4 additions & 0 deletions src/storage-types/src/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ pub enum SourceExportStatementDetails {
MySql {
table: mz_mysql_util::MySqlTableDesc,
initial_gtid_set: String,
binlog_full_metadata: bool,
},
SqlServer {
table: mz_sql_server_util::desc::SqlServerTableDesc,
Expand All @@ -933,11 +934,13 @@ impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetail
SourceExportStatementDetails::MySql {
table,
initial_gtid_set,
binlog_full_metadata,
} => ProtoSourceExportStatementDetails {
kind: Some(proto_source_export_statement_details::Kind::Mysql(
mysql::ProtoMySqlSourceExportStatementDetails {
table: Some(table.into_proto()),
initial_gtid_set: initial_gtid_set.clone(),
binlog_full_metadata: *binlog_full_metadata,
},
)),
},
Expand Down Expand Up @@ -985,6 +988,7 @@ impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetail
.into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,

initial_gtid_set: details.initial_gtid_set,
binlog_full_metadata: details.binlog_full_metadata,
},
Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
table: details
Expand Down
1 change: 1 addition & 0 deletions src/storage-types/src/sources/mysql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ message ProtoMySqlSourceDetails {
message ProtoMySqlSourceExportStatementDetails {
mz_mysql_util.ProtoMySqlTableDesc table = 1;
string initial_gtid_set = 2;
bool binlog_full_metadata = 3;
}
2 changes: 2 additions & 0 deletions src/storage-types/src/sources/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ pub struct MySqlSourceExportDetails {
pub initial_gtid_set: String,
pub text_columns: Vec<String>,
pub exclude_columns: Vec<String>,
pub binlog_full_metadata: bool,
}

impl AlterCompatible for MySqlSourceExportDetails {
Expand All @@ -230,6 +231,7 @@ impl AlterCompatible for MySqlSourceExportDetails {
initial_gtid_set: _,
text_columns: _,
exclude_columns: _,
binlog_full_metadata: _,
} = self;
Ok(())
}
Expand Down
36 changes: 25 additions & 11 deletions test/mysql-cdc/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,35 @@
from materialize.mzcompose.services.toxiproxy import Toxiproxy


def create_mysql(mysql_version: str) -> MySql:
return MySql(version=mysql_version)


def create_mysql_replica(mysql_version: str) -> MySql:
def create_mysql(mysql_version: str, binlog_full_metadata: bool = True) -> MySql:
additional_args = []
if binlog_full_metadata:
additional_args.extend(
[
"--binlog-row-metadata=FULL",
"--gtid_mode=ON",
"--enforce_gtid_consistency=ON",
]
)
return MySql(version=mysql_version, additional_args=additional_args)


def create_mysql_replica(
mysql_version: str, binlog_full_metadata: bool = True
) -> MySql:
additional_args = [
"--gtid_mode=ON",
"--enforce_gtid_consistency=ON",
"--skip-replica-start",
"--server-id=2",
]
if binlog_full_metadata:
additional_args.append("--binlog-row-metadata=FULL")
return MySql(
name="mysql-replica",
version=mysql_version,
use_seeded_image=False,
additional_args=[
"--gtid_mode=ON",
"--enforce_gtid_consistency=ON",
"--skip-replica-start",
"--server-id=2",
],
additional_args=additional_args,
)


Expand Down
Loading