From a306be850949d66b1a5997f8728c14fff58422f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E8=8A=8A=E8=94=9A?= Date: Thu, 26 Feb 2026 11:51:54 +0800 Subject: [PATCH 1/5] move redo s3 check from cli to server --- cmd/cdc/cli/cli_changefeed_create.go | 12 +++++------ cmd/cdc/cli/cli_changefeed_create_test.go | 26 +++++++++++++++++++++++ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/cmd/cdc/cli/cli_changefeed_create.go b/cmd/cdc/cli/cli_changefeed_create.go index b2651603cb..d868bed330 100644 --- a/cmd/cdc/cli/cli_changefeed_create.go +++ b/cmd/cdc/cli/cli_changefeed_create.go @@ -151,15 +151,13 @@ func (o *createChangefeedOptions) completeReplicaCfg() error { } } - uri, err := url.Parse(o.commonChangefeedOptions.sinkURI) - if err != nil { - return err - } - - err = cfg.ValidateAndAdjust(uri) - if err != nil { + if _, err := url.Parse(o.commonChangefeedOptions.sinkURI); err != nil { return err } + // NOTE: Do not call cfg.ValidateAndAdjust here. + // It may try to initialize redo external storage (for example, S3) and perform a connectivity + // check locally, which breaks environments where only TiCDC server nodes can access S3. + // The server will validate and adjust the replica config during changefeed initialization. if o.commonChangefeedOptions.schemaRegistry != "" { cfg.Sink.SchemaRegistry = putil.AddressOf(o.commonChangefeedOptions.schemaRegistry) diff --git a/cmd/cdc/cli/cli_changefeed_create_test.go b/cmd/cdc/cli/cli_changefeed_create_test.go index 8f5bd0e433..0f8dad40a3 100644 --- a/cmd/cdc/cli/cli_changefeed_create_test.go +++ b/cmd/cdc/cli/cli_changefeed_create_test.go @@ -173,3 +173,29 @@ func TestChangefeedCreateCli(t *testing.T) { require.NoError(t, o.complete(f)) require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`") } + +func TestCompleteReplicaCfgDoesNotValidateRedoStorage(t *testing.T) { + t.Parallel() + + // The CLI should not try to initialize/verify redo external storage locally. + // It should be verified by TiCDC server nodes during changefeed initialization. + o := newCreateChangefeedOptions(newChangefeedCommonOptions()) + o.commonChangefeedOptions.sinkURI = "blackhole://" + + dir := t.TempDir() + configPath := filepath.Join(dir, "cf.toml") + content := ` +[consistent] +level = "eventual" +storage = "s3:///test/prefix" +` + require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644)) + o.commonChangefeedOptions.configFile = configPath + + // If the CLI calls ReplicaConfig.ValidateAndAdjust here, it would fail because the + // s3 URI is intentionally invalid (missing bucket). We only want to make sure the + // CLI doesn't perform such validation locally. + require.NoError(t, o.completeReplicaCfg()) + require.Equal(t, "eventual", *o.cfg.Consistent.Level) + require.Equal(t, "s3:///test/prefix", *o.cfg.Consistent.Storage) +} From 6d0c53ba022b137b0a3286faa8e9795b7ec601fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E8=8A=8A=E8=94=9A?= Date: Tue, 3 Mar 2026 11:30:16 +0800 Subject: [PATCH 2/5] move redo s3 check from cli to server add option --- cmd/cdc/cli/cli_changefeed_create.go | 13 ++++++---- cmd/cdc/cli/cli_changefeed_create_test.go | 31 ++++++++++++++++++----- pkg/config/consistent.go | 6 ++++- pkg/config/replica_config.go | 19 +++++++++++++- pkg/redo/config.go | 16 ++++++++++++ pkg/redo/config_test.go | 13 ++++++++++ 6 files changed, 85 insertions(+), 13 deletions(-) diff --git a/cmd/cdc/cli/cli_changefeed_create.go b/cmd/cdc/cli/cli_changefeed_create.go index d868bed330..22c612ce37 100644 --- a/cmd/cdc/cli/cli_changefeed_create.go +++ b/cmd/cdc/cli/cli_changefeed_create.go @@ -151,13 +151,16 @@ func (o *createChangefeedOptions) completeReplicaCfg() error { } } - if _, err := url.Parse(o.commonChangefeedOptions.sinkURI); err != nil { + uri, err := url.Parse(o.commonChangefeedOptions.sinkURI) + if err != nil { + return err + } + err = cfg.ValidateAndAdjustWithOptions(uri, config.ReplicaConfigValidateAndAdjustOptions{ + EnableConsistentStorageIOCheck: false, + }) + if err != nil { return err } - // NOTE: Do not call cfg.ValidateAndAdjust here. - // It may try to initialize redo external storage (for example, S3) and perform a connectivity - // check locally, which breaks environments where only TiCDC server nodes can access S3. - // The server will validate and adjust the replica config during changefeed initialization. if o.commonChangefeedOptions.schemaRegistry != "" { cfg.Sink.SchemaRegistry = putil.AddressOf(o.commonChangefeedOptions.schemaRegistry) diff --git a/cmd/cdc/cli/cli_changefeed_create_test.go b/cmd/cdc/cli/cli_changefeed_create_test.go index 0f8dad40a3..c390d2c731 100644 --- a/cmd/cdc/cli/cli_changefeed_create_test.go +++ b/cmd/cdc/cli/cli_changefeed_create_test.go @@ -174,11 +174,9 @@ func TestChangefeedCreateCli(t *testing.T) { require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`") } -func TestCompleteReplicaCfgDoesNotValidateRedoStorage(t *testing.T) { +func TestCompleteReplicaCfgSkipConsistentStorageIOCheckInCLI(t *testing.T) { t.Parallel() - // The CLI should not try to initialize/verify redo external storage locally. - // It should be verified by TiCDC server nodes during changefeed initialization. o := newCreateChangefeedOptions(newChangefeedCommonOptions()) o.commonChangefeedOptions.sinkURI = "blackhole://" @@ -192,10 +190,31 @@ storage = "s3:///test/prefix" require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644)) o.commonChangefeedOptions.configFile = configPath - // If the CLI calls ReplicaConfig.ValidateAndAdjust here, it would fail because the - // s3 URI is intentionally invalid (missing bucket). We only want to make sure the - // CLI doesn't perform such validation locally. + // The CLI still validates replica config, but it skips storage I/O check. + // Therefore this should pass even if the S3 URI misses bucket info. require.NoError(t, o.completeReplicaCfg()) require.Equal(t, "eventual", *o.cfg.Consistent.Level) require.Equal(t, "s3:///test/prefix", *o.cfg.Consistent.Storage) } + +func TestCompleteReplicaCfgStillValidateReplicaConfigInCLI(t *testing.T) { + t.Parallel() + + o := newCreateChangefeedOptions(newChangefeedCommonOptions()) + o.commonChangefeedOptions.sinkURI = "blackhole://" + + dir := t.TempDir() + configPath := filepath.Join(dir, "cf.toml") + content := ` +[consistent] +level = "eventual" +storage = "nfs:///ticdc-cli-should-not-io-check" +compression = "snappy" +` + require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644)) + o.commonChangefeedOptions.configFile = configPath + + err := o.completeReplicaCfg() + require.Error(t, err) + require.Contains(t, err.Error(), "consistent.compression") +} diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 4d2ece4990..d2e2a6ce81 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -73,6 +73,10 @@ type ConsistentMemoryUsage struct { // ValidateAndAdjust validates the consistency config and adjusts it if necessary. func (c *ConsistentConfig) ValidateAndAdjust() error { + return c.validateAndAdjust(true) +} + +func (c *ConsistentConfig) validateAndAdjust(enableIOCheck bool) error { if !redo.IsConsistentEnabled(util.GetOrZero(c.Level)) { return nil } @@ -116,7 +120,7 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs( fmt.Sprintf("invalid storage uri: %s", util.GetOrZero(c.Storage))) } - return redo.ValidateStorage(uri) + return redo.ValidateStorageWithOptions(uri, redo.StorageValidationOptions{EnableIOCheck: enableIOCheck}) } // MaskSensitiveData masks sensitive data in ConsistentConfig diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index e4e58c921d..9af7626e94 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -265,8 +265,25 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } } +// ReplicaConfigValidateAndAdjustOptions provides optional controls for +// (*ReplicaConfig).ValidateAndAdjustWithOptions. +type ReplicaConfigValidateAndAdjustOptions struct { + EnableConsistentStorageIOCheck bool +} + // ValidateAndAdjust verifies and adjusts the replica configuration. func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri + return c.ValidateAndAdjustWithOptions( + sinkURI, + ReplicaConfigValidateAndAdjustOptions{EnableConsistentStorageIOCheck: true}, + ) +} + +// ValidateAndAdjustWithOptions verifies and adjusts the replica configuration +// with extra controls. +func (c *ReplicaConfig) ValidateAndAdjustWithOptions( + sinkURI *url.URL, opts ReplicaConfigValidateAndAdjustOptions, +) error { if c.Sink != nil { err := c.Sink.validateAndAdjust(sinkURI) if err != nil { @@ -275,7 +292,7 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin } if c.Consistent != nil { - err := c.Consistent.ValidateAndAdjust() + err := c.Consistent.validateAndAdjust(opts.EnableConsistentStorageIOCheck) if err != nil { return err } diff --git a/pkg/redo/config.go b/pkg/redo/config.go index 0b4ea12d85..8b7ec392bd 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -214,8 +214,21 @@ func initExternalStorageForTest(ctx context.Context, uri url.URL) (storage.Exter return s, nil } +// StorageValidationOptions controls whether ValidateStorage performs +// an I/O based accessibility check. +type StorageValidationOptions struct { + EnableIOCheck bool +} + // ValidateStorage validates the storage used by redo. func ValidateStorage(uri *url.URL) error { + return ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true}) +} + +// ValidateStorageWithOptions validates the storage used by redo with options. +// +// When EnableIOCheck is false, only basic scheme validation is performed. +func ValidateStorageWithOptions(uri *url.URL, opts StorageValidationOptions) error { scheme := uri.Scheme if !IsValidConsistentStorage(scheme) { return errors.ErrConsistentStorage.GenWithStackByArgs(scheme) @@ -223,6 +236,9 @@ func ValidateStorage(uri *url.URL) error { if IsBlackholeStorage(scheme) { return nil } + if !opts.EnableIOCheck { + return nil + } if IsExternalStorage(scheme) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) diff --git a/pkg/redo/config_test.go b/pkg/redo/config_test.go index 33aa9de486..1dd0e8c6e7 100644 --- a/pkg/redo/config_test.go +++ b/pkg/redo/config_test.go @@ -187,3 +187,16 @@ func TestInitExternalStorage(t *testing.T) { require.NoError(t, err) } } + +func TestValidateStorageWithOptionsSkipIOCheck(t *testing.T) { + t.Parallel() + + uri, err := storage.ParseRawURL("s3:///redo-test-no-bucket") + require.NoError(t, err) + + err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: false}) + require.NoError(t, err) + + err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true}) + require.Error(t, err) +} From eea604a63b3b70fa72599458205261578e51c698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E8=8A=8A=E8=94=9A?= Date: Tue, 3 Mar 2026 15:02:24 +0800 Subject: [PATCH 3/5] simplified var name --- cmd/cdc/cli/cli_changefeed_create.go | 4 ++-- pkg/config/replica_config.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/cdc/cli/cli_changefeed_create.go b/cmd/cdc/cli/cli_changefeed_create.go index 22c612ce37..c96b78d94f 100644 --- a/cmd/cdc/cli/cli_changefeed_create.go +++ b/cmd/cdc/cli/cli_changefeed_create.go @@ -155,8 +155,8 @@ func (o *createChangefeedOptions) completeReplicaCfg() error { if err != nil { return err } - err = cfg.ValidateAndAdjustWithOptions(uri, config.ReplicaConfigValidateAndAdjustOptions{ - EnableConsistentStorageIOCheck: false, + err = cfg.ValidateAndAdjustWithOptions(uri, config.ValidateOptions{ + EnableRedoIOCheck: false, }) if err != nil { return err diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 9af7626e94..4bc40baf8e 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -265,24 +265,24 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } } -// ReplicaConfigValidateAndAdjustOptions provides optional controls for +// ValidateOptions provides optional controls for // (*ReplicaConfig).ValidateAndAdjustWithOptions. -type ReplicaConfigValidateAndAdjustOptions struct { - EnableConsistentStorageIOCheck bool +type ValidateOptions struct { + EnableRedoIOCheck bool } // ValidateAndAdjust verifies and adjusts the replica configuration. func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri return c.ValidateAndAdjustWithOptions( sinkURI, - ReplicaConfigValidateAndAdjustOptions{EnableConsistentStorageIOCheck: true}, + ValidateOptions{EnableRedoIOCheck: true}, ) } // ValidateAndAdjustWithOptions verifies and adjusts the replica configuration // with extra controls. func (c *ReplicaConfig) ValidateAndAdjustWithOptions( - sinkURI *url.URL, opts ReplicaConfigValidateAndAdjustOptions, + sinkURI *url.URL, opts ValidateOptions, ) error { if c.Sink != nil { err := c.Sink.validateAndAdjust(sinkURI) @@ -292,7 +292,7 @@ func (c *ReplicaConfig) ValidateAndAdjustWithOptions( } if c.Consistent != nil { - err := c.Consistent.validateAndAdjust(opts.EnableConsistentStorageIOCheck) + err := c.Consistent.validateAndAdjust(opts.EnableRedoIOCheck) if err != nil { return err } From f036dffdce1b555c298596d4592f3cea1f15b969 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E8=8A=8A=E8=94=9A?= Date: Tue, 3 Mar 2026 17:46:46 +0800 Subject: [PATCH 4/5] fix test --- pkg/redo/writer/file/file.go | 39 ++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/pkg/redo/writer/file/file.go b/pkg/redo/writer/file/file.go index 6ec60dfe62..b5d7553265 100644 --- a/pkg/redo/writer/file/file.go +++ b/pkg/redo/writer/file/file.go @@ -303,31 +303,32 @@ func (w *Writer) encode(ctx context.Context) error { cacheEventPostFlush = cacheEventPostFlush[:0] return nil } - select { - case <-ctx.Done(): - return ctx.Err() - case <-ticker.C: - err := flush() - if err != nil { - return errors.Trace(err) - } - case e := <-w.inputCh: - err := w.write(e) - if err != nil { - return err - } - num++ - if num > redo.DefaultFlushBatchSize { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: err := flush() if err != nil { return errors.Trace(err) } - e.PostFlush() - } else { - cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush) + case e := <-w.inputCh: + err := w.write(e) + if err != nil { + return err + } + num++ + if num > redo.DefaultFlushBatchSize { + err := flush() + if err != nil { + return errors.Trace(err) + } + e.PostFlush() + } else { + cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush) + } } } - return nil } func (w *Writer) close(ctx context.Context) error { From 4b41482aee10d649be121aef021825d8a6abcd0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E8=8A=8A=E8=94=9A?= Date: Tue, 3 Mar 2026 18:27:44 +0800 Subject: [PATCH 5/5] fix test --- pkg/redo/writer/file/file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/redo/writer/file/file.go b/pkg/redo/writer/file/file.go index b5d7553265..10064ecf3c 100644 --- a/pkg/redo/writer/file/file.go +++ b/pkg/redo/writer/file/file.go @@ -318,7 +318,7 @@ func (w *Writer) encode(ctx context.Context) error { return err } num++ - if num > redo.DefaultFlushBatchSize { + if num >= redo.DefaultFlushBatchSize { err := flush() if err != nil { return errors.Trace(err)