From a98766968a103fefc8eda3cb868eb8ac944f9352 Mon Sep 17 00:00:00 2001 From: Adnaan Date: Fri, 20 Mar 2026 22:36:26 +0100 Subject: [PATCH 1/3] feat: add background job queue system using River (#2.1) Add `lvt gen queue` and `lvt gen job ` commands that scaffold background job processing using River (https://riverqueue.com), the de facto Go job queue library with 4.8k stars. River handles all queue/worker internals: worker pool, retry with exponential backoff, scheduled jobs, dead letter queue, unique jobs, graceful shutdown, and supports both SQLite and PostgreSQL. Our code is purely the generation layer: - Kit templates for migration, worker init, and job handler scaffolds - Generator functions (GenerateQueue, GenerateJob) with main.go injection - CLI commands with help text Co-Authored-By: Claude Opus 4.6 (1M context) --- commands/gen.go | 8 +- commands/jobs.go | 157 ++++++++ internal/generator/jobs.go | 354 ++++++++++++++++++ internal/generator/jobs_test.go | 248 ++++++++++++ .../multi/templates/jobs/handler.go.tmpl | 37 ++ .../multi/templates/jobs/migration.sql.tmpl | 123 ++++++ .../multi/templates/jobs/schema.sql.tmpl | 75 ++++ .../multi/templates/jobs/worker_init.go.tmpl | 15 + .../single/templates/jobs/handler.go.tmpl | 37 ++ .../single/templates/jobs/migration.sql.tmpl | 123 ++++++ .../single/templates/jobs/schema.sql.tmpl | 75 ++++ .../single/templates/jobs/worker_init.go.tmpl | 15 + 12 files changed, 1266 insertions(+), 1 deletion(-) create mode 100644 commands/jobs.go create mode 100644 internal/generator/jobs.go create mode 100644 internal/generator/jobs_test.go create mode 100644 internal/kits/system/multi/templates/jobs/handler.go.tmpl create mode 100644 internal/kits/system/multi/templates/jobs/migration.sql.tmpl create mode 100644 internal/kits/system/multi/templates/jobs/schema.sql.tmpl create mode 100644 internal/kits/system/multi/templates/jobs/worker_init.go.tmpl create mode 100644 internal/kits/system/single/templates/jobs/handler.go.tmpl create mode 100644 internal/kits/system/single/templates/jobs/migration.sql.tmpl create mode 100644 internal/kits/system/single/templates/jobs/schema.sql.tmpl create mode 100644 internal/kits/system/single/templates/jobs/worker_init.go.tmpl diff --git a/commands/gen.go b/commands/gen.go index a64de88..9097a68 100644 --- a/commands/gen.go +++ b/commands/gen.go @@ -47,8 +47,12 @@ func Gen(args []string) error { return Auth(args[1:]) case "stack": return GenStack(args[1:]) + case "queue": + return GenQueue(args[1:]) + case "job": + return GenJob(args[1:]) default: - return fmt.Errorf("unknown subcommand: %s\n\nAvailable subcommands:\n resource Generate full CRUD resource with database\n view Generate view-only handler (no database)\n schema Generate database schema only\n auth Generate authentication system\n stack Generate deployment stack configuration\n\nRun 'lvt gen' for interactive mode", subcommand) + return fmt.Errorf("unknown subcommand: %s\n\nAvailable subcommands:\n resource Generate full CRUD resource with database\n view Generate view-only handler (no database)\n schema Generate database schema only\n auth Generate authentication system\n stack Generate deployment stack configuration\n queue Set up background job processing (River)\n job Scaffold a new background job handler\n\nRun 'lvt gen' for interactive mode", subcommand) } } @@ -61,6 +65,8 @@ func interactiveGen() error { fmt.Println(" schema ... Generate database schema only") fmt.Println(" auth [StructName] [table_name] Generate authentication system") fmt.Println(" stack Generate deployment stack configuration") + fmt.Println(" queue Set up background job processing (River)") + fmt.Println(" job Scaffold a new background job handler") fmt.Println() fmt.Println("Examples:") fmt.Println(" lvt gen resource posts title content:text published:bool") diff --git a/commands/jobs.go b/commands/jobs.go new file mode 100644 index 0000000..b2dc636 --- /dev/null +++ b/commands/jobs.go @@ -0,0 +1,157 @@ +package commands + +import ( + "fmt" + "os" + "strings" + + "github.com/livetemplate/lvt/internal/config" + "github.com/livetemplate/lvt/internal/generator" +) + +// GenQueue sets up background job infrastructure using River. +func GenQueue(args []string) error { + if ShowHelpIfRequested(args, printGenQueueHelp) { + return nil + } + + cwd, err := os.Getwd() + if err != nil { + return fmt.Errorf("failed to get working directory: %w", err) + } + + projectConfig, err := config.LoadProjectConfig(cwd) + if err != nil { + return fmt.Errorf("failed to load project config: %w", err) + } + + moduleName := projectConfig.Module + if moduleName == "" { + return fmt.Errorf("could not determine module name from project config") + } + + if err := generator.GenerateQueue(cwd, moduleName); err != nil { + return err + } + + fmt.Println() + fmt.Println("✅ Background job queue set up successfully!") + fmt.Println() + fmt.Println("Generated files:") + fmt.Println(" app/jobs/worker.go Job worker registration") + fmt.Println(" database/migrations/..._river.sql River queue tables") + fmt.Println() + fmt.Println("Next steps:") + fmt.Println(" 1. Run 'lvt gen job ' to create your first job handler") + fmt.Println(" 2. Run 'go mod tidy' to fetch River dependencies") + fmt.Println(" 3. Run 'lvt migration up' to create River tables") + fmt.Println(" 4. Start your app — the job worker will start automatically") + fmt.Println() + fmt.Println("Example:") + fmt.Println(" lvt gen job send_email") + + return nil +} + +// GenJob scaffolds a new background job handler. +func GenJob(args []string) error { + if ShowHelpIfRequested(args, printGenJobHelp) { + return nil + } + + if len(args) < 1 { + return fmt.Errorf("job name required\n\nUsage: lvt gen job \n\nExamples:\n lvt gen job send_email\n lvt gen job process_payment\n lvt gen job generate_report") + } + + jobName := strings.TrimSpace(args[0]) + if jobName == "" { + return fmt.Errorf("job name cannot be empty") + } + + if err := ValidatePositionalArg(jobName, "job name"); err != nil { + return err + } + + // Normalize to snake_case + jobName = strings.ToLower(jobName) + + cwd, err := os.Getwd() + if err != nil { + return fmt.Errorf("failed to get working directory: %w", err) + } + + projectConfig, err := config.LoadProjectConfig(cwd) + if err != nil { + return fmt.Errorf("failed to load project config: %w", err) + } + + moduleName := projectConfig.Module + if moduleName == "" { + return fmt.Errorf("could not determine module name from project config") + } + + if err := generator.GenerateJob(cwd, moduleName, jobName); err != nil { + return err + } + + fmt.Println() + fmt.Printf("✅ Job '%s' created successfully!\n", jobName) + fmt.Println() + fmt.Println("Generated files:") + fmt.Printf(" app/jobs/%s.go Job handler\n", jobName) + fmt.Println() + fmt.Println("Next steps:") + fmt.Printf(" 1. Edit app/jobs/%s.go to define your payload and logic\n", jobName) + fmt.Println(" 2. Enqueue jobs from your handlers:") + fmt.Println() + fmt.Printf(" // In any handler with access to riverClient:\n") + // Convert snake_case to CamelCase for display + parts := strings.Split(jobName, "_") + for i, p := range parts { + if len(p) > 0 { + parts[i] = strings.ToUpper(p[:1]) + p[1:] + } + } + camelName := strings.Join(parts, "") + fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", camelName) + + return nil +} + +func printGenQueueHelp() { + fmt.Println("Usage: lvt gen queue") + fmt.Println() + fmt.Println("Set up background job processing infrastructure using River.") + fmt.Println("This is a one-time setup command that creates:") + fmt.Println() + fmt.Println(" - Database migration for River queue tables") + fmt.Println(" - Worker registration file (app/jobs/worker.go)") + fmt.Println(" - River client setup in main.go") + fmt.Println() + fmt.Println("River (https://riverqueue.com) provides:") + fmt.Println(" - Worker pool with configurable concurrency") + fmt.Println(" - Retry with exponential backoff") + fmt.Println(" - Scheduled and periodic jobs") + fmt.Println(" - Dead letter queue (discarded jobs)") + fmt.Println(" - Unique/deduplicated jobs") + fmt.Println(" - Graceful shutdown") + fmt.Println(" - SQLite and PostgreSQL support") +} + +func printGenJobHelp() { + fmt.Println("Usage: lvt gen job ") + fmt.Println() + fmt.Println("Scaffold a new background job handler.") + fmt.Println() + fmt.Println("Arguments:") + fmt.Println(" name Job name in snake_case (e.g., send_email, process_payment)") + fmt.Println() + fmt.Println("Examples:") + fmt.Println(" lvt gen job send_email") + fmt.Println(" lvt gen job process_payment") + fmt.Println(" lvt gen job generate_report") + fmt.Println(" lvt gen job cleanup_expired_sessions") + fmt.Println() + fmt.Println("Prerequisites:") + fmt.Println(" Run 'lvt gen queue' first to set up the job infrastructure.") +} diff --git a/internal/generator/jobs.go b/internal/generator/jobs.go new file mode 100644 index 0000000..0633157 --- /dev/null +++ b/internal/generator/jobs.go @@ -0,0 +1,354 @@ +package generator + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "text/template" + "time" + + "github.com/livetemplate/lvt/internal/config" + "github.com/livetemplate/lvt/internal/kits" +) + +// JobsConfig holds configuration for generating the queue infrastructure. +type JobsConfig struct { + ModuleName string +} + +// JobConfig holds configuration for generating a single job handler. +type JobConfig struct { + ModuleName string + JobName string // snake_case, e.g. "send_email" + JobNameCamel string // CamelCase, e.g. "SendEmail" +} + +// GenerateQueue sets up the background job infrastructure using River. +// It creates the migration, schema, worker init file, and injects setup into main.go. +func GenerateQueue(projectRoot string, moduleName string) error { + projectConfig, err := config.LoadProjectConfig(projectRoot) + if err != nil { + return fmt.Errorf("failed to load project config: %w", err) + } + kitName := projectConfig.GetKit() + kitLoader := kits.DefaultLoader() + + // Check if queue already set up + workerPath := filepath.Join(projectRoot, "app", "jobs", "worker.go") + if _, err := os.Stat(workerPath); err == nil { + return fmt.Errorf("job queue already set up (app/jobs/worker.go exists)") + } + + // 1. Create migration file + migrationsDir := filepath.Join(projectRoot, "database", "migrations") + if err := os.MkdirAll(migrationsDir, 0755); err != nil { + return fmt.Errorf("failed to create migrations directory: %w", err) + } + + timestamp := time.Now() + var migrationPath string + for i := 0; i < 3600; i++ { + timestampStr := timestamp.Format("20060102150405") + migrationFile := fmt.Sprintf("%s_setup_river_queue.sql", timestampStr) + migrationPath = filepath.Join(migrationsDir, migrationFile) + + matches, err := filepath.Glob(filepath.Join(migrationsDir, timestampStr+"_*")) + if err != nil { + return fmt.Errorf("failed to check for existing migrations: %w", err) + } + if len(matches) == 0 { + break + } + timestamp = timestamp.Add(1 * time.Second) + if i == 3599 { + return fmt.Errorf("failed to generate unique migration timestamp") + } + } + + if err := writeTemplateFile(kitLoader, kitName, "jobs/migration.sql.tmpl", migrationPath, nil); err != nil { + return fmt.Errorf("failed to generate migration: %w", err) + } + + // 2. Append to schema.sql + schemaPath := filepath.Join(projectRoot, "database", "schema.sql") + if err := appendTemplateFile(kitLoader, kitName, "jobs/schema.sql.tmpl", schemaPath, nil); err != nil { + return fmt.Errorf("failed to append to schema.sql: %w", err) + } + + // 3. Create app/jobs/worker.go + jobsDir := filepath.Join(projectRoot, "app", "jobs") + if err := os.MkdirAll(jobsDir, 0755); err != nil { + return fmt.Errorf("failed to create app/jobs directory: %w", err) + } + + if err := writeTemplateFile(kitLoader, kitName, "jobs/worker_init.go.tmpl", workerPath, nil); err != nil { + return fmt.Errorf("failed to generate worker.go: %w", err) + } + + // 4. Add River dependencies + goModPath := filepath.Join(projectRoot, "go.mod") + if _, err := os.Stat(goModPath); err == nil { + dependencies := []string{ + "github.com/riverqueue/river@latest", + "github.com/riverqueue/river/riverdriver/riversqlite@latest", + } + args := append([]string{"get"}, dependencies...) + cmd := exec.Command("go", args...) + cmd.Dir = projectRoot + if output, err := cmd.CombinedOutput(); err != nil { + fmt.Fprintf(os.Stderr, "Warning: could not fetch River dependencies (run 'go mod tidy' in %s to resolve):\n%s\n", projectRoot, string(output)) + } + } + + // 5. Inject River client setup into main.go + mainGoPath := findMainGo(projectRoot) + if mainGoPath != "" { + if err := injectJobWorker(mainGoPath, moduleName); err != nil { + return fmt.Errorf("failed to inject job worker into main.go: %w", err) + } + } + + return nil +} + +// GenerateJob scaffolds a new job handler and registers it with the worker. +func GenerateJob(projectRoot string, moduleName string, jobName string) error { + // Validate queue is set up + workerPath := filepath.Join(projectRoot, "app", "jobs", "worker.go") + if _, err := os.Stat(workerPath); os.IsNotExist(err) { + return fmt.Errorf("job queue not set up yet. Run 'lvt gen queue' first") + } + + projectConfig, err := config.LoadProjectConfig(projectRoot) + if err != nil { + return fmt.Errorf("failed to load project config: %w", err) + } + kitName := projectConfig.GetKit() + kitLoader := kits.DefaultLoader() + + jobNameCamel := toCamelCase(jobName) + + jobConfig := &JobConfig{ + ModuleName: moduleName, + JobName: jobName, + JobNameCamel: jobNameCamel, + } + + // Check if job already exists + jobPath := filepath.Join(projectRoot, "app", "jobs", jobName+".go") + if _, err := os.Stat(jobPath); err == nil { + return fmt.Errorf("job '%s' already exists (app/jobs/%s.go)", jobName, jobName) + } + + // 1. Generate job handler file + templateContent, err := kitLoader.LoadKitTemplate(kitName, "jobs/handler.go.tmpl") + if err != nil { + return fmt.Errorf("failed to load handler template: %w", err) + } + + tmpl, err := template.New("job_handler").Delims("<<", ">>").Parse(string(templateContent)) + if err != nil { + return fmt.Errorf("failed to parse handler template: %w", err) + } + + file, err := os.Create(jobPath) + if err != nil { + return fmt.Errorf("failed to create %s.go: %w", jobName, err) + } + + if err := tmpl.Execute(file, jobConfig); err != nil { + file.Close() + return fmt.Errorf("failed to execute handler template: %w", err) + } + file.Close() + + // 2. Register worker in app/jobs/worker.go + if err := injectWorkerRegistration(workerPath, jobNameCamel); err != nil { + return fmt.Errorf("failed to register worker: %w", err) + } + + return nil +} + +// writeTemplateFile loads a kit template and writes it to the output path. +func writeTemplateFile(kitLoader *kits.KitLoader, kitName, templatePath, outputPath string, data interface{}) error { + content, err := kitLoader.LoadKitTemplate(kitName, templatePath) + if err != nil { + return err + } + + tmpl, err := template.New(filepath.Base(templatePath)).Parse(string(content)) + if err != nil { + return err + } + + file, err := os.Create(outputPath) + if err != nil { + return err + } + defer file.Close() + + return tmpl.Execute(file, data) +} + +// appendTemplateFile loads a kit template and appends it to the output file. +func appendTemplateFile(kitLoader *kits.KitLoader, kitName, templatePath, outputPath string, data interface{}) error { + content, err := kitLoader.LoadKitTemplate(kitName, templatePath) + if err != nil { + return err + } + + tmpl, err := template.New(filepath.Base(templatePath)).Parse(string(content)) + if err != nil { + return err + } + + file, err := os.OpenFile(outputPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return err + } + if stat.Size() > 0 { + if _, err := file.WriteString("\n"); err != nil { + return err + } + } + + return tmpl.Execute(file, data) +} + +// injectJobWorker injects River client setup into main.go. +func injectJobWorker(mainGoPath string, moduleName string) error { + content, err := os.ReadFile(mainGoPath) + if err != nil { + return fmt.Errorf("failed to read main.go: %w", err) + } + + mainStr := string(content) + + // Check if already injected + if strings.Contains(mainStr, "river.NewClient") { + return nil // Already injected + } + + // Find injection point: after database.InitDB call, before route registrations + // Look for the database init line + lines := strings.Split(mainStr, "\n") + var result []string + injected := false + + for _, line := range lines { + result = append(result, line) + + // Inject after the database initialization block + // Look for the line that closes the database error check (the closing brace after InitDB) + if !injected && strings.Contains(line, "defer database.CloseDB()") { + // Insert River setup after this line + riverSetup := []string{ + "", + "\t// Background job processing (River)", + "\triverDB, err := sql.Open(\"sqlite\", dbPath+\"?_pragma=journal_mode(WAL)\")", + "\tif err != nil {", + "\t\tslog.Error(\"Failed to open River database\", \"error\", err)", + "\t\tos.Exit(1)", + "\t}", + "\triverDB.SetMaxOpenConns(1)", + "\tdefer riverDB.Close()", + "", + "\tjobWorkers := jobs.SetupWorkers()", + "\triverClient, err := river.NewClient(riversqlite.New(riverDB), &river.Config{", + "\t\tQueues: map[string]river.QueueConfig{", + "\t\t\triver.QueueDefault: {MaxWorkers: 100},", + "\t\t},", + "\t\tWorkers: jobWorkers,", + "\t})", + "\tif err != nil {", + "\t\tslog.Error(\"Failed to create River client\", \"error\", err)", + "\t\tos.Exit(1)", + "\t}", + "\tif err := riverClient.Start(appCtx); err != nil {", + "\t\tslog.Error(\"Failed to start job workers\", \"error\", err)", + "\t\tos.Exit(1)", + "\t}", + "\tdefer riverClient.Stop(appCtx)", + "\t_ = riverClient // Available for enqueueing jobs in handlers", + } + result = append(result, riverSetup...) + injected = true + } + } + + if !injected { + return fmt.Errorf("could not find injection point in main.go (expected 'defer database.CloseDB()')") + } + + // Inject imports + resultStr := strings.Join(result, "\n") + + // Add River imports + riverImports := fmt.Sprintf("\t\"%s/app/jobs\"\n", moduleName) + + "\t\"github.com/riverqueue/river\"\n" + + "\t\"github.com/riverqueue/river/riverdriver/riversqlite\"" + + // Find the import block and add our imports + if idx := strings.Index(resultStr, "\"database/sql\""); idx != -1 { + // Insert after "database/sql" import + insertPos := idx + len("\"database/sql\"") + resultStr = resultStr[:insertPos] + "\n" + riverImports + resultStr[insertPos:] + } else if idx := strings.Index(resultStr, "_ \"modernc.org/sqlite\""); idx != -1 { + // Insert before the sqlite import + resultStr = resultStr[:idx] + riverImports + "\n\t" + resultStr[idx:] + } else { + // Fallback: find first import line and add after it + importIdx := strings.Index(resultStr, "import (") + if importIdx == -1 { + return fmt.Errorf("could not find import block in main.go") + } + // Find the next newline after "import (" + nextNewline := strings.Index(resultStr[importIdx:], "\n") + if nextNewline == -1 { + return fmt.Errorf("malformed import block in main.go") + } + insertPos := importIdx + nextNewline + 1 + resultStr = resultStr[:insertPos] + "\t" + riverImports + "\n" + resultStr[insertPos:] + } + + return os.WriteFile(mainGoPath, []byte(resultStr), 0644) +} + +// injectWorkerRegistration adds a river.AddWorker call to app/jobs/worker.go. +func injectWorkerRegistration(workerPath string, jobNameCamel string) error { + content, err := os.ReadFile(workerPath) + if err != nil { + return fmt.Errorf("failed to read worker.go: %w", err) + } + + workerStr := string(content) + + // Check if already registered + registrationLine := fmt.Sprintf("river.AddWorker(workers, &%sWorker{})", jobNameCamel) + if strings.Contains(workerStr, registrationLine) { + return nil // Already registered + } + + // Find the marker comment and insert after it + marker := "// Register job workers below (added by `lvt gen job`)" + idx := strings.Index(workerStr, marker) + if idx == -1 { + return fmt.Errorf("could not find registration marker in worker.go") + } + + insertPos := idx + len(marker) + registration := "\n\t" + registrationLine + + workerStr = workerStr[:insertPos] + registration + workerStr[insertPos:] + + return os.WriteFile(workerPath, []byte(workerStr), 0644) +} diff --git a/internal/generator/jobs_test.go b/internal/generator/jobs_test.go new file mode 100644 index 0000000..64a7d84 --- /dev/null +++ b/internal/generator/jobs_test.go @@ -0,0 +1,248 @@ +package generator + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +func TestGenerateQueue(t *testing.T) { + tmpDir := t.TempDir() + + // Set up minimal project structure + setupTestProject(t, tmpDir) + + err := GenerateQueue(tmpDir, "testmodule") + if err != nil { + t.Fatalf("GenerateQueue failed: %v", err) + } + + // Verify worker.go was created + workerPath := filepath.Join(tmpDir, "app", "jobs", "worker.go") + if _, err := os.Stat(workerPath); os.IsNotExist(err) { + t.Error("app/jobs/worker.go was not created") + } + + workerContent, err := os.ReadFile(workerPath) + if err != nil { + t.Fatalf("Failed to read worker.go: %v", err) + } + if !strings.Contains(string(workerContent), "river.NewWorkers()") { + t.Error("worker.go missing river.NewWorkers() call") + } + if !strings.Contains(string(workerContent), "Register job workers below") { + t.Error("worker.go missing registration marker comment") + } + + // Verify migration was created + migrationsDir := filepath.Join(tmpDir, "database", "migrations") + entries, err := os.ReadDir(migrationsDir) + if err != nil { + t.Fatalf("Failed to read migrations dir: %v", err) + } + + var migrationFound bool + for _, entry := range entries { + if strings.Contains(entry.Name(), "setup_river_queue") { + migrationFound = true + // Read and verify content + content, err := os.ReadFile(filepath.Join(migrationsDir, entry.Name())) + if err != nil { + t.Fatalf("Failed to read migration: %v", err) + } + if !strings.Contains(string(content), "river_job") { + t.Error("Migration missing river_job table") + } + if !strings.Contains(string(content), "river_leader") { + t.Error("Migration missing river_leader table") + } + if !strings.Contains(string(content), "+goose Up") { + t.Error("Migration missing goose Up marker") + } + } + } + if !migrationFound { + t.Error("River migration file was not created") + } + + // Verify schema.sql was appended + schemaPath := filepath.Join(tmpDir, "database", "schema.sql") + schemaContent, err := os.ReadFile(schemaPath) + if err != nil { + t.Fatalf("Failed to read schema.sql: %v", err) + } + if !strings.Contains(string(schemaContent), "river_job") { + t.Error("schema.sql missing river_job table") + } +} + +func TestGenerateQueueIdempotent(t *testing.T) { + tmpDir := t.TempDir() + setupTestProject(t, tmpDir) + + // First call should succeed + if err := GenerateQueue(tmpDir, "testmodule"); err != nil { + t.Fatalf("First GenerateQueue failed: %v", err) + } + + // Second call should fail (already set up) + err := GenerateQueue(tmpDir, "testmodule") + if err == nil { + t.Error("Expected error on second GenerateQueue call") + } + if err != nil && !strings.Contains(err.Error(), "already set up") { + t.Errorf("Expected 'already set up' error, got: %v", err) + } +} + +func TestGenerateJob(t *testing.T) { + tmpDir := t.TempDir() + setupTestProject(t, tmpDir) + + // Set up queue first + if err := GenerateQueue(tmpDir, "testmodule"); err != nil { + t.Fatalf("GenerateQueue failed: %v", err) + } + + // Generate a job + if err := GenerateJob(tmpDir, "testmodule", "send_email"); err != nil { + t.Fatalf("GenerateJob failed: %v", err) + } + + // Verify job handler was created + jobPath := filepath.Join(tmpDir, "app", "jobs", "send_email.go") + if _, err := os.Stat(jobPath); os.IsNotExist(err) { + t.Error("app/jobs/send_email.go was not created") + } + + jobContent, err := os.ReadFile(jobPath) + if err != nil { + t.Fatalf("Failed to read send_email.go: %v", err) + } + + // Check for expected content + checks := []string{ + "SendEmailArgs", + "SendEmailWorker", + `Kind() string { return "send_email" }`, + "river.WorkerDefaults[SendEmailArgs]", + "func (w *SendEmailWorker) Work(", + } + for _, check := range checks { + if !strings.Contains(string(jobContent), check) { + t.Errorf("send_email.go missing expected content: %s", check) + } + } + + // Verify worker registration was injected + workerPath := filepath.Join(tmpDir, "app", "jobs", "worker.go") + workerContent, err := os.ReadFile(workerPath) + if err != nil { + t.Fatalf("Failed to read worker.go: %v", err) + } + if !strings.Contains(string(workerContent), "river.AddWorker(workers, &SendEmailWorker{})") { + t.Error("worker.go missing SendEmailWorker registration") + } +} + +func TestGenerateJobWithoutQueue(t *testing.T) { + tmpDir := t.TempDir() + setupTestProject(t, tmpDir) + + // Try to generate job without queue setup + err := GenerateJob(tmpDir, "testmodule", "send_email") + if err == nil { + t.Error("Expected error when generating job without queue") + } + if err != nil && !strings.Contains(err.Error(), "Run 'lvt gen queue' first") { + t.Errorf("Expected 'Run lvt gen queue first' error, got: %v", err) + } +} + +func TestGenerateJobDuplicate(t *testing.T) { + tmpDir := t.TempDir() + setupTestProject(t, tmpDir) + + if err := GenerateQueue(tmpDir, "testmodule"); err != nil { + t.Fatalf("GenerateQueue failed: %v", err) + } + + // First job should succeed + if err := GenerateJob(tmpDir, "testmodule", "send_email"); err != nil { + t.Fatalf("First GenerateJob failed: %v", err) + } + + // Duplicate should fail + err := GenerateJob(tmpDir, "testmodule", "send_email") + if err == nil { + t.Error("Expected error on duplicate job") + } + if err != nil && !strings.Contains(err.Error(), "already exists") { + t.Errorf("Expected 'already exists' error, got: %v", err) + } +} + +func TestGenerateMultipleJobs(t *testing.T) { + tmpDir := t.TempDir() + setupTestProject(t, tmpDir) + + if err := GenerateQueue(tmpDir, "testmodule"); err != nil { + t.Fatalf("GenerateQueue failed: %v", err) + } + + jobs := []string{"send_email", "process_payment", "generate_report"} + for _, job := range jobs { + if err := GenerateJob(tmpDir, "testmodule", job); err != nil { + t.Fatalf("GenerateJob(%s) failed: %v", job, err) + } + } + + // Verify all jobs registered in worker.go + workerContent, err := os.ReadFile(filepath.Join(tmpDir, "app", "jobs", "worker.go")) + if err != nil { + t.Fatalf("Failed to read worker.go: %v", err) + } + + expectedRegistrations := []string{ + "river.AddWorker(workers, &SendEmailWorker{})", + "river.AddWorker(workers, &ProcessPaymentWorker{})", + "river.AddWorker(workers, &GenerateReportWorker{})", + } + for _, reg := range expectedRegistrations { + if !strings.Contains(string(workerContent), reg) { + t.Errorf("worker.go missing registration: %s", reg) + } + } +} + +// setupTestProject creates a minimal project structure for testing. +func setupTestProject(t *testing.T, dir string) { + t.Helper() + + // Create database directory + dbDir := filepath.Join(dir, "database") + if err := os.MkdirAll(dbDir, 0755); err != nil { + t.Fatalf("Failed to create database directory: %v", err) + } + + // Create migrations directory + migrationsDir := filepath.Join(dbDir, "migrations") + if err := os.MkdirAll(migrationsDir, 0755); err != nil { + t.Fatalf("Failed to create migrations directory: %v", err) + } + + // Create schema.sql + schemaPath := filepath.Join(dbDir, "schema.sql") + if err := os.WriteFile(schemaPath, []byte("-- existing schema\n"), 0644); err != nil { + t.Fatalf("Failed to create schema.sql: %v", err) + } + + // Create lvt.yaml (project config) + lvtConfig := `kit: multi +module: testmodule +` + if err := os.WriteFile(filepath.Join(dir, "lvt.yaml"), []byte(lvtConfig), 0644); err != nil { + t.Fatalf("Failed to create lvt.yaml: %v", err) + } +} diff --git a/internal/kits/system/multi/templates/jobs/handler.go.tmpl b/internal/kits/system/multi/templates/jobs/handler.go.tmpl new file mode 100644 index 0000000..972348c --- /dev/null +++ b/internal/kits/system/multi/templates/jobs/handler.go.tmpl @@ -0,0 +1,37 @@ +package jobs + +import ( + "context" + "log/slog" + + "github.com/riverqueue/river" +) + +// <<.JobNameCamel>>Args defines the payload for <<.JobName>> jobs. +type <<.JobNameCamel>>Args struct { + // TODO: Define your job payload fields here. + // Example: + // To string `json:"to"` + // Subject string `json:"subject"` +} + +// Kind returns the unique job type identifier used by River. +func (<<.JobNameCamel>>Args) Kind() string { return "<<.JobName>>" } + +// <<.JobNameCamel>>Worker processes <<.JobName>> jobs. +type <<.JobNameCamel>>Worker struct { + river.WorkerDefaults[<<.JobNameCamel>>Args] +} + +// Work executes the <<.JobName>> job. +func (w *<<.JobNameCamel>>Worker) Work(ctx context.Context, job *river.Job[<<.JobNameCamel>>Args]) error { + slog.Info("Processing <<.JobName>> job", "job_id", job.ID) + + _ = job.Args // TODO: use job.Args fields + + // TODO: Implement your job logic here. + // Return nil on success, or an error to trigger a retry. + // After max attempts are exhausted, the job moves to "discarded" state. + + return nil +} diff --git a/internal/kits/system/multi/templates/jobs/migration.sql.tmpl b/internal/kits/system/multi/templates/jobs/migration.sql.tmpl new file mode 100644 index 0000000..d2adad5 --- /dev/null +++ b/internal/kits/system/multi/templates/jobs/migration.sql.tmpl @@ -0,0 +1,123 @@ +-- +goose Up +-- +goose StatementBegin + +-- River job queue tables (SQLite) +-- Generated by: river migrate-get --line main --up --all --database-url "sqlite://" +-- River version: v0.26.x + +CREATE TABLE IF NOT EXISTS river_migration ( + line TEXT NOT NULL, + version INTEGER NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT line_length CHECK (length(line) > 0 AND length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), + PRIMARY KEY (line, version) +); + +CREATE TABLE IF NOT EXISTS river_job ( + id INTEGER PRIMARY KEY, + args BLOB NOT NULL DEFAULT '{}', + attempt INTEGER NOT NULL DEFAULT 0, + attempted_at TIMESTAMP, + attempted_by BLOB, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + errors BLOB, + finalized_at TIMESTAMP, + kind TEXT NOT NULL, + max_attempts INTEGER NOT NULL, + metadata BLOB NOT NULL DEFAULT (json('{}')), + priority INTEGER NOT NULL DEFAULT 1, + queue TEXT NOT NULL DEFAULT 'default', + state TEXT NOT NULL DEFAULT 'available', + scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + tags BLOB NOT NULL DEFAULT (json('[]')), + unique_key BLOB, + unique_states INTEGER, + CONSTRAINT finalized_or_finalized_at_null CHECK ( + (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR + (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) + ), + CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4), + CONSTRAINT queue_length CHECK (length(queue) > 0 AND length(queue) < 128), + CONSTRAINT kind_length CHECK (length(kind) > 0 AND length(kind) < 128), + CONSTRAINT state_valid CHECK (state IN ('available', 'cancelled', 'completed', 'discarded', 'pending', 'retryable', 'running', 'scheduled')) +); + +CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind); +CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at) WHERE finalized_at IS NOT NULL; +CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); +CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key) + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + AND CASE state + WHEN 'available' THEN unique_states & (1 << 0) + WHEN 'cancelled' THEN unique_states & (1 << 1) + WHEN 'completed' THEN unique_states & (1 << 2) + WHEN 'discarded' THEN unique_states & (1 << 3) + WHEN 'pending' THEN unique_states & (1 << 4) + WHEN 'retryable' THEN unique_states & (1 << 5) + WHEN 'running' THEN unique_states & (1 << 6) + WHEN 'scheduled' THEN unique_states & (1 << 7) + ELSE 0 + END >= 1; + +CREATE TABLE IF NOT EXISTS river_leader ( + elected_at TIMESTAMP NOT NULL, + expires_at TIMESTAMP NOT NULL, + leader_id TEXT NOT NULL, + name TEXT PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'), + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); + +CREATE TABLE IF NOT EXISTS river_queue ( + name TEXT PRIMARY KEY NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata BLOB NOT NULL DEFAULT (json('{}')), + paused_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL +); + +CREATE TABLE IF NOT EXISTS river_client ( + id TEXT PRIMARY KEY NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata BLOB NOT NULL DEFAULT (json('{}')), + paused_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL, + CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128) +); + +CREATE TABLE IF NOT EXISTS river_client_queue ( + river_client_id TEXT NOT NULL REFERENCES river_client (id) ON DELETE CASCADE, + name TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + max_workers INTEGER NOT NULL DEFAULT 0, + metadata BLOB NOT NULL DEFAULT (json('{}')), + num_jobs_completed INTEGER NOT NULL DEFAULT 0, + num_jobs_running INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (river_client_id, name), + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0), + CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0) +); + +-- Record that all River migrations have been applied +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 2); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 3); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 4); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 5); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 6); + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS river_client_queue; +DROP TABLE IF EXISTS river_client; +DROP TABLE IF EXISTS river_queue; +DROP TABLE IF EXISTS river_leader; +DROP TABLE IF EXISTS river_job; +DROP TABLE IF EXISTS river_migration; +-- +goose StatementEnd diff --git a/internal/kits/system/multi/templates/jobs/schema.sql.tmpl b/internal/kits/system/multi/templates/jobs/schema.sql.tmpl new file mode 100644 index 0000000..eabc8b8 --- /dev/null +++ b/internal/kits/system/multi/templates/jobs/schema.sql.tmpl @@ -0,0 +1,75 @@ +-- River job queue tables +CREATE TABLE IF NOT EXISTS river_migration ( + line TEXT NOT NULL, + version INTEGER NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (line, version) +); + +CREATE TABLE IF NOT EXISTS river_job ( + id INTEGER PRIMARY KEY, + args BLOB NOT NULL DEFAULT '{}', + attempt INTEGER NOT NULL DEFAULT 0, + attempted_at TIMESTAMP, + attempted_by BLOB, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + errors BLOB, + finalized_at TIMESTAMP, + kind TEXT NOT NULL, + max_attempts INTEGER NOT NULL, + metadata BLOB NOT NULL DEFAULT (json('{}')), + priority INTEGER NOT NULL DEFAULT 1, + queue TEXT NOT NULL DEFAULT 'default', + state TEXT NOT NULL DEFAULT 'available', + scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + tags BLOB NOT NULL DEFAULT (json('[]')), + unique_key BLOB, + unique_states INTEGER +); + +CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind); +CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); + +CREATE TABLE IF NOT EXISTS river_leader ( + elected_at TIMESTAMP NOT NULL, + expires_at TIMESTAMP NOT NULL, + leader_id TEXT NOT NULL, + name TEXT PRIMARY KEY NOT NULL DEFAULT 'default', + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); + +CREATE TABLE IF NOT EXISTS river_queue ( + name TEXT PRIMARY KEY NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata BLOB NOT NULL DEFAULT (json('{}')), + paused_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL +); + +CREATE TABLE IF NOT EXISTS river_client ( + id TEXT PRIMARY KEY NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata BLOB NOT NULL DEFAULT (json('{}')), + paused_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL +); + +CREATE TABLE IF NOT EXISTS river_client_queue ( + river_client_id TEXT NOT NULL REFERENCES river_client (id) ON DELETE CASCADE, + name TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + max_workers INTEGER NOT NULL DEFAULT 0, + metadata BLOB NOT NULL DEFAULT (json('{}')), + num_jobs_completed INTEGER NOT NULL DEFAULT 0, + num_jobs_running INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (river_client_id, name) +); + +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 2); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 3); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 4); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 5); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 6); diff --git a/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl b/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl new file mode 100644 index 0000000..a36df8e --- /dev/null +++ b/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl @@ -0,0 +1,15 @@ +package jobs + +import ( + "github.com/riverqueue/river" +) + +// SetupWorkers registers all job workers with River. +// New workers are added here by `lvt gen job`. +func SetupWorkers() *river.Workers { + workers := river.NewWorkers() + + // Register job workers below (added by `lvt gen job`) + + return workers +} diff --git a/internal/kits/system/single/templates/jobs/handler.go.tmpl b/internal/kits/system/single/templates/jobs/handler.go.tmpl new file mode 100644 index 0000000..972348c --- /dev/null +++ b/internal/kits/system/single/templates/jobs/handler.go.tmpl @@ -0,0 +1,37 @@ +package jobs + +import ( + "context" + "log/slog" + + "github.com/riverqueue/river" +) + +// <<.JobNameCamel>>Args defines the payload for <<.JobName>> jobs. +type <<.JobNameCamel>>Args struct { + // TODO: Define your job payload fields here. + // Example: + // To string `json:"to"` + // Subject string `json:"subject"` +} + +// Kind returns the unique job type identifier used by River. +func (<<.JobNameCamel>>Args) Kind() string { return "<<.JobName>>" } + +// <<.JobNameCamel>>Worker processes <<.JobName>> jobs. +type <<.JobNameCamel>>Worker struct { + river.WorkerDefaults[<<.JobNameCamel>>Args] +} + +// Work executes the <<.JobName>> job. +func (w *<<.JobNameCamel>>Worker) Work(ctx context.Context, job *river.Job[<<.JobNameCamel>>Args]) error { + slog.Info("Processing <<.JobName>> job", "job_id", job.ID) + + _ = job.Args // TODO: use job.Args fields + + // TODO: Implement your job logic here. + // Return nil on success, or an error to trigger a retry. + // After max attempts are exhausted, the job moves to "discarded" state. + + return nil +} diff --git a/internal/kits/system/single/templates/jobs/migration.sql.tmpl b/internal/kits/system/single/templates/jobs/migration.sql.tmpl new file mode 100644 index 0000000..d2adad5 --- /dev/null +++ b/internal/kits/system/single/templates/jobs/migration.sql.tmpl @@ -0,0 +1,123 @@ +-- +goose Up +-- +goose StatementBegin + +-- River job queue tables (SQLite) +-- Generated by: river migrate-get --line main --up --all --database-url "sqlite://" +-- River version: v0.26.x + +CREATE TABLE IF NOT EXISTS river_migration ( + line TEXT NOT NULL, + version INTEGER NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT line_length CHECK (length(line) > 0 AND length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), + PRIMARY KEY (line, version) +); + +CREATE TABLE IF NOT EXISTS river_job ( + id INTEGER PRIMARY KEY, + args BLOB NOT NULL DEFAULT '{}', + attempt INTEGER NOT NULL DEFAULT 0, + attempted_at TIMESTAMP, + attempted_by BLOB, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + errors BLOB, + finalized_at TIMESTAMP, + kind TEXT NOT NULL, + max_attempts INTEGER NOT NULL, + metadata BLOB NOT NULL DEFAULT (json('{}')), + priority INTEGER NOT NULL DEFAULT 1, + queue TEXT NOT NULL DEFAULT 'default', + state TEXT NOT NULL DEFAULT 'available', + scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + tags BLOB NOT NULL DEFAULT (json('[]')), + unique_key BLOB, + unique_states INTEGER, + CONSTRAINT finalized_or_finalized_at_null CHECK ( + (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR + (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) + ), + CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4), + CONSTRAINT queue_length CHECK (length(queue) > 0 AND length(queue) < 128), + CONSTRAINT kind_length CHECK (length(kind) > 0 AND length(kind) < 128), + CONSTRAINT state_valid CHECK (state IN ('available', 'cancelled', 'completed', 'discarded', 'pending', 'retryable', 'running', 'scheduled')) +); + +CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind); +CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at) WHERE finalized_at IS NOT NULL; +CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); +CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key) + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + AND CASE state + WHEN 'available' THEN unique_states & (1 << 0) + WHEN 'cancelled' THEN unique_states & (1 << 1) + WHEN 'completed' THEN unique_states & (1 << 2) + WHEN 'discarded' THEN unique_states & (1 << 3) + WHEN 'pending' THEN unique_states & (1 << 4) + WHEN 'retryable' THEN unique_states & (1 << 5) + WHEN 'running' THEN unique_states & (1 << 6) + WHEN 'scheduled' THEN unique_states & (1 << 7) + ELSE 0 + END >= 1; + +CREATE TABLE IF NOT EXISTS river_leader ( + elected_at TIMESTAMP NOT NULL, + expires_at TIMESTAMP NOT NULL, + leader_id TEXT NOT NULL, + name TEXT PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'), + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); + +CREATE TABLE IF NOT EXISTS river_queue ( + name TEXT PRIMARY KEY NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata BLOB NOT NULL DEFAULT (json('{}')), + paused_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL +); + +CREATE TABLE IF NOT EXISTS river_client ( + id TEXT PRIMARY KEY NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata BLOB NOT NULL DEFAULT (json('{}')), + paused_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL, + CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128) +); + +CREATE TABLE IF NOT EXISTS river_client_queue ( + river_client_id TEXT NOT NULL REFERENCES river_client (id) ON DELETE CASCADE, + name TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + max_workers INTEGER NOT NULL DEFAULT 0, + metadata BLOB NOT NULL DEFAULT (json('{}')), + num_jobs_completed INTEGER NOT NULL DEFAULT 0, + num_jobs_running INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (river_client_id, name), + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0), + CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0) +); + +-- Record that all River migrations have been applied +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 2); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 3); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 4); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 5); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 6); + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS river_client_queue; +DROP TABLE IF EXISTS river_client; +DROP TABLE IF EXISTS river_queue; +DROP TABLE IF EXISTS river_leader; +DROP TABLE IF EXISTS river_job; +DROP TABLE IF EXISTS river_migration; +-- +goose StatementEnd diff --git a/internal/kits/system/single/templates/jobs/schema.sql.tmpl b/internal/kits/system/single/templates/jobs/schema.sql.tmpl new file mode 100644 index 0000000..eabc8b8 --- /dev/null +++ b/internal/kits/system/single/templates/jobs/schema.sql.tmpl @@ -0,0 +1,75 @@ +-- River job queue tables +CREATE TABLE IF NOT EXISTS river_migration ( + line TEXT NOT NULL, + version INTEGER NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (line, version) +); + +CREATE TABLE IF NOT EXISTS river_job ( + id INTEGER PRIMARY KEY, + args BLOB NOT NULL DEFAULT '{}', + attempt INTEGER NOT NULL DEFAULT 0, + attempted_at TIMESTAMP, + attempted_by BLOB, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + errors BLOB, + finalized_at TIMESTAMP, + kind TEXT NOT NULL, + max_attempts INTEGER NOT NULL, + metadata BLOB NOT NULL DEFAULT (json('{}')), + priority INTEGER NOT NULL DEFAULT 1, + queue TEXT NOT NULL DEFAULT 'default', + state TEXT NOT NULL DEFAULT 'available', + scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + tags BLOB NOT NULL DEFAULT (json('[]')), + unique_key BLOB, + unique_states INTEGER +); + +CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind); +CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); + +CREATE TABLE IF NOT EXISTS river_leader ( + elected_at TIMESTAMP NOT NULL, + expires_at TIMESTAMP NOT NULL, + leader_id TEXT NOT NULL, + name TEXT PRIMARY KEY NOT NULL DEFAULT 'default', + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); + +CREATE TABLE IF NOT EXISTS river_queue ( + name TEXT PRIMARY KEY NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata BLOB NOT NULL DEFAULT (json('{}')), + paused_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL +); + +CREATE TABLE IF NOT EXISTS river_client ( + id TEXT PRIMARY KEY NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + metadata BLOB NOT NULL DEFAULT (json('{}')), + paused_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL +); + +CREATE TABLE IF NOT EXISTS river_client_queue ( + river_client_id TEXT NOT NULL REFERENCES river_client (id) ON DELETE CASCADE, + name TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + max_workers INTEGER NOT NULL DEFAULT 0, + metadata BLOB NOT NULL DEFAULT (json('{}')), + num_jobs_completed INTEGER NOT NULL DEFAULT 0, + num_jobs_running INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (river_client_id, name) +); + +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 2); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 3); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 4); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 5); +INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 6); diff --git a/internal/kits/system/single/templates/jobs/worker_init.go.tmpl b/internal/kits/system/single/templates/jobs/worker_init.go.tmpl new file mode 100644 index 0000000..a36df8e --- /dev/null +++ b/internal/kits/system/single/templates/jobs/worker_init.go.tmpl @@ -0,0 +1,15 @@ +package jobs + +import ( + "github.com/riverqueue/river" +) + +// SetupWorkers registers all job workers with River. +// New workers are added here by `lvt gen job`. +func SetupWorkers() *river.Workers { + workers := river.NewWorkers() + + // Register job workers below (added by `lvt gen job`) + + return workers +} From d55a800e0a884d859f41b1439f1278b13ef346f4 Mon Sep 17 00:00:00 2001 From: Adnaan Date: Fri, 20 Mar 2026 23:22:00 +0100 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20address=20review=20findings=20?= =?UTF-8?q?=E2=80=94=20injection=20point,=20imports,=20atomicity?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move injection point from defer database.CloseDB() to after appCtx creation (appCtx must exist before River client Start/Stop) - Add missing "database/sql" import for sql.Open in injected code - Use existing injectImport() helper instead of manual import injection - Buffer template output in bytes.Buffer before writing files to prevent partial files on template execution failure - Export ToCamelCase for use by CLI commands (consistent initialism handling for display output) - Sync schema.sql.tmpl with full migration.sql.tmpl (was missing constraints and indexes) Co-Authored-By: Claude Opus 4.6 (1M context) --- commands/jobs.go | 10 +-- internal/generator/jobs.go | 75 +++++++------------ internal/generator/types.go | 7 ++ .../multi/templates/jobs/schema.sql.tmpl | 40 ++++++++-- .../single/templates/jobs/schema.sql.tmpl | 40 ++++++++-- 5 files changed, 107 insertions(+), 65 deletions(-) diff --git a/commands/jobs.go b/commands/jobs.go index b2dc636..776f2c8 100644 --- a/commands/jobs.go +++ b/commands/jobs.go @@ -105,15 +105,7 @@ func GenJob(args []string) error { fmt.Println(" 2. Enqueue jobs from your handlers:") fmt.Println() fmt.Printf(" // In any handler with access to riverClient:\n") - // Convert snake_case to CamelCase for display - parts := strings.Split(jobName, "_") - for i, p := range parts { - if len(p) > 0 { - parts[i] = strings.ToUpper(p[:1]) + p[1:] - } - } - camelName := strings.Join(parts, "") - fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", camelName) + fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", generator.ToCamelCase(jobName)) return nil } diff --git a/internal/generator/jobs.go b/internal/generator/jobs.go index 0633157..06f7e42 100644 --- a/internal/generator/jobs.go +++ b/internal/generator/jobs.go @@ -1,6 +1,7 @@ package generator import ( + "bytes" "fmt" "os" "os/exec" @@ -153,16 +154,13 @@ func GenerateJob(projectRoot string, moduleName string, jobName string) error { return fmt.Errorf("failed to parse handler template: %w", err) } - file, err := os.Create(jobPath) - if err != nil { - return fmt.Errorf("failed to create %s.go: %w", jobName, err) - } - - if err := tmpl.Execute(file, jobConfig); err != nil { - file.Close() + var buf bytes.Buffer + if err := tmpl.Execute(&buf, jobConfig); err != nil { return fmt.Errorf("failed to execute handler template: %w", err) } - file.Close() + if err := os.WriteFile(jobPath, buf.Bytes(), 0644); err != nil { + return fmt.Errorf("failed to write %s.go: %w", jobName, err) + } // 2. Register worker in app/jobs/worker.go if err := injectWorkerRegistration(workerPath, jobNameCamel); err != nil { @@ -172,7 +170,7 @@ func GenerateJob(projectRoot string, moduleName string, jobName string) error { return nil } -// writeTemplateFile loads a kit template and writes it to the output path. +// writeTemplateFile loads a kit template, executes it, and writes the result atomically. func writeTemplateFile(kitLoader *kits.KitLoader, kitName, templatePath, outputPath string, data interface{}) error { content, err := kitLoader.LoadKitTemplate(kitName, templatePath) if err != nil { @@ -184,13 +182,12 @@ func writeTemplateFile(kitLoader *kits.KitLoader, kitName, templatePath, outputP return err } - file, err := os.Create(outputPath) - if err != nil { + var buf bytes.Buffer + if err := tmpl.Execute(&buf, data); err != nil { return err } - defer file.Close() - return tmpl.Execute(file, data) + return os.WriteFile(outputPath, buf.Bytes(), 0644) } // appendTemplateFile loads a kit template and appends it to the output file. @@ -238,8 +235,7 @@ func injectJobWorker(mainGoPath string, moduleName string) error { return nil // Already injected } - // Find injection point: after database.InitDB call, before route registrations - // Look for the database init line + // Find injection point: after appCtx creation (needed by River client) lines := strings.Split(mainStr, "\n") var result []string injected := false @@ -247,10 +243,12 @@ func injectJobWorker(mainGoPath string, moduleName string) error { for _, line := range lines { result = append(result, line) - // Inject after the database initialization block - // Look for the line that closes the database error check (the closing brace after InitDB) - if !injected && strings.Contains(line, "defer database.CloseDB()") { - // Insert River setup after this line + // Inject after appCtx creation — River needs the context for Start/Stop + if !injected && strings.Contains(line, "appCtx, appCancel := context.WithCancel") { + // Find the next line (defer appCancel()) and include it + continue + } + if !injected && strings.Contains(line, "defer appCancel()") { riverSetup := []string{ "", "\t// Background job processing (River)", @@ -286,38 +284,23 @@ func injectJobWorker(mainGoPath string, moduleName string) error { } if !injected { - return fmt.Errorf("could not find injection point in main.go (expected 'defer database.CloseDB()')") + return fmt.Errorf("could not find injection point in main.go (expected 'appCtx, appCancel := context.WithCancel')") } - // Inject imports + // Inject imports using existing helper resultStr := strings.Join(result, "\n") - // Add River imports - riverImports := fmt.Sprintf("\t\"%s/app/jobs\"\n", moduleName) + - "\t\"github.com/riverqueue/river\"\n" + - "\t\"github.com/riverqueue/river/riverdriver/riversqlite\"" - - // Find the import block and add our imports - if idx := strings.Index(resultStr, "\"database/sql\""); idx != -1 { - // Insert after "database/sql" import - insertPos := idx + len("\"database/sql\"") - resultStr = resultStr[:insertPos] + "\n" + riverImports + resultStr[insertPos:] - } else if idx := strings.Index(resultStr, "_ \"modernc.org/sqlite\""); idx != -1 { - // Insert before the sqlite import - resultStr = resultStr[:idx] + riverImports + "\n\t" + resultStr[idx:] - } else { - // Fallback: find first import line and add after it - importIdx := strings.Index(resultStr, "import (") - if importIdx == -1 { - return fmt.Errorf("could not find import block in main.go") - } - // Find the next newline after "import (" - nextNewline := strings.Index(resultStr[importIdx:], "\n") - if nextNewline == -1 { - return fmt.Errorf("malformed import block in main.go") + imports := []string{ + fmt.Sprintf("\t\"database/sql\""), + fmt.Sprintf("\t\"%s/app/jobs\"", moduleName), + "\t\"github.com/riverqueue/river\"", + "\t\"github.com/riverqueue/river/riverdriver/riversqlite\"", + } + for _, imp := range imports { + resultStr, err = injectImport(resultStr, imp) + if err != nil { + return fmt.Errorf("failed to inject import %s: %w", imp, err) } - insertPos := importIdx + nextNewline + 1 - resultStr = resultStr[:insertPos] + "\t" + riverImports + "\n" + resultStr[insertPos:] } return os.WriteFile(mainGoPath, []byte(resultStr), 0644) diff --git a/internal/generator/types.go b/internal/generator/types.go index a267eda..5997041 100644 --- a/internal/generator/types.go +++ b/internal/generator/types.go @@ -108,6 +108,13 @@ func singularizeForTemplate(s string) string { return singularize(strings.ToLower(s)) } +// ToCamelCase converts snake_case to CamelCase following Go conventions. +// Common initialisms like ID, URL, HTTP are kept in all caps. +// Exported for use by CLI commands that need consistent naming. +func ToCamelCase(s string) string { + return toCamelCase(s) +} + // toCamelCase converts snake_case to CamelCase following Go conventions // Common initialisms like ID, URL, HTTP are kept in all caps func toCamelCase(s string) string { diff --git a/internal/kits/system/multi/templates/jobs/schema.sql.tmpl b/internal/kits/system/multi/templates/jobs/schema.sql.tmpl index eabc8b8..55e6a3e 100644 --- a/internal/kits/system/multi/templates/jobs/schema.sql.tmpl +++ b/internal/kits/system/multi/templates/jobs/schema.sql.tmpl @@ -1,8 +1,11 @@ --- River job queue tables +-- River job queue tables (SQLite) + CREATE TABLE IF NOT EXISTS river_migration ( line TEXT NOT NULL, version INTEGER NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT line_length CHECK (length(line) > 0 AND length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), PRIMARY KEY (line, version) ); @@ -24,17 +27,40 @@ CREATE TABLE IF NOT EXISTS river_job ( scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, tags BLOB NOT NULL DEFAULT (json('[]')), unique_key BLOB, - unique_states INTEGER + unique_states INTEGER, + CONSTRAINT finalized_or_finalized_at_null CHECK ( + (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR + (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) + ), + CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4), + CONSTRAINT queue_length CHECK (length(queue) > 0 AND length(queue) < 128), + CONSTRAINT kind_length CHECK (length(kind) > 0 AND length(kind) < 128), + CONSTRAINT state_valid CHECK (state IN ('available', 'cancelled', 'completed', 'discarded', 'pending', 'retryable', 'running', 'scheduled')) ); CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind); +CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at) WHERE finalized_at IS NOT NULL; CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); +CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key) + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + AND CASE state + WHEN 'available' THEN unique_states & (1 << 0) + WHEN 'cancelled' THEN unique_states & (1 << 1) + WHEN 'completed' THEN unique_states & (1 << 2) + WHEN 'discarded' THEN unique_states & (1 << 3) + WHEN 'pending' THEN unique_states & (1 << 4) + WHEN 'retryable' THEN unique_states & (1 << 5) + WHEN 'running' THEN unique_states & (1 << 6) + WHEN 'scheduled' THEN unique_states & (1 << 7) + ELSE 0 + END >= 1; CREATE TABLE IF NOT EXISTS river_leader ( elected_at TIMESTAMP NOT NULL, expires_at TIMESTAMP NOT NULL, leader_id TEXT NOT NULL, - name TEXT PRIMARY KEY NOT NULL DEFAULT 'default', + name TEXT PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'), CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) ); @@ -52,7 +78,8 @@ CREATE TABLE IF NOT EXISTS river_client ( created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, metadata BLOB NOT NULL DEFAULT (json('{}')), paused_at TIMESTAMP, - updated_at TIMESTAMP NOT NULL + updated_at TIMESTAMP NOT NULL, + CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128) ); CREATE TABLE IF NOT EXISTS river_client_queue ( @@ -64,7 +91,10 @@ CREATE TABLE IF NOT EXISTS river_client_queue ( num_jobs_completed INTEGER NOT NULL DEFAULT 0, num_jobs_running INTEGER NOT NULL DEFAULT 0, updated_at TIMESTAMP NOT NULL, - PRIMARY KEY (river_client_id, name) + PRIMARY KEY (river_client_id, name), + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0), + CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0) ); INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1); diff --git a/internal/kits/system/single/templates/jobs/schema.sql.tmpl b/internal/kits/system/single/templates/jobs/schema.sql.tmpl index eabc8b8..55e6a3e 100644 --- a/internal/kits/system/single/templates/jobs/schema.sql.tmpl +++ b/internal/kits/system/single/templates/jobs/schema.sql.tmpl @@ -1,8 +1,11 @@ --- River job queue tables +-- River job queue tables (SQLite) + CREATE TABLE IF NOT EXISTS river_migration ( line TEXT NOT NULL, version INTEGER NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT line_length CHECK (length(line) > 0 AND length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), PRIMARY KEY (line, version) ); @@ -24,17 +27,40 @@ CREATE TABLE IF NOT EXISTS river_job ( scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, tags BLOB NOT NULL DEFAULT (json('[]')), unique_key BLOB, - unique_states INTEGER + unique_states INTEGER, + CONSTRAINT finalized_or_finalized_at_null CHECK ( + (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR + (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) + ), + CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4), + CONSTRAINT queue_length CHECK (length(queue) > 0 AND length(queue) < 128), + CONSTRAINT kind_length CHECK (length(kind) > 0 AND length(kind) < 128), + CONSTRAINT state_valid CHECK (state IN ('available', 'cancelled', 'completed', 'discarded', 'pending', 'retryable', 'running', 'scheduled')) ); CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind); +CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at) WHERE finalized_at IS NOT NULL; CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id); +CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key) + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + AND CASE state + WHEN 'available' THEN unique_states & (1 << 0) + WHEN 'cancelled' THEN unique_states & (1 << 1) + WHEN 'completed' THEN unique_states & (1 << 2) + WHEN 'discarded' THEN unique_states & (1 << 3) + WHEN 'pending' THEN unique_states & (1 << 4) + WHEN 'retryable' THEN unique_states & (1 << 5) + WHEN 'running' THEN unique_states & (1 << 6) + WHEN 'scheduled' THEN unique_states & (1 << 7) + ELSE 0 + END >= 1; CREATE TABLE IF NOT EXISTS river_leader ( elected_at TIMESTAMP NOT NULL, expires_at TIMESTAMP NOT NULL, leader_id TEXT NOT NULL, - name TEXT PRIMARY KEY NOT NULL DEFAULT 'default', + name TEXT PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'), CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) ); @@ -52,7 +78,8 @@ CREATE TABLE IF NOT EXISTS river_client ( created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, metadata BLOB NOT NULL DEFAULT (json('{}')), paused_at TIMESTAMP, - updated_at TIMESTAMP NOT NULL + updated_at TIMESTAMP NOT NULL, + CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128) ); CREATE TABLE IF NOT EXISTS river_client_queue ( @@ -64,7 +91,10 @@ CREATE TABLE IF NOT EXISTS river_client_queue ( num_jobs_completed INTEGER NOT NULL DEFAULT 0, num_jobs_running INTEGER NOT NULL DEFAULT 0, updated_at TIMESTAMP NOT NULL, - PRIMARY KEY (river_client_id, name) + PRIMARY KEY (river_client_id, name), + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0), + CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0) ); INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1); From d404f62c0d9a6004e3a7539ff9cfbd331ac9716a Mon Sep 17 00:00:00 2001 From: Adnaan Date: Sat, 21 Mar 2026 06:40:28 +0100 Subject: [PATCH 3/3] fix: address Claude + Copilot review comments - Fix riverClient.Stop using cancelled appCtx: now uses fresh background context with 30s timeout for graceful drain - Make riverClient accessible to handlers via jobs.SetClient/Client() so jobs can actually be enqueued from HTTP handlers - Fix CLI output: migration filename now matches actual generated name - Fix test config: use .lvtrc format (key=value) not lvt.yaml - Add TestInjectJobWorker: tests main.go injection with real fixture, verifying imports, River setup code, and idempotency Co-Authored-By: Claude Opus 4.6 (1M context) --- commands/jobs.go | 4 +- internal/generator/jobs.go | 12 ++- internal/generator/jobs_test.go | 85 +++++++++++++++++-- .../multi/templates/jobs/worker_init.go.tmpl | 13 +++ .../single/templates/jobs/worker_init.go.tmpl | 13 +++ 5 files changed, 116 insertions(+), 11 deletions(-) diff --git a/commands/jobs.go b/commands/jobs.go index 776f2c8..7f3e26a 100644 --- a/commands/jobs.go +++ b/commands/jobs.go @@ -38,8 +38,8 @@ func GenQueue(args []string) error { fmt.Println("✅ Background job queue set up successfully!") fmt.Println() fmt.Println("Generated files:") - fmt.Println(" app/jobs/worker.go Job worker registration") - fmt.Println(" database/migrations/..._river.sql River queue tables") + fmt.Println(" app/jobs/worker.go Job worker registration") + fmt.Println(" database/migrations/..._setup_river_queue.sql River queue tables") fmt.Println() fmt.Println("Next steps:") fmt.Println(" 1. Run 'lvt gen job ' to create your first job handler") diff --git a/internal/generator/jobs.go b/internal/generator/jobs.go index 06f7e42..c18b15f 100644 --- a/internal/generator/jobs.go +++ b/internal/generator/jobs.go @@ -275,8 +275,13 @@ func injectJobWorker(mainGoPath string, moduleName string) error { "\t\tslog.Error(\"Failed to start job workers\", \"error\", err)", "\t\tos.Exit(1)", "\t}", - "\tdefer riverClient.Stop(appCtx)", - "\t_ = riverClient // Available for enqueueing jobs in handlers", + "\tdefer func() {", + "\t\tstopCtx, stopCancel := context.WithTimeout(context.Background(), 30*time.Second)", + "\t\tdefer stopCancel()", + "\t\t_ = riverClient.Stop(stopCtx)", + "\t}()", + "\tjobs.SetClient(riverClient)", + "\tslog.Info(\"Background job worker started\")", } result = append(result, riverSetup...) injected = true @@ -291,7 +296,8 @@ func injectJobWorker(mainGoPath string, moduleName string) error { resultStr := strings.Join(result, "\n") imports := []string{ - fmt.Sprintf("\t\"database/sql\""), + "\t\"database/sql\"", + "\t\"time\"", fmt.Sprintf("\t\"%s/app/jobs\"", moduleName), "\t\"github.com/riverqueue/river\"", "\t\"github.com/riverqueue/river/riverdriver/riversqlite\"", diff --git a/internal/generator/jobs_test.go b/internal/generator/jobs_test.go index 64a7d84..5a1badf 100644 --- a/internal/generator/jobs_test.go +++ b/internal/generator/jobs_test.go @@ -216,6 +216,81 @@ func TestGenerateMultipleJobs(t *testing.T) { } } +func TestInjectJobWorker(t *testing.T) { + tmpDir := t.TempDir() + + // Create a minimal main.go that matches the generated template structure + mainGoContent := `package main + +import ( + "context" + "log/slog" + "os" + + "testmodule/database" +) + +func main() { + dbPath := "app.db" + _, err := database.InitDB(dbPath) + if err != nil { + slog.Error("Failed to initialize database", "error", err) + os.Exit(1) + } + defer database.CloseDB() + + appCtx, appCancel := context.WithCancel(context.Background()) + defer appCancel() + + // Routes go here + slog.Info("Server starting") +} +` + mainGoPath := filepath.Join(tmpDir, "main.go") + if err := os.WriteFile(mainGoPath, []byte(mainGoContent), 0644); err != nil { + t.Fatalf("Failed to create main.go: %v", err) + } + + if err := injectJobWorker(mainGoPath, "testmodule"); err != nil { + t.Fatalf("injectJobWorker failed: %v", err) + } + + result, err := os.ReadFile(mainGoPath) + if err != nil { + t.Fatalf("Failed to read modified main.go: %v", err) + } + resultStr := string(result) + + // Verify River setup was injected + checks := []string{ + "river.NewClient", + "riversqlite.New", + "jobs.SetupWorkers()", + "riverClient.Start(appCtx)", + "riverClient.Stop(stopCtx)", + "jobs.SetClient(riverClient)", + "\"database/sql\"", + "\"github.com/riverqueue/river\"", + "\"testmodule/app/jobs\"", + } + for _, check := range checks { + if !strings.Contains(resultStr, check) { + t.Errorf("main.go missing expected content: %s", check) + } + } + + // Verify idempotency — second call should be no-op + if err := injectJobWorker(mainGoPath, "testmodule"); err != nil { + t.Fatalf("Second injectJobWorker call failed: %v", err) + } + + // Verify no duplicate injection + count := strings.Count(resultStr, "river.NewClient") + if count != 1 { + t.Errorf("Expected 1 river.NewClient occurrence, got %d", count) + } +} + // setupTestProject creates a minimal project structure for testing. func setupTestProject(t *testing.T, dir string) { t.Helper() @@ -238,11 +313,9 @@ func setupTestProject(t *testing.T, dir string) { t.Fatalf("Failed to create schema.sql: %v", err) } - // Create lvt.yaml (project config) - lvtConfig := `kit: multi -module: testmodule -` - if err := os.WriteFile(filepath.Join(dir, "lvt.yaml"), []byte(lvtConfig), 0644); err != nil { - t.Fatalf("Failed to create lvt.yaml: %v", err) + // Create .lvtrc (project config) + lvtConfig := "kit=multi\nmodule=testmodule\n" + if err := os.WriteFile(filepath.Join(dir, ".lvtrc"), []byte(lvtConfig), 0644); err != nil { + t.Fatalf("Failed to create .lvtrc: %v", err) } } diff --git a/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl b/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl index a36df8e..b864713 100644 --- a/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl +++ b/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl @@ -4,6 +4,8 @@ import ( "github.com/riverqueue/river" ) +var client *river.Client[any] + // SetupWorkers registers all job workers with River. // New workers are added here by `lvt gen job`. func SetupWorkers() *river.Workers { @@ -13,3 +15,14 @@ func SetupWorkers() *river.Workers { return workers } + +// SetClient stores the River client for use by handlers via Client(). +func SetClient(c *river.Client[any]) { + client = c +} + +// Client returns the River client for enqueueing jobs. +// Call from HTTP handlers: jobs.Client().Insert(ctx, args, nil) +func Client() *river.Client[any] { + return client +} diff --git a/internal/kits/system/single/templates/jobs/worker_init.go.tmpl b/internal/kits/system/single/templates/jobs/worker_init.go.tmpl index a36df8e..b864713 100644 --- a/internal/kits/system/single/templates/jobs/worker_init.go.tmpl +++ b/internal/kits/system/single/templates/jobs/worker_init.go.tmpl @@ -4,6 +4,8 @@ import ( "github.com/riverqueue/river" ) +var client *river.Client[any] + // SetupWorkers registers all job workers with River. // New workers are added here by `lvt gen job`. func SetupWorkers() *river.Workers { @@ -13,3 +15,14 @@ func SetupWorkers() *river.Workers { return workers } + +// SetClient stores the River client for use by handlers via Client(). +func SetClient(c *river.Client[any]) { + client = c +} + +// Client returns the River client for enqueueing jobs. +// Call from HTTP handlers: jobs.Client().Insert(ctx, args, nil) +func Client() *river.Client[any] { + return client +}