Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ func fromCsvValToColValue(csvConfig *common.Config, csvVal any, ft types.FieldTy
case mysql.TypeYear:
val, err = strconv.ParseInt(str, 10, 64)
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp:
val, err = types.ParseTime(types.DefaultStmtNoWarningContext, str, ft.GetType(), ft.GetDecimal())
val, err = types.ParseTime(types.DefaultStmtNoWarningContext, str, ft.GetType(), types.MaxFsp)
case mysql.TypeDuration:
val, _, err = types.ParseDuration(types.DefaultStmtNoWarningContext, str, ft.GetDecimal())
val, _, err = types.ParseDuration(types.DefaultStmtNoWarningContext, str, types.MaxFsp)
case mysql.TypeBit:
var v uint64
v, err = strconv.ParseUint(str, 10, 64)
Expand Down
8 changes: 3 additions & 5 deletions pkg/sink/codec/open/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,7 @@ func newTiColumns(rawColumns map[string]column) []*timodel.ColumnInfo {
col.SetCollate("utf8mb4_bin")
}
case mysql.TypeDuration:
// todo: how to find the correct decimal for the duration type ?
_, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(col.GetType())
col.SetDecimal(defaultDecimal)
col.SetDecimal(tiTypes.MaxFsp)
case mysql.TypeEnum, mysql.TypeSet:
col.SetCharset("utf8mb4")
col.SetCollate("utf8mb4_bin")
Expand Down Expand Up @@ -640,7 +638,7 @@ func formatColumn(c column, ft types.FieldType) column {
default:
log.Panic("invalid column value for date / datetime / timestamp", zap.String("value", util.RedactAny(c.Value)), zap.Any("type", v))
}
c.Value, err = tiTypes.ParseTime(tiTypes.DefaultStmtNoWarningContext, data, ft.GetType(), ft.GetDecimal())
c.Value, err = tiTypes.ParseTime(tiTypes.DefaultStmtNoWarningContext, data, ft.GetType(), tiTypes.MaxFsp)
if err != nil {
log.Panic("invalid column value for date / datetime / timestamp", zap.String("value", util.RedactAny(c.Value)), zap.Error(err))
}
Expand All @@ -661,7 +659,7 @@ func formatColumn(c column, ft types.FieldType) column {
default:
log.Panic("invalid column value for duration", zap.String("value", util.RedactAny(c.Value)), zap.Any("type", v))
}
c.Value, _, err = tiTypes.ParseDuration(tiTypes.DefaultStmtNoWarningContext, data, ft.GetDecimal())
c.Value, _, err = tiTypes.ParseDuration(tiTypes.DefaultStmtNoWarningContext, data, tiTypes.MaxFsp)
if err != nil {
log.Panic("invalid column value for duration", zap.String("value", util.RedactAny(c.Value)), zap.Error(err))
}
Expand Down
10 changes: 10 additions & 0 deletions tests/integration_tests/common_1/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,13 @@ VALUES (1),
UPDATE `column_is_null`
SET t = NULL
WHERE id = 1;

CREATE TABLE time_is_pk
(
id time(3) NOT NULL,
t datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);

INSERT INTO `time_is_pk`(id) VALUES ('517:51:04.777'),('-733:00:00.0011');
DELETE FROM `time_is_pk` WHERE id = '517:51:04.777';
Comment on lines +178 to +187

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The added test case is a good start, but it may not fully cover the scenario described in issue #4283. The issue mentions that a panic can occur during DELETE or UPDATE operations on a row where the time primary key has lost precision. This test only inserts a row with a value that will be rounded (-733:00:00.0011) and then deletes a different, unrelated row.

To make the test more robust and ensure it covers the problematic case, it should perform UPDATE and DELETE operations on the row with the rounded key. This will better simulate the conditions that could lead to the panic.

CREATE TABLE time_is_pk
(
    id time(3) NOT NULL,
    t  int,
    PRIMARY KEY (id)
);
INSERT INTO `time_is_pk`(id, t) VALUES ('-733:00:00.0011', 1);
UPDATE `time_is_pk` SET t = 2 WHERE id = '-733:00:00.0011';
DELETE FROM `time_is_pk` WHERE id = '-733:00:00.0011';

2 changes: 1 addition & 1 deletion tests/integration_tests/common_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ EOF
fi

case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
kafka) run_kafka_consumer $WORK_DIR $SINK_URI ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --ca "${WORK_DIR}/ca.cert.pem" --auth-tls-private-key-path "${WORK_DIR}/broker_client.key-pk8.pem" --auth-tls-certificate-path="${WORK_DIR}/broker_client.cert.pem" ;;
esac
Expand Down