Skip to content

feat: add background job queue system using River#261

Open
adnaan wants to merge 2 commits intomainfrom
feat/background-jobs
Open

feat: add background job queue system using River#261
adnaan wants to merge 2 commits intomainfrom
feat/background-jobs

Conversation

@adnaan
Copy link
Contributor

@adnaan adnaan commented Mar 20, 2026

Summary

  • Add lvt gen queue command to set up background job infrastructure (River queue tables, worker registration, main.go injection)
  • Add lvt gen job <name> command to scaffold individual job handlers with River worker pattern
  • Uses River (4.8k stars, v0.26) — the de facto Go job queue library supporting both SQLite and PostgreSQL
  • Zero custom queue/worker code to maintain — River handles worker pool, retry with exponential backoff, scheduled jobs, dead letter, unique jobs, and graceful shutdown

Architecture

Following the pattern of every major framework (Rails -> Solid Queue, Phoenix -> Oban, Django -> Celery), we wrap an existing battle-tested library rather than building from scratch. Our code is purely the generation layer:

  • Kit templates (migration SQL, worker_init.go, handler.go) in internal/kits/system/{multi,single}/templates/jobs/
  • Generator (internal/generator/jobs.go) with GenerateQueue() and GenerateJob() functions
  • CLI (commands/jobs.go) with help text and validation

Usage

# One-time setup: creates migration, worker.go, injects into main.go
lvt gen queue

# Scaffold job handlers (repeatable)
lvt gen job send_email
lvt gen job process_payment
lvt gen job generate_report

# Run migrations and start
lvt migration up
lvt serve  # worker pool starts automatically

Test plan

  • TestGenerateQueue — verifies migration, schema.sql, worker.go creation
  • TestGenerateQueueIdempotent — second call errors with "already set up"
  • TestGenerateJob — verifies handler file, worker registration injection
  • TestGenerateJobWithoutQueue — errors with "Run lvt gen queue first"
  • TestGenerateJobDuplicate — errors with "already exists"
  • TestGenerateMultipleJobs — 3 jobs all registered correctly
  • Full go test ./... green

🤖 Generated with Claude Code

Add `lvt gen queue` and `lvt gen job <name>` commands that scaffold
background job processing using River (https://riverqueue.com), the
de facto Go job queue library with 4.8k stars.

River handles all queue/worker internals: worker pool, retry with
exponential backoff, scheduled jobs, dead letter queue, unique jobs,
graceful shutdown, and supports both SQLite and PostgreSQL.

Our code is purely the generation layer:
- Kit templates for migration, worker init, and job handler scaffolds
- Generator functions (GenerateQueue, GenerateJob) with main.go injection
- CLI commands with help text

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings March 20, 2026 21:37
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds first-class background job scaffolding to LiveTemplate via River, wiring new generation commands into the CLI and providing kit templates for queue schema/migrations, worker registration, and job handler scaffolds.

Changes:

  • Add lvt gen queue to generate River queue DB artifacts and attempt to inject worker startup into app main.go.
  • Add lvt gen job <name> to scaffold a River worker + args type and register it in app/jobs/worker.go.
  • Add generator + tests plus CLI routing/help text for the new subcommands.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
internal/kits/system/single/templates/jobs/worker_init.go.tmpl Template for app/jobs/worker.go worker registry (single kit).
internal/kits/system/single/templates/jobs/schema.sql.tmpl River schema appended into database/schema.sql (single kit).
internal/kits/system/single/templates/jobs/migration.sql.tmpl Goose migration template for River tables (single kit).
internal/kits/system/single/templates/jobs/handler.go.tmpl Scaffold for per-job River worker/args (single kit).
internal/kits/system/multi/templates/jobs/worker_init.go.tmpl Template for app/jobs/worker.go worker registry (multi kit).
internal/kits/system/multi/templates/jobs/schema.sql.tmpl River schema appended into database/schema.sql (multi kit).
internal/kits/system/multi/templates/jobs/migration.sql.tmpl Goose migration template for River tables (multi kit).
internal/kits/system/multi/templates/jobs/handler.go.tmpl Scaffold for per-job River worker/args (multi kit).
internal/generator/jobs.go Implements GenerateQueue/GenerateJob + string-based main.go/worker registration injection.
internal/generator/jobs_test.go Unit tests for queue/job generation behaviors.
commands/jobs.go CLI entrypoints for lvt gen queue / lvt gen job.
commands/gen.go Routes new queue/job subcommands and updates interactive help text.

Comment on lines +254 to +304
riverSetup := []string{
"",
"\t// Background job processing (River)",
"\triverDB, err := sql.Open(\"sqlite\", dbPath+\"?_pragma=journal_mode(WAL)\")",
"\tif err != nil {",
"\t\tslog.Error(\"Failed to open River database\", \"error\", err)",
"\t\tos.Exit(1)",
"\t}",
"\triverDB.SetMaxOpenConns(1)",
"\tdefer riverDB.Close()",
"",
"\tjobWorkers := jobs.SetupWorkers()",
"\triverClient, err := river.NewClient(riversqlite.New(riverDB), &river.Config{",
"\t\tQueues: map[string]river.QueueConfig{",
"\t\t\triver.QueueDefault: {MaxWorkers: 100},",
"\t\t},",
"\t\tWorkers: jobWorkers,",
"\t})",
"\tif err != nil {",
"\t\tslog.Error(\"Failed to create River client\", \"error\", err)",
"\t\tos.Exit(1)",
"\t}",
"\tif err := riverClient.Start(appCtx); err != nil {",
"\t\tslog.Error(\"Failed to start job workers\", \"error\", err)",
"\t\tos.Exit(1)",
"\t}",
"\tdefer riverClient.Stop(appCtx)",
"\t_ = riverClient // Available for enqueueing jobs in handlers",
}
result = append(result, riverSetup...)
injected = true
}
}

if !injected {
return fmt.Errorf("could not find injection point in main.go (expected 'defer database.CloseDB()')")
}

// Inject imports
resultStr := strings.Join(result, "\n")

// Add River imports
riverImports := fmt.Sprintf("\t\"%s/app/jobs\"\n", moduleName) +
"\t\"github.com/riverqueue/river\"\n" +
"\t\"github.com/riverqueue/river/riverdriver/riversqlite\""

// Find the import block and add our imports
if idx := strings.Index(resultStr, "\"database/sql\""); idx != -1 {
// Insert after "database/sql" import
insertPos := idx + len("\"database/sql\"")
resultStr = resultStr[:insertPos] + "\n" + riverImports + resultStr[insertPos:]
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

injectJobWorker inserts River setup code that calls sql.Open(...), but the import injection only adds app/jobs, river, and riversqlite imports. Since the generated main.go templates don’t import database/sql, the injected code won’t compile unless database/sql is also added to the import block (or the code avoids using sql.Open).

Copilot uses AI. Check for mistakes.
Comment on lines +241 to +285
// Find injection point: after database.InitDB call, before route registrations
// Look for the database init line
lines := strings.Split(mainStr, "\n")
var result []string
injected := false

for _, line := range lines {
result = append(result, line)

// Inject after the database initialization block
// Look for the line that closes the database error check (the closing brace after InitDB)
if !injected && strings.Contains(line, "defer database.CloseDB()") {
// Insert River setup after this line
riverSetup := []string{
"",
"\t// Background job processing (River)",
"\triverDB, err := sql.Open(\"sqlite\", dbPath+\"?_pragma=journal_mode(WAL)\")",
"\tif err != nil {",
"\t\tslog.Error(\"Failed to open River database\", \"error\", err)",
"\t\tos.Exit(1)",
"\t}",
"\triverDB.SetMaxOpenConns(1)",
"\tdefer riverDB.Close()",
"",
"\tjobWorkers := jobs.SetupWorkers()",
"\triverClient, err := river.NewClient(riversqlite.New(riverDB), &river.Config{",
"\t\tQueues: map[string]river.QueueConfig{",
"\t\t\triver.QueueDefault: {MaxWorkers: 100},",
"\t\t},",
"\t\tWorkers: jobWorkers,",
"\t})",
"\tif err != nil {",
"\t\tslog.Error(\"Failed to create River client\", \"error\", err)",
"\t\tos.Exit(1)",
"\t}",
"\tif err := riverClient.Start(appCtx); err != nil {",
"\t\tslog.Error(\"Failed to start job workers\", \"error\", err)",
"\t\tos.Exit(1)",
"\t}",
"\tdefer riverClient.Stop(appCtx)",
"\t_ = riverClient // Available for enqueueing jobs in handlers",
}
result = append(result, riverSetup...)
injected = true
}
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The injected River setup references appCtx (riverClient.Start(appCtx) / Stop(appCtx)), but injectJobWorker injects immediately after defer database.CloseDB(). In the multi-kit main.go template, appCtx is created later (after route setup), and in the single-kit main.go template there is no appCtx at all. As a result, generated apps will not compile. The injection needs to either (a) inject after appCtx is created (and add an appCtx block for kits that don’t have one), or (b) use a locally-created context for River startup/shutdown.

Copilot uses AI. Check for mistakes.
Comment on lines +105 to +111
// 5. Inject River client setup into main.go
mainGoPath := findMainGo(projectRoot)
if mainGoPath != "" {
if err := injectJobWorker(mainGoPath, moduleName); err != nil {
return fmt.Errorf("failed to inject job worker into main.go: %w", err)
}
}
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenerateQueue’s main.go injection path isn’t exercised by these tests: setupTestProject doesn’t create a cmd/*/main.go, so findMainGo() returns "" and injectJobWorker() is skipped. This leaves the most fragile/critical part (string-based injection + import edits) untested; adding a test fixture main.go matching the kit templates would catch compile-breaking issues like missing imports / undefined identifiers.

Copilot uses AI. Check for mistakes.
Comment on lines +241 to +246
// Create lvt.yaml (project config)
lvtConfig := `kit: multi
module: testmodule
`
if err := os.WriteFile(filepath.Join(dir, "lvt.yaml"), []byte(lvtConfig), 0644); err != nil {
t.Fatalf("Failed to create lvt.yaml: %v", err)
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test writes lvt.yaml as the "project config", but config.LoadProjectConfig reads .lvtrc (and returns defaults if it doesn’t exist). This means the tests aren’t actually validating config loading behavior (e.g., kit selection/module name), and could mask bugs. Update the fixture to write a .lvtrc file with kit= / module= entries (or adjust to whatever config format is actually used).

Suggested change
// Create lvt.yaml (project config)
lvtConfig := `kit: multi
module: testmodule
`
if err := os.WriteFile(filepath.Join(dir, "lvt.yaml"), []byte(lvtConfig), 0644); err != nil {
t.Fatalf("Failed to create lvt.yaml: %v", err)
// Create .lvtrc (project config)
lvtConfig := `kit=multi
module=testmodule
`
if err := os.WriteFile(filepath.Join(dir, ".lvtrc"), []byte(lvtConfig), 0644); err != nil {
t.Fatalf("Failed to create .lvtrc: %v", err)

Copilot uses AI. Check for mistakes.
);

CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind);
CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

database/schema.sql is executed directly at app startup in the generated kits, so it should create the full River schema. This template is missing important indexes/constraints that are present in the generated migration (notably river_job_state_and_finalized_at_index and the partial unique index on unique_key/unique_states used for unique jobs). Consider keeping schema.sql.tmpl in sync with the migration’s Up DDL so River features/performance match expectations.

Suggested change
CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at);
CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_key_and_unique_states_index ON river_job (unique_key, unique_states) WHERE unique_key IS NOT NULL AND unique_states IS NOT NULL;

Copilot uses AI. Check for mistakes.
);

CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind);
CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

database/schema.sql is executed directly at app startup in the generated kits, so it should create the full River schema. This template is missing important indexes/constraints that are present in the generated migration (notably river_job_state_and_finalized_at_index and the partial unique index on unique_key/unique_states used for unique jobs). Consider keeping schema.sql.tmpl in sync with the migration’s Up DDL so River features/performance match expectations.

Suggested change
CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at);
CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_key_unique_states_partial_idx ON river_job (unique_key, unique_states) WHERE unique_key IS NOT NULL;

Copilot uses AI. Check for mistakes.
Comment on lines +41 to +42
fmt.Println(" app/jobs/worker.go Job worker registration")
fmt.Println(" database/migrations/..._river.sql River queue tables")
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The “Generated files” output for the queue migration doesn’t match the filename pattern produced by GenerateQueue (which creates *_setup_river_queue.sql). Printing database/migrations/..._river.sql is misleading for users trying to locate the file; consider updating the message to reflect the actual naming convention.

Suggested change
fmt.Println(" app/jobs/worker.go Job worker registration")
fmt.Println(" database/migrations/..._river.sql River queue tables")
fmt.Println(" app/jobs/worker.go Job worker registration")
fmt.Println(" database/migrations/..._setup_river_queue.sql River queue tables")

Copilot uses AI. Check for mistakes.
commands/jobs.go Outdated
Comment on lines +108 to +118
// Convert snake_case to CamelCase for display
parts := strings.Split(jobName, "_")
for i, p := range parts {
if len(p) > 0 {
parts[i] = strings.ToUpper(p[:1]) + p[1:]
}
}
camelName := strings.Join(parts, "")
fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", camelName)

return nil
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenJob builds the displayed jobs.<Camel>Args name using a local snake_case→CamelCase conversion that doesn’t preserve Go initialisms (e.g. sync_api becomes SyncApiArgs here, but generator.toCamelCase would generate SyncAPIArgs). This can lead to incorrect/misleading CLI instructions compared to the generated type names. Consider reusing the same CamelCase helper as the generator (e.g., export a shared helper) to keep the output consistent.

Suggested change
// Convert snake_case to CamelCase for display
parts := strings.Split(jobName, "_")
for i, p := range parts {
if len(p) > 0 {
parts[i] = strings.ToUpper(p[:1]) + p[1:]
}
}
camelName := strings.Join(parts, "")
fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", camelName)
return nil
// Convert snake_case to CamelCase for display, preserving Go initialisms
camelName := snakeToCamelPreserveInitialisms(jobName)
fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", camelName)
return nil
}
// snakeToCamelPreserveInitialisms converts a snake_case identifier to CamelCase,
// preserving common Go initialisms (e.g., "sync_api" -> "SyncAPI").
func snakeToCamelPreserveInitialisms(s string) string {
if s == "" {
return ""
}
parts := strings.Split(s, "_")
for i, p := range parts {
if p == "" {
continue
}
upper := strings.ToUpper(p)
switch upper {
case "API", "ASCII", "CPU", "CSS", "DNS", "EOF",
"GUID", "HTML", "HTTP", "HTTPS", "ID", "IP",
"JSON", "LHS", "QPS", "RAM", "RHS", "RPC",
"SLA", "SMTP", "SQL", "SSH", "TCP", "TLS",
"TTL", "UDP", "UI", "UID", "UUID", "URI",
"URL", "UTF8", "VM", "XML", "XMPP", "XSRF", "XSS":
parts[i] = upper
default:
parts[i] = strings.ToUpper(p[:1]) + strings.ToLower(p[1:])
}
}
return strings.Join(parts, "")

Copilot uses AI. Check for mistakes.
@claude
Copy link

claude bot commented Mar 20, 2026

Code Review: feat: add background job queue system using River

Great feature addition! River is a solid choice for Go job queues. The generator pattern is clean and happy-path test coverage is good. Here are my observations:

Bugs / Correctness Issues

1. _ = riverClient makes jobs impossible to enqueue

After all the River setup in injectJobWorker, the client is discarded with _ = riverClient. Handlers have no way to actually enqueue jobs. The generated main.go should pass riverClient to the router/handler setup -- otherwise the whole feature is non-functional as shipped.

2. dbPath and appCtx variable name assumptions

The injected code references dbPath and appCtx by exact name. If a project uses different variable names the generated code will not compile. This should be derived from the project or at least documented as a hard constraint.

3. Import injection does not check for duplicates

injectJobWorker guards against re-injecting the setup block by checking for river.NewClient, but the import injection has no such guard. Running after a partial failure could produce duplicate imports.

4. Partial file left on disk if template execution fails in GenerateJob

os.Create(jobPath) runs before tmpl.Execute. If Execute fails, a partial file is left on disk and the next invocation returns "already exists" instead of re-attempting. Add os.Remove(jobPath) in the error path.

5. riverClient.Stop(appCtx) runs with a cancelled context

defer riverClient.Stop(appCtx) executes after appCtx is cancelled, which defeats graceful drain. Use a fresh background context with a timeout instead.

Architecture Concerns

6. PostgreSQL apps get broken generated code

The injection always hardcodes the SQLite driver and riversqlite. PostgreSQL projects would get uncompilable code and a spurious dependency. The generator should check the project database driver and branch accordingly.

7. Template duplication between single and multi kits

All five files in single/templates/jobs/ are byte-for-byte identical to multi/templates/jobs/. They will silently drift. Consider symlinks or a shared base location.

Quality Issues

8. Schema drift between migration and schema.sql templates

migration.sql.tmpl has constraints that schema.sql.tmpl omits: CONSTRAINT version_gte_1 on river_migration, the inline CHECK (name = 'default') on river_leader.name, and the river_job_state_and_finalized_at_index index. schema.sql should match the migration exactly since both represent the same end state.

9. Migration version numbers are hardcoded

When River releases v0.27+ and adds migration 7, generated migrations will be stale. Worth adding a comment warning, and pinning River to the specific version the templates target.

10. go get @latest is non-deterministic

Two runs on the same day could pull different versions. Pin to v0.26. Also riversqlite should not be fetched for PostgreSQL projects (related to point 6).

11. Inconsistent template delimiters

GenerateJob correctly uses <</>> delimiters, but writeTemplateFile uses the default {{/}}. The migration/schema templates have no variables so it works today, but any future variable addition will silently fail.

Test Coverage Gaps

12. injectJobWorker is never tested

setupTestProject creates no main.go, so findMainGo returns empty and main.go injection is skipped in all tests. This is the most fragile code in the PR -- it needs dedicated tests with a real main.go fixture.

13. No test for error paths (partial file cleanup in GenerateJob, injection failure modes).

14. No compilation test for generated code -- the handler template could have a syntax error that only surfaces at runtime.

Minor

  • MaxWorkers: 100 is hardcoded. Consider defaulting to something more conservative like 10, or making it configurable.
  • injectWorkerRegistration silently returns nil on duplicate -- correct for idempotency, but a debug log would help distinguish the two cases.

Summary: Two blockers: (1) riverClient is not wired to handlers so jobs cannot actually be enqueued, and (2) PostgreSQL projects get uncompilable generated code. The schema drift between migration/schema.sql templates and the untested main.go injection are close behind.

- Move injection point from defer database.CloseDB() to after appCtx
  creation (appCtx must exist before River client Start/Stop)
- Add missing "database/sql" import for sql.Open in injected code
- Use existing injectImport() helper instead of manual import injection
- Buffer template output in bytes.Buffer before writing files to prevent
  partial files on template execution failure
- Export ToCamelCase for use by CLI commands (consistent initialism
  handling for display output)
- Sync schema.sql.tmpl with full migration.sql.tmpl (was missing
  constraints and indexes)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants