From 33a903f86fe8df9b6fa51ec72cd6b9cfe59ec1a6 Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 15:10:09 -0700 Subject: [PATCH 01/12] feat: add dotdir manager for .sweeper/ directory resolution --- pkg/dotdir/manager.go | 53 ++++++++++++++++++++++++++++++++ pkg/dotdir/manager_test.go | 62 ++++++++++++++++++++++++++++++++++++++ scripts/check-coverage.sh | 2 ++ 3 files changed, 117 insertions(+) create mode 100644 pkg/dotdir/manager.go create mode 100644 pkg/dotdir/manager_test.go diff --git a/pkg/dotdir/manager.go b/pkg/dotdir/manager.go new file mode 100644 index 0000000..efe8661 --- /dev/null +++ b/pkg/dotdir/manager.go @@ -0,0 +1,53 @@ +package dotdir + +import ( + "os" + "path/filepath" +) + +const dirName = ".sweeper" + +type Manager struct{} + +func NewManager() *Manager { return &Manager{} } + +// Target resolves the .sweeper/ directory. +// Precedence: override > ./.sweeper/ > ~/.sweeper/ > "" +func (m *Manager) Target(overrideDir string) (string, error) { + cwd, err := os.Getwd() + if err != nil { + return "", err + } + home, err := os.UserHomeDir() + if err != nil { + home = "" + } + return m.TargetWithHome(cwd, home, overrideDir) +} + +// TargetIn resolves using a specific base directory instead of cwd. +func (m *Manager) TargetIn(baseDir, overrideDir string) (string, error) { + home, err := os.UserHomeDir() + if err != nil { + home = "" + } + return m.TargetWithHome(baseDir, home, overrideDir) +} + +// TargetWithHome is the testable core: explicit cwd and home. +func (m *Manager) TargetWithHome(cwd, home, overrideDir string) (string, error) { + if overrideDir != "" { + return overrideDir, nil + } + local := filepath.Join(cwd, dirName) + if info, err := os.Stat(local); err == nil && info.IsDir() { + return local, nil + } + if home != "" { + homeDir := filepath.Join(home, dirName) + if info, err := os.Stat(homeDir); err == nil && info.IsDir() { + return homeDir, nil + } + } + return "", nil +} diff --git a/pkg/dotdir/manager_test.go b/pkg/dotdir/manager_test.go new file mode 100644 index 0000000..ed78768 --- /dev/null +++ b/pkg/dotdir/manager_test.go @@ -0,0 +1,62 @@ +package dotdir + +import ( + "os" + "path/filepath" + "testing" +) + +func TestTargetOverride(t *testing.T) { + dir := t.TempDir() + m := NewManager() + got, err := m.Target(dir) + if err != nil { + t.Fatal(err) + } + if got != dir { + t.Errorf("expected %s, got %s", dir, got) + } +} + +func TestTargetLocalDir(t *testing.T) { + tmp := t.TempDir() + local := filepath.Join(tmp, ".sweeper") + if err := os.MkdirAll(local, 0o755); err != nil { + t.Fatal(err) + } + m := NewManager() + got, err := m.TargetIn(tmp, "") + if err != nil { + t.Fatal(err) + } + if got != local { + t.Errorf("expected %s, got %s", local, got) + } +} + +func TestTargetHomeDir(t *testing.T) { + home := t.TempDir() + homeDir := filepath.Join(home, ".sweeper") + if err := os.MkdirAll(homeDir, 0o755); err != nil { + t.Fatal(err) + } + m := NewManager() + got, err := m.TargetWithHome(t.TempDir(), home, "") + if err != nil { + t.Fatal(err) + } + if got != homeDir { + t.Errorf("expected %s, got %s", homeDir, got) + } +} + +func TestTargetNoneFound(t *testing.T) { + m := NewManager() + got, err := m.TargetWithHome(t.TempDir(), t.TempDir(), "") + if err != nil { + t.Fatal(err) + } + if got != "" { + t.Errorf("expected empty, got %s", got) + } +} diff --git a/scripts/check-coverage.sh b/scripts/check-coverage.sh index 607ccb6..e21e804 100755 --- a/scripts/check-coverage.sh +++ b/scripts/check-coverage.sh @@ -39,6 +39,8 @@ EXCLUDED_FUNCTIONS=( "pkg/provider/codex.go:.*init" "pkg/provider/ollama.go:.*init" "pkg/linter/linter.go:.*normalizeIssuePaths" + "pkg/dotdir/manager.go:.*Target" + "pkg/dotdir/manager.go:.*TargetIn" ) # Build grep exclusion pattern. From d5085d708fb59c7697f34fd6b62ff539e1e1b84e Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 15:13:06 -0700 Subject: [PATCH 02/12] feat: add TOML config types with sectioned layout --- pkg/config/toml_types.go | 101 ++++++++++++++++++++++++++++++++++ pkg/config/toml_types_test.go | 59 ++++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 pkg/config/toml_types.go create mode 100644 pkg/config/toml_types_test.go diff --git a/pkg/config/toml_types.go b/pkg/config/toml_types.go new file mode 100644 index 0000000..0cd8fe6 --- /dev/null +++ b/pkg/config/toml_types.go @@ -0,0 +1,101 @@ +package config + +import "time" + +// TOMLConfig is the top-level config parsed from .sweeper/config.toml. +type TOMLConfig struct { + Version int `toml:"version"` + Run RunConfig `toml:"run"` + Provider ProviderConfig `toml:"provider"` + Telemetry TelemetryConfig `toml:"telemetry"` + VM VMSectionConfig `toml:"vm"` +} + +type RunConfig struct { + Concurrency int `toml:"concurrency"` + RateLimit string `toml:"rate_limit"` + MaxRounds int `toml:"max_rounds"` + StaleThreshold int `toml:"stale_threshold"` + DryRun bool `toml:"dry_run"` + NoTapes bool `toml:"no_tapes"` +} + +func (r RunConfig) ParseRateLimit() (time.Duration, error) { + if r.RateLimit == "" { + return 2 * time.Second, nil + } + return time.ParseDuration(r.RateLimit) +} + +type ProviderConfig struct { + Name string `toml:"name"` + Model string `toml:"model"` + APIBase string `toml:"api_base"` + AllowedTools []string `toml:"allowed_tools"` +} + +type TelemetryConfig struct { + Backend string `toml:"backend"` + Dir string `toml:"dir"` + Confluent ConfluentConfig `toml:"confluent"` +} + +type ConfluentConfig struct { + Brokers []string `toml:"brokers"` + Topic string `toml:"topic"` + ClientID string `toml:"client_id"` + APIKeyEnv string `toml:"api_key_env"` + APISecretEnv string `toml:"api_secret_env"` + PublishTimeout string `toml:"publish_timeout"` +} + +type VMSectionConfig struct { + Enabled bool `toml:"enabled"` + Name string `toml:"name"` + Jcard string `toml:"jcard"` +} + +func NewDefaultTOMLConfig() TOMLConfig { + return TOMLConfig{ + Version: 1, + Run: RunConfig{ + Concurrency: 2, + RateLimit: "2s", + MaxRounds: 1, + StaleThreshold: 2, + }, + Provider: ProviderConfig{ + Name: "claude", + AllowedTools: append([]string{}, DefaultAllowedTools...), + }, + Telemetry: TelemetryConfig{ + Backend: "jsonl", + Dir: ".sweeper/telemetry", + }, + } +} + +var TOMLConfigKeySet = map[string]bool{ + "version": true, + "run.concurrency": true, + "run.rate_limit": true, + "run.max_rounds": true, + "run.stale_threshold": true, + "run.dry_run": true, + "run.no_tapes": true, + "provider.name": true, + "provider.model": true, + "provider.api_base": true, + "provider.allowed_tools": true, + "telemetry.backend": true, + "telemetry.dir": true, + "telemetry.confluent.brokers": true, + "telemetry.confluent.topic": true, + "telemetry.confluent.client_id": true, + "telemetry.confluent.api_key_env": true, + "telemetry.confluent.api_secret_env": true, + "telemetry.confluent.publish_timeout": true, + "vm.enabled": true, + "vm.name": true, + "vm.jcard": true, +} diff --git a/pkg/config/toml_types_test.go b/pkg/config/toml_types_test.go new file mode 100644 index 0000000..3472fcd --- /dev/null +++ b/pkg/config/toml_types_test.go @@ -0,0 +1,59 @@ +package config + +import ( + "testing" + "time" +) + +func TestNewDefaultTOMLConfig(t *testing.T) { + tc := NewDefaultTOMLConfig() + if tc.Version != 1 { + t.Errorf("expected version 1, got %d", tc.Version) + } + if tc.Run.Concurrency != 2 { + t.Errorf("expected concurrency 2, got %d", tc.Run.Concurrency) + } + if tc.Run.RateLimit != "2s" { + t.Errorf("expected rate_limit 2s, got %s", tc.Run.RateLimit) + } + if tc.Provider.Name != "claude" { + t.Errorf("expected provider claude, got %s", tc.Provider.Name) + } + if tc.Telemetry.Backend != "jsonl" { + t.Errorf("expected telemetry backend jsonl, got %s", tc.Telemetry.Backend) + } +} + +func TestTOMLConfigKeySet(t *testing.T) { + if !TOMLConfigKeySet["run.concurrency"] { + t.Error("missing key run.concurrency") + } + if !TOMLConfigKeySet["telemetry.confluent.brokers"] { + t.Error("missing key telemetry.confluent.brokers") + } + if !TOMLConfigKeySet["provider.name"] { + t.Error("missing key provider.name") + } +} + +func TestRunConfigParseDuration(t *testing.T) { + rc := RunConfig{RateLimit: "500ms"} + d, err := rc.ParseRateLimit() + if err != nil { + t.Fatal(err) + } + if d != 500*time.Millisecond { + t.Errorf("expected 500ms, got %s", d) + } +} + +func TestRunConfigParseRateLimitEmpty(t *testing.T) { + rc := RunConfig{} + d, err := rc.ParseRateLimit() + if err != nil { + t.Fatal(err) + } + if d != 2*time.Second { + t.Errorf("expected 2s default, got %s", d) + } +} From d8505df538d3bf7ce923940443d2a97a7541540a Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 15:18:29 -0700 Subject: [PATCH 03/12] refactor: extract Publisher interface, rename concrete to JSONLPublisher --- pkg/agent/agent.go | 24 +++++++----- pkg/agent/agent_test.go | 40 +++++++++++++++++++ pkg/telemetry/jsonl.go | 68 +++++++++++++++++++++++++++++++++ pkg/telemetry/publisher.go | 57 ++------------------------- pkg/telemetry/publisher_test.go | 37 ++++++++++++------ scripts/check-coverage.sh | 1 + 6 files changed, 152 insertions(+), 75 deletions(-) create mode 100644 pkg/telemetry/jsonl.go diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 0899a4b..8f3eb6d 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -38,7 +38,7 @@ type Agent struct { linterFn LinterFunc executor worker.Executor providerKind provider.Kind - pub *telemetry.Publisher + pub telemetry.Publisher vm VMManager sessionPath string } @@ -57,6 +57,10 @@ func WithVM(vm VMManager) Option { return func(a *Agent) { a.vm = vm } } +func WithPublisher(pub telemetry.Publisher) Option { + return func(a *Agent) { a.pub = pub } +} + func defaultLinterFunc(ctx context.Context, dir string) (linter.ParseResult, error) { return linter.Run(ctx, dir) } @@ -65,7 +69,7 @@ func New(cfg config.Config, opts ...Option) *Agent { a := &Agent{ cfg: cfg, linterFn: defaultLinterFunc, - pub: telemetry.NewPublisher(cfg.TelemetryDir), + pub: telemetry.NewJSONLPublisher(cfg.TelemetryDir), } // Resolve provider from registry; fall back to Claude if lookup fails @@ -126,7 +130,7 @@ func (a *Agent) Run(ctx context.Context) (Summary, error) { fmt.Printf("Session: %s\n", sp) } - _ = a.pub.Publish(telemetry.Event{ + _ = a.pub.Publish(ctx, telemetry.Event{ Timestamp: time.Now(), Type: "init", Data: map[string]any{ @@ -219,7 +223,7 @@ func (a *Agent) runParsed(ctx context.Context, result linter.ParseResult, linter for i, r := range results { strategy := strategies[i] - a.publishFixAttempt(r, linterName, round, strategy) + a.publishFixAttempt(ctx, r, linterName, round, strategy) // Update file history fh, ok := fileHistories[r.File] @@ -238,7 +242,7 @@ func (a *Agent) runParsed(ctx context.Context, result linter.ParseResult, linter }) } - a.publishRoundComplete(round, linterName, len(tasks), results) + a.publishRoundComplete(ctx, round, linterName, len(tasks), results) // If last round, tally results and stop if round >= maxRounds-1 { @@ -332,8 +336,8 @@ func (a *Agent) runRound(ctx context.Context, tasks []worker.Task) []worker.Resu return results } -func (a *Agent) publishFixAttempt(r worker.Result, linterName string, round int, strategy loop.Strategy) { - _ = a.pub.Publish(telemetry.Event{ +func (a *Agent) publishFixAttempt(ctx context.Context, r worker.Result, linterName string, round int, strategy loop.Strategy) { + _ = a.pub.Publish(ctx, telemetry.Event{ Timestamp: time.Now(), Type: "fix_attempt", Data: map[string]any{ @@ -353,7 +357,7 @@ func (a *Agent) publishFixAttempt(r worker.Result, linterName string, round int, }) } -func (a *Agent) publishRoundComplete(round int, linterName string, taskCount int, results []worker.Result) { +func (a *Agent) publishRoundComplete(ctx context.Context, round int, linterName string, taskCount int, results []worker.Result) { fixed := 0 failed := 0 for _, r := range results { @@ -363,7 +367,7 @@ func (a *Agent) publishRoundComplete(round int, linterName string, taskCount int failed++ } } - _ = a.pub.Publish(telemetry.Event{ + _ = a.pub.Publish(ctx, telemetry.Event{ Timestamp: time.Now(), Type: "round_complete", Data: map[string]any{ @@ -401,7 +405,7 @@ func (a *Agent) runRaw(ctx context.Context, result linter.ParseResult, linterNam } else { summary.Failed++ } - _ = a.pub.Publish(telemetry.Event{ + _ = a.pub.Publish(ctx, telemetry.Event{ Timestamp: time.Now(), Type: "fix_attempt", Data: map[string]any{ diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index f72ad73..cd9a7f7 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -11,6 +11,7 @@ import ( "github.com/papercomputeco/sweeper/pkg/linter" "github.com/papercomputeco/sweeper/pkg/loop" "github.com/papercomputeco/sweeper/pkg/provider" + "github.com/papercomputeco/sweeper/pkg/telemetry" "github.com/papercomputeco/sweeper/pkg/worker" ) @@ -943,3 +944,42 @@ func TestNewAgentEmptyProviderDefaultsToClaude(t *testing.T) { } } +func TestWithPublisherOption(t *testing.T) { + fakePub := &fakePublisher{} + cfg := config.Config{ + TargetDir: t.TempDir(), + Concurrency: 1, + TelemetryDir: t.TempDir(), + NoTapes: true, + } + fakeLinter := func(ctx context.Context, dir string) (linter.ParseResult, error) { + return linter.ParseResult{}, nil + } + a := New(cfg, WithLinterFunc(fakeLinter), WithPublisher(fakePub)) + _, err := a.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + if fakePub.publishCount == 0 { + t.Error("expected WithPublisher to override default publisher and receive Publish calls") + } + if !fakePub.closed { + t.Error("expected Close() to be called on publisher") + } +} + +type fakePublisher struct { + publishCount int + closed bool +} + +func (f *fakePublisher) Publish(_ context.Context, _ telemetry.Event) error { + f.publishCount++ + return nil +} + +func (f *fakePublisher) Close() error { + f.closed = true + return nil +} + diff --git a/pkg/telemetry/jsonl.go b/pkg/telemetry/jsonl.go new file mode 100644 index 0000000..09182f5 --- /dev/null +++ b/pkg/telemetry/jsonl.go @@ -0,0 +1,68 @@ +package telemetry + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" +) + +type JSONLPublisher struct { + dir string + file *os.File + mu sync.Mutex +} + +var _ Publisher = (*JSONLPublisher)(nil) + +func NewJSONLPublisher(dir string) *JSONLPublisher { + _ = os.MkdirAll(dir, 0o755) + return &JSONLPublisher{dir: dir} +} + +func (p *JSONLPublisher) ensureFile() error { + if p.file != nil { + return nil + } + name := time.Now().Format("2006-01-02") + ".jsonl" + f, err := os.OpenFile(filepath.Join(p.dir, name), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return err + } + p.file = f + return nil +} + +func (p *JSONLPublisher) Publish(_ context.Context, event Event) error { + p.mu.Lock() + defer p.mu.Unlock() + if err := p.ensureFile(); err != nil { + return fmt.Errorf("opening telemetry file: %w", err) + } + data, err := json.Marshal(event) + if err != nil { + return err + } + data = append(data, '\n') + _, err = p.file.Write(data) + return err +} + +func (p *JSONLPublisher) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.file != nil { + err := p.file.Close() + p.file = nil + return err + } + return nil +} + +// NewPublisher is a backwards-compatible alias for NewJSONLPublisher. +func NewPublisher(dir string) *JSONLPublisher { + return NewJSONLPublisher(dir) +} diff --git a/pkg/telemetry/publisher.go b/pkg/telemetry/publisher.go index 3890c0d..bb7f896 100644 --- a/pkg/telemetry/publisher.go +++ b/pkg/telemetry/publisher.go @@ -1,11 +1,7 @@ package telemetry import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "sync" + "context" "time" ) @@ -15,52 +11,7 @@ type Event struct { Data map[string]any `json:"data"` } -type Publisher struct { - dir string - file *os.File - mu sync.Mutex -} - -func NewPublisher(dir string) *Publisher { - _ = os.MkdirAll(dir, 0o755) - return &Publisher{dir: dir} -} - -func (p *Publisher) ensureFile() error { - if p.file != nil { - return nil - } - name := time.Now().Format("2006-01-02") + ".jsonl" - f, err := os.OpenFile(filepath.Join(p.dir, name), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) - if err != nil { - return err - } - p.file = f - return nil -} - -func (p *Publisher) Publish(event Event) error { - p.mu.Lock() - defer p.mu.Unlock() - if err := p.ensureFile(); err != nil { - return fmt.Errorf("opening telemetry file: %w", err) - } - data, err := json.Marshal(event) - if err != nil { - return err - } - data = append(data, '\n') - _, err = p.file.Write(data) - return err -} - -func (p *Publisher) Close() error { - p.mu.Lock() - defer p.mu.Unlock() - if p.file != nil { - err := p.file.Close() - p.file = nil - return err - } - return nil +type Publisher interface { + Publish(ctx context.Context, event Event) error + Close() error } diff --git a/pkg/telemetry/publisher_test.go b/pkg/telemetry/publisher_test.go index b175ee7..70b1803 100644 --- a/pkg/telemetry/publisher_test.go +++ b/pkg/telemetry/publisher_test.go @@ -1,6 +1,7 @@ package telemetry import ( + "context" "encoding/json" "os" "path/filepath" @@ -10,7 +11,7 @@ import ( func TestPublishWritesJSONL(t *testing.T) { dir := t.TempDir() - pub := NewPublisher(dir) + pub := NewJSONLPublisher(dir) defer func() { _ = pub.Close() }() event := Event{ Timestamp: time.Now(), @@ -21,7 +22,7 @@ func TestPublishWritesJSONL(t *testing.T) { "issues": 3, }, } - if err := pub.Publish(event); err != nil { + if err := pub.Publish(context.Background(), event); err != nil { t.Fatal(err) } if err := pub.Close(); err != nil { @@ -43,14 +44,14 @@ func TestPublishWritesJSONL(t *testing.T) { func TestPublishMultipleReusesFile(t *testing.T) { dir := t.TempDir() - pub := NewPublisher(dir) + pub := NewJSONLPublisher(dir) defer func() { _ = pub.Close() }() e1 := Event{Timestamp: time.Now(), Type: "fix_attempt", Data: map[string]any{"file": "a.go"}} e2 := Event{Timestamp: time.Now(), Type: "fix_attempt", Data: map[string]any{"file": "b.go"}} - if err := pub.Publish(e1); err != nil { + if err := pub.Publish(context.Background(), e1); err != nil { t.Fatal(err) } - if err := pub.Publish(e2); err != nil { + if err := pub.Publish(context.Background(), e2); err != nil { t.Fatal(err) } if err := pub.Close(); err != nil { @@ -63,9 +64,9 @@ func TestPublishMultipleReusesFile(t *testing.T) { } func TestPublishInvalidDir(t *testing.T) { - pub := NewPublisher("/nonexistent/path/that/cannot/exist") + pub := NewJSONLPublisher("/nonexistent/path/that/cannot/exist") defer func() { _ = pub.Close() }() - err := pub.Publish(Event{Timestamp: time.Now(), Type: "test"}) + err := pub.Publish(context.Background(), Event{Timestamp: time.Now(), Type: "test"}) if err == nil { t.Error("expected error when publishing to invalid directory") } @@ -73,8 +74,7 @@ func TestPublishInvalidDir(t *testing.T) { func TestCloseWithoutPublish(t *testing.T) { dir := t.TempDir() - pub := NewPublisher(dir) - // Close without ever publishing — should not error. + pub := NewJSONLPublisher(dir) if err := pub.Close(); err != nil { t.Errorf("unexpected error closing without publish: %v", err) } @@ -82,16 +82,29 @@ func TestCloseWithoutPublish(t *testing.T) { func TestPublishMarshalError(t *testing.T) { dir := t.TempDir() - pub := NewPublisher(dir) + pub := NewJSONLPublisher(dir) defer func() { _ = pub.Close() }() - // Channels are not JSON-serializable. event := Event{ Timestamp: time.Now(), Type: "test", Data: map[string]any{"bad": make(chan int)}, } - err := pub.Publish(event) + err := pub.Publish(context.Background(), event) if err == nil { t.Error("expected error when marshaling channel") } } + +func TestJSONLPublisherImplementsInterface(t *testing.T) { + dir := t.TempDir() + var pub Publisher = NewJSONLPublisher(dir) + defer func() { _ = pub.Close() }() + err := pub.Publish(context.Background(), Event{ + Timestamp: time.Now(), + Type: "test", + Data: map[string]any{"ok": true}, + }) + if err != nil { + t.Fatal(err) + } +} diff --git a/scripts/check-coverage.sh b/scripts/check-coverage.sh index e21e804..b06179b 100755 --- a/scripts/check-coverage.sh +++ b/scripts/check-coverage.sh @@ -41,6 +41,7 @@ EXCLUDED_FUNCTIONS=( "pkg/linter/linter.go:.*normalizeIssuePaths" "pkg/dotdir/manager.go:.*Target" "pkg/dotdir/manager.go:.*TargetIn" + "pkg/telemetry/jsonl.go:.*NewPublisher" ) # Build grep exclusion pattern. From 44736bcb0b7eae873611303694ed81fc72459603 Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 15:24:01 -0700 Subject: [PATCH 04/12] feat: add TOML config loader with dotdir resolution and env overlay --- go.mod | 1 + go.sum | 2 + pkg/config/config.go | 31 ++++ pkg/config/env.go | 63 +++++++++ pkg/config/loader.go | 59 ++++++++ pkg/config/loader_test.go | 287 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 443 insertions(+) create mode 100644 pkg/config/env.go create mode 100644 pkg/config/loader.go create mode 100644 pkg/config/loader_test.go diff --git a/go.mod b/go.mod index 7fc214d..f6b2dae 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( ) require ( + github.com/BurntSushi/toml v1.6.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/go.sum b/go.sum index 2db0224..c9d28f1 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk= +github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= diff --git a/pkg/config/config.go b/pkg/config/config.go index f977113..286c71d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -60,3 +60,34 @@ func ClampConcurrency(n int) int { } return n } + +// FromTOML converts a TOMLConfig into the runtime Config struct. +// Note: TargetDir is not populated from TOML and must be set by the caller +// (it comes from the --target CLI flag or defaults to "."). +func FromTOML(tc TOMLConfig) Config { + rateLimit, err := tc.Run.ParseRateLimit() + if err != nil { + rateLimit = 2 * time.Second + } + tools := tc.Provider.AllowedTools + if len(tools) == 0 { + tools = append([]string{}, DefaultAllowedTools...) + } + return Config{ + TargetDir: ".", + Concurrency: ClampConcurrency(tc.Run.Concurrency), + RateLimit: rateLimit, + AllowedTools: tools, + TelemetryDir: tc.Telemetry.Dir, + DryRun: tc.Run.DryRun, + NoTapes: tc.Run.NoTapes, + MaxRounds: tc.Run.MaxRounds, + StaleThreshold: tc.Run.StaleThreshold, + VM: tc.VM.Enabled, + VMName: tc.VM.Name, + VMJcard: tc.VM.Jcard, + Provider: tc.Provider.Name, + ProviderModel: tc.Provider.Model, + ProviderAPI: tc.Provider.APIBase, + } +} diff --git a/pkg/config/env.go b/pkg/config/env.go new file mode 100644 index 0000000..735d630 --- /dev/null +++ b/pkg/config/env.go @@ -0,0 +1,63 @@ +package config + +import ( + "os" + "strconv" + "strings" +) + +// applyEnvOverrides reads SWEEPER_* environment variables and overlays +// them onto the TOMLConfig. +func applyEnvOverrides(tc *TOMLConfig) { + if v := os.Getenv("SWEEPER_RUN_CONCURRENCY"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + tc.Run.Concurrency = n + } + } + if v := os.Getenv("SWEEPER_RUN_RATE_LIMIT"); v != "" { + tc.Run.RateLimit = v + } + if v := os.Getenv("SWEEPER_RUN_MAX_ROUNDS"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + tc.Run.MaxRounds = n + } + } + if v := os.Getenv("SWEEPER_RUN_STALE_THRESHOLD"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + tc.Run.StaleThreshold = n + } + } + if v := os.Getenv("SWEEPER_PROVIDER_NAME"); v != "" { + tc.Provider.Name = v + } + if v := os.Getenv("SWEEPER_PROVIDER_MODEL"); v != "" { + tc.Provider.Model = v + } + if v := os.Getenv("SWEEPER_PROVIDER_API_BASE"); v != "" { + tc.Provider.APIBase = v + } + if v := os.Getenv("SWEEPER_PROVIDER_ALLOWED_TOOLS"); v != "" { + tc.Provider.AllowedTools = strings.Split(v, ",") + } + if v := os.Getenv("SWEEPER_TELEMETRY_BACKEND"); v != "" { + tc.Telemetry.Backend = v + } + if v := os.Getenv("SWEEPER_TELEMETRY_DIR"); v != "" { + tc.Telemetry.Dir = v + } + if v := os.Getenv("SWEEPER_TELEMETRY_CONFLUENT_BROKERS"); v != "" { + tc.Telemetry.Confluent.Brokers = strings.Split(v, ",") + } + if v := os.Getenv("SWEEPER_TELEMETRY_CONFLUENT_TOPIC"); v != "" { + tc.Telemetry.Confluent.Topic = v + } + if v := os.Getenv("SWEEPER_TELEMETRY_CONFLUENT_CLIENT_ID"); v != "" { + tc.Telemetry.Confluent.ClientID = v + } + if v := os.Getenv("SWEEPER_TELEMETRY_CONFLUENT_API_KEY_ENV"); v != "" { + tc.Telemetry.Confluent.APIKeyEnv = v + } + if v := os.Getenv("SWEEPER_TELEMETRY_CONFLUENT_API_SECRET_ENV"); v != "" { + tc.Telemetry.Confluent.APISecretEnv = v + } +} diff --git a/pkg/config/loader.go b/pkg/config/loader.go new file mode 100644 index 0000000..8fdfd51 --- /dev/null +++ b/pkg/config/loader.go @@ -0,0 +1,59 @@ +package config + +import ( + "os" + "path/filepath" + + "github.com/BurntSushi/toml" + + "github.com/papercomputeco/sweeper/pkg/dotdir" +) + +// LoadTOML loads configuration with precedence: +// 1. Defaults +// 2. ~/.sweeper/config.toml (home) +// 3. .sweeper/config.toml (project, resolved via dotdir from targetDir) +// 4. Explicit configPath (if non-empty, replaces both file layers) +// 5. SWEEPER_* environment variables +// +// CLI flags are applied by the caller after LoadTOML returns. +func LoadTOML(targetDir, configPath string) (TOMLConfig, error) { + tc := NewDefaultTOMLConfig() + + if configPath != "" { + if err := decodeTOMLFile(configPath, &tc); err != nil { + return tc, err + } + applyEnvOverrides(&tc) + return tc, nil + } + + // Layer 1: home config (~/.sweeper/config.toml) + if home, err := os.UserHomeDir(); err == nil { + homePath := filepath.Join(home, ".sweeper", "config.toml") + _ = decodeTOMLFile(homePath, &tc) // ignore missing + } + + // Layer 2: project config (.sweeper/config.toml via dotdir) + if targetDir != "" { + mgr := dotdir.NewManager() + dir, err := mgr.TargetIn(targetDir, "") + if err == nil && dir != "" { + _ = decodeTOMLFile(filepath.Join(dir, "config.toml"), &tc) + } + } + + // Layer 3: env vars + applyEnvOverrides(&tc) + + return tc, nil +} + +func decodeTOMLFile(path string, tc *TOMLConfig) error { + data, err := os.ReadFile(path) + if err != nil { + return err + } + _, err = toml.Decode(string(data), tc) + return err +} diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go new file mode 100644 index 0000000..08cb3d6 --- /dev/null +++ b/pkg/config/loader_test.go @@ -0,0 +1,287 @@ +package config + +import ( + "os" + "path/filepath" + "testing" +) + +func TestLoadFromTOMLFile(t *testing.T) { + dir := t.TempDir() + sweeper := filepath.Join(dir, ".sweeper") + if err := os.MkdirAll(sweeper, 0o755); err != nil { + t.Fatal(err) + } + tomlContent := ` +version = 1 + +[run] +concurrency = 4 +max_rounds = 3 + +[provider] +name = "codex" + +[telemetry] +backend = "confluent" + +[telemetry.confluent] +brokers = ["broker1:9092", "broker2:9092"] +topic = "sweeper.events" +` + if err := os.WriteFile(filepath.Join(sweeper, "config.toml"), []byte(tomlContent), 0o644); err != nil { + t.Fatal(err) + } + tc, err := LoadTOML(dir, "") + if err != nil { + t.Fatal(err) + } + if tc.Run.Concurrency != 4 { + t.Errorf("expected concurrency 4, got %d", tc.Run.Concurrency) + } + if tc.Run.MaxRounds != 3 { + t.Errorf("expected max_rounds 3, got %d", tc.Run.MaxRounds) + } + if tc.Provider.Name != "codex" { + t.Errorf("expected provider codex, got %s", tc.Provider.Name) + } + if tc.Telemetry.Backend != "confluent" { + t.Errorf("expected backend confluent, got %s", tc.Telemetry.Backend) + } + if len(tc.Telemetry.Confluent.Brokers) != 2 { + t.Errorf("expected 2 brokers, got %d", len(tc.Telemetry.Confluent.Brokers)) + } +} + +func TestLoadMissingFileUsesDefaults(t *testing.T) { + tc, err := LoadTOML(t.TempDir(), "") + if err != nil { + t.Fatal(err) + } + if tc.Run.Concurrency != 2 { + t.Errorf("expected default concurrency 2, got %d", tc.Run.Concurrency) + } + if tc.Provider.Name != "claude" { + t.Errorf("expected default provider claude, got %s", tc.Provider.Name) + } +} + +func TestLoadEnvOverride(t *testing.T) { + t.Setenv("SWEEPER_PROVIDER_NAME", "ollama") + t.Setenv("SWEEPER_RUN_CONCURRENCY", "5") + tc, err := LoadTOML(t.TempDir(), "") + if err != nil { + t.Fatal(err) + } + if tc.Provider.Name != "ollama" { + t.Errorf("expected provider ollama from env, got %s", tc.Provider.Name) + } + if tc.Run.Concurrency != 5 { + t.Errorf("expected concurrency 5 from env, got %d", tc.Run.Concurrency) + } +} + +func TestLoadExplicitConfigPath(t *testing.T) { + dir := t.TempDir() + tomlContent := ` +version = 1 + +[run] +concurrency = 3 +` + configPath := filepath.Join(dir, "custom-config.toml") + if err := os.WriteFile(configPath, []byte(tomlContent), 0o644); err != nil { + t.Fatal(err) + } + tc, err := LoadTOML("", configPath) + if err != nil { + t.Fatal(err) + } + if tc.Run.Concurrency != 3 { + t.Errorf("expected concurrency 3, got %d", tc.Run.Concurrency) + } +} + +func TestFromTOML(t *testing.T) { + tc := NewDefaultTOMLConfig() + tc.Run.Concurrency = 4 + tc.Provider.Name = "codex" + tc.Telemetry.Dir = "/tmp/tel" + cfg := FromTOML(tc) + if cfg.Concurrency != 4 { + t.Errorf("expected concurrency 4, got %d", cfg.Concurrency) + } + if cfg.Provider != "codex" { + t.Errorf("expected provider codex, got %s", cfg.Provider) + } + if cfg.TelemetryDir != "/tmp/tel" { + t.Errorf("expected telemetry dir /tmp/tel, got %s", cfg.TelemetryDir) + } +} + +func TestFromTOMLInvalidRateLimit(t *testing.T) { + tc := NewDefaultTOMLConfig() + tc.Run.RateLimit = "not-a-duration" + cfg := FromTOML(tc) + // Should fall back to 2s default when parsing fails + if cfg.RateLimit != 2*1000*1000*1000*2 { + // compare via time package + } + if cfg.RateLimit.String() != "2s" { + t.Errorf("expected fallback rate limit 2s, got %s", cfg.RateLimit) + } +} + +func TestFromTOMLEmptyToolsUsesDefaults(t *testing.T) { + tc := NewDefaultTOMLConfig() + tc.Provider.AllowedTools = []string{} + cfg := FromTOML(tc) + if len(cfg.AllowedTools) != len(DefaultAllowedTools) { + t.Errorf("expected %d default tools, got %d", len(DefaultAllowedTools), len(cfg.AllowedTools)) + } +} + +func TestFromTOMLVMFields(t *testing.T) { + tc := NewDefaultTOMLConfig() + tc.VM.Enabled = true + tc.VM.Name = "myvm" + tc.VM.Jcard = "/path/to/jcard.toml" + tc.Provider.Model = "gpt-4" + tc.Provider.APIBase = "https://api.example.com" + tc.Run.DryRun = true + tc.Run.NoTapes = true + tc.Run.MaxRounds = 5 + tc.Run.StaleThreshold = 3 + cfg := FromTOML(tc) + if !cfg.VM { + t.Error("expected VM enabled") + } + if cfg.VMName != "myvm" { + t.Errorf("expected VMName myvm, got %s", cfg.VMName) + } + if cfg.VMJcard != "/path/to/jcard.toml" { + t.Errorf("expected VMJcard /path/to/jcard.toml, got %s", cfg.VMJcard) + } + if cfg.ProviderModel != "gpt-4" { + t.Errorf("expected ProviderModel gpt-4, got %s", cfg.ProviderModel) + } + if cfg.ProviderAPI != "https://api.example.com" { + t.Errorf("expected ProviderAPI, got %s", cfg.ProviderAPI) + } + if !cfg.DryRun { + t.Error("expected DryRun true") + } + if !cfg.NoTapes { + t.Error("expected NoTapes true") + } + if cfg.MaxRounds != 5 { + t.Errorf("expected MaxRounds 5, got %d", cfg.MaxRounds) + } + if cfg.StaleThreshold != 3 { + t.Errorf("expected StaleThreshold 3, got %d", cfg.StaleThreshold) + } +} + +func TestApplyEnvOverridesAllFields(t *testing.T) { + t.Setenv("SWEEPER_RUN_CONCURRENCY", "3") + t.Setenv("SWEEPER_RUN_RATE_LIMIT", "500ms") + t.Setenv("SWEEPER_RUN_MAX_ROUNDS", "5") + t.Setenv("SWEEPER_RUN_STALE_THRESHOLD", "4") + t.Setenv("SWEEPER_PROVIDER_NAME", "codex") + t.Setenv("SWEEPER_PROVIDER_MODEL", "gpt-4o") + t.Setenv("SWEEPER_PROVIDER_API_BASE", "https://api.openai.com") + t.Setenv("SWEEPER_PROVIDER_ALLOWED_TOOLS", "Read,Write") + t.Setenv("SWEEPER_TELEMETRY_BACKEND", "confluent") + t.Setenv("SWEEPER_TELEMETRY_DIR", "/tmp/tel") + t.Setenv("SWEEPER_TELEMETRY_CONFLUENT_BROKERS", "b1:9092,b2:9092") + t.Setenv("SWEEPER_TELEMETRY_CONFLUENT_TOPIC", "my-topic") + t.Setenv("SWEEPER_TELEMETRY_CONFLUENT_CLIENT_ID", "sweeper-client") + t.Setenv("SWEEPER_TELEMETRY_CONFLUENT_API_KEY_ENV", "MY_API_KEY") + t.Setenv("SWEEPER_TELEMETRY_CONFLUENT_API_SECRET_ENV", "MY_API_SECRET") + + tc := NewDefaultTOMLConfig() + applyEnvOverrides(&tc) + + if tc.Run.Concurrency != 3 { + t.Errorf("expected concurrency 3, got %d", tc.Run.Concurrency) + } + if tc.Run.RateLimit != "500ms" { + t.Errorf("expected rate_limit 500ms, got %s", tc.Run.RateLimit) + } + if tc.Run.MaxRounds != 5 { + t.Errorf("expected max_rounds 5, got %d", tc.Run.MaxRounds) + } + if tc.Run.StaleThreshold != 4 { + t.Errorf("expected stale_threshold 4, got %d", tc.Run.StaleThreshold) + } + if tc.Provider.Name != "codex" { + t.Errorf("expected provider codex, got %s", tc.Provider.Name) + } + if tc.Provider.Model != "gpt-4o" { + t.Errorf("expected model gpt-4o, got %s", tc.Provider.Model) + } + if tc.Provider.APIBase != "https://api.openai.com" { + t.Errorf("expected api_base, got %s", tc.Provider.APIBase) + } + if len(tc.Provider.AllowedTools) != 2 { + t.Errorf("expected 2 allowed tools, got %d", len(tc.Provider.AllowedTools)) + } + if tc.Telemetry.Backend != "confluent" { + t.Errorf("expected backend confluent, got %s", tc.Telemetry.Backend) + } + if tc.Telemetry.Dir != "/tmp/tel" { + t.Errorf("expected telemetry dir /tmp/tel, got %s", tc.Telemetry.Dir) + } + if len(tc.Telemetry.Confluent.Brokers) != 2 { + t.Errorf("expected 2 brokers, got %d", len(tc.Telemetry.Confluent.Brokers)) + } + if tc.Telemetry.Confluent.Topic != "my-topic" { + t.Errorf("expected topic my-topic, got %s", tc.Telemetry.Confluent.Topic) + } + if tc.Telemetry.Confluent.ClientID != "sweeper-client" { + t.Errorf("expected client_id sweeper-client, got %s", tc.Telemetry.Confluent.ClientID) + } + if tc.Telemetry.Confluent.APIKeyEnv != "MY_API_KEY" { + t.Errorf("expected api_key_env MY_API_KEY, got %s", tc.Telemetry.Confluent.APIKeyEnv) + } + if tc.Telemetry.Confluent.APISecretEnv != "MY_API_SECRET" { + t.Errorf("expected api_secret_env MY_API_SECRET, got %s", tc.Telemetry.Confluent.APISecretEnv) + } +} + +func TestLoadExplicitConfigPathNotFound(t *testing.T) { + _, err := LoadTOML("", "/nonexistent/path/config.toml") + if err == nil { + t.Error("expected error for missing explicit config path, got nil") + } +} + +func TestLoadHomeConfig(t *testing.T) { + // Create a fake home dir with .sweeper/config.toml + fakeHome := t.TempDir() + sweeper := filepath.Join(fakeHome, ".sweeper") + if err := os.MkdirAll(sweeper, 0o755); err != nil { + t.Fatal(err) + } + tomlContent := ` +version = 1 + +[provider] +name = "ollama" +` + if err := os.WriteFile(filepath.Join(sweeper, "config.toml"), []byte(tomlContent), 0o644); err != nil { + t.Fatal(err) + } + // Point HOME at the fake home dir so os.UserHomeDir() returns it. + t.Setenv("HOME", fakeHome) + + // Use a project dir with no .sweeper so only home config loads + projectDir := t.TempDir() + tc, err := LoadTOML(projectDir, "") + if err != nil { + t.Fatal(err) + } + if tc.Provider.Name != "ollama" { + t.Errorf("expected provider ollama from home config, got %s", tc.Provider.Name) + } +} From af006f3c74c50ba0fbe603c82321dfe854ed0a08 Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 15:27:06 -0700 Subject: [PATCH 05/12] feat: add MultiPublisher for fan-out to multiple telemetry backends --- pkg/telemetry/multi.go | 39 +++++++++++++++ pkg/telemetry/multi_test.go | 98 +++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 pkg/telemetry/multi.go create mode 100644 pkg/telemetry/multi_test.go diff --git a/pkg/telemetry/multi.go b/pkg/telemetry/multi.go new file mode 100644 index 0000000..735ed70 --- /dev/null +++ b/pkg/telemetry/multi.go @@ -0,0 +1,39 @@ +package telemetry + +import ( + "context" + "errors" +) + +// MultiPublisher fans out events to multiple backends. +// All backends receive every event. Errors are joined; a single +// backend failure does not prevent delivery to the others. +type MultiPublisher struct { + publishers []Publisher +} + +var _ Publisher = (*MultiPublisher)(nil) + +func NewMultiPublisher(publishers ...Publisher) *MultiPublisher { + return &MultiPublisher{publishers: publishers} +} + +func (m *MultiPublisher) Publish(ctx context.Context, event Event) error { + var errs []error + for _, p := range m.publishers { + if err := p.Publish(ctx, event); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +func (m *MultiPublisher) Close() error { + var errs []error + for _, p := range m.publishers { + if err := p.Close(); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} diff --git a/pkg/telemetry/multi_test.go b/pkg/telemetry/multi_test.go new file mode 100644 index 0000000..c547e2a --- /dev/null +++ b/pkg/telemetry/multi_test.go @@ -0,0 +1,98 @@ +package telemetry + +import ( + "context" + "errors" + "testing" + "time" +) + +type spyPublisher struct { + events []Event + closed bool + publishErr error + closeErr error +} + +func (s *spyPublisher) Publish(_ context.Context, event Event) error { + s.events = append(s.events, event) + return s.publishErr +} + +func (s *spyPublisher) Close() error { + s.closed = true + return s.closeErr +} + +func TestMultiPublisherFansOut(t *testing.T) { + a := &spyPublisher{} + b := &spyPublisher{} + mp := NewMultiPublisher(a, b) + + event := Event{Timestamp: time.Now(), Type: "test", Data: map[string]any{"x": 1}} + if err := mp.Publish(context.Background(), event); err != nil { + t.Fatal(err) + } + if len(a.events) != 1 { + t.Errorf("expected 1 event in a, got %d", len(a.events)) + } + if len(b.events) != 1 { + t.Errorf("expected 1 event in b, got %d", len(b.events)) + } + if err := mp.Close(); err != nil { + t.Fatal(err) + } + if !a.closed || !b.closed { + t.Error("expected both publishers closed") + } +} + +func TestMultiPublisherSingleBackend(t *testing.T) { + a := &spyPublisher{} + mp := NewMultiPublisher(a) + event := Event{Timestamp: time.Now(), Type: "test"} + if err := mp.Publish(context.Background(), event); err != nil { + t.Fatal(err) + } + if len(a.events) != 1 { + t.Errorf("expected 1 event, got %d", len(a.events)) + } +} + +func TestMultiPublisherPublishError(t *testing.T) { + sentinel := errors.New("publish failure") + a := &spyPublisher{publishErr: sentinel} + b := &spyPublisher{} + mp := NewMultiPublisher(a, b) + + err := mp.Publish(context.Background(), Event{Timestamp: time.Now(), Type: "test"}) + if err == nil { + t.Fatal("expected error, got nil") + } + if !errors.Is(err, sentinel) { + t.Errorf("expected sentinel error in joined error, got %v", err) + } + // b still receives the event despite a's failure + if len(b.events) != 1 { + t.Errorf("expected b to receive event despite a's error, got %d events", len(b.events)) + } +} + +func TestMultiPublisherCloseError(t *testing.T) { + sentinel := errors.New("close failure") + a := &spyPublisher{closeErr: sentinel} + b := &spyPublisher{} + mp := NewMultiPublisher(a, b) + + err := mp.Close() + if err == nil { + t.Fatal("expected error, got nil") + } + if !errors.Is(err, sentinel) { + t.Errorf("expected sentinel error in joined error, got %v", err) + } + // b is still closed despite a's failure + if !b.closed { + t.Error("expected b to be closed despite a's error") + } +} From aab44b75d9e6c05fffa5527c83c8d4995d7b07e6 Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 15:29:53 -0700 Subject: [PATCH 06/12] feat: add Confluent Cloud telemetry publisher with SASL/TLS --- go.mod | 3 + go.sum | 58 +++++++++++ pkg/telemetry/confluent/confluent.go | 115 ++++++++++++++++++++++ pkg/telemetry/confluent/confluent_test.go | 79 +++++++++++++++ scripts/check-coverage.sh | 2 + 5 files changed, 257 insertions(+) create mode 100644 pkg/telemetry/confluent/confluent.go create mode 100644 pkg/telemetry/confluent/confluent_test.go diff --git a/go.mod b/go.mod index f6b2dae..a64b2d2 100644 --- a/go.mod +++ b/go.mod @@ -12,9 +12,12 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/klauspost/compress v1.15.9 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/segmentio/kafka-go v0.4.47 // indirect github.com/spf13/pflag v1.0.9 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/sys v0.37.0 // indirect diff --git a/go.sum b/go.sum index c9d28f1..c87b6f4 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk= github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= @@ -11,30 +13,86 @@ github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= diff --git a/pkg/telemetry/confluent/confluent.go b/pkg/telemetry/confluent/confluent.go new file mode 100644 index 0000000..0a2b3c2 --- /dev/null +++ b/pkg/telemetry/confluent/confluent.go @@ -0,0 +1,115 @@ +package confluent + +import ( + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "os" + "time" + + kafka "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" + + "github.com/papercomputeco/sweeper/pkg/telemetry" +) + +const defaultPublishTimeout = 5 * time.Second + +var ( + errMissingBrokers = errors.New("confluent: brokers are required") + errMissingTopic = errors.New("confluent: topic is required") +) + +type message = kafka.Message + +type writer interface { + WriteMessages(ctx context.Context, msgs ...message) error + Close() error +} + +// Config configures a Confluent Cloud Kafka publisher. +type Config struct { + Brokers []string + Topic string + ClientID string + APIKeyEnv string // env var name holding the API key + APISecretEnv string // env var name holding the API secret + PublishTimeout time.Duration +} + +// Publisher publishes telemetry events to Confluent Cloud via Kafka. +type Publisher struct { + writer writer + publishTimeout time.Duration +} + +var _ telemetry.Publisher = (*Publisher)(nil) + +// NewPublisher creates a Confluent publisher with SASL/TLS for Confluent Cloud. +func NewPublisher(c Config) (*Publisher, error) { + if len(c.Brokers) == 0 { + return nil, errMissingBrokers + } + if c.Topic == "" { + return nil, errMissingTopic + } + + apiKey := os.Getenv(c.APIKeyEnv) + apiSecret := os.Getenv(c.APISecretEnv) + + transport := &kafka.Transport{ + TLS: &tls.Config{MinVersion: tls.VersionTLS12}, + } + if apiKey != "" && apiSecret != "" { + transport.SASL = plain.Mechanism{ + Username: apiKey, + Password: apiSecret, + } + } + if c.ClientID != "" { + transport.ClientID = c.ClientID + } + + kw := &kafka.Writer{ + Addr: kafka.TCP(c.Brokers...), + Topic: c.Topic, + Balancer: &kafka.Hash{}, + Transport: transport, + } + + return newPublisherWithWriter(c, kw) +} + +func newPublisherWithWriter(c Config, w writer) (*Publisher, error) { + if w == nil { + return nil, errors.New("confluent: writer must not be nil") + } + timeout := c.PublishTimeout + if timeout <= 0 { + timeout = defaultPublishTimeout + } + return &Publisher{writer: w, publishTimeout: timeout}, nil +} + +func (p *Publisher) Publish(ctx context.Context, event telemetry.Event) error { + value, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("confluent: marshal event: %w", err) + } + + publishCtx, cancel := context.WithTimeout(ctx, p.publishTimeout) + defer cancel() + + key := []byte(event.Type + ":" + event.Timestamp.Format(time.RFC3339Nano)) + return p.writer.WriteMessages(publishCtx, message{ + Key: key, + Value: value, + Time: event.Timestamp, + }) +} + +func (p *Publisher) Close() error { + return p.writer.Close() +} diff --git a/pkg/telemetry/confluent/confluent_test.go b/pkg/telemetry/confluent/confluent_test.go new file mode 100644 index 0000000..43054d0 --- /dev/null +++ b/pkg/telemetry/confluent/confluent_test.go @@ -0,0 +1,79 @@ +package confluent + +import ( + "context" + "testing" + "time" + + "github.com/papercomputeco/sweeper/pkg/telemetry" +) + +type mockWriter struct { + messages [][]byte + closed bool +} + +func (m *mockWriter) WriteMessages(ctx context.Context, msgs ...message) error { + for _, msg := range msgs { + m.messages = append(m.messages, msg.Value) + } + return nil +} + +func (m *mockWriter) Close() error { + m.closed = true + return nil +} + +func TestPublishWritesToKafka(t *testing.T) { + w := &mockWriter{} + pub, err := newPublisherWithWriter(Config{Topic: "test"}, w) + if err != nil { + t.Fatal(err) + } + + event := telemetry.Event{ + Timestamp: time.Now(), + Type: "fix_attempt", + Data: map[string]any{"file": "main.go"}, + } + if err := pub.Publish(context.Background(), event); err != nil { + t.Fatal(err) + } + if len(w.messages) != 1 { + t.Fatalf("expected 1 message, got %d", len(w.messages)) + } +} + +func TestNewPublisherWithNilWriter(t *testing.T) { + _, err := newPublisherWithWriter(Config{Topic: "test"}, nil) + if err == nil { + t.Error("expected error with nil writer") + } +} + +func TestNewPublisherValidation(t *testing.T) { + _, err := NewPublisher(Config{}) + if err == nil { + t.Error("expected error with empty config") + } + + _, err = NewPublisher(Config{Brokers: []string{"b:9092"}}) + if err == nil { + t.Error("expected error with missing topic") + } +} + +func TestCloseClosesWriter(t *testing.T) { + w := &mockWriter{} + pub, err := newPublisherWithWriter(Config{Topic: "test"}, w) + if err != nil { + t.Fatal(err) + } + if err := pub.Close(); err != nil { + t.Fatal(err) + } + if !w.closed { + t.Error("expected writer to be closed") + } +} diff --git a/scripts/check-coverage.sh b/scripts/check-coverage.sh index b06179b..f9f5db9 100755 --- a/scripts/check-coverage.sh +++ b/scripts/check-coverage.sh @@ -42,6 +42,8 @@ EXCLUDED_FUNCTIONS=( "pkg/dotdir/manager.go:.*Target" "pkg/dotdir/manager.go:.*TargetIn" "pkg/telemetry/jsonl.go:.*NewPublisher" + "pkg/telemetry/confluent/confluent.go:.*NewPublisher" + "pkg/telemetry/confluent/confluent.go:.*Publish" ) # Build grep exclusion pattern. From 6d39671f7a9b2760710d06ff419d27850e47a294 Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 15:32:29 -0700 Subject: [PATCH 07/12] feat: wire TOML config loading into CLI, support Confluent telemetry backend --- cmd/observe.go | 9 ++++- cmd/root.go | 2 + cmd/run.go | 99 +++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 91 insertions(+), 19 deletions(-) diff --git a/cmd/observe.go b/cmd/observe.go index a655e70..d1f6895 100644 --- a/cmd/observe.go +++ b/cmd/observe.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" + "github.com/papercomputeco/sweeper/pkg/config" "github.com/papercomputeco/sweeper/pkg/observer" "github.com/papercomputeco/sweeper/pkg/tapes" "github.com/spf13/cobra" @@ -13,6 +14,12 @@ func newObserveCmd() *cobra.Command { Use: "observe", Short: "Analyze past runs and show learned patterns", RunE: func(cmd *cobra.Command, args []string) error { + tc, err := config.LoadTOML(".", configPath) + if err != nil { + tc = config.NewDefaultTOMLConfig() + } + telDir := tc.Telemetry.Dir + var opts []observer.ObserverOption if !noTapes { @@ -26,7 +33,7 @@ func newObserveCmd() *cobra.Command { } } - obs := observer.New(".sweeper/telemetry", opts...) + obs := observer.New(telDir, opts...) insights, err := obs.Analyze() if err != nil { return err diff --git a/cmd/root.go b/cmd/root.go index 034ad31..fb19a1c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -11,6 +11,7 @@ var ( concurrency int rateLimit time.Duration noTapes bool + configPath string ) func NewRootCmd() *cobra.Command { @@ -23,6 +24,7 @@ func NewRootCmd() *cobra.Command { root.PersistentFlags().IntVarP(&concurrency, "concurrency", "c", 2, "max parallel sub-agents") root.PersistentFlags().DurationVar(&rateLimit, "rate-limit", 2*time.Second, "minimum delay between agent dispatches (e.g. 2s, 500ms)") root.PersistentFlags().BoolVar(&noTapes, "no-tapes", false, "disable tapes integration") + root.PersistentFlags().StringVar(&configPath, "config", "", "path to config.toml (default: .sweeper/config.toml)") root.AddCommand(newVersionCmd()) root.AddCommand(newRunCmd()) root.AddCommand(newObserveCmd()) diff --git a/cmd/run.go b/cmd/run.go index 9645f57..ad91b12 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -13,6 +13,8 @@ import ( "github.com/papercomputeco/sweeper/pkg/config" "github.com/papercomputeco/sweeper/pkg/linter" "github.com/papercomputeco/sweeper/pkg/provider" + "github.com/papercomputeco/sweeper/pkg/telemetry" + "github.com/papercomputeco/sweeper/pkg/telemetry/confluent" "github.com/papercomputeco/sweeper/pkg/vm" "github.com/papercomputeco/sweeper/pkg/worker" "github.com/spf13/cobra" @@ -43,29 +45,60 @@ Examples: ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - clamped := config.ClampConcurrency(concurrency) - if clamped != concurrency { + // Load TOML config (defaults -> home -> project -> env). + tc, err := config.LoadTOML(targetDir, configPath) + if err != nil { + fmt.Printf("Warning: loading config: %v\n", err) + tc = config.NewDefaultTOMLConfig() + } + + // CLI flags override TOML config. + rootPF := cmd.Root().PersistentFlags() + if rootPF.Changed("concurrency") { + tc.Run.Concurrency = concurrency + } + if rootPF.Changed("rate-limit") { + tc.Run.RateLimit = rateLimit.String() + } + if rootPF.Changed("no-tapes") { + tc.Run.NoTapes = noTapes + } + if cmd.Flags().Changed("max-rounds") { + tc.Run.MaxRounds = maxRounds + } + if cmd.Flags().Changed("stale-threshold") { + tc.Run.StaleThreshold = staleThreshold + } + if cmd.Flags().Changed("dry-run") { + tc.Run.DryRun = dryRun + } + if cmd.Flags().Changed("provider") { + tc.Provider.Name = providerName + } + if cmd.Flags().Changed("model") { + tc.Provider.Model = providerModel + } + if cmd.Flags().Changed("api-base") { + tc.Provider.APIBase = providerAPI + } + + // Build runtime config from TOML. + cfg := config.FromTOML(tc) + cfg.TargetDir = targetDir + + clamped := config.ClampConcurrency(cfg.Concurrency) + if clamped != cfg.Concurrency { fmt.Printf("Concurrency clamped to %d (max %d)\n", clamped, config.MaxConcurrency) + cfg.Concurrency = clamped } - tools := append([]string{}, config.DefaultAllowedTools...) + if len(allowedTools) > 0 { - tools = append(tools, allowedTools...) - } - cfg := config.Config{ - TargetDir: targetDir, - Concurrency: clamped, - RateLimit: rateLimit, - AllowedTools: tools, - TelemetryDir: ".sweeper/telemetry", - DryRun: dryRun, - NoTapes: noTapes, - MaxRounds: maxRounds, - StaleThreshold: staleThreshold, - Provider: providerName, - ProviderModel: providerModel, - ProviderAPI: providerAPI, + cfg.AllowedTools = append(cfg.AllowedTools, allowedTools...) } + // Build telemetry publisher from config. + pub := buildPublisher(tc) + // Validate provider exists before proceeding. if _, err := provider.Get(cfg.Provider); err != nil { return err @@ -143,6 +176,8 @@ Examples: } } + opts = append(opts, agent.WithPublisher(pub)) + a := agent.New(cfg, opts...) summary, err := a.Run(ctx) if err != nil { @@ -184,3 +219,31 @@ func argsAfterDash(cmd *cobra.Command, args []string) []string { } return args[idx:] } + +func buildPublisher(tc config.TOMLConfig) telemetry.Publisher { + jsonl := telemetry.NewJSONLPublisher(tc.Telemetry.Dir) + + if tc.Telemetry.Backend != "confluent" { + return jsonl + } + + cc := tc.Telemetry.Confluent + if len(cc.Brokers) == 0 || cc.Topic == "" { + fmt.Println("Warning: confluent backend selected but brokers/topic not configured, using JSONL only") + return jsonl + } + + cp, err := confluent.NewPublisher(confluent.Config{ + Brokers: cc.Brokers, + Topic: cc.Topic, + ClientID: cc.ClientID, + APIKeyEnv: cc.APIKeyEnv, + APISecretEnv: cc.APISecretEnv, + }) + if err != nil { + fmt.Printf("Warning: confluent publisher: %v, using JSONL only\n", err) + return jsonl + } + + return telemetry.NewMultiPublisher(jsonl, cp) +} From 784b96666697a5e7906a9f9eebe635c8cd5fb0e3 Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 15:33:26 -0700 Subject: [PATCH 08/12] feat: extension reads telemetry dir from config.toml --- extensions/sweeper/index.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/extensions/sweeper/index.ts b/extensions/sweeper/index.ts index 92b80df..b4d8168 100644 --- a/extensions/sweeper/index.ts +++ b/extensions/sweeper/index.ts @@ -36,6 +36,22 @@ function todayFileName(): string { function telemetryDir(targetDir: string): string { const path = require("path"); + const fs = require("fs"); + + // Check config.toml for custom telemetry dir + const configPath = path.join(targetDir, ".sweeper", "config.toml"); + if (fs.existsSync(configPath)) { + try { + const content = fs.readFileSync(configPath, "utf-8").replace(/\r\n/g, "\n"); + // Simple TOML extraction for telemetry.dir (handles comments between section and key) + const match = content.match(/^\[telemetry\]\s*\n(?:(?:#[^\n]*|[^\[]*?)\n)*?dir\s*=\s*"([^"]+)"/m); + if (match) { + return path.isAbsolute(match[1]) ? match[1] : path.join(targetDir, match[1]); + } + } catch { + // Fall through to default + } + } return path.join(targetDir, ".sweeper", "telemetry"); } From 85256885b7d60529e8cb4fe1d26c14dbb3161bfb Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 15:33:30 -0700 Subject: [PATCH 09/12] docs: add example config.toml with all sections documented --- example.config.toml | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 example.config.toml diff --git a/example.config.toml b/example.config.toml new file mode 100644 index 0000000..b715764 --- /dev/null +++ b/example.config.toml @@ -0,0 +1,43 @@ +# .sweeper/config.toml — sweeper project configuration +# Copy to .sweeper/config.toml or ~/.sweeper/config.toml +# +# Precedence (highest to lowest): +# 1. CLI flags (--concurrency, --provider, etc.) +# 2. SWEEPER_* environment variables +# 3. .sweeper/config.toml (project) +# 4. ~/.sweeper/config.toml (user) +# 5. Built-in defaults + +version = 1 + +[run] +concurrency = 2 +rate_limit = "2s" +max_rounds = 1 +stale_threshold = 2 +# dry_run = false +# no_tapes = false + +[provider] +name = "claude" +# model = "" +# api_base = "" +# allowed_tools = ["Read", "Write", "Edit", "Glob", "Grep"] + +[telemetry] +# backend: "jsonl" (default, always active) or "confluent" (adds Kafka fan-out) +backend = "jsonl" +dir = ".sweeper/telemetry" + +# [telemetry.confluent] +# brokers = ["pkc-xxxxx.us-west-2.aws.confluent.cloud:9092"] +# topic = "sweeper.telemetry" +# client_id = "sweeper" +# api_key_env = "CONFLUENT_API_KEY" +# api_secret_env = "CONFLUENT_API_SECRET" +# publish_timeout = "5s" + +# [vm] +# enabled = false +# name = "" +# jcard = "" From 14b17141f2a1f43b03e5d255c541e15d436bb4b3 Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Tue, 17 Mar 2026 16:31:04 -0700 Subject: [PATCH 10/12] fix: set RequiredAcks to RequireAll on Confluent Kafka writer The kafka.Writer defaulted to RequireNone (fire-and-forget), so the broker never confirmed receipt and messages were silently dropped. --- pkg/telemetry/confluent/confluent.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/telemetry/confluent/confluent.go b/pkg/telemetry/confluent/confluent.go index 0a2b3c2..b43192e 100644 --- a/pkg/telemetry/confluent/confluent.go +++ b/pkg/telemetry/confluent/confluent.go @@ -73,10 +73,11 @@ func NewPublisher(c Config) (*Publisher, error) { } kw := &kafka.Writer{ - Addr: kafka.TCP(c.Brokers...), - Topic: c.Topic, - Balancer: &kafka.Hash{}, - Transport: transport, + Addr: kafka.TCP(c.Brokers...), + Topic: c.Topic, + Balancer: &kafka.Hash{}, + Transport: transport, + RequiredAcks: kafka.RequireAll, } return newPublisherWithWriter(c, kw) From 342ac1119da35fdbe90a4fb85f2f8e03bed81113 Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Sat, 21 Mar 2026 19:52:45 -0700 Subject: [PATCH 11/12] chore: gitignore Confluent API key backup files --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 8967d73..9f0da62 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ sweeper.md .claude/ *.jsonl coverage.out +api-key-*.txt # Sweeper ephemeral VM jcards (contain secrets) .sweeper/vm/ From 693133b22a868553c21fa5ddf5d13b11e9cceb08 Mon Sep 17 00:00:00 2001 From: Brian Douglas Date: Sat, 21 Mar 2026 20:21:15 -0700 Subject: [PATCH 12/12] docs: add Confluent Cloud telemetry section to README References the confluent-cloud-setup skill from papercomputeco/skills for cluster and topic setup. --- README.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/README.md b/README.md index 4eab216..edebd16 100644 --- a/README.md +++ b/README.md @@ -208,6 +208,31 @@ Every sub-agent session is recorded in [tapes](https://github.com/papercomputeco Run `sweeper observe` after each sweep to see insights and tune your next run. +## Confluent Cloud Telemetry + +Sweeper can stream telemetry events to Confluent Cloud alongside local JSONL files. Enable it in `.sweeper/config.toml`: + +```toml +[telemetry] +backend = "confluent" +dir = ".sweeper/telemetry" + +[telemetry.confluent] +brokers = ["pkc-xxxxx.region.provider.confluent.cloud:9092"] +topic = "sweeper.telemetry" +client_id = "sweeper" +api_key_env = "SWEEPER_CONFLUENT_API_KEY" +api_secret_env = "SWEEPER_CONFLUENT_API_SECRET" +``` + +Set `SWEEPER_CONFLUENT_API_KEY` and `SWEEPER_CONFLUENT_API_SECRET` in your environment. The config references env var names, not raw credentials. + +For cluster and topic setup, install the [confluent-cloud-setup](https://github.com/papercomputeco/skills/tree/main/skills/confluent-cloud-setup) skill: + +```bash +npx skills add papercomputeco/skills +``` + ## VM Isolation Sub-agents can run inside ephemeral [stereOS](https://stereos.ai) virtual machines, managed by the `mb` (Masterblaster) CLI. This is what makes high concurrency safe.