diff --git a/pkg/sink/codec/csv/csv_decoder.go b/pkg/sink/codec/csv/csv_decoder.go index 03429a54c2..05109c687a 100644 --- a/pkg/sink/codec/csv/csv_decoder.go +++ b/pkg/sink/codec/csv/csv_decoder.go @@ -189,9 +189,9 @@ func fromCsvValToColValue(csvConfig *common.Config, csvVal any, ft types.FieldTy case mysql.TypeYear: val, err = strconv.ParseInt(str, 10, 64) case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp: - val, err = types.ParseTime(types.DefaultStmtNoWarningContext, str, ft.GetType(), ft.GetDecimal()) + val, err = types.ParseTime(types.DefaultStmtNoWarningContext, str, ft.GetType(), types.MaxFsp) case mysql.TypeDuration: - val, _, err = types.ParseDuration(types.DefaultStmtNoWarningContext, str, ft.GetDecimal()) + val, _, err = types.ParseDuration(types.DefaultStmtNoWarningContext, str, types.MaxFsp) case mysql.TypeBit: var v uint64 v, err = strconv.ParseUint(str, 10, 64) diff --git a/pkg/sink/codec/open/decoder.go b/pkg/sink/codec/open/decoder.go index b6eeae3ee6..0b3a4ff1e7 100644 --- a/pkg/sink/codec/open/decoder.go +++ b/pkg/sink/codec/open/decoder.go @@ -415,9 +415,7 @@ func newTiColumns(rawColumns map[string]column) []*timodel.ColumnInfo { col.SetCollate("utf8mb4_bin") } case mysql.TypeDuration: - // todo: how to find the correct decimal for the duration type ? - _, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(col.GetType()) - col.SetDecimal(defaultDecimal) + col.SetDecimal(tiTypes.MaxFsp) case mysql.TypeEnum, mysql.TypeSet: col.SetCharset("utf8mb4") col.SetCollate("utf8mb4_bin") @@ -640,7 +638,7 @@ func formatColumn(c column, ft types.FieldType) column { default: log.Panic("invalid column value for date / datetime / timestamp", zap.String("value", util.RedactAny(c.Value)), zap.Any("type", v)) } - c.Value, err = tiTypes.ParseTime(tiTypes.DefaultStmtNoWarningContext, data, ft.GetType(), ft.GetDecimal()) + c.Value, err = tiTypes.ParseTime(tiTypes.DefaultStmtNoWarningContext, data, ft.GetType(), tiTypes.MaxFsp) if err != nil { log.Panic("invalid column value for date / datetime / timestamp", zap.String("value", util.RedactAny(c.Value)), zap.Error(err)) } @@ -661,7 +659,7 @@ func formatColumn(c column, ft types.FieldType) column { default: log.Panic("invalid column value for duration", zap.String("value", util.RedactAny(c.Value)), zap.Any("type", v)) } - c.Value, _, err = tiTypes.ParseDuration(tiTypes.DefaultStmtNoWarningContext, data, ft.GetDecimal()) + c.Value, _, err = tiTypes.ParseDuration(tiTypes.DefaultStmtNoWarningContext, data, tiTypes.MaxFsp) if err != nil { log.Panic("invalid column value for duration", zap.String("value", util.RedactAny(c.Value)), zap.Error(err)) } diff --git a/tests/integration_tests/common_1/data/test.sql b/tests/integration_tests/common_1/data/test.sql index a38cd5c182..47ae223f7e 100644 --- a/tests/integration_tests/common_1/data/test.sql +++ b/tests/integration_tests/common_1/data/test.sql @@ -175,3 +175,13 @@ VALUES (1), UPDATE `column_is_null` SET t = NULL WHERE id = 1; + +CREATE TABLE time_is_pk +( + id time(3) NOT NULL, + t datetime DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +INSERT INTO `time_is_pk`(id) VALUES ('517:51:04.777'),('-733:00:00.0011'); +DELETE FROM `time_is_pk` WHERE id = '517:51:04.777'; diff --git a/tests/integration_tests/common_1/run.sh b/tests/integration_tests/common_1/run.sh index 736070fd41..4271a3e1b7 100755 --- a/tests/integration_tests/common_1/run.sh +++ b/tests/integration_tests/common_1/run.sh @@ -54,7 +54,7 @@ EOF fi case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) run_kafka_consumer $WORK_DIR $SINK_URI ;; storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --ca "${WORK_DIR}/ca.cert.pem" --auth-tls-private-key-path "${WORK_DIR}/broker_client.key-pk8.pem" --auth-tls-certificate-path="${WORK_DIR}/broker_client.cert.pem" ;; esac