Add Kafka topic pattern matching and metadata-based dynamic index naming#85
Draft
Copilot wants to merge 5 commits into
Draft
Add Kafka topic pattern matching and metadata-based dynamic index naming#85Copilot wants to merge 5 commits into
Copilot wants to merge 5 commits into
Conversation
Co-authored-by: kevwan <1918356+kevwan@users.noreply.github.com>
Co-authored-by: kevwan <1918356+kevwan@users.noreply.github.com>
Co-authored-by: kevwan <1918356+kevwan@users.noreply.github.com>
Co-authored-by: kevwan <1918356+kevwan@users.noreply.github.com>
Copilot
AI
changed the title
[WIP] Add support for topics_pattern in go-stash
Add Kafka topic pattern matching and metadata-based dynamic index naming
Nov 6, 2025
|
这个功能还没集成到最新的版本中么 |
There was a problem hiding this comment.
Pull request overview
This PR adds Logstash-like Kafka topic pattern discovery (TopicsPattern) and enables dynamic Elasticsearch index naming using nested metadata placeholders (e.g., {.@metadata.kafka.topic}), so indices can be derived from the Kafka topic at ingest time.
Changes:
- Added
KafkaConf.TopicsPatternand implemented Kafka topic discovery via regex (FetchMatchingTopics). - Injected Kafka topic into message metadata (
@metadata.kafka.topic) via a topic-aware handler. - Extended index template resolution to support nested fields via dot notation, with new test coverage.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| stash/stash.go | Fetches topics via pattern matching and instantiates per-topic handlers for metadata injection. |
| stash/handler/handler.go | Adds topic-aware handler and injects @metadata.kafka.topic. |
| stash/etc/config.yaml | Documents TopicsPattern and nested metadata index placeholder usage. |
| stash/es/index.go | Adds nested (dot-notation) field lookup for index template formatting. |
| stash/es/index_test.go | Adds tests for nested metadata placeholders and getNestedValue. |
| stash/config/topics.go | Implements regex-based topic discovery via broker metadata. |
| stash/config/topics_test.go | Adds unit tests for explicit topics, empty pattern, and invalid regex. |
| stash/config/config.go | Adds TopicsPattern to KafkaConf. |
| readme.md | Documents TopicsPattern and {.@metadata.kafka.topic} index formatting. |
| readme-cn.md | Chinese documentation for topic pattern and dynamic index naming. |
| go.mod | Promotes segmentio/kafka-go to a direct dependency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+25
to
+37
| // Try connecting to each broker until one succeeds | ||
| var conn *kafka.Conn | ||
| var lastErr error | ||
| for _, broker := range c.Brokers { | ||
| conn, err = kafka.DialContext(context.Background(), "tcp", broker) | ||
| if err == nil { | ||
| break | ||
| } | ||
| lastErr = err | ||
| } | ||
| if conn == nil { | ||
| return nil, lastErr | ||
| } |
Comment on lines
+28
to
+32
| for _, broker := range c.Brokers { | ||
| conn, err = kafka.DialContext(context.Background(), "tcp", broker) | ||
| if err == nil { | ||
| break | ||
| } |
Comment on lines
+28
to
+34
| for _, broker := range c.Brokers { | ||
| conn, err = kafka.DialContext(context.Background(), "tcp", broker) | ||
| if err == nil { | ||
| break | ||
| } | ||
| lastErr = err | ||
| } |
| topics = append(topics, topic) | ||
| } | ||
|
|
||
| logx.Infof("Matched %d topics with pattern '%s': %v", len(topics), c.TopicsPattern, topics) |
Comment on lines
+83
to
88
| for _, k := range kqConfs { | ||
| // Create a handler with topic information for metadata injection | ||
| handle := handler.NewHandlerWithTopic(writer, indexer, k.Topic) | ||
| handle.AddFilters(filters...) | ||
| handle.AddFilters(filter.AddUriFieldFilter("url", "uri")) | ||
| group.Add(kq.MustNewQueue(k, handle)) |
|
下载编译后,用TopicsPattern: 配置,提示: [type mismatch for field "Clusters[0].Input.Kafka.Topics"](error: config file etc/test.yaml, type mismatch for field "Clusters[0].Input.Kafka.Topics") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Implements regex-based topic discovery and dynamic index naming using Kafka metadata, similar to Logstash's
topics_patternand%{[@metadata][kafka][topic]}functionality.Changes
Configuration (
stash/config/)TopicsPatternfield toKafkaConffor regex-based topic matchingFetchMatchingTopics()with broker failover to discover topics dynamicallyMetadata Injection (
stash/handler/)MessageHandlerto inject Kafka topic name into@metadata.kafka.topicensureMetadataStructure()helper for cleaner metadata initializationIndex Formatting (
stash/es/)getNestedValue()to support dot-notation field access (e.g.,@metadata.kafka.topic)buildIndexFormatter()to resolve nested fields in index templatesMain (
stash/stash.go)toKqConf()to fetch topics via pattern matchingUsage
Topic pattern matching:
Dynamic index naming:
Both features work independently or together. Existing explicit
Topicsconfigurations remain fully supported.Original prompt
✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.