Skip to content

Add Kafka topic pattern matching and metadata-based dynamic index naming#85

Draft
Copilot wants to merge 5 commits into
masterfrom
copilot/add-topics-pattern-support
Draft

Add Kafka topic pattern matching and metadata-based dynamic index naming#85
Copilot wants to merge 5 commits into
masterfrom
copilot/add-topics-pattern-support

Conversation

Copilot AI commented Nov 6, 2025

Copy link
Copy Markdown
Contributor

Implements regex-based topic discovery and dynamic index naming using Kafka metadata, similar to Logstash's topics_pattern and %{[@metadata][kafka][topic]} functionality.

Changes

Configuration (stash/config/)

  • Added TopicsPattern field to KafkaConf for regex-based topic matching
  • Implemented FetchMatchingTopics() with broker failover to discover topics dynamically

Metadata Injection (stash/handler/)

  • Enhanced MessageHandler to inject Kafka topic name into @metadata.kafka.topic
  • Extracted ensureMetadataStructure() helper for cleaner metadata initialization

Index Formatting (stash/es/)

  • Added getNestedValue() to support dot-notation field access (e.g., @metadata.kafka.topic)
  • Updated buildIndexFormatter() to resolve nested fields in index templates

Main (stash/stash.go)

  • Modified toKqConf() to fetch topics via pattern matching
  • Updated handler initialization to pass topic context per consumer

Usage

Topic pattern matching:

Input:
  Kafka:
    TopicsPattern: "^app-.*-logs$"  # Matches app-service1-logs, app-service2-logs, etc.

Dynamic index naming:

Output:
  ElasticSearch:
    Index: "{.@metadata.kafka.topic}-{{yyyy-MM-dd}}"  # Creates my-topic-2024-01-15

Both features work independently or together. Existing explicit Topics configurations remain fully supported.

Original prompt

This section details on the original issue you should resolve

<issue_title>能实现像logstash中topics_pattern功能么?</issue_title>
<issue_description>目前go-stash只支持配置单个topic,如果有上千个topic,需要一个一个配置还是可以支持正则?
另外,output也可以实现像logstash中的index => "%{[@metadata][kafka][topic]}-%{+YYYY-MM-dd}" 类似功能么?</issue_description>

Comments on the Issue (you are @copilot in this section)


✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

Copilot AI and others added 4 commits November 6, 2025 07:18
Co-authored-by: kevwan <1918356+kevwan@users.noreply.github.com>
Co-authored-by: kevwan <1918356+kevwan@users.noreply.github.com>
Co-authored-by: kevwan <1918356+kevwan@users.noreply.github.com>
Co-authored-by: kevwan <1918356+kevwan@users.noreply.github.com>
Copilot AI changed the title [WIP] Add support for topics_pattern in go-stash Add Kafka topic pattern matching and metadata-based dynamic index naming Nov 6, 2025
Copilot AI requested a review from kevwan November 6, 2025 07:26
@drings-liu

Copy link
Copy Markdown

这个功能还没集成到最新的版本中么

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds Logstash-like Kafka topic pattern discovery (TopicsPattern) and enables dynamic Elasticsearch index naming using nested metadata placeholders (e.g., {.@metadata.kafka.topic}), so indices can be derived from the Kafka topic at ingest time.

Changes:

  • Added KafkaConf.TopicsPattern and implemented Kafka topic discovery via regex (FetchMatchingTopics).
  • Injected Kafka topic into message metadata (@metadata.kafka.topic) via a topic-aware handler.
  • Extended index template resolution to support nested fields via dot notation, with new test coverage.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
stash/stash.go Fetches topics via pattern matching and instantiates per-topic handlers for metadata injection.
stash/handler/handler.go Adds topic-aware handler and injects @metadata.kafka.topic.
stash/etc/config.yaml Documents TopicsPattern and nested metadata index placeholder usage.
stash/es/index.go Adds nested (dot-notation) field lookup for index template formatting.
stash/es/index_test.go Adds tests for nested metadata placeholders and getNestedValue.
stash/config/topics.go Implements regex-based topic discovery via broker metadata.
stash/config/topics_test.go Adds unit tests for explicit topics, empty pattern, and invalid regex.
stash/config/config.go Adds TopicsPattern to KafkaConf.
readme.md Documents TopicsPattern and {.@metadata.kafka.topic} index formatting.
readme-cn.md Chinese documentation for topic pattern and dynamic index naming.
go.mod Promotes segmentio/kafka-go to a direct dependency.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread stash/config/topics.go
Comment on lines +25 to +37
// Try connecting to each broker until one succeeds
var conn *kafka.Conn
var lastErr error
for _, broker := range c.Brokers {
conn, err = kafka.DialContext(context.Background(), "tcp", broker)
if err == nil {
break
}
lastErr = err
}
if conn == nil {
return nil, lastErr
}
Comment thread stash/config/topics.go
Comment on lines +28 to +32
for _, broker := range c.Brokers {
conn, err = kafka.DialContext(context.Background(), "tcp", broker)
if err == nil {
break
}
Comment thread stash/config/topics.go
Comment on lines +28 to +34
for _, broker := range c.Brokers {
conn, err = kafka.DialContext(context.Background(), "tcp", broker)
if err == nil {
break
}
lastErr = err
}
Comment thread stash/config/topics.go
topics = append(topics, topic)
}

logx.Infof("Matched %d topics with pattern '%s': %v", len(topics), c.TopicsPattern, topics)
Comment thread stash/stash.go
Comment on lines +83 to 88
for _, k := range kqConfs {
// Create a handler with topic information for metadata injection
handle := handler.NewHandlerWithTopic(writer, indexer, k.Topic)
handle.AddFilters(filters...)
handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
group.Add(kq.MustNewQueue(k, handle))
@drings-liu

Copy link
Copy Markdown

下载编译后,用TopicsPattern: 配置,提示: [type mismatch for field "Clusters[0].Input.Kafka.Topics"](error: config file etc/test.yaml, type mismatch for field "Clusters[0].Input.Kafka.Topics")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

能实现像logstash中topics_pattern功能么?

4 participants