Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Copyright 2025 Deutsche Telekom IT GmbH
#
# SPDX-License-Identifier: Apache-2.0

name: Run CI

on:
Expand All @@ -8,6 +12,8 @@ on:
tags:
- 'v*'
pull_request:
schedule:
- cron: '0 0 * * *' # Runs every day at midnight UTC

permissions:
contents: read
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/reusable-go-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ jobs:
${{ runner.os }}-go-${{ inputs.module }}-

- name: Initialize CodeQL
uses: github/codeql-action/init@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
uses: github/codeql-action/init@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
with:
languages: go
build-mode: manual # Set to manual as we provide a build step
Expand All @@ -308,7 +308,7 @@ jobs:
fi

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
uses: github/codeql-action/analyze@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
with:
category: "/language:go/${{ inputs.name }}"

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/scorecard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard (optional).
# Commenting out will disable upload of results to your repo's Code Scanning dashboard
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.31.1
uses: github/codeql-action/upload-sarif@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.31.1
with:
sarif_file: results.sarif
4 changes: 2 additions & 2 deletions app/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
local bool
dryRun bool
color bool
concurrency string
concurrency int
baseDirs []string
)

Expand Down Expand Up @@ -71,7 +71,7 @@ func init() {
rootCmd.PersistentFlags().BoolVarP(&local, "local", "l", false, "run with working directory used as base directory")
rootCmd.PersistentFlags().BoolVarP(&dryRun, "dryRun", "D", false, "run with dry run mode")
rootCmd.PersistentFlags().BoolVarP(&color, "color", "c", true, "color output")
rootCmd.PersistentFlags().StringVarP(&concurrency, "concurrency", "C", "1", "number of concurrent operations")
rootCmd.PersistentFlags().IntVarP(&concurrency, "concurrency", "C", 1, "number of concurrent operations")
rootCmd.PersistentFlags().StringSliceVar(&baseDirs, "base.dirs", []string{}, "base directories for git repositories")

// Bind flags to Viper settings
Expand Down
6 changes: 3 additions & 3 deletions config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func IsColored() bool {
return viper.GetBool(GitCtlColor)
}

// GetConcurrency returns the concurrency level as a string
func GetConcurrency() string {
return viper.GetString(GitCtlConcurrency)
// GetConcurrency returns the concurrency level as an integer
func GetConcurrency() int {
return viper.GetInt(GitCtlConcurrency)
}

// GetBaseDirs returns the base directories as a slice of strings
Expand Down
12 changes: 11 additions & 1 deletion config/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,24 @@ func TestIsColoredReturnsFalseWhenDisabled(t *testing.T) {
}

func TestGetConcurrencyReturnsCorrectValue(t *testing.T) {
expected := "4"
expected := 4
viper.Set(GitCtlConcurrency, expected)
result := GetConcurrency()
if result != expected {
t.Errorf("expected %v, got %v", expected, result)
}
}

func TestGetConcurrencyReturnsZeroWhenUnset(t *testing.T) {
viper.Reset()
t.Cleanup(viper.Reset)
result := GetConcurrency()
// viper returns 0 for unset int keys; clamping to 1 is the caller's responsibility
if result != 0 {
t.Errorf("expected 0 (unset), got %v", result)
}
}

func TestGetBaseDirsReturnsCorrectValueWhenLocal(t *testing.T) {
viper.Reset()
t.Cleanup(viper.Reset)
Expand Down
26 changes: 22 additions & 4 deletions gitrepo/gitrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func FindGitRepos(root string) ([]GitRepo, error) {
}
}

func (gitRepo *GitRepo) RunGitCommand(command string) ([]byte, error) {
// runRaw executes the git command and returns raw combined output without any
// color formatting or global state mutations. Safe to call from goroutines.
func (gitRepo *GitRepo) runRaw(command string) ([]byte, error) {
verbose := config.IsVerbose()
dryRun := config.IsDryRun()
repoPath := ""
Expand Down Expand Up @@ -88,10 +90,26 @@ func (gitRepo *GitRepo) RunGitCommand(command string) ([]byte, error) {

gitCmd.Dir = repoPath
out, err := gitCmd.CombinedOutput()
// Format the output with headers and separators and color
formattedOutput := FormatOutput(repoPath, out)
if err != nil {
return []byte(formattedOutput), fmt.Errorf("git %s failed for %s: %w", command, repoPath, err)
return out, fmt.Errorf("git %s failed for %s: %w", command, repoPath, err)
}
return out, nil
}

// RunGitCommand executes the git command and returns color-formatted output.
// Not safe to call from concurrent goroutines (mutates global color state).
func (gitRepo *GitRepo) RunGitCommand(command string) ([]byte, error) {
repoPath := ""
if gitRepo != nil {
repoPath = gitRepo.path
}
raw, err := gitRepo.runRaw(command)
if raw == nil && err == nil {
return nil, nil
}
formattedOutput := FormatOutput(repoPath, raw)
if err != nil {
return []byte(formattedOutput), err
}
return []byte(formattedOutput), nil
}
Expand Down
72 changes: 62 additions & 10 deletions gitrepo/gitrepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ import (
"errors"
"fmt"
"strings"
"sync"

"github.com/bjoernkarma/gitctl/color"
"github.com/bjoernkarma/gitctl/config"
)

type repoResult struct {
rawOutput []byte
err error
}

func RunGitCommand(command string, baseDirs []string) error {
allGitRepos, findErr := findGitReposInBaseDirs(baseDirs)
if findErr != nil {
Expand All @@ -21,25 +27,28 @@ func RunGitCommand(command string, baseDirs []string) error {
if isVerbose && !isQuiet {
fmt.Printf("\n============ GIT OUTPUT (VERBOSE) ============\n")
}

results := runWithWorkerPool(command, allGitRepos)

var commandErrors []error
if findErr != nil {
commandErrors = append(commandErrors, findErr)
}
for _, gitRepo := range allGitRepos {
output, err := gitRepo.RunGitCommand(command)
if err != nil {
commandErrors = append(commandErrors, err)
errorMsg := extractErrorMessage(string(output))
color.AddGitCommandFailure(gitRepo.path, errorMsg, string(output))
// In verbose mode, show the full formatted output immediately
for i, result := range results {
// FormatOutput mutates global color state — must stay in the main goroutine.
formattedOutput := FormatOutput(allGitRepos[i].path, result.rawOutput)
if result.err != nil {
commandErrors = append(commandErrors, result.err)
errorMsg := extractErrorMessage(formattedOutput)
color.AddGitCommandFailure(allGitRepos[i].path, errorMsg, formattedOutput)
if isVerbose && !isQuiet {
fmt.Printf("%s", output)
fmt.Printf("%s", formattedOutput)
}
} else if isVerbose && !isQuiet {
fmt.Printf("%s", output)
fmt.Printf("%s", formattedOutput)
}

}

if isVerbose && !isQuiet {
fmt.Printf("\n============ GIT OUTPUT END ============\n")
}
Expand All @@ -51,6 +60,49 @@ func RunGitCommand(command string, baseDirs []string) error {
return errors.Join(commandErrors...)
}

// runWithWorkerPool executes the git command across all repos using a bounded
// goroutine pool. Results are stored at each repo's discovery index so that
// the caller can iterate them in deterministic order. Workers call runRaw to
// avoid concurrent mutations of global color state.
func runWithWorkerPool(command string, repos []GitRepo) []repoResult {
results := make([]repoResult, len(repos))
if len(repos) == 0 {
return results
}

concurrency := config.GetConcurrency()
if concurrency < 1 {
concurrency = 1
}

type job struct {
index int
repo GitRepo
}

jobs := make(chan job, len(repos))
var wg sync.WaitGroup

for range concurrency {
wg.Add(1)
go func() {
defer wg.Done()
for j := range jobs {
raw, err := j.repo.runRaw(command)
results[j.index] = repoResult{rawOutput: raw, err: err}
}
}()
}

for i, repo := range repos {
jobs <- job{index: i, repo: repo}
}
close(jobs)

wg.Wait()
return results
}

func findGitReposInBaseDirs(baseDirs []string) ([]GitRepo, error) {
var allGitRepos []GitRepo
verbose := config.IsVerbose()
Expand Down
62 changes: 62 additions & 0 deletions gitrepo/gitrepos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,65 @@ func TestRunGitCommandAggregatesErrorsFromInvalidAndValidBaseDirs(t *testing.T)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "failed to find repositories"))
}

func TestRunGitCommandWithConcurrencyGreaterThanOneProcessesAllRepos(t *testing.T) {
viper.Reset()
t.Cleanup(viper.Reset)
viper.Set(config.GitCtlDryRun, true)
viper.Set(config.GitCtlConcurrency, 3)

testDir, _ := filepath.Abs(testDirPath)
baseDirs := []string{testDir}

err := RunGitCommand(GitStatus, baseDirs)
assert.NoError(t, err)
}

func TestRunWithWorkerPoolClampsNegativeConcurrencyToOne(t *testing.T) {
viper.Reset()
t.Cleanup(viper.Reset)
viper.Set(config.GitCtlDryRun, true)
viper.Set(config.GitCtlConcurrency, -1)

testDir, _ := filepath.Abs(testDirPath)
repos, err := findGitReposInBaseDirs([]string{testDir})
assert.NoError(t, err)

results := runWithWorkerPool(GitStatus, repos)
assert.Len(t, results, len(repos))
}

func TestRunWithWorkerPoolClampsZeroConcurrencyToOne(t *testing.T) {
viper.Reset()
t.Cleanup(viper.Reset)
viper.Set(config.GitCtlDryRun, true)
viper.Set(config.GitCtlConcurrency, 0)

testDir, _ := filepath.Abs(testDirPath)
repos, err := findGitReposInBaseDirs([]string{testDir})
assert.NoError(t, err)

results := runWithWorkerPool(GitStatus, repos)
assert.Len(t, results, len(repos))
}

func TestRunWithWorkerPoolPreservesDiscoveryOrder(t *testing.T) {
viper.Reset()
t.Cleanup(viper.Reset)
viper.Set(config.GitCtlConcurrency, 3)

testDir, _ := filepath.Abs(microserviceDirPath)
// Mix valid and invalid repos to confirm results are indexed by discovery order.
repos := []GitRepo{
{path: testDir}, // index 0: valid — no error expected
{path: invalidPath}, // index 1: invalid — error expected
{path: testDir}, // index 2: valid — no error expected
}

results := runWithWorkerPool(GitStatus, repos)

assert.Len(t, results, 3)
assert.NoError(t, results[0].err)
assert.Error(t, results[1].err)
assert.NoError(t, results[2].err)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/bjoernkarma/gitctl

go 1.26.2
go 1.26.3

require (
github.com/charmbracelet/lipgloss v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions openspec/changes/concurrent-execution/.openspec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-05-28
63 changes: 63 additions & 0 deletions openspec/changes/concurrent-execution/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
## Context

`gitctl` discovers git repositories across one or more base directories and runs a git command (`status` or `pull`) on each. Currently, execution is strictly sequential — one repo at a time. Git operations are I/O-bound (network for pull, filesystem for status), making them natural candidates for concurrency.

The configuration layer already exposes `run_mode.concurrency` (YAML, env, and CLI flag), and `config.GetConcurrency()` exists but returns a `string` and is never called by the execution layer. The groundwork is there; it just needs to be wired up correctly.

## Goals / Non-Goals

**Goals:**
- Implement a bounded goroutine worker pool in `gitrepo/gitrepos.go`, limited by `run_mode.concurrency`.
- Print results in deterministic discovery order regardless of completion order.
- Fix `GetConcurrency()` return type from `string` to `int`.
- Keep `concurrency=1` behaviour identical to current sequential behaviour.

**Non-Goals:**
- Streaming live output as repos complete (deferred; collect-then-print is chosen for stability).
- Progress bars or live status indicators.
- Per-command concurrency limits (single limit applies to all commands).
- Changing how errors are reported or aggregated.

## Decisions

### Decision 1: Worker pool with result collection (vs. fan-out goroutines)

**Chosen**: A fixed-size worker pool reads from a job channel. Each worker writes its result (output + error) to a pre-allocated result slice at the repo's discovery index. After all workers finish, the main goroutine iterates the result slice in order and prints.

**Why not unbounded goroutines (one per repo)?** With hundreds of repos, spawning unlimited goroutines risks exhausting file descriptors or memory. The pool naturally applies backpressure.

**Why not channels for results?** A pre-allocated `[]result` indexed by position gives us ordered output for free, without sorting or a second coordination step.

```
Discovery order: [repo0, repo1, repo2, repo3, repo4]
job channel
┌──────────┐
worker 1 ◀───┤ ├───▶ results[0], results[2], results[4]
worker 2 ◀───┤ ├───▶ results[1], results[3]
└──────────┘
(all done)
print results[0..4] in order
```

### Decision 2: Fix `GetConcurrency()` to return `int`

**Chosen**: Change signature to `GetConcurrency() int` using `viper.GetInt`. Update the CLI flag binding and default from `"1"` (string) to `1` (int). The flag type changes from `StringVarP` to `IntVarP`.

**Why now?** The type is only used by the new worker pool. Fixing it as part of this change avoids a later migration.

### Decision 3: Minimum concurrency of 1

**Chosen**: If the configured value is `< 1`, clamp to `1`. This prevents deadlocks from a zero-worker pool and makes the behaviour predictable.

## Risks / Trade-offs

- **Interleaved filesystem access** — Multiple `git pull` operations on repos sharing a common remote could hit rate limits on the remote. This is a user-configurable concern; the default of `1` is safe. → _No mitigation needed; document in help text._
- **Type change is breaking** — Any external code importing `config.GetConcurrency()` must update. Since this is a CLI tool (not a library), impact is limited to internal callers. → _Fix all internal call sites as part of this change._
- **Result slice pre-allocation** — Requires knowing the full repo list upfront (already the case). Not a concern for typical repo counts.

## Open Questions

- None. Design is fully resolved for the agreed scope.
Loading
Loading