From 8e1aa2738c86d6a0c95535a09ddae602c9b202c0 Mon Sep 17 00:00:00 2001 From: nhsmw Date: Thu, 26 Feb 2026 16:58:10 +0800 Subject: [PATCH 1/4] Update test.sql --- tests/integration_tests/common_1/data/test.sql | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/integration_tests/common_1/data/test.sql b/tests/integration_tests/common_1/data/test.sql index a38cd5c182..47ae223f7e 100644 --- a/tests/integration_tests/common_1/data/test.sql +++ b/tests/integration_tests/common_1/data/test.sql @@ -175,3 +175,13 @@ VALUES (1), UPDATE `column_is_null` SET t = NULL WHERE id = 1; + +CREATE TABLE time_is_pk +( + id time(3) NOT NULL, + t datetime DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +INSERT INTO `time_is_pk`(id) VALUES ('517:51:04.777'),('-733:00:00.0011'); +DELETE FROM `time_is_pk` WHERE id = '517:51:04.777'; From a7b5db43bf37543b41f2288f719cae23447a8ef4 Mon Sep 17 00:00:00 2001 From: nhsmw Date: Tue, 3 Mar 2026 01:12:12 +0800 Subject: [PATCH 2/4] Update decoder.go --- pkg/sink/codec/open/decoder.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/sink/codec/open/decoder.go b/pkg/sink/codec/open/decoder.go index b6eeae3ee6..d17770eada 100644 --- a/pkg/sink/codec/open/decoder.go +++ b/pkg/sink/codec/open/decoder.go @@ -414,10 +414,6 @@ func newTiColumns(rawColumns map[string]column) []*timodel.ColumnInfo { col.SetCharset("utf8mb4") col.SetCollate("utf8mb4_bin") } - case mysql.TypeDuration: - // todo: how to find the correct decimal for the duration type ? - _, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(col.GetType()) - col.SetDecimal(defaultDecimal) case mysql.TypeEnum, mysql.TypeSet: col.SetCharset("utf8mb4") col.SetCollate("utf8mb4_bin") @@ -640,7 +636,7 @@ func formatColumn(c column, ft types.FieldType) column { default: log.Panic("invalid column value for date / datetime / timestamp", zap.String("value", util.RedactAny(c.Value)), zap.Any("type", v)) } - c.Value, err = tiTypes.ParseTime(tiTypes.DefaultStmtNoWarningContext, data, ft.GetType(), ft.GetDecimal()) + c.Value, err = tiTypes.ParseTime(tiTypes.DefaultStmtNoWarningContext, data, ft.GetType(), tiTypes.MaxFsp) if err != nil { log.Panic("invalid column value for date / datetime / timestamp", zap.String("value", util.RedactAny(c.Value)), zap.Error(err)) } @@ -661,7 +657,7 @@ func formatColumn(c column, ft types.FieldType) column { default: log.Panic("invalid column value for duration", zap.String("value", util.RedactAny(c.Value)), zap.Any("type", v)) } - c.Value, _, err = tiTypes.ParseDuration(tiTypes.DefaultStmtNoWarningContext, data, ft.GetDecimal()) + c.Value, _, err = tiTypes.ParseDuration(tiTypes.DefaultStmtNoWarningContext, data, tiTypes.MaxFsp) if err != nil { log.Panic("invalid column value for duration", zap.String("value", util.RedactAny(c.Value)), zap.Error(err)) } From 8a1b7198adb22ec75ce1015ab0269bdd40cfba6d Mon Sep 17 00:00:00 2001 From: nhsmw Date: Tue, 3 Mar 2026 01:12:43 +0800 Subject: [PATCH 3/4] Update csv_decoder.go --- pkg/sink/codec/csv/csv_decoder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sink/codec/csv/csv_decoder.go b/pkg/sink/codec/csv/csv_decoder.go index 03429a54c2..05109c687a 100644 --- a/pkg/sink/codec/csv/csv_decoder.go +++ b/pkg/sink/codec/csv/csv_decoder.go @@ -189,9 +189,9 @@ func fromCsvValToColValue(csvConfig *common.Config, csvVal any, ft types.FieldTy case mysql.TypeYear: val, err = strconv.ParseInt(str, 10, 64) case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp: - val, err = types.ParseTime(types.DefaultStmtNoWarningContext, str, ft.GetType(), ft.GetDecimal()) + val, err = types.ParseTime(types.DefaultStmtNoWarningContext, str, ft.GetType(), types.MaxFsp) case mysql.TypeDuration: - val, _, err = types.ParseDuration(types.DefaultStmtNoWarningContext, str, ft.GetDecimal()) + val, _, err = types.ParseDuration(types.DefaultStmtNoWarningContext, str, types.MaxFsp) case mysql.TypeBit: var v uint64 v, err = strconv.ParseUint(str, 10, 64) From 581c0b2a5da9c2ea642d890f552a1190a80d8f9b Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 3 Mar 2026 05:59:06 +0000 Subject: [PATCH 4/4] fix Signed-off-by: wk989898 --- pkg/sink/codec/open/decoder.go | 2 ++ tests/integration_tests/common_1/run.sh | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/sink/codec/open/decoder.go b/pkg/sink/codec/open/decoder.go index d17770eada..0b3a4ff1e7 100644 --- a/pkg/sink/codec/open/decoder.go +++ b/pkg/sink/codec/open/decoder.go @@ -414,6 +414,8 @@ func newTiColumns(rawColumns map[string]column) []*timodel.ColumnInfo { col.SetCharset("utf8mb4") col.SetCollate("utf8mb4_bin") } + case mysql.TypeDuration: + col.SetDecimal(tiTypes.MaxFsp) case mysql.TypeEnum, mysql.TypeSet: col.SetCharset("utf8mb4") col.SetCollate("utf8mb4_bin") diff --git a/tests/integration_tests/common_1/run.sh b/tests/integration_tests/common_1/run.sh index 736070fd41..4271a3e1b7 100755 --- a/tests/integration_tests/common_1/run.sh +++ b/tests/integration_tests/common_1/run.sh @@ -54,7 +54,7 @@ EOF fi case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) run_kafka_consumer $WORK_DIR $SINK_URI ;; storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --ca "${WORK_DIR}/ca.cert.pem" --auth-tls-private-key-path "${WORK_DIR}/broker_client.key-pk8.pem" --auth-tls-certificate-path="${WORK_DIR}/broker_client.cert.pem" ;; esac