diff --git a/coordinator/changefeed/changefeed.go b/coordinator/changefeed/changefeed.go index 7682cce433..a594e45b8c 100644 --- a/coordinator/changefeed/changefeed.go +++ b/coordinator/changefeed/changefeed.go @@ -98,11 +98,30 @@ func NewChangefeed(cfID common.ChangeFeedID, return res } +// GetInfo returns the latest ChangeFeedInfo stored in memory. +// +// It may return nil if the changefeed hasn't been fully initialized. func (c *Changefeed) GetInfo() *config.ChangeFeedInfo { + if c == nil || c.info == nil { + return nil + } return c.info.Load() } +// SetInfo updates the in-memory ChangeFeedInfo for the changefeed. +// +// It lazily initializes the internal pointer for uninitialized changefeeds +// (primarily used by unit tests). +// +// If the receiver is nil, it does nothing. func (c *Changefeed) SetInfo(info *config.ChangeFeedInfo) { + if c == nil { + return + } + if c.info == nil { + c.info = atomic.NewPointer(info) + return + } c.info.Store(info) } diff --git a/coordinator/changefeed/changefeed_db.go b/coordinator/changefeed/changefeed_db.go index 4ddc0b454c..4caefb4d31 100644 --- a/coordinator/changefeed/changefeed_db.go +++ b/coordinator/changefeed/changefeed_db.go @@ -135,6 +135,16 @@ func (db *ChangefeedDB) StopByChangefeedID(cfID common.ChangeFeedID, remove bool delete(db.changefeeds, cfID) delete(db.changefeedDisplayNames, cf.ID.DisplayName) delete(db.stopped, cfID) + + info := cf.GetInfo() + if info != nil { + downstreamType := metrics.DownstreamTypeFromSinkURI(info.SinkURI) + metrics.ChangefeedDownstreamInfoGauge.DeleteLabelValues( + cfID.Keyspace(), + cfID.Name(), + downstreamType, + ) + } } else { log.Info("stop changefeed", zap.String("changefeed", cfID.String())) db.stopped[cfID] = cf diff --git a/coordinator/controller.go b/coordinator/controller.go index 4285e3624d..d5b8b10a5b 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -186,6 +186,10 @@ func NewController( func (c *Controller) collectMetrics(ctx context.Context) error { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() + + // changefeedDownstreamTypeCache is used to cleanup the previous downstream type + // label value when a changefeed's sink-uri is updated. + changefeedDownstreamTypeCache := make(map[common.ChangeFeedDisplayName]string) for { select { case <-ctx.Done(): @@ -197,10 +201,25 @@ func (c *Controller) collectMetrics(ctx context.Context) error { metrics.ChangefeedStateGauge.WithLabelValues("Absent").Set(float64(c.changefeedDB.GetAbsentSize())) metrics.ChangefeedStateGauge.WithLabelValues("Stopped").Set(float64(c.changefeedDB.GetStoppedSize())) + changefeedDownstreamTypes := make(map[common.ChangeFeedDisplayName]struct{}) c.changefeedDB.Foreach(func(cf *changefeed.Changefeed) { info := cf.GetInfo() - keyspace := info.ChangefeedID.Keyspace() - name := info.ChangefeedID.Name() + if info == nil { + return + } + + displayName := info.ChangefeedID.DisplayName + changefeedDownstreamTypes[displayName] = struct{}{} + keyspace := displayName.Keyspace + name := displayName.Name + + downstreamType := metrics.DownstreamTypeFromSinkURI(info.SinkURI) + if oldType, ok := changefeedDownstreamTypeCache[displayName]; ok && oldType != downstreamType { + metrics.ChangefeedDownstreamInfoGauge.DeleteLabelValues(keyspace, name, oldType) + } + changefeedDownstreamTypeCache[displayName] = downstreamType + metrics.ChangefeedDownstreamInfoGauge.WithLabelValues(keyspace, name, downstreamType).Set(1) + metrics.ChangefeedStatusGauge.WithLabelValues(keyspace, name).Set(float64(info.State.ToInt())) // don't update checkpoint ts and checkpoint ts lag for stopped changefeed @@ -214,6 +233,19 @@ func (c *Controller) collectMetrics(ctx context.Context) error { metrics.ChangefeedCheckpointTsGauge.WithLabelValues(keyspace, name).Set(float64(phyCkpTs)) metrics.ChangefeedCheckpointTsLagGauge.WithLabelValues(keyspace, name).Set(lag) }) + + // Cleanup removed changefeeds, so dashboards won't show stale label values. + for displayName, downstreamType := range changefeedDownstreamTypeCache { + if _, ok := changefeedDownstreamTypes[displayName]; ok { + continue + } + metrics.ChangefeedDownstreamInfoGauge.DeleteLabelValues( + displayName.Keyspace, + displayName.Name, + downstreamType, + ) + delete(changefeedDownstreamTypeCache, displayName) + } } } } diff --git a/downstreamadapter/sink/mysql/sink.go b/downstreamadapter/sink/mysql/sink.go index b4a6d0f3d5..7a8318b52c 100644 --- a/downstreamadapter/sink/mysql/sink.go +++ b/downstreamadapter/sink/mysql/sink.go @@ -95,6 +95,18 @@ func New( if err != nil { return nil, err } + + // Expose whether the MySQL-compatible downstream is confirmed to be TiDB, so + // dashboards can display "tidb" when we can prove it. Otherwise, the + // scheme-based label remains "mysql/tidb". + keyspace := changefeedID.Keyspace() + name := changefeedID.Name() + if cfg.IsTiDB { + metrics.ChangefeedDownstreamIsTiDBGauge.WithLabelValues(keyspace, name).Set(1) + } else { + metrics.ChangefeedDownstreamIsTiDBGauge.DeleteLabelValues(keyspace, name) + } + return NewMySQLSink(ctx, changefeedID, cfg, db, config.BDRMode, config.EnableActiveActive, config.ActiveActiveProgressInterval), nil } @@ -409,4 +421,6 @@ func (s *Sink) Close(removeChangefeed bool) { s.activeActiveSyncStatsCollector.Close() } s.statistics.Close() + + metrics.ChangefeedDownstreamIsTiDBGauge.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name()) } diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index 41412f8a31..d8310cba74 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -4591,6 +4591,78 @@ "align": false, "alignLevel": null } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Downstream type of each changefeed. For MySQL-compatible sinks, it shows \"mysql/tidb\" unless the downstream is confirmed to be TiDB.", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": true + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 26 + }, + "id": 60029, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "targets": [ + { + "expr": "label_replace(max by (namespace, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}), \"downstream_type\", \"tidb\", \"__name__\", \".*\") or (max by (namespace, changefeed, downstream_type) (ticdc_owner_changefeed_downstream_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}) unless on (namespace, changefeed) max by (namespace, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}))", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Changefeed Downstream Type", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "changefeed": 1, + "downstream_type": 2, + "namespace": 0 + }, + "renameByName": {} + } + } + ], + "type": "table" } ], "title": "Changefeed", @@ -25515,4 +25587,4 @@ "title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch", "uid": "YiGL8hBZ0aac", "version": 38 -} \ No newline at end of file +} diff --git a/metrics/nextgengrafana/ticdc_new_arch_next_gen.json b/metrics/nextgengrafana/ticdc_new_arch_next_gen.json index 6c342519c1..f67e28f0c5 100644 --- a/metrics/nextgengrafana/ticdc_new_arch_next_gen.json +++ b/metrics/nextgengrafana/ticdc_new_arch_next_gen.json @@ -4591,6 +4591,78 @@ "align": false, "alignLevel": null } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Downstream type of each changefeed. For MySQL-compatible sinks, it shows \"mysql/tidb\" unless the downstream is confirmed to be TiDB.", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": true + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 26 + }, + "id": 60029, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "targets": [ + { + "expr": "label_replace(max by (keyspace_name, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}), \"downstream_type\", \"tidb\", \"__name__\", \".*\") or (max by (keyspace_name, changefeed, downstream_type) (ticdc_owner_changefeed_downstream_info{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}) unless on (keyspace_name, changefeed) max by (keyspace_name, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}))", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Changefeed Downstream Type", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "changefeed": 1, + "downstream_type": 2, + "keyspace_name": 0 + }, + "renameByName": {} + } + } + ], + "type": "table" } ], "title": "Changefeed", @@ -25515,4 +25587,4 @@ "title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch", "uid": "YiGL8hBZ0aac", "version": 38 -} \ No newline at end of file +} diff --git a/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json b/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json index 74428dfad1..95a55fabea 100644 --- a/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json +++ b/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json @@ -2334,6 +2334,78 @@ "align": false, "alignLevel": null } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Downstream type of each changefeed. For MySQL-compatible sinks, it shows \"mysql/tidb\" unless the downstream is confirmed to be TiDB.", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": true + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 26 + }, + "id": 60029, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "targets": [ + { + "expr": "label_replace(max by (keyspace_name, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}), \"downstream_type\", \"tidb\", \"__name__\", \".*\") or (max by (keyspace_name, changefeed, downstream_type) (ticdc_owner_changefeed_downstream_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}) unless on (keyspace_name, changefeed) max by (keyspace_name, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}))", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Changefeed Downstream Type", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "changefeed": 1, + "downstream_type": 2, + "keyspace_name": 0 + }, + "renameByName": {} + } + } + ], + "type": "table" } ], "title": "Changefeed", diff --git a/pkg/metrics/changefeed.go b/pkg/metrics/changefeed.go index 03e9514a9c..29a63cc4b3 100644 --- a/pkg/metrics/changefeed.go +++ b/pkg/metrics/changefeed.go @@ -89,6 +89,34 @@ var ( Name: "checkpoint_ts", Help: "checkpoint ts of changefeeds", }, []string{getKeyspaceLabel(), "changefeed"}) + + // ChangefeedDownstreamInfoGauge is a metric with a constant '1' value, + // labeled by the downstream type of each changefeed. + // + // It is used by dashboards to show a changefeed -> downstream type mapping + // without leaking sensitive information in sink-uri. + ChangefeedDownstreamInfoGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "changefeed_downstream_info", + Help: "Downstream type information of changefeeds exposed as labels.", + }, []string{getKeyspaceLabel(), "changefeed", "downstream_type"}) + + // ChangefeedDownstreamIsTiDBGauge indicates whether the downstream of a + // MySQL-compatible sink is confirmed to be TiDB (1 means yes). + // + // This metric is only set when the sink can positively identify TiDB (for + // example by executing `SELECT tidb_version()`), and is intentionally absent + // for MySQL or unknown downstreams. Dashboards can use it to override the + // generic "mysql/tidb" label value with "tidb" for running changefeeds. + ChangefeedDownstreamIsTiDBGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "changefeed_downstream_is_tidb", + Help: "Whether the downstream of a changefeed is confirmed to be TiDB (1 means yes).", + }, []string{getKeyspaceLabel(), "changefeed"}) ) func initChangefeedMetrics(registry *prometheus.Registry) { @@ -101,4 +129,6 @@ func initChangefeedMetrics(registry *prometheus.Registry) { registry.MustRegister(ChangefeedStatusGauge) registry.MustRegister(ChangefeedCheckpointTsLagGauge) registry.MustRegister(ChangefeedCheckpointTsGauge) + registry.MustRegister(ChangefeedDownstreamInfoGauge) + registry.MustRegister(ChangefeedDownstreamIsTiDBGauge) } diff --git a/pkg/metrics/downstream_type.go b/pkg/metrics/downstream_type.go new file mode 100644 index 0000000000..399337cbe6 --- /dev/null +++ b/pkg/metrics/downstream_type.go @@ -0,0 +1,69 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "net/url" + + "github.com/pingcap/ticdc/pkg/config" +) + +// DownstreamTypeFromSinkURI returns a stable downstream type label value for a +// changefeed sink-uri. +// +// The returned value is derived from the URI scheme and normalized to: +// - keep the number of label values small (e.g., "mysql+ssl" -> "mysql") +// - keep compatibility with future sink schemes (unknown schemes are returned as-is) +// - avoid leaking sensitive information (only the scheme is used) +// +// It returns "unknown" when the sink-uri cannot be parsed or has an empty scheme. +func DownstreamTypeFromSinkURI(sinkURI string) string { + parsed, err := url.Parse(sinkURI) + if err != nil { + return "unknown" + } + return downstreamTypeFromScheme(config.GetScheme(parsed)) +} + +func downstreamTypeFromScheme(scheme string) string { + switch scheme { + case config.MySQLScheme, config.MySQLSSLScheme: + // MySQL-compatible downstreams may be either MySQL or TiDB. We only + // expose a confirmed "tidb" label when the sink reports it is TiDB. + // Otherwise, use a combined label to avoid misleading users. + return "mysql/tidb" + case config.TiDBScheme, config.TiDBSSLScheme: + return config.TiDBScheme + case config.KafkaScheme, config.KafkaSSLScheme: + return config.KafkaScheme + case config.PulsarScheme, config.PulsarSSLScheme, config.PulsarHTTPScheme, config.PulsarHTTPSScheme: + return config.PulsarScheme + case config.FileScheme: + return config.FileScheme + case config.S3Scheme: + return config.S3Scheme + case config.GCSScheme, config.GSScheme: + return config.GCSScheme + case config.AzblobScheme, config.AzureScheme: + return config.AzblobScheme + case config.CloudStorageNoopScheme: + return config.CloudStorageNoopScheme + case config.BlackHoleScheme: + return config.BlackHoleScheme + case "": + return "unknown" + default: + return scheme + } +} diff --git a/pkg/metrics/downstream_type_test.go b/pkg/metrics/downstream_type_test.go new file mode 100644 index 0000000000..60372cafc4 --- /dev/null +++ b/pkg/metrics/downstream_type_test.go @@ -0,0 +1,64 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDownstreamTypeFromSinkURI(t *testing.T) { + // Scenario: Dashboards need a stable changefeed -> downstream type mapping, but + // should not depend on or expose the full sink-uri (which can contain secrets). + // Steps: Feed representative sink-uri values and verify the normalized type. + testCases := []struct { + sinkURI string + expectedLabel string + }{ + {sinkURI: "mysql://127.0.0.1:3306/", expectedLabel: "mysql/tidb"}, + {sinkURI: "mysql+ssl://127.0.0.1:3306/", expectedLabel: "mysql/tidb"}, + {sinkURI: "tidb://127.0.0.1:4000/", expectedLabel: "tidb"}, + {sinkURI: "tidb+ssl://127.0.0.1:4000/", expectedLabel: "tidb"}, + {sinkURI: "kafka://127.0.0.1:9092/topic", expectedLabel: "kafka"}, + {sinkURI: "kafka+ssl://127.0.0.1:9092/topic", expectedLabel: "kafka"}, + {sinkURI: "pulsar://127.0.0.1:6650/topic", expectedLabel: "pulsar"}, + {sinkURI: "pulsar+ssl://127.0.0.1:6651/topic", expectedLabel: "pulsar"}, + {sinkURI: "pulsar+http://127.0.0.1:8080/topic", expectedLabel: "pulsar"}, + {sinkURI: "pulsar+https://127.0.0.1:8443/topic", expectedLabel: "pulsar"}, + {sinkURI: "file:///tmp/ticdc", expectedLabel: "file"}, + {sinkURI: "s3://bucket/prefix", expectedLabel: "s3"}, + {sinkURI: "gs://bucket/prefix", expectedLabel: "gcs"}, + {sinkURI: "gcs://bucket/prefix", expectedLabel: "gcs"}, + {sinkURI: "azure://bucket/prefix", expectedLabel: "azblob"}, + {sinkURI: "azblob://bucket/prefix", expectedLabel: "azblob"}, + {sinkURI: "noop://bucket/prefix", expectedLabel: "noop"}, + {sinkURI: "blackhole:///", expectedLabel: "blackhole"}, + {sinkURI: "127.0.0.1:3306", expectedLabel: "unknown"}, + {sinkURI: "mysql://[::1", expectedLabel: "unknown"}, + {sinkURI: "", expectedLabel: "unknown"}, + } + + for _, tc := range testCases { + tc := tc + testName := tc.sinkURI + if testName == "" { + testName = "" + } + t.Run(testName, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.expectedLabel, DownstreamTypeFromSinkURI(tc.sinkURI)) + }) + } +}