Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/cdc/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ func (o *createChangefeedOptions) completeReplicaCfg() error {
if err != nil {
return err
}

err = cfg.ValidateAndAdjust(uri)
err = cfg.ValidateAndAdjustWithOptions(uri, config.ValidateOptions{
EnableRedoIOCheck: false,
})
if err != nil {
return err
}
Expand Down
45 changes: 45 additions & 0 deletions cmd/cdc/cli/cli_changefeed_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,48 @@ func TestChangefeedCreateCli(t *testing.T) {
require.NoError(t, o.complete(f))
require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`")
}

func TestCompleteReplicaCfgSkipConsistentStorageIOCheckInCLI(t *testing.T) {
t.Parallel()

o := newCreateChangefeedOptions(newChangefeedCommonOptions())
o.commonChangefeedOptions.sinkURI = "blackhole://"

dir := t.TempDir()
configPath := filepath.Join(dir, "cf.toml")
content := `
[consistent]
level = "eventual"
storage = "s3:///test/prefix"
`
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
o.commonChangefeedOptions.configFile = configPath

// The CLI still validates replica config, but it skips storage I/O check.
// Therefore this should pass even if the S3 URI misses bucket info.
require.NoError(t, o.completeReplicaCfg())
require.Equal(t, "eventual", *o.cfg.Consistent.Level)
require.Equal(t, "s3:///test/prefix", *o.cfg.Consistent.Storage)
}

func TestCompleteReplicaCfgStillValidateReplicaConfigInCLI(t *testing.T) {
t.Parallel()

o := newCreateChangefeedOptions(newChangefeedCommonOptions())
o.commonChangefeedOptions.sinkURI = "blackhole://"

dir := t.TempDir()
configPath := filepath.Join(dir, "cf.toml")
content := `
[consistent]
level = "eventual"
storage = "nfs:///ticdc-cli-should-not-io-check"
compression = "snappy"
`
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
o.commonChangefeedOptions.configFile = configPath

err := o.completeReplicaCfg()
require.Error(t, err)
require.Contains(t, err.Error(), "consistent.compression")
}
6 changes: 5 additions & 1 deletion pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type ConsistentMemoryUsage struct {

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
func (c *ConsistentConfig) ValidateAndAdjust() error {
return c.validateAndAdjust(true)
}

func (c *ConsistentConfig) validateAndAdjust(enableIOCheck bool) error {
if !redo.IsConsistentEnabled(util.GetOrZero(c.Level)) {
return nil
}
Expand Down Expand Up @@ -116,7 +120,7 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
fmt.Sprintf("invalid storage uri: %s", util.GetOrZero(c.Storage)))
}
return redo.ValidateStorage(uri)
return redo.ValidateStorageWithOptions(uri, redo.StorageValidationOptions{EnableIOCheck: enableIOCheck})
}

// MaskSensitiveData masks sensitive data in ConsistentConfig
Expand Down
19 changes: 18 additions & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,25 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) {
}
}

// ValidateOptions provides optional controls for
// (*ReplicaConfig).ValidateAndAdjustWithOptions.
type ValidateOptions struct {
EnableRedoIOCheck bool
}

// ValidateAndAdjust verifies and adjusts the replica configuration.
func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri
return c.ValidateAndAdjustWithOptions(
sinkURI,
ValidateOptions{EnableRedoIOCheck: true},
)
}

// ValidateAndAdjustWithOptions verifies and adjusts the replica configuration
// with extra controls.
func (c *ReplicaConfig) ValidateAndAdjustWithOptions(
sinkURI *url.URL, opts ValidateOptions,
) error {
if c.Sink != nil {
err := c.Sink.validateAndAdjust(sinkURI)
if err != nil {
Expand All @@ -275,7 +292,7 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
}

if c.Consistent != nil {
err := c.Consistent.ValidateAndAdjust()
err := c.Consistent.validateAndAdjust(opts.EnableRedoIOCheck)
if err != nil {
return err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,31 @@ func initExternalStorageForTest(ctx context.Context, uri url.URL) (storage.Exter
return s, nil
}

// StorageValidationOptions controls whether ValidateStorage performs
// an I/O based accessibility check.
type StorageValidationOptions struct {
EnableIOCheck bool
}

// ValidateStorage validates the storage used by redo.
func ValidateStorage(uri *url.URL) error {
return ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
}

// ValidateStorageWithOptions validates the storage used by redo with options.
//
// When EnableIOCheck is false, only basic scheme validation is performed.
func ValidateStorageWithOptions(uri *url.URL, opts StorageValidationOptions) error {
scheme := uri.Scheme
if !IsValidConsistentStorage(scheme) {
return errors.ErrConsistentStorage.GenWithStackByArgs(scheme)
}
if IsBlackholeStorage(scheme) {
return nil
}
if !opts.EnableIOCheck {
return nil
}

if IsExternalStorage(scheme) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down
13 changes: 13 additions & 0 deletions pkg/redo/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,16 @@ func TestInitExternalStorage(t *testing.T) {
require.NoError(t, err)
}
}

func TestValidateStorageWithOptionsSkipIOCheck(t *testing.T) {
t.Parallel()

uri, err := storage.ParseRawURL("s3:///redo-test-no-bucket")
require.NoError(t, err)

err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: false})
require.NoError(t, err)

err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
require.Error(t, err)
}