Dior is a high-performance middleware pressure testing and data migration tool written in Go, supporting Kafka and NSQ.
Its design is inspired by Flume, with core concepts including:
- Source: Responsible for reading data.
- Channel: Buffers data between Source and Sink.
- Sink: Responsible for writing data.
- 📨 KafkaSource: Consume data from Kafka.
- 📬 NSQSource: Consume data from NSQ.
- ⚡ PressSource: Pressure testing source. Reads a specified file (line by line) and emits data to Sink at a specified rate.
- 📨 KafkaSink: Write data to Kafka.
- 📬 NSQSink: Write data to NSQ.
- 💾 FileSink: Write data to local file.
- 🗑️ NilSink: Empty Sink, used for testing Source performance or discarding data.
- Go 1.20+
- GNU Make 3.81+
makeAfter compilation, binary files will be generated in the build/ directory.
Dior supports cross-compilation to different platforms and architectures. The Makefile automatically detects your current system, but you can override it:
| Target Platform | Command |
|---|---|
| Linux AMD64 | make GOOS=linux GOARCH=amd64 |
| Linux ARM64 | make GOOS=linux GOARCH=arm64 |
| Windows AMD64 | make GOOS=windows GOARCH=amd64 |
| macOS ARM64 | make GOOS=darwin GOARCH=arm64 |
| macOS AMD64 | make GOOS=darwin GOARCH=amd64 |
注意:为了确保交叉编译正常工作,请确保在命令行中直接设置环境变量,而不是在 Makefile 中修改。Makefile 会自动检测并应用这些环境变量。
The build system automatically adds .exe extension for Windows targets.
To see the current build configuration, use:
make show-config示例:
# 编译 Linux AMD64 版本
make GOOS=linux GOARCH=amd64
# 编译 Windows AMD64 版本
make GOOS=windows GOARCH=amd64
# 编译 macOS ARM64 版本
make GOOS=darwin GOARCH=arm64Dior can be configured via command-line arguments or environment variables.
| Flag | Default | Description |
|---|---|---|
--version |
false | Show dior version |
--log-level |
info | Log verbosity: debug, info, warn, error, fatal |
--log-prefix |
[dior] | Log message prefix |
--chan-size |
100 | Size of queue between source and sink (0 = non-blocking) |
| Flag | Default | Description |
|---|---|---|
--src |
- | Source type: nsq, kafka, press |
--src-topic |
- | Source topic (for NSQ/Kafka) |
--src-channel |
- | Source channel (for NSQ) |
--src-group |
- | Consumer group (for Kafka) |
--src-bootstrap-servers |
- | Kafka brokers (comma-separated) |
--src-lookupd-http-addresses |
- | NSQ Lookupd HTTP addresses (comma-separated) |
--src-nsqd-tcp-addresses |
- | NSQD addresses (comma-separated) |
--src-speed |
10 | Messages per second (for press, 0 = unlimited) |
--src-file |
- | Data file path (for press) |
--src-scanner-buf-size-mb |
1 | Scanner buffer size in MB (for press) |
| Flag | Default | Description |
|---|---|---|
--dst |
- | Destination type: nsq, kafka, file, nil |
--dst-topic |
- | Destination topic (for NSQ/Kafka) |
--dst-bootstrap-servers |
- | Kafka brokers (comma-separated) |
--dst-nsqd-tcp-addresses |
- | NSQD TCP addresses (comma-separated) |
--dst-lookupd-http-addresses |
- | NSQ Lookupd HTTP addresses (comma-separated) |
--dst-file |
- | Output file path (for file sink) |
--dst-buf-size-byte |
4096 | Write buffer size in bytes (for file sink) |
All command-line options can also be set via environment variables (use lowercase with hyphens replaced by underscores):
# Example: Set source configuration via environment variables
export src=kafka
export src-bootstrap-servers=127.0.0.1:9092
export src-topic=my-topic
export src-group=my-group
# Example: Set destination configuration via environment variables
export dst=file
export dst-file=output.txtDior is configured via command-line arguments. Here are some common usage scenarios:
Read local file source.txt and send to Kafka at a rate of 10 messages per second.
./build/dior \
--src press \
--src-file source.txt \
--src-speed 10 \
--dst kafka \
--dst-bootstrap-servers 127.0.0.1:9092 \
--dst-topic topic_to./build/dior \
--src press \
--src-file source.txt \
--src-speed 10 \
--dst nsq \
--dst-nsqd-tcp-addresses 127.0.0.1:4150 \
--dst-topic topic_toTest source performance without actual write operations:
./build/dior \
--src press \
--src-file source.txt \
--src-speed 100 \
--dst nil./build/dior \
--src kafka \
--src-bootstrap-servers 127.0.0.1:9092 \
--src-topic topic_from \
--src-group benson \
--dst kafka \
--dst-bootstrap-servers 127.0.0.1:9092 \
--dst-topic topic_to./build/dior \
--src nsq \
--src-lookupd-http-addresses 127.0.0.1:4161 \
--src-topic topic_from \
--src-channel benson \
--dst nsq \
--dst-nsqd-tcp-addresses 127.0.0.1:4150 \
--dst-topic topic_to./build/dior \
--src kafka \
--src-bootstrap-servers 127.0.0.1:9092 \
--src-topic topic_from \
--src-group benson \
--dst nsq \
--dst-nsqd-tcp-addresses 127.0.0.1:4150 \
--dst-topic topic_to./build/dior \
--src nsq \
--src-lookupd-http-addresses 127.0.0.1:4161 \
--src-topic topic_from \
--src-channel benson \
--dst kafka \
--dst-bootstrap-servers 127.0.0.1:9092 \
--dst-topic topic_to./build/dior \
--src kafka \
--src-bootstrap-servers 127.0.0.1:9092 \
--src-topic topic_from \
--src-group benson \
--dst file \
--dst-file sink.txt./build/dior \
--src nsq \
--src-lookupd-http-addresses 127.0.0.1:4161 \
--src-topic topic_from \
--src-channel benson \
--dst file \
--dst-file sink.txt# Run all tests with verbose output and race detection
make test
# Or run directly
go test -v -race -cover ./...make cleanmake install
# Installs to /usr/local/bin by default
# Use DESTDIR for custom installation:
make install DESTDIR=/custom/pathThe application follows a modular architecture:
- cmd/: Application entry points.
- component/: Defines core interfaces and the Controller which manages the lifecycle of Source and Sink components.
- internal/: Contains specific implementations of Sources and Sinks, as well as internal utilities like logging and caching.
- option/: Handles configuration parsing and validation.
For detailed architecture documentation, see docs/architecture.md.
The Controller manages the lifecycle of Source and Sink components:
- Creates and manages data transfer channel
- Coordinates startup and shutdown of components
- Handles system signals for graceful shutdown
- Ensures data integrity and proper resource cleanup
The Asynchronizer provides asynchronous processing capabilities:
- Manages data channel reading
- Provides error handling and statistics
- Supports graceful shutdown
The Component interface defines the contract for all sources and sinks:
Init(channel chan []byte): Initialize with data channelStart(ctx context.Context): Start processingStop(): Stop and cleanup
Dior implements a 5-phase graceful shutdown process:
- Stop Source (stop producing data)
- Wait for Source goroutines to exit
- Close Channel (signal Sink no more data)
- Wait for Sink to drain remaining data
- Stop Sink (release resources)
- Panic recovery in all goroutines
- Exponential backoff retry for Kafka operations
- Error counting and statistics
- Configurable error handling callbacks
- Atomic operations for state management
- Read-write mutex for state access
- WaitGroup for goroutine coordination
- github.com/IBM/sarama v1.41.0 - Kafka client library
- github.com/nsqio/go-nsq v1.1.0 - NSQ client library
MIT License
Contributions are welcome! Please feel free to submit a Pull Request.