diff --git a/cmd/cdc/cli/cli_changefeed_create.go b/cmd/cdc/cli/cli_changefeed_create.go index b2651603cb..c96b78d94f 100644 --- a/cmd/cdc/cli/cli_changefeed_create.go +++ b/cmd/cdc/cli/cli_changefeed_create.go @@ -155,8 +155,9 @@ func (o *createChangefeedOptions) completeReplicaCfg() error { if err != nil { return err } - - err = cfg.ValidateAndAdjust(uri) + err = cfg.ValidateAndAdjustWithOptions(uri, config.ValidateOptions{ + EnableRedoIOCheck: false, + }) if err != nil { return err } diff --git a/cmd/cdc/cli/cli_changefeed_create_test.go b/cmd/cdc/cli/cli_changefeed_create_test.go index 8f5bd0e433..c390d2c731 100644 --- a/cmd/cdc/cli/cli_changefeed_create_test.go +++ b/cmd/cdc/cli/cli_changefeed_create_test.go @@ -173,3 +173,48 @@ func TestChangefeedCreateCli(t *testing.T) { require.NoError(t, o.complete(f)) require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`") } + +func TestCompleteReplicaCfgSkipConsistentStorageIOCheckInCLI(t *testing.T) { + t.Parallel() + + o := newCreateChangefeedOptions(newChangefeedCommonOptions()) + o.commonChangefeedOptions.sinkURI = "blackhole://" + + dir := t.TempDir() + configPath := filepath.Join(dir, "cf.toml") + content := ` +[consistent] +level = "eventual" +storage = "s3:///test/prefix" +` + require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644)) + o.commonChangefeedOptions.configFile = configPath + + // The CLI still validates replica config, but it skips storage I/O check. + // Therefore this should pass even if the S3 URI misses bucket info. + require.NoError(t, o.completeReplicaCfg()) + require.Equal(t, "eventual", *o.cfg.Consistent.Level) + require.Equal(t, "s3:///test/prefix", *o.cfg.Consistent.Storage) +} + +func TestCompleteReplicaCfgStillValidateReplicaConfigInCLI(t *testing.T) { + t.Parallel() + + o := newCreateChangefeedOptions(newChangefeedCommonOptions()) + o.commonChangefeedOptions.sinkURI = "blackhole://" + + dir := t.TempDir() + configPath := filepath.Join(dir, "cf.toml") + content := ` +[consistent] +level = "eventual" +storage = "nfs:///ticdc-cli-should-not-io-check" +compression = "snappy" +` + require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644)) + o.commonChangefeedOptions.configFile = configPath + + err := o.completeReplicaCfg() + require.Error(t, err) + require.Contains(t, err.Error(), "consistent.compression") +} diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 4d2ece4990..d2e2a6ce81 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -73,6 +73,10 @@ type ConsistentMemoryUsage struct { // ValidateAndAdjust validates the consistency config and adjusts it if necessary. func (c *ConsistentConfig) ValidateAndAdjust() error { + return c.validateAndAdjust(true) +} + +func (c *ConsistentConfig) validateAndAdjust(enableIOCheck bool) error { if !redo.IsConsistentEnabled(util.GetOrZero(c.Level)) { return nil } @@ -116,7 +120,7 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs( fmt.Sprintf("invalid storage uri: %s", util.GetOrZero(c.Storage))) } - return redo.ValidateStorage(uri) + return redo.ValidateStorageWithOptions(uri, redo.StorageValidationOptions{EnableIOCheck: enableIOCheck}) } // MaskSensitiveData masks sensitive data in ConsistentConfig diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index e4e58c921d..4bc40baf8e 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -265,8 +265,25 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } } +// ValidateOptions provides optional controls for +// (*ReplicaConfig).ValidateAndAdjustWithOptions. +type ValidateOptions struct { + EnableRedoIOCheck bool +} + // ValidateAndAdjust verifies and adjusts the replica configuration. func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri + return c.ValidateAndAdjustWithOptions( + sinkURI, + ValidateOptions{EnableRedoIOCheck: true}, + ) +} + +// ValidateAndAdjustWithOptions verifies and adjusts the replica configuration +// with extra controls. +func (c *ReplicaConfig) ValidateAndAdjustWithOptions( + sinkURI *url.URL, opts ValidateOptions, +) error { if c.Sink != nil { err := c.Sink.validateAndAdjust(sinkURI) if err != nil { @@ -275,7 +292,7 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin } if c.Consistent != nil { - err := c.Consistent.ValidateAndAdjust() + err := c.Consistent.validateAndAdjust(opts.EnableRedoIOCheck) if err != nil { return err } diff --git a/pkg/redo/config.go b/pkg/redo/config.go index 0b4ea12d85..8b7ec392bd 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -214,8 +214,21 @@ func initExternalStorageForTest(ctx context.Context, uri url.URL) (storage.Exter return s, nil } +// StorageValidationOptions controls whether ValidateStorage performs +// an I/O based accessibility check. +type StorageValidationOptions struct { + EnableIOCheck bool +} + // ValidateStorage validates the storage used by redo. func ValidateStorage(uri *url.URL) error { + return ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true}) +} + +// ValidateStorageWithOptions validates the storage used by redo with options. +// +// When EnableIOCheck is false, only basic scheme validation is performed. +func ValidateStorageWithOptions(uri *url.URL, opts StorageValidationOptions) error { scheme := uri.Scheme if !IsValidConsistentStorage(scheme) { return errors.ErrConsistentStorage.GenWithStackByArgs(scheme) @@ -223,6 +236,9 @@ func ValidateStorage(uri *url.URL) error { if IsBlackholeStorage(scheme) { return nil } + if !opts.EnableIOCheck { + return nil + } if IsExternalStorage(scheme) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) diff --git a/pkg/redo/config_test.go b/pkg/redo/config_test.go index 33aa9de486..1dd0e8c6e7 100644 --- a/pkg/redo/config_test.go +++ b/pkg/redo/config_test.go @@ -187,3 +187,16 @@ func TestInitExternalStorage(t *testing.T) { require.NoError(t, err) } } + +func TestValidateStorageWithOptionsSkipIOCheck(t *testing.T) { + t.Parallel() + + uri, err := storage.ParseRawURL("s3:///redo-test-no-bucket") + require.NoError(t, err) + + err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: false}) + require.NoError(t, err) + + err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true}) + require.Error(t, err) +} diff --git a/pkg/redo/writer/file/file.go b/pkg/redo/writer/file/file.go index 6ec60dfe62..10064ecf3c 100644 --- a/pkg/redo/writer/file/file.go +++ b/pkg/redo/writer/file/file.go @@ -303,31 +303,32 @@ func (w *Writer) encode(ctx context.Context) error { cacheEventPostFlush = cacheEventPostFlush[:0] return nil } - select { - case <-ctx.Done(): - return ctx.Err() - case <-ticker.C: - err := flush() - if err != nil { - return errors.Trace(err) - } - case e := <-w.inputCh: - err := w.write(e) - if err != nil { - return err - } - num++ - if num > redo.DefaultFlushBatchSize { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: err := flush() if err != nil { return errors.Trace(err) } - e.PostFlush() - } else { - cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush) + case e := <-w.inputCh: + err := w.write(e) + if err != nil { + return err + } + num++ + if num >= redo.DefaultFlushBatchSize { + err := flush() + if err != nil { + return errors.Trace(err) + } + e.PostFlush() + } else { + cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush) + } } } - return nil } func (w *Writer) close(ctx context.Context) error {