Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions coordinator/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,30 @@ func NewChangefeed(cfID common.ChangeFeedID,
return res
}

// GetInfo returns the latest ChangeFeedInfo stored in memory.
//
// It may return nil if the changefeed hasn't been fully initialized.
func (c *Changefeed) GetInfo() *config.ChangeFeedInfo {
if c == nil || c.info == nil {
return nil
}
return c.info.Load()
}

// SetInfo updates the in-memory ChangeFeedInfo for the changefeed.
//
// It lazily initializes the internal pointer for uninitialized changefeeds
// (primarily used by unit tests).
//
// If the receiver is nil, it does nothing.
func (c *Changefeed) SetInfo(info *config.ChangeFeedInfo) {
if c == nil {
return
}
if c.info == nil {
c.info = atomic.NewPointer(info)
return
}
c.info.Store(info)
}

Expand Down
10 changes: 10 additions & 0 deletions coordinator/changefeed/changefeed_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ func (db *ChangefeedDB) StopByChangefeedID(cfID common.ChangeFeedID, remove bool
delete(db.changefeeds, cfID)
delete(db.changefeedDisplayNames, cf.ID.DisplayName)
delete(db.stopped, cfID)

info := cf.GetInfo()
if info != nil {
downstreamType := metrics.DownstreamTypeFromSinkURI(info.SinkURI)
metrics.ChangefeedDownstreamInfoGauge.DeleteLabelValues(
cfID.Keyspace(),
cfID.Name(),
downstreamType,
)
}
} else {
log.Info("stop changefeed", zap.String("changefeed", cfID.String()))
db.stopped[cfID] = cf
Expand Down
36 changes: 34 additions & 2 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ func NewController(
func (c *Controller) collectMetrics(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

// changefeedDownstreamTypeCache is used to cleanup the previous downstream type
// label value when a changefeed's sink-uri is updated.
changefeedDownstreamTypeCache := make(map[common.ChangeFeedDisplayName]string)
for {
select {
case <-ctx.Done():
Expand All @@ -197,10 +201,25 @@ func (c *Controller) collectMetrics(ctx context.Context) error {
metrics.ChangefeedStateGauge.WithLabelValues("Absent").Set(float64(c.changefeedDB.GetAbsentSize()))
metrics.ChangefeedStateGauge.WithLabelValues("Stopped").Set(float64(c.changefeedDB.GetStoppedSize()))

changefeedDownstreamTypes := make(map[common.ChangeFeedDisplayName]struct{})
c.changefeedDB.Foreach(func(cf *changefeed.Changefeed) {
info := cf.GetInfo()
keyspace := info.ChangefeedID.Keyspace()
name := info.ChangefeedID.Name()
if info == nil {
return
}

displayName := info.ChangefeedID.DisplayName
changefeedDownstreamTypes[displayName] = struct{}{}
keyspace := displayName.Keyspace
name := displayName.Name

downstreamType := metrics.DownstreamTypeFromSinkURI(info.SinkURI)
if oldType, ok := changefeedDownstreamTypeCache[displayName]; ok && oldType != downstreamType {
metrics.ChangefeedDownstreamInfoGauge.DeleteLabelValues(keyspace, name, oldType)
}
changefeedDownstreamTypeCache[displayName] = downstreamType
metrics.ChangefeedDownstreamInfoGauge.WithLabelValues(keyspace, name, downstreamType).Set(1)

metrics.ChangefeedStatusGauge.WithLabelValues(keyspace, name).Set(float64(info.State.ToInt()))

// don't update checkpoint ts and checkpoint ts lag for stopped changefeed
Expand All @@ -214,6 +233,19 @@ func (c *Controller) collectMetrics(ctx context.Context) error {
metrics.ChangefeedCheckpointTsGauge.WithLabelValues(keyspace, name).Set(float64(phyCkpTs))
metrics.ChangefeedCheckpointTsLagGauge.WithLabelValues(keyspace, name).Set(lag)
})

// Cleanup removed changefeeds, so dashboards won't show stale label values.
for displayName, downstreamType := range changefeedDownstreamTypeCache {
if _, ok := changefeedDownstreamTypes[displayName]; ok {
continue
}
metrics.ChangefeedDownstreamInfoGauge.DeleteLabelValues(
displayName.Keyspace,
displayName.Name,
downstreamType,
)
delete(changefeedDownstreamTypeCache, displayName)
}
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions downstreamadapter/sink/mysql/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ func New(
if err != nil {
return nil, err
}

// Expose whether the MySQL-compatible downstream is confirmed to be TiDB, so
// dashboards can display "tidb" when we can prove it. Otherwise, the
// scheme-based label remains "mysql/tidb".
keyspace := changefeedID.Keyspace()
name := changefeedID.Name()
if cfg.IsTiDB {
metrics.ChangefeedDownstreamIsTiDBGauge.WithLabelValues(keyspace, name).Set(1)
} else {
metrics.ChangefeedDownstreamIsTiDBGauge.DeleteLabelValues(keyspace, name)
}

return NewMySQLSink(ctx, changefeedID, cfg, db, config.BDRMode, config.EnableActiveActive, config.ActiveActiveProgressInterval), nil
}

Expand Down Expand Up @@ -409,4 +421,6 @@ func (s *Sink) Close(removeChangefeed bool) {
s.activeActiveSyncStatsCollector.Close()
}
s.statistics.Close()

metrics.ChangefeedDownstreamIsTiDBGauge.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name())
}
74 changes: 73 additions & 1 deletion metrics/grafana/ticdc_new_arch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4591,6 +4591,78 @@
"align": false,
"alignLevel": null
}
},
{
"datasource": "${DS_TEST-CLUSTER}",
"description": "Downstream type of each changefeed. For MySQL-compatible sinks, it shows \"mysql/tidb\" unless the downstream is confirmed to be TiDB.",
"fieldConfig": {
"defaults": {
"custom": {
"align": null,
"filterable": true
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 26
},
"id": 60029,
"options": {
"showHeader": true,
"sortBy": []
},
"pluginVersion": "7.5.17",
"targets": [
{
"expr": "label_replace(max by (namespace, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}), \"downstream_type\", \"tidb\", \"__name__\", \".*\") or (max by (namespace, changefeed, downstream_type) (ticdc_owner_changefeed_downstream_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}) unless on (namespace, changefeed) max by (namespace, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}))",
"format": "time_series",
"instant": true,
"refId": "A"
}
],
"title": "Changefeed Downstream Type",
"transformations": [
{
"id": "labelsToFields",
"options": {}
},
{
"id": "organize",
"options": {
"excludeByName": {
"Metric": true,
"Time": true,
"Value": true,
"__name__": true
},
"indexByName": {
"changefeed": 1,
"downstream_type": 2,
"namespace": 0
},
"renameByName": {}
}
}
],
"type": "table"
}
],
"title": "Changefeed",
Expand Down Expand Up @@ -25515,4 +25587,4 @@
"title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch",
"uid": "YiGL8hBZ0aac",
"version": 38
}
}
74 changes: 73 additions & 1 deletion metrics/nextgengrafana/ticdc_new_arch_next_gen.json
Original file line number Diff line number Diff line change
Expand Up @@ -4591,6 +4591,78 @@
"align": false,
"alignLevel": null
}
},
{
"datasource": "${DS_TEST-CLUSTER}",
"description": "Downstream type of each changefeed. For MySQL-compatible sinks, it shows \"mysql/tidb\" unless the downstream is confirmed to be TiDB.",
"fieldConfig": {
"defaults": {
"custom": {
"align": null,
"filterable": true
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 26
},
"id": 60029,
"options": {
"showHeader": true,
"sortBy": []
},
"pluginVersion": "7.5.17",
"targets": [
{
"expr": "label_replace(max by (keyspace_name, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}), \"downstream_type\", \"tidb\", \"__name__\", \".*\") or (max by (keyspace_name, changefeed, downstream_type) (ticdc_owner_changefeed_downstream_info{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}) unless on (keyspace_name, changefeed) max by (keyspace_name, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}))",
"format": "time_series",
"instant": true,
"refId": "A"
}
],
"title": "Changefeed Downstream Type",
"transformations": [
{
"id": "labelsToFields",
"options": {}
},
{
"id": "organize",
"options": {
"excludeByName": {
"Metric": true,
"Time": true,
"Value": true,
"__name__": true
},
"indexByName": {
"changefeed": 1,
"downstream_type": 2,
"keyspace_name": 0
},
"renameByName": {}
}
}
],
"type": "table"
}
],
"title": "Changefeed",
Expand Down Expand Up @@ -25515,4 +25587,4 @@
"title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch",
"uid": "YiGL8hBZ0aac",
"version": 38
}
}
72 changes: 72 additions & 0 deletions metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json
Original file line number Diff line number Diff line change
Expand Up @@ -2334,6 +2334,78 @@
"align": false,
"alignLevel": null
}
},
{
"datasource": "${DS_TEST-CLUSTER}",
"description": "Downstream type of each changefeed. For MySQL-compatible sinks, it shows \"mysql/tidb\" unless the downstream is confirmed to be TiDB.",
"fieldConfig": {
"defaults": {
"custom": {
"align": null,
"filterable": true
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 26
},
"id": 60029,
"options": {
"showHeader": true,
"sortBy": []
},
"pluginVersion": "7.5.17",
"targets": [
{
"expr": "label_replace(max by (keyspace_name, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}), \"downstream_type\", \"tidb\", \"__name__\", \".*\") or (max by (keyspace_name, changefeed, downstream_type) (ticdc_owner_changefeed_downstream_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}) unless on (keyspace_name, changefeed) max by (keyspace_name, changefeed) (ticdc_sink_changefeed_downstream_is_tidb{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\"}))",
"format": "time_series",
"instant": true,
"refId": "A"
}
],
"title": "Changefeed Downstream Type",
"transformations": [
{
"id": "labelsToFields",
"options": {}
},
{
"id": "organize",
"options": {
"excludeByName": {
"Metric": true,
"Time": true,
"Value": true,
"__name__": true
},
"indexByName": {
"changefeed": 1,
"downstream_type": 2,
"keyspace_name": 0
},
"renameByName": {}
}
}
],
"type": "table"
}
],
"title": "Changefeed",
Expand Down
Loading