Reactive configuration synchronization for Go.
Watch external sources, validate changes, and apply them safely with automatic rollback on failure.
A capacitor watches a source, validates incoming data, and calls you back when it changes.
type Config struct {
Port int `json:"port"`
Host string `json:"host"`
}
func (c Config) Validate() error {
if c.Port < 1 || c.Port > 65535 {
return errors.New("invalid port")
}
return nil
}
capacitor := flux.New[Config](
file.New("/etc/app/config.json"),
func(ctx context.Context, prev, curr Config) error {
log.Printf("Port changed: %d -> %d", prev.Port, curr.Port)
return reconfigureServer(curr)
},
)Start once, react forever. Invalid config is rejected, previous valid config is retained.
if err := capacitor.Start(ctx); err != nil {
log.Fatal("Initial load failed:", err)
}
// State machine tracks health
capacitor.State() // Loading -> Healthy -> Degraded -> Empty
// Current config always available
if cfg, ok := capacitor.Current(); ok {
server.Listen(cfg.Host, cfg.Port)
}
// Last error for observability
if err := capacitor.LastError(); err != nil {
metrics.RecordConfigError(err)
}Multiple sources? Compose them with a reducer.
capacitor := flux.Compose[Config](
func(ctx context.Context, prev, curr []Config) (Config, error) {
// Merge defaults, file config, and environment overrides
return mergeConfigs(curr[0], curr[1], curr[2]), nil
},
file.New("/etc/app/defaults.json"),
file.New("/etc/app/config.json"),
env.New("APP_"),
)Same guarantees: validate first, reject invalid, retain previous, call back with valid.
go get github.com/zoobz-io/fluxRequires Go 1.24+.
package main
import (
"context"
"errors"
"log"
"github.com/zoobz-io/flux"
"github.com/zoobz-io/flux/file"
)
type Config struct {
Port int `json:"port"`
Host string `json:"host"`
}
func (c Config) Validate() error {
if c.Port < 1 || c.Port > 65535 {
return errors.New("port must be between 1 and 65535")
}
if c.Host == "" {
return errors.New("host is required")
}
return nil
}
func main() {
ctx := context.Background()
capacitor := flux.New[Config](
file.New("/etc/myapp/config.json"),
func(ctx context.Context, prev, curr Config) error {
log.Printf("Config updated: %+v", curr)
return nil
},
)
if err := capacitor.Start(ctx); err != nil {
log.Fatalf("Initial load failed: %v", err)
}
log.Printf("State: %s", capacitor.State())
if cfg, ok := capacitor.Current(); ok {
log.Printf("Listening on %s:%d", cfg.Host, cfg.Port)
}
// Capacitor watches in background until context is cancelled
<-ctx.Done()
}| Feature | Description | Docs |
|---|---|---|
| State Machine | Loading, Healthy, Degraded, Empty with clear transitions | Concepts |
| Multi-Source Composition | Merge configs from multiple watchers with custom reducers | Multi-Source |
| Pluggable Providers | File, Redis, Consul, etcd, NATS, Kubernetes, ZooKeeper, Firestore | Providers |
| Validation Pipeline | Type-safe validation with automatic rejection and rollback | Architecture |
| Debouncing | Configurable delay to batch rapid changes | Best Practices |
| Signal Observability | State changes and errors via capitan | Fields |
| Testing Utilities | Sync mode and channel watchers for deterministic tests | Testing |
- Safe by default — Invalid config rejected, previous retained, callback only sees valid data
- Four-state machine — Loading, Healthy, Degraded, Empty with clear transitions
- Multi-source composition — Merge configs from files, Redis, Kubernetes, environment
- Pluggable providers — File, Redis, Consul, etcd, NATS, Kubernetes, ZooKeeper, Firestore
- Observable — capitan signals for state changes and failures
- Testable — Sync mode and channel watchers for deterministic tests
Flux enables a pattern: define once, update anywhere, validate always.
Your configuration lives in external sources — files, Redis, Kubernetes ConfigMaps. Flux watches, validates, and delivers changes. Your application just reacts.
// In your infrastructure
configMap := &corev1.ConfigMap{
Data: map[string]string{
"config": `{"port": 8080, "host": "0.0.0.0"}`,
},
}
// In your application
capacitor := flux.New[Config](
kubernetes.New(client, "default", "app-config", "config"),
func(ctx context.Context, prev, curr Config) error {
return server.Reconfigure(curr)
},
)Update the ConfigMap, flux delivers the change. Invalid update? Rejected. Previous config retained. Your application never sees bad data.
- Overview — Design philosophy and architecture
- Quickstart — Get started in minutes
- Core Concepts — Capacitor, watchers, state machine, validation
- Architecture — Processing pipeline, debouncing, error handling
- Testing — Sync mode, channel watchers, deterministic tests
- Providers — Configuring file, Redis, Kubernetes, and other watchers
- State Management — State transitions, error recovery, circuit breakers
- Best Practices — Validation design, graceful degradation, observability
- File Config — Hot-reloading configuration files
- Multi-Source — Merging defaults, files, and environment
- Custom Watcher — Building watchers for custom sources
- API Reference — Complete function and type documentation
- Fields Reference — Capitan signal fields
- Providers Reference — All provider packages and options
See CONTRIBUTING.md for guidelines. Run make help for available commands.
MIT License — see LICENSE for details.