Skip to content

Commit f66d96e

Browse files
committed
Parallelize file processing.
1 parent 859ac18 commit f66d96e

3 files changed

Lines changed: 62 additions & 19 deletions

File tree

cmd/root.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ func init() {
2323
rootCmd.Flags().StringP("migration-archive", "m", "", "Path to the migration archive Example: /path/to/migration-archive.tar.gz")
2424
rootCmd.MarkFlagRequired("migration-archive")
2525

26+
rootCmd.Flags().IntP("threads", "t", 0, "Number of parallel goroutines for metadata processing (default: number of CPUs)")
27+
2628
rootCmd.SilenceErrors = true
2729
rootCmd.SilenceUsage = true
2830
}
@@ -94,7 +96,8 @@ var rootCmd = &cobra.Command{
9496

9597
pterm.DefaultSection.Println("Remap")
9698
remapSpinner, _ := pterm.DefaultSpinner.Start("Remapping SHAs...")
97-
stats, err := commitremap.ProcessFiles(extractedDir, commitremap.DefaultPrefixes(), commitMap)
99+
threads, _ := cmd.Flags().GetInt("threads")
100+
stats, err := commitremap.ProcessFiles(extractedDir, commitremap.DefaultPrefixes(), commitMap, threads)
98101
if err != nil {
99102
remapSpinner.Fail("Remap failed")
100103
renderSummaryTable(stats, extractedDir)

pkg/commitremap/commitremap.go

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"fmt"
77
"os"
88
"path/filepath"
9+
"runtime"
910
"strings"
11+
"sync"
1012
)
1113

1214
// DefaultPrefixes returns the set of archive metadata file prefixes that
@@ -61,29 +63,67 @@ func ParseCommitMap(filePath string) (map[string]string, error) {
6163
// Each file is walked once, replacing string values that exactly match a key in
6264
// commitMap. Only whole-string SHA values are replaced. SHAs embedded in URLs,
6365
// markdown, or composite strings are not rewritten.
64-
func ProcessFiles(archiveDir string, prefixes []string, commitMap map[string]string) (Stats, error) {
66+
//
67+
// numWorkers controls how many goroutines process files in parallel.
68+
// If numWorkers <= 0, it defaults to runtime.NumCPU().
69+
func ProcessFiles(archiveDir string, prefixes []string, commitMap map[string]string, numWorkers int) (Stats, error) {
6570
stats := Stats{PerFile: make(map[string]int)}
6671

72+
if numWorkers <= 0 {
73+
numWorkers = runtime.NumCPU()
74+
}
75+
76+
// Collect all files to process
77+
var allFiles []string
6778
for _, prefix := range prefixes {
6879
pattern := filepath.Join(archiveDir, prefix+"_*.json")
6980
files, err := filepath.Glob(pattern)
7081
if err != nil {
7182
return stats, fmt.Errorf("globbing %s: %w", pattern, err)
7283
}
84+
allFiles = append(allFiles, files...)
85+
}
7386

74-
for _, file := range files {
75-
stats.FilesScanned++
76-
n, err := updateMetadataFile(file, commitMap)
77-
if err != nil {
78-
return stats, fmt.Errorf("updating metadata file %s: %w", file, err)
79-
}
80-
if n > 0 {
81-
stats.PerFile[file] = n
87+
stats.FilesScanned = len(allFiles)
88+
89+
type fileResult struct {
90+
file string
91+
count int
92+
err error
93+
}
94+
95+
results := make([]fileResult, len(allFiles))
96+
workCh := make(chan int, len(allFiles))
97+
for i := range allFiles {
98+
workCh <- i
99+
}
100+
close(workCh)
101+
102+
var wg sync.WaitGroup
103+
for w := 0; w < numWorkers; w++ {
104+
wg.Add(1)
105+
go func() {
106+
defer wg.Done()
107+
for idx := range workCh {
108+
n, err := updateMetadataFile(allFiles[idx], commitMap)
109+
results[idx] = fileResult{file: allFiles[idx], count: n, err: err}
82110
}
111+
}()
112+
}
113+
wg.Wait()
114+
115+
// Merge results in order, returning partial stats on first error
116+
var firstErr error
117+
for _, res := range results {
118+
if res.count > 0 {
119+
stats.PerFile[res.file] = res.count
120+
}
121+
if res.err != nil && firstErr == nil {
122+
firstErr = fmt.Errorf("updating metadata file %s: %w", res.file, res.err)
83123
}
84124
}
85125

86-
return stats, nil
126+
return stats, firstErr
87127
}
88128

89129
func updateMetadataFile(filePath string, commitMap map[string]string) (int, error) {

pkg/commitremap/commitremap_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func TestProcessFiles(t *testing.T) {
173173
writeFile(t, p, fixture.input)
174174
}
175175

176-
stats, err := ProcessFiles(dir, DefaultPrefixes(), commitMap)
176+
stats, err := ProcessFiles(dir, DefaultPrefixes(), commitMap, 0)
177177
if err != nil {
178178
t.Fatalf("ProcessFiles returned error: %v", err)
179179
}
@@ -207,15 +207,15 @@ func TestProcessFiles(t *testing.T) {
207207
want := `{"sha":"oldSHA1","nested":[{"sha":"oldSHA2"}]}`
208208
writeFile(t, filePath, want)
209209

210-
if _, err := ProcessFiles(dir, []string{"pull_requests"}, map[string]string{}); err != nil {
210+
if _, err := ProcessFiles(dir, []string{"pull_requests"}, map[string]string{}, 0); err != nil {
211211
t.Fatalf("ProcessFiles returned error: %v", err)
212212
}
213213

214214
assertJSONFileEqual(t, filePath, want)
215215
})
216216

217217
t.Run("no matching files", func(t *testing.T) {
218-
if _, err := ProcessFiles(t.TempDir(), DefaultPrefixes(), map[string]string{"oldSHA1": "newSHA1"}); err != nil {
218+
if _, err := ProcessFiles(t.TempDir(), DefaultPrefixes(), map[string]string{"oldSHA1": "newSHA1"}, 0); err != nil {
219219
t.Fatalf("ProcessFiles returned error: %v", err)
220220
}
221221
})
@@ -227,7 +227,7 @@ func TestProcessFiles(t *testing.T) {
227227
writeFile(t, fooPath, `{"sha":"oldSHA1"}`)
228228
writeFile(t, pullPath, `{"sha":"oldSHA1"}`)
229229

230-
stats, err := ProcessFiles(dir, []string{"foo"}, map[string]string{"oldSHA1": "newSHA1"})
230+
stats, err := ProcessFiles(dir, []string{"foo"}, map[string]string{"oldSHA1": "newSHA1"}, 0)
231231
if err != nil {
232232
t.Fatalf("ProcessFiles returned error: %v", err)
233233
}
@@ -257,7 +257,7 @@ func TestProcessFiles(t *testing.T) {
257257
"oldSHA3": "newSHA3",
258258
"oldSHA4": "newSHA4",
259259
}
260-
if _, err := ProcessFiles(dir, []string{"pull_requests"}, commitMap); err != nil {
260+
if _, err := ProcessFiles(dir, []string{"pull_requests"}, commitMap, 0); err != nil {
261261
t.Fatalf("ProcessFiles returned error: %v", err)
262262
}
263263

@@ -270,7 +270,7 @@ func TestProcessFiles(t *testing.T) {
270270
original := `{"title":"no SHA here","body":"https://example.invalid/oldSHA1","labels":["bug","help wanted"]}`
271271
writeFile(t, filePath, original)
272272

273-
if _, err := ProcessFiles(dir, []string{"issues"}, map[string]string{"oldSHA1": "newSHA1"}); err != nil {
273+
if _, err := ProcessFiles(dir, []string{"issues"}, map[string]string{"oldSHA1": "newSHA1"}, 0); err != nil {
274274
t.Fatalf("ProcessFiles returned error: %v", err)
275275
}
276276

@@ -305,7 +305,7 @@ func TestProcessFiles_SkipsWriteWhenNoReplacements(t *testing.T) {
305305
t.Fatalf("stat before: %v", err)
306306
}
307307

308-
stats, err := ProcessFiles(dir, []string{"pull_requests"}, map[string]string{"unrelated": "x"})
308+
stats, err := ProcessFiles(dir, []string{"pull_requests"}, map[string]string{"unrelated": "x"}, 0)
309309
if err != nil {
310310
t.Fatalf("ProcessFiles returned error: %v", err)
311311
}
@@ -345,7 +345,7 @@ func TestProcessFiles_ReturnsPartialStatsOnError(t *testing.T) {
345345
writeFile(t, validPath, `{"sha":"oldSHA1"}`)
346346
writeFile(t, badPath, `{not valid json`)
347347

348-
stats, err := ProcessFiles(dir, []string{"pull_requests"}, map[string]string{"oldSHA1": "newSHA1"})
348+
stats, err := ProcessFiles(dir, []string{"pull_requests"}, map[string]string{"oldSHA1": "newSHA1"}, 0)
349349
if err == nil {
350350
t.Fatal("expected error from malformed JSON file")
351351
}

0 commit comments

Comments
 (0)