feat: add background job queue system using River#261
Conversation
Add `lvt gen queue` and `lvt gen job <name>` commands that scaffold background job processing using River (https://riverqueue.com), the de facto Go job queue library with 4.8k stars. River handles all queue/worker internals: worker pool, retry with exponential backoff, scheduled jobs, dead letter queue, unique jobs, graceful shutdown, and supports both SQLite and PostgreSQL. Our code is purely the generation layer: - Kit templates for migration, worker init, and job handler scaffolds - Generator functions (GenerateQueue, GenerateJob) with main.go injection - CLI commands with help text Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds first-class background job scaffolding to LiveTemplate via River, wiring new generation commands into the CLI and providing kit templates for queue schema/migrations, worker registration, and job handler scaffolds.
Changes:
- Add
lvt gen queueto generate River queue DB artifacts and attempt to inject worker startup into appmain.go. - Add
lvt gen job <name>to scaffold a River worker + args type and register it inapp/jobs/worker.go. - Add generator + tests plus CLI routing/help text for the new subcommands.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/kits/system/single/templates/jobs/worker_init.go.tmpl | Template for app/jobs/worker.go worker registry (single kit). |
| internal/kits/system/single/templates/jobs/schema.sql.tmpl | River schema appended into database/schema.sql (single kit). |
| internal/kits/system/single/templates/jobs/migration.sql.tmpl | Goose migration template for River tables (single kit). |
| internal/kits/system/single/templates/jobs/handler.go.tmpl | Scaffold for per-job River worker/args (single kit). |
| internal/kits/system/multi/templates/jobs/worker_init.go.tmpl | Template for app/jobs/worker.go worker registry (multi kit). |
| internal/kits/system/multi/templates/jobs/schema.sql.tmpl | River schema appended into database/schema.sql (multi kit). |
| internal/kits/system/multi/templates/jobs/migration.sql.tmpl | Goose migration template for River tables (multi kit). |
| internal/kits/system/multi/templates/jobs/handler.go.tmpl | Scaffold for per-job River worker/args (multi kit). |
| internal/generator/jobs.go | Implements GenerateQueue/GenerateJob + string-based main.go/worker registration injection. |
| internal/generator/jobs_test.go | Unit tests for queue/job generation behaviors. |
| commands/jobs.go | CLI entrypoints for lvt gen queue / lvt gen job. |
| commands/gen.go | Routes new queue/job subcommands and updates interactive help text. |
internal/generator/jobs.go
Outdated
| riverSetup := []string{ | ||
| "", | ||
| "\t// Background job processing (River)", | ||
| "\triverDB, err := sql.Open(\"sqlite\", dbPath+\"?_pragma=journal_mode(WAL)\")", | ||
| "\tif err != nil {", | ||
| "\t\tslog.Error(\"Failed to open River database\", \"error\", err)", | ||
| "\t\tos.Exit(1)", | ||
| "\t}", | ||
| "\triverDB.SetMaxOpenConns(1)", | ||
| "\tdefer riverDB.Close()", | ||
| "", | ||
| "\tjobWorkers := jobs.SetupWorkers()", | ||
| "\triverClient, err := river.NewClient(riversqlite.New(riverDB), &river.Config{", | ||
| "\t\tQueues: map[string]river.QueueConfig{", | ||
| "\t\t\triver.QueueDefault: {MaxWorkers: 100},", | ||
| "\t\t},", | ||
| "\t\tWorkers: jobWorkers,", | ||
| "\t})", | ||
| "\tif err != nil {", | ||
| "\t\tslog.Error(\"Failed to create River client\", \"error\", err)", | ||
| "\t\tos.Exit(1)", | ||
| "\t}", | ||
| "\tif err := riverClient.Start(appCtx); err != nil {", | ||
| "\t\tslog.Error(\"Failed to start job workers\", \"error\", err)", | ||
| "\t\tos.Exit(1)", | ||
| "\t}", | ||
| "\tdefer riverClient.Stop(appCtx)", | ||
| "\t_ = riverClient // Available for enqueueing jobs in handlers", | ||
| } | ||
| result = append(result, riverSetup...) | ||
| injected = true | ||
| } | ||
| } | ||
|
|
||
| if !injected { | ||
| return fmt.Errorf("could not find injection point in main.go (expected 'defer database.CloseDB()')") | ||
| } | ||
|
|
||
| // Inject imports | ||
| resultStr := strings.Join(result, "\n") | ||
|
|
||
| // Add River imports | ||
| riverImports := fmt.Sprintf("\t\"%s/app/jobs\"\n", moduleName) + | ||
| "\t\"github.com/riverqueue/river\"\n" + | ||
| "\t\"github.com/riverqueue/river/riverdriver/riversqlite\"" | ||
|
|
||
| // Find the import block and add our imports | ||
| if idx := strings.Index(resultStr, "\"database/sql\""); idx != -1 { | ||
| // Insert after "database/sql" import | ||
| insertPos := idx + len("\"database/sql\"") | ||
| resultStr = resultStr[:insertPos] + "\n" + riverImports + resultStr[insertPos:] |
There was a problem hiding this comment.
injectJobWorker inserts River setup code that calls sql.Open(...), but the import injection only adds app/jobs, river, and riversqlite imports. Since the generated main.go templates don’t import database/sql, the injected code won’t compile unless database/sql is also added to the import block (or the code avoids using sql.Open).
internal/generator/jobs.go
Outdated
| // Find injection point: after database.InitDB call, before route registrations | ||
| // Look for the database init line | ||
| lines := strings.Split(mainStr, "\n") | ||
| var result []string | ||
| injected := false | ||
|
|
||
| for _, line := range lines { | ||
| result = append(result, line) | ||
|
|
||
| // Inject after the database initialization block | ||
| // Look for the line that closes the database error check (the closing brace after InitDB) | ||
| if !injected && strings.Contains(line, "defer database.CloseDB()") { | ||
| // Insert River setup after this line | ||
| riverSetup := []string{ | ||
| "", | ||
| "\t// Background job processing (River)", | ||
| "\triverDB, err := sql.Open(\"sqlite\", dbPath+\"?_pragma=journal_mode(WAL)\")", | ||
| "\tif err != nil {", | ||
| "\t\tslog.Error(\"Failed to open River database\", \"error\", err)", | ||
| "\t\tos.Exit(1)", | ||
| "\t}", | ||
| "\triverDB.SetMaxOpenConns(1)", | ||
| "\tdefer riverDB.Close()", | ||
| "", | ||
| "\tjobWorkers := jobs.SetupWorkers()", | ||
| "\triverClient, err := river.NewClient(riversqlite.New(riverDB), &river.Config{", | ||
| "\t\tQueues: map[string]river.QueueConfig{", | ||
| "\t\t\triver.QueueDefault: {MaxWorkers: 100},", | ||
| "\t\t},", | ||
| "\t\tWorkers: jobWorkers,", | ||
| "\t})", | ||
| "\tif err != nil {", | ||
| "\t\tslog.Error(\"Failed to create River client\", \"error\", err)", | ||
| "\t\tos.Exit(1)", | ||
| "\t}", | ||
| "\tif err := riverClient.Start(appCtx); err != nil {", | ||
| "\t\tslog.Error(\"Failed to start job workers\", \"error\", err)", | ||
| "\t\tos.Exit(1)", | ||
| "\t}", | ||
| "\tdefer riverClient.Stop(appCtx)", | ||
| "\t_ = riverClient // Available for enqueueing jobs in handlers", | ||
| } | ||
| result = append(result, riverSetup...) | ||
| injected = true | ||
| } |
There was a problem hiding this comment.
The injected River setup references appCtx (riverClient.Start(appCtx) / Stop(appCtx)), but injectJobWorker injects immediately after defer database.CloseDB(). In the multi-kit main.go template, appCtx is created later (after route setup), and in the single-kit main.go template there is no appCtx at all. As a result, generated apps will not compile. The injection needs to either (a) inject after appCtx is created (and add an appCtx block for kits that don’t have one), or (b) use a locally-created context for River startup/shutdown.
| // 5. Inject River client setup into main.go | ||
| mainGoPath := findMainGo(projectRoot) | ||
| if mainGoPath != "" { | ||
| if err := injectJobWorker(mainGoPath, moduleName); err != nil { | ||
| return fmt.Errorf("failed to inject job worker into main.go: %w", err) | ||
| } | ||
| } |
There was a problem hiding this comment.
GenerateQueue’s main.go injection path isn’t exercised by these tests: setupTestProject doesn’t create a cmd/*/main.go, so findMainGo() returns "" and injectJobWorker() is skipped. This leaves the most fragile/critical part (string-based injection + import edits) untested; adding a test fixture main.go matching the kit templates would catch compile-breaking issues like missing imports / undefined identifiers.
| // Create lvt.yaml (project config) | ||
| lvtConfig := `kit: multi | ||
| module: testmodule | ||
| ` | ||
| if err := os.WriteFile(filepath.Join(dir, "lvt.yaml"), []byte(lvtConfig), 0644); err != nil { | ||
| t.Fatalf("Failed to create lvt.yaml: %v", err) |
There was a problem hiding this comment.
The test writes lvt.yaml as the "project config", but config.LoadProjectConfig reads .lvtrc (and returns defaults if it doesn’t exist). This means the tests aren’t actually validating config loading behavior (e.g., kit selection/module name), and could mask bugs. Update the fixture to write a .lvtrc file with kit= / module= entries (or adjust to whatever config format is actually used).
| // Create lvt.yaml (project config) | |
| lvtConfig := `kit: multi | |
| module: testmodule | |
| ` | |
| if err := os.WriteFile(filepath.Join(dir, "lvt.yaml"), []byte(lvtConfig), 0644); err != nil { | |
| t.Fatalf("Failed to create lvt.yaml: %v", err) | |
| // Create .lvtrc (project config) | |
| lvtConfig := `kit=multi | |
| module=testmodule | |
| ` | |
| if err := os.WriteFile(filepath.Join(dir, ".lvtrc"), []byte(lvtConfig), 0644); err != nil { | |
| t.Fatalf("Failed to create .lvtrc: %v", err) |
| ); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind); | ||
| CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); |
There was a problem hiding this comment.
database/schema.sql is executed directly at app startup in the generated kits, so it should create the full River schema. This template is missing important indexes/constraints that are present in the generated migration (notably river_job_state_and_finalized_at_index and the partial unique index on unique_key/unique_states used for unique jobs). Consider keeping schema.sql.tmpl in sync with the migration’s Up DDL so River features/performance match expectations.
| CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); | |
| CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); | |
| CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at); | |
| CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_key_and_unique_states_index ON river_job (unique_key, unique_states) WHERE unique_key IS NOT NULL AND unique_states IS NOT NULL; |
| ); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind); | ||
| CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); |
There was a problem hiding this comment.
database/schema.sql is executed directly at app startup in the generated kits, so it should create the full River schema. This template is missing important indexes/constraints that are present in the generated migration (notably river_job_state_and_finalized_at_index and the partial unique index on unique_key/unique_states used for unique jobs). Consider keeping schema.sql.tmpl in sync with the migration’s Up DDL so River features/performance match expectations.
| CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); | |
| CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); | |
| CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at); | |
| CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_key_unique_states_partial_idx ON river_job (unique_key, unique_states) WHERE unique_key IS NOT NULL; |
| fmt.Println(" app/jobs/worker.go Job worker registration") | ||
| fmt.Println(" database/migrations/..._river.sql River queue tables") |
There was a problem hiding this comment.
The “Generated files” output for the queue migration doesn’t match the filename pattern produced by GenerateQueue (which creates *_setup_river_queue.sql). Printing database/migrations/..._river.sql is misleading for users trying to locate the file; consider updating the message to reflect the actual naming convention.
| fmt.Println(" app/jobs/worker.go Job worker registration") | |
| fmt.Println(" database/migrations/..._river.sql River queue tables") | |
| fmt.Println(" app/jobs/worker.go Job worker registration") | |
| fmt.Println(" database/migrations/..._setup_river_queue.sql River queue tables") |
commands/jobs.go
Outdated
| // Convert snake_case to CamelCase for display | ||
| parts := strings.Split(jobName, "_") | ||
| for i, p := range parts { | ||
| if len(p) > 0 { | ||
| parts[i] = strings.ToUpper(p[:1]) + p[1:] | ||
| } | ||
| } | ||
| camelName := strings.Join(parts, "") | ||
| fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", camelName) | ||
|
|
||
| return nil |
There was a problem hiding this comment.
GenJob builds the displayed jobs.<Camel>Args name using a local snake_case→CamelCase conversion that doesn’t preserve Go initialisms (e.g. sync_api becomes SyncApiArgs here, but generator.toCamelCase would generate SyncAPIArgs). This can lead to incorrect/misleading CLI instructions compared to the generated type names. Consider reusing the same CamelCase helper as the generator (e.g., export a shared helper) to keep the output consistent.
| // Convert snake_case to CamelCase for display | |
| parts := strings.Split(jobName, "_") | |
| for i, p := range parts { | |
| if len(p) > 0 { | |
| parts[i] = strings.ToUpper(p[:1]) + p[1:] | |
| } | |
| } | |
| camelName := strings.Join(parts, "") | |
| fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", camelName) | |
| return nil | |
| // Convert snake_case to CamelCase for display, preserving Go initialisms | |
| camelName := snakeToCamelPreserveInitialisms(jobName) | |
| fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", camelName) | |
| return nil | |
| } | |
| // snakeToCamelPreserveInitialisms converts a snake_case identifier to CamelCase, | |
| // preserving common Go initialisms (e.g., "sync_api" -> "SyncAPI"). | |
| func snakeToCamelPreserveInitialisms(s string) string { | |
| if s == "" { | |
| return "" | |
| } | |
| parts := strings.Split(s, "_") | |
| for i, p := range parts { | |
| if p == "" { | |
| continue | |
| } | |
| upper := strings.ToUpper(p) | |
| switch upper { | |
| case "API", "ASCII", "CPU", "CSS", "DNS", "EOF", | |
| "GUID", "HTML", "HTTP", "HTTPS", "ID", "IP", | |
| "JSON", "LHS", "QPS", "RAM", "RHS", "RPC", | |
| "SLA", "SMTP", "SQL", "SSH", "TCP", "TLS", | |
| "TTL", "UDP", "UI", "UID", "UUID", "URI", | |
| "URL", "UTF8", "VM", "XML", "XMPP", "XSRF", "XSS": | |
| parts[i] = upper | |
| default: | |
| parts[i] = strings.ToUpper(p[:1]) + strings.ToLower(p[1:]) | |
| } | |
| } | |
| return strings.Join(parts, "") |
Code Review: feat: add background job queue system using RiverGreat feature addition! River is a solid choice for Go job queues. The generator pattern is clean and happy-path test coverage is good. Here are my observations: Bugs / Correctness Issues1. After all the River setup in 2. The injected code references 3. Import injection does not check for duplicates
4. Partial file left on disk if template execution fails in
5.
Architecture Concerns6. PostgreSQL apps get broken generated code The injection always hardcodes the SQLite driver and 7. Template duplication between All five files in Quality Issues8. Schema drift between migration and schema.sql templates
9. Migration version numbers are hardcoded When River releases v0.27+ and adds migration 7, generated migrations will be stale. Worth adding a comment warning, and pinning River to the specific version the templates target. 10. Two runs on the same day could pull different versions. Pin to 11. Inconsistent template delimiters
Test Coverage Gaps12.
13. No test for error paths (partial file cleanup in 14. No compilation test for generated code -- the handler template could have a syntax error that only surfaces at runtime. Minor
Summary: Two blockers: (1) |
- Move injection point from defer database.CloseDB() to after appCtx creation (appCtx must exist before River client Start/Stop) - Add missing "database/sql" import for sql.Open in injected code - Use existing injectImport() helper instead of manual import injection - Buffer template output in bytes.Buffer before writing files to prevent partial files on template execution failure - Export ToCamelCase for use by CLI commands (consistent initialism handling for display output) - Sync schema.sql.tmpl with full migration.sql.tmpl (was missing constraints and indexes) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
lvt gen queuecommand to set up background job infrastructure (River queue tables, worker registration, main.go injection)lvt gen job <name>command to scaffold individual job handlers with River worker patternArchitecture
Following the pattern of every major framework (Rails -> Solid Queue, Phoenix -> Oban, Django -> Celery), we wrap an existing battle-tested library rather than building from scratch. Our code is purely the generation layer:
internal/kits/system/{multi,single}/templates/jobs/internal/generator/jobs.go) withGenerateQueue()andGenerateJob()functionscommands/jobs.go) with help text and validationUsage
Test plan
TestGenerateQueue— verifies migration, schema.sql, worker.go creationTestGenerateQueueIdempotent— second call errors with "already set up"TestGenerateJob— verifies handler file, worker registration injectionTestGenerateJobWithoutQueue— errors with "Run lvt gen queue first"TestGenerateJobDuplicate— errors with "already exists"TestGenerateMultipleJobs— 3 jobs all registered correctlygo test ./...green🤖 Generated with Claude Code