diff --git a/commands/gen.go b/commands/gen.go
index a64de88..9097a68 100644
--- a/commands/gen.go
+++ b/commands/gen.go
@@ -47,8 +47,12 @@ func Gen(args []string) error {
return Auth(args[1:])
case "stack":
return GenStack(args[1:])
+ case "queue":
+ return GenQueue(args[1:])
+ case "job":
+ return GenJob(args[1:])
default:
- return fmt.Errorf("unknown subcommand: %s\n\nAvailable subcommands:\n resource Generate full CRUD resource with database\n view Generate view-only handler (no database)\n schema Generate database schema only\n auth Generate authentication system\n stack Generate deployment stack configuration\n\nRun 'lvt gen' for interactive mode", subcommand)
+ return fmt.Errorf("unknown subcommand: %s\n\nAvailable subcommands:\n resource Generate full CRUD resource with database\n view Generate view-only handler (no database)\n schema Generate database schema only\n auth Generate authentication system\n stack Generate deployment stack configuration\n queue Set up background job processing (River)\n job Scaffold a new background job handler\n\nRun 'lvt gen' for interactive mode", subcommand)
}
}
@@ -61,6 +65,8 @@ func interactiveGen() error {
fmt.Println(" schema
... Generate database schema only")
fmt.Println(" auth [StructName] [table_name] Generate authentication system")
fmt.Println(" stack Generate deployment stack configuration")
+ fmt.Println(" queue Set up background job processing (River)")
+ fmt.Println(" job Scaffold a new background job handler")
fmt.Println()
fmt.Println("Examples:")
fmt.Println(" lvt gen resource posts title content:text published:bool")
diff --git a/commands/jobs.go b/commands/jobs.go
new file mode 100644
index 0000000..7f3e26a
--- /dev/null
+++ b/commands/jobs.go
@@ -0,0 +1,149 @@
+package commands
+
+import (
+ "fmt"
+ "os"
+ "strings"
+
+ "github.com/livetemplate/lvt/internal/config"
+ "github.com/livetemplate/lvt/internal/generator"
+)
+
+// GenQueue sets up background job infrastructure using River.
+func GenQueue(args []string) error {
+ if ShowHelpIfRequested(args, printGenQueueHelp) {
+ return nil
+ }
+
+ cwd, err := os.Getwd()
+ if err != nil {
+ return fmt.Errorf("failed to get working directory: %w", err)
+ }
+
+ projectConfig, err := config.LoadProjectConfig(cwd)
+ if err != nil {
+ return fmt.Errorf("failed to load project config: %w", err)
+ }
+
+ moduleName := projectConfig.Module
+ if moduleName == "" {
+ return fmt.Errorf("could not determine module name from project config")
+ }
+
+ if err := generator.GenerateQueue(cwd, moduleName); err != nil {
+ return err
+ }
+
+ fmt.Println()
+ fmt.Println("✅ Background job queue set up successfully!")
+ fmt.Println()
+ fmt.Println("Generated files:")
+ fmt.Println(" app/jobs/worker.go Job worker registration")
+ fmt.Println(" database/migrations/..._setup_river_queue.sql River queue tables")
+ fmt.Println()
+ fmt.Println("Next steps:")
+ fmt.Println(" 1. Run 'lvt gen job ' to create your first job handler")
+ fmt.Println(" 2. Run 'go mod tidy' to fetch River dependencies")
+ fmt.Println(" 3. Run 'lvt migration up' to create River tables")
+ fmt.Println(" 4. Start your app — the job worker will start automatically")
+ fmt.Println()
+ fmt.Println("Example:")
+ fmt.Println(" lvt gen job send_email")
+
+ return nil
+}
+
+// GenJob scaffolds a new background job handler.
+func GenJob(args []string) error {
+ if ShowHelpIfRequested(args, printGenJobHelp) {
+ return nil
+ }
+
+ if len(args) < 1 {
+ return fmt.Errorf("job name required\n\nUsage: lvt gen job \n\nExamples:\n lvt gen job send_email\n lvt gen job process_payment\n lvt gen job generate_report")
+ }
+
+ jobName := strings.TrimSpace(args[0])
+ if jobName == "" {
+ return fmt.Errorf("job name cannot be empty")
+ }
+
+ if err := ValidatePositionalArg(jobName, "job name"); err != nil {
+ return err
+ }
+
+ // Normalize to snake_case
+ jobName = strings.ToLower(jobName)
+
+ cwd, err := os.Getwd()
+ if err != nil {
+ return fmt.Errorf("failed to get working directory: %w", err)
+ }
+
+ projectConfig, err := config.LoadProjectConfig(cwd)
+ if err != nil {
+ return fmt.Errorf("failed to load project config: %w", err)
+ }
+
+ moduleName := projectConfig.Module
+ if moduleName == "" {
+ return fmt.Errorf("could not determine module name from project config")
+ }
+
+ if err := generator.GenerateJob(cwd, moduleName, jobName); err != nil {
+ return err
+ }
+
+ fmt.Println()
+ fmt.Printf("✅ Job '%s' created successfully!\n", jobName)
+ fmt.Println()
+ fmt.Println("Generated files:")
+ fmt.Printf(" app/jobs/%s.go Job handler\n", jobName)
+ fmt.Println()
+ fmt.Println("Next steps:")
+ fmt.Printf(" 1. Edit app/jobs/%s.go to define your payload and logic\n", jobName)
+ fmt.Println(" 2. Enqueue jobs from your handlers:")
+ fmt.Println()
+ fmt.Printf(" // In any handler with access to riverClient:\n")
+ fmt.Printf(" riverClient.Insert(ctx, jobs.%sArgs{...}, nil)\n", generator.ToCamelCase(jobName))
+
+ return nil
+}
+
+func printGenQueueHelp() {
+ fmt.Println("Usage: lvt gen queue")
+ fmt.Println()
+ fmt.Println("Set up background job processing infrastructure using River.")
+ fmt.Println("This is a one-time setup command that creates:")
+ fmt.Println()
+ fmt.Println(" - Database migration for River queue tables")
+ fmt.Println(" - Worker registration file (app/jobs/worker.go)")
+ fmt.Println(" - River client setup in main.go")
+ fmt.Println()
+ fmt.Println("River (https://riverqueue.com) provides:")
+ fmt.Println(" - Worker pool with configurable concurrency")
+ fmt.Println(" - Retry with exponential backoff")
+ fmt.Println(" - Scheduled and periodic jobs")
+ fmt.Println(" - Dead letter queue (discarded jobs)")
+ fmt.Println(" - Unique/deduplicated jobs")
+ fmt.Println(" - Graceful shutdown")
+ fmt.Println(" - SQLite and PostgreSQL support")
+}
+
+func printGenJobHelp() {
+ fmt.Println("Usage: lvt gen job ")
+ fmt.Println()
+ fmt.Println("Scaffold a new background job handler.")
+ fmt.Println()
+ fmt.Println("Arguments:")
+ fmt.Println(" name Job name in snake_case (e.g., send_email, process_payment)")
+ fmt.Println()
+ fmt.Println("Examples:")
+ fmt.Println(" lvt gen job send_email")
+ fmt.Println(" lvt gen job process_payment")
+ fmt.Println(" lvt gen job generate_report")
+ fmt.Println(" lvt gen job cleanup_expired_sessions")
+ fmt.Println()
+ fmt.Println("Prerequisites:")
+ fmt.Println(" Run 'lvt gen queue' first to set up the job infrastructure.")
+}
diff --git a/internal/generator/jobs.go b/internal/generator/jobs.go
new file mode 100644
index 0000000..c18b15f
--- /dev/null
+++ b/internal/generator/jobs.go
@@ -0,0 +1,343 @@
+package generator
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "text/template"
+ "time"
+
+ "github.com/livetemplate/lvt/internal/config"
+ "github.com/livetemplate/lvt/internal/kits"
+)
+
+// JobsConfig holds configuration for generating the queue infrastructure.
+type JobsConfig struct {
+ ModuleName string
+}
+
+// JobConfig holds configuration for generating a single job handler.
+type JobConfig struct {
+ ModuleName string
+ JobName string // snake_case, e.g. "send_email"
+ JobNameCamel string // CamelCase, e.g. "SendEmail"
+}
+
+// GenerateQueue sets up the background job infrastructure using River.
+// It creates the migration, schema, worker init file, and injects setup into main.go.
+func GenerateQueue(projectRoot string, moduleName string) error {
+ projectConfig, err := config.LoadProjectConfig(projectRoot)
+ if err != nil {
+ return fmt.Errorf("failed to load project config: %w", err)
+ }
+ kitName := projectConfig.GetKit()
+ kitLoader := kits.DefaultLoader()
+
+ // Check if queue already set up
+ workerPath := filepath.Join(projectRoot, "app", "jobs", "worker.go")
+ if _, err := os.Stat(workerPath); err == nil {
+ return fmt.Errorf("job queue already set up (app/jobs/worker.go exists)")
+ }
+
+ // 1. Create migration file
+ migrationsDir := filepath.Join(projectRoot, "database", "migrations")
+ if err := os.MkdirAll(migrationsDir, 0755); err != nil {
+ return fmt.Errorf("failed to create migrations directory: %w", err)
+ }
+
+ timestamp := time.Now()
+ var migrationPath string
+ for i := 0; i < 3600; i++ {
+ timestampStr := timestamp.Format("20060102150405")
+ migrationFile := fmt.Sprintf("%s_setup_river_queue.sql", timestampStr)
+ migrationPath = filepath.Join(migrationsDir, migrationFile)
+
+ matches, err := filepath.Glob(filepath.Join(migrationsDir, timestampStr+"_*"))
+ if err != nil {
+ return fmt.Errorf("failed to check for existing migrations: %w", err)
+ }
+ if len(matches) == 0 {
+ break
+ }
+ timestamp = timestamp.Add(1 * time.Second)
+ if i == 3599 {
+ return fmt.Errorf("failed to generate unique migration timestamp")
+ }
+ }
+
+ if err := writeTemplateFile(kitLoader, kitName, "jobs/migration.sql.tmpl", migrationPath, nil); err != nil {
+ return fmt.Errorf("failed to generate migration: %w", err)
+ }
+
+ // 2. Append to schema.sql
+ schemaPath := filepath.Join(projectRoot, "database", "schema.sql")
+ if err := appendTemplateFile(kitLoader, kitName, "jobs/schema.sql.tmpl", schemaPath, nil); err != nil {
+ return fmt.Errorf("failed to append to schema.sql: %w", err)
+ }
+
+ // 3. Create app/jobs/worker.go
+ jobsDir := filepath.Join(projectRoot, "app", "jobs")
+ if err := os.MkdirAll(jobsDir, 0755); err != nil {
+ return fmt.Errorf("failed to create app/jobs directory: %w", err)
+ }
+
+ if err := writeTemplateFile(kitLoader, kitName, "jobs/worker_init.go.tmpl", workerPath, nil); err != nil {
+ return fmt.Errorf("failed to generate worker.go: %w", err)
+ }
+
+ // 4. Add River dependencies
+ goModPath := filepath.Join(projectRoot, "go.mod")
+ if _, err := os.Stat(goModPath); err == nil {
+ dependencies := []string{
+ "github.com/riverqueue/river@latest",
+ "github.com/riverqueue/river/riverdriver/riversqlite@latest",
+ }
+ args := append([]string{"get"}, dependencies...)
+ cmd := exec.Command("go", args...)
+ cmd.Dir = projectRoot
+ if output, err := cmd.CombinedOutput(); err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: could not fetch River dependencies (run 'go mod tidy' in %s to resolve):\n%s\n", projectRoot, string(output))
+ }
+ }
+
+ // 5. Inject River client setup into main.go
+ mainGoPath := findMainGo(projectRoot)
+ if mainGoPath != "" {
+ if err := injectJobWorker(mainGoPath, moduleName); err != nil {
+ return fmt.Errorf("failed to inject job worker into main.go: %w", err)
+ }
+ }
+
+ return nil
+}
+
+// GenerateJob scaffolds a new job handler and registers it with the worker.
+func GenerateJob(projectRoot string, moduleName string, jobName string) error {
+ // Validate queue is set up
+ workerPath := filepath.Join(projectRoot, "app", "jobs", "worker.go")
+ if _, err := os.Stat(workerPath); os.IsNotExist(err) {
+ return fmt.Errorf("job queue not set up yet. Run 'lvt gen queue' first")
+ }
+
+ projectConfig, err := config.LoadProjectConfig(projectRoot)
+ if err != nil {
+ return fmt.Errorf("failed to load project config: %w", err)
+ }
+ kitName := projectConfig.GetKit()
+ kitLoader := kits.DefaultLoader()
+
+ jobNameCamel := toCamelCase(jobName)
+
+ jobConfig := &JobConfig{
+ ModuleName: moduleName,
+ JobName: jobName,
+ JobNameCamel: jobNameCamel,
+ }
+
+ // Check if job already exists
+ jobPath := filepath.Join(projectRoot, "app", "jobs", jobName+".go")
+ if _, err := os.Stat(jobPath); err == nil {
+ return fmt.Errorf("job '%s' already exists (app/jobs/%s.go)", jobName, jobName)
+ }
+
+ // 1. Generate job handler file
+ templateContent, err := kitLoader.LoadKitTemplate(kitName, "jobs/handler.go.tmpl")
+ if err != nil {
+ return fmt.Errorf("failed to load handler template: %w", err)
+ }
+
+ tmpl, err := template.New("job_handler").Delims("<<", ">>").Parse(string(templateContent))
+ if err != nil {
+ return fmt.Errorf("failed to parse handler template: %w", err)
+ }
+
+ var buf bytes.Buffer
+ if err := tmpl.Execute(&buf, jobConfig); err != nil {
+ return fmt.Errorf("failed to execute handler template: %w", err)
+ }
+ if err := os.WriteFile(jobPath, buf.Bytes(), 0644); err != nil {
+ return fmt.Errorf("failed to write %s.go: %w", jobName, err)
+ }
+
+ // 2. Register worker in app/jobs/worker.go
+ if err := injectWorkerRegistration(workerPath, jobNameCamel); err != nil {
+ return fmt.Errorf("failed to register worker: %w", err)
+ }
+
+ return nil
+}
+
+// writeTemplateFile loads a kit template, executes it, and writes the result atomically.
+func writeTemplateFile(kitLoader *kits.KitLoader, kitName, templatePath, outputPath string, data interface{}) error {
+ content, err := kitLoader.LoadKitTemplate(kitName, templatePath)
+ if err != nil {
+ return err
+ }
+
+ tmpl, err := template.New(filepath.Base(templatePath)).Parse(string(content))
+ if err != nil {
+ return err
+ }
+
+ var buf bytes.Buffer
+ if err := tmpl.Execute(&buf, data); err != nil {
+ return err
+ }
+
+ return os.WriteFile(outputPath, buf.Bytes(), 0644)
+}
+
+// appendTemplateFile loads a kit template and appends it to the output file.
+func appendTemplateFile(kitLoader *kits.KitLoader, kitName, templatePath, outputPath string, data interface{}) error {
+ content, err := kitLoader.LoadKitTemplate(kitName, templatePath)
+ if err != nil {
+ return err
+ }
+
+ tmpl, err := template.New(filepath.Base(templatePath)).Parse(string(content))
+ if err != nil {
+ return err
+ }
+
+ file, err := os.OpenFile(outputPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
+ if err != nil {
+ return err
+ }
+ defer file.Close()
+
+ stat, err := file.Stat()
+ if err != nil {
+ return err
+ }
+ if stat.Size() > 0 {
+ if _, err := file.WriteString("\n"); err != nil {
+ return err
+ }
+ }
+
+ return tmpl.Execute(file, data)
+}
+
+// injectJobWorker injects River client setup into main.go.
+func injectJobWorker(mainGoPath string, moduleName string) error {
+ content, err := os.ReadFile(mainGoPath)
+ if err != nil {
+ return fmt.Errorf("failed to read main.go: %w", err)
+ }
+
+ mainStr := string(content)
+
+ // Check if already injected
+ if strings.Contains(mainStr, "river.NewClient") {
+ return nil // Already injected
+ }
+
+ // Find injection point: after appCtx creation (needed by River client)
+ lines := strings.Split(mainStr, "\n")
+ var result []string
+ injected := false
+
+ for _, line := range lines {
+ result = append(result, line)
+
+ // Inject after appCtx creation — River needs the context for Start/Stop
+ if !injected && strings.Contains(line, "appCtx, appCancel := context.WithCancel") {
+ // Find the next line (defer appCancel()) and include it
+ continue
+ }
+ if !injected && strings.Contains(line, "defer appCancel()") {
+ riverSetup := []string{
+ "",
+ "\t// Background job processing (River)",
+ "\triverDB, err := sql.Open(\"sqlite\", dbPath+\"?_pragma=journal_mode(WAL)\")",
+ "\tif err != nil {",
+ "\t\tslog.Error(\"Failed to open River database\", \"error\", err)",
+ "\t\tos.Exit(1)",
+ "\t}",
+ "\triverDB.SetMaxOpenConns(1)",
+ "\tdefer riverDB.Close()",
+ "",
+ "\tjobWorkers := jobs.SetupWorkers()",
+ "\triverClient, err := river.NewClient(riversqlite.New(riverDB), &river.Config{",
+ "\t\tQueues: map[string]river.QueueConfig{",
+ "\t\t\triver.QueueDefault: {MaxWorkers: 100},",
+ "\t\t},",
+ "\t\tWorkers: jobWorkers,",
+ "\t})",
+ "\tif err != nil {",
+ "\t\tslog.Error(\"Failed to create River client\", \"error\", err)",
+ "\t\tos.Exit(1)",
+ "\t}",
+ "\tif err := riverClient.Start(appCtx); err != nil {",
+ "\t\tslog.Error(\"Failed to start job workers\", \"error\", err)",
+ "\t\tos.Exit(1)",
+ "\t}",
+ "\tdefer func() {",
+ "\t\tstopCtx, stopCancel := context.WithTimeout(context.Background(), 30*time.Second)",
+ "\t\tdefer stopCancel()",
+ "\t\t_ = riverClient.Stop(stopCtx)",
+ "\t}()",
+ "\tjobs.SetClient(riverClient)",
+ "\tslog.Info(\"Background job worker started\")",
+ }
+ result = append(result, riverSetup...)
+ injected = true
+ }
+ }
+
+ if !injected {
+ return fmt.Errorf("could not find injection point in main.go (expected 'appCtx, appCancel := context.WithCancel')")
+ }
+
+ // Inject imports using existing helper
+ resultStr := strings.Join(result, "\n")
+
+ imports := []string{
+ "\t\"database/sql\"",
+ "\t\"time\"",
+ fmt.Sprintf("\t\"%s/app/jobs\"", moduleName),
+ "\t\"github.com/riverqueue/river\"",
+ "\t\"github.com/riverqueue/river/riverdriver/riversqlite\"",
+ }
+ for _, imp := range imports {
+ resultStr, err = injectImport(resultStr, imp)
+ if err != nil {
+ return fmt.Errorf("failed to inject import %s: %w", imp, err)
+ }
+ }
+
+ return os.WriteFile(mainGoPath, []byte(resultStr), 0644)
+}
+
+// injectWorkerRegistration adds a river.AddWorker call to app/jobs/worker.go.
+func injectWorkerRegistration(workerPath string, jobNameCamel string) error {
+ content, err := os.ReadFile(workerPath)
+ if err != nil {
+ return fmt.Errorf("failed to read worker.go: %w", err)
+ }
+
+ workerStr := string(content)
+
+ // Check if already registered
+ registrationLine := fmt.Sprintf("river.AddWorker(workers, &%sWorker{})", jobNameCamel)
+ if strings.Contains(workerStr, registrationLine) {
+ return nil // Already registered
+ }
+
+ // Find the marker comment and insert after it
+ marker := "// Register job workers below (added by `lvt gen job`)"
+ idx := strings.Index(workerStr, marker)
+ if idx == -1 {
+ return fmt.Errorf("could not find registration marker in worker.go")
+ }
+
+ insertPos := idx + len(marker)
+ registration := "\n\t" + registrationLine
+
+ workerStr = workerStr[:insertPos] + registration + workerStr[insertPos:]
+
+ return os.WriteFile(workerPath, []byte(workerStr), 0644)
+}
diff --git a/internal/generator/jobs_test.go b/internal/generator/jobs_test.go
new file mode 100644
index 0000000..5a1badf
--- /dev/null
+++ b/internal/generator/jobs_test.go
@@ -0,0 +1,321 @@
+package generator
+
+import (
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+)
+
+func TestGenerateQueue(t *testing.T) {
+ tmpDir := t.TempDir()
+
+ // Set up minimal project structure
+ setupTestProject(t, tmpDir)
+
+ err := GenerateQueue(tmpDir, "testmodule")
+ if err != nil {
+ t.Fatalf("GenerateQueue failed: %v", err)
+ }
+
+ // Verify worker.go was created
+ workerPath := filepath.Join(tmpDir, "app", "jobs", "worker.go")
+ if _, err := os.Stat(workerPath); os.IsNotExist(err) {
+ t.Error("app/jobs/worker.go was not created")
+ }
+
+ workerContent, err := os.ReadFile(workerPath)
+ if err != nil {
+ t.Fatalf("Failed to read worker.go: %v", err)
+ }
+ if !strings.Contains(string(workerContent), "river.NewWorkers()") {
+ t.Error("worker.go missing river.NewWorkers() call")
+ }
+ if !strings.Contains(string(workerContent), "Register job workers below") {
+ t.Error("worker.go missing registration marker comment")
+ }
+
+ // Verify migration was created
+ migrationsDir := filepath.Join(tmpDir, "database", "migrations")
+ entries, err := os.ReadDir(migrationsDir)
+ if err != nil {
+ t.Fatalf("Failed to read migrations dir: %v", err)
+ }
+
+ var migrationFound bool
+ for _, entry := range entries {
+ if strings.Contains(entry.Name(), "setup_river_queue") {
+ migrationFound = true
+ // Read and verify content
+ content, err := os.ReadFile(filepath.Join(migrationsDir, entry.Name()))
+ if err != nil {
+ t.Fatalf("Failed to read migration: %v", err)
+ }
+ if !strings.Contains(string(content), "river_job") {
+ t.Error("Migration missing river_job table")
+ }
+ if !strings.Contains(string(content), "river_leader") {
+ t.Error("Migration missing river_leader table")
+ }
+ if !strings.Contains(string(content), "+goose Up") {
+ t.Error("Migration missing goose Up marker")
+ }
+ }
+ }
+ if !migrationFound {
+ t.Error("River migration file was not created")
+ }
+
+ // Verify schema.sql was appended
+ schemaPath := filepath.Join(tmpDir, "database", "schema.sql")
+ schemaContent, err := os.ReadFile(schemaPath)
+ if err != nil {
+ t.Fatalf("Failed to read schema.sql: %v", err)
+ }
+ if !strings.Contains(string(schemaContent), "river_job") {
+ t.Error("schema.sql missing river_job table")
+ }
+}
+
+func TestGenerateQueueIdempotent(t *testing.T) {
+ tmpDir := t.TempDir()
+ setupTestProject(t, tmpDir)
+
+ // First call should succeed
+ if err := GenerateQueue(tmpDir, "testmodule"); err != nil {
+ t.Fatalf("First GenerateQueue failed: %v", err)
+ }
+
+ // Second call should fail (already set up)
+ err := GenerateQueue(tmpDir, "testmodule")
+ if err == nil {
+ t.Error("Expected error on second GenerateQueue call")
+ }
+ if err != nil && !strings.Contains(err.Error(), "already set up") {
+ t.Errorf("Expected 'already set up' error, got: %v", err)
+ }
+}
+
+func TestGenerateJob(t *testing.T) {
+ tmpDir := t.TempDir()
+ setupTestProject(t, tmpDir)
+
+ // Set up queue first
+ if err := GenerateQueue(tmpDir, "testmodule"); err != nil {
+ t.Fatalf("GenerateQueue failed: %v", err)
+ }
+
+ // Generate a job
+ if err := GenerateJob(tmpDir, "testmodule", "send_email"); err != nil {
+ t.Fatalf("GenerateJob failed: %v", err)
+ }
+
+ // Verify job handler was created
+ jobPath := filepath.Join(tmpDir, "app", "jobs", "send_email.go")
+ if _, err := os.Stat(jobPath); os.IsNotExist(err) {
+ t.Error("app/jobs/send_email.go was not created")
+ }
+
+ jobContent, err := os.ReadFile(jobPath)
+ if err != nil {
+ t.Fatalf("Failed to read send_email.go: %v", err)
+ }
+
+ // Check for expected content
+ checks := []string{
+ "SendEmailArgs",
+ "SendEmailWorker",
+ `Kind() string { return "send_email" }`,
+ "river.WorkerDefaults[SendEmailArgs]",
+ "func (w *SendEmailWorker) Work(",
+ }
+ for _, check := range checks {
+ if !strings.Contains(string(jobContent), check) {
+ t.Errorf("send_email.go missing expected content: %s", check)
+ }
+ }
+
+ // Verify worker registration was injected
+ workerPath := filepath.Join(tmpDir, "app", "jobs", "worker.go")
+ workerContent, err := os.ReadFile(workerPath)
+ if err != nil {
+ t.Fatalf("Failed to read worker.go: %v", err)
+ }
+ if !strings.Contains(string(workerContent), "river.AddWorker(workers, &SendEmailWorker{})") {
+ t.Error("worker.go missing SendEmailWorker registration")
+ }
+}
+
+func TestGenerateJobWithoutQueue(t *testing.T) {
+ tmpDir := t.TempDir()
+ setupTestProject(t, tmpDir)
+
+ // Try to generate job without queue setup
+ err := GenerateJob(tmpDir, "testmodule", "send_email")
+ if err == nil {
+ t.Error("Expected error when generating job without queue")
+ }
+ if err != nil && !strings.Contains(err.Error(), "Run 'lvt gen queue' first") {
+ t.Errorf("Expected 'Run lvt gen queue first' error, got: %v", err)
+ }
+}
+
+func TestGenerateJobDuplicate(t *testing.T) {
+ tmpDir := t.TempDir()
+ setupTestProject(t, tmpDir)
+
+ if err := GenerateQueue(tmpDir, "testmodule"); err != nil {
+ t.Fatalf("GenerateQueue failed: %v", err)
+ }
+
+ // First job should succeed
+ if err := GenerateJob(tmpDir, "testmodule", "send_email"); err != nil {
+ t.Fatalf("First GenerateJob failed: %v", err)
+ }
+
+ // Duplicate should fail
+ err := GenerateJob(tmpDir, "testmodule", "send_email")
+ if err == nil {
+ t.Error("Expected error on duplicate job")
+ }
+ if err != nil && !strings.Contains(err.Error(), "already exists") {
+ t.Errorf("Expected 'already exists' error, got: %v", err)
+ }
+}
+
+func TestGenerateMultipleJobs(t *testing.T) {
+ tmpDir := t.TempDir()
+ setupTestProject(t, tmpDir)
+
+ if err := GenerateQueue(tmpDir, "testmodule"); err != nil {
+ t.Fatalf("GenerateQueue failed: %v", err)
+ }
+
+ jobs := []string{"send_email", "process_payment", "generate_report"}
+ for _, job := range jobs {
+ if err := GenerateJob(tmpDir, "testmodule", job); err != nil {
+ t.Fatalf("GenerateJob(%s) failed: %v", job, err)
+ }
+ }
+
+ // Verify all jobs registered in worker.go
+ workerContent, err := os.ReadFile(filepath.Join(tmpDir, "app", "jobs", "worker.go"))
+ if err != nil {
+ t.Fatalf("Failed to read worker.go: %v", err)
+ }
+
+ expectedRegistrations := []string{
+ "river.AddWorker(workers, &SendEmailWorker{})",
+ "river.AddWorker(workers, &ProcessPaymentWorker{})",
+ "river.AddWorker(workers, &GenerateReportWorker{})",
+ }
+ for _, reg := range expectedRegistrations {
+ if !strings.Contains(string(workerContent), reg) {
+ t.Errorf("worker.go missing registration: %s", reg)
+ }
+ }
+}
+
+func TestInjectJobWorker(t *testing.T) {
+ tmpDir := t.TempDir()
+
+ // Create a minimal main.go that matches the generated template structure
+ mainGoContent := `package main
+
+import (
+ "context"
+ "log/slog"
+ "os"
+
+ "testmodule/database"
+)
+
+func main() {
+ dbPath := "app.db"
+ _, err := database.InitDB(dbPath)
+ if err != nil {
+ slog.Error("Failed to initialize database", "error", err)
+ os.Exit(1)
+ }
+ defer database.CloseDB()
+
+ appCtx, appCancel := context.WithCancel(context.Background())
+ defer appCancel()
+
+ // Routes go here
+ slog.Info("Server starting")
+}
+`
+ mainGoPath := filepath.Join(tmpDir, "main.go")
+ if err := os.WriteFile(mainGoPath, []byte(mainGoContent), 0644); err != nil {
+ t.Fatalf("Failed to create main.go: %v", err)
+ }
+
+ if err := injectJobWorker(mainGoPath, "testmodule"); err != nil {
+ t.Fatalf("injectJobWorker failed: %v", err)
+ }
+
+ result, err := os.ReadFile(mainGoPath)
+ if err != nil {
+ t.Fatalf("Failed to read modified main.go: %v", err)
+ }
+ resultStr := string(result)
+
+ // Verify River setup was injected
+ checks := []string{
+ "river.NewClient",
+ "riversqlite.New",
+ "jobs.SetupWorkers()",
+ "riverClient.Start(appCtx)",
+ "riverClient.Stop(stopCtx)",
+ "jobs.SetClient(riverClient)",
+ "\"database/sql\"",
+ "\"github.com/riverqueue/river\"",
+ "\"testmodule/app/jobs\"",
+ }
+ for _, check := range checks {
+ if !strings.Contains(resultStr, check) {
+ t.Errorf("main.go missing expected content: %s", check)
+ }
+ }
+
+ // Verify idempotency — second call should be no-op
+ if err := injectJobWorker(mainGoPath, "testmodule"); err != nil {
+ t.Fatalf("Second injectJobWorker call failed: %v", err)
+ }
+
+ // Verify no duplicate injection
+ count := strings.Count(resultStr, "river.NewClient")
+ if count != 1 {
+ t.Errorf("Expected 1 river.NewClient occurrence, got %d", count)
+ }
+}
+
+// setupTestProject creates a minimal project structure for testing.
+func setupTestProject(t *testing.T, dir string) {
+ t.Helper()
+
+ // Create database directory
+ dbDir := filepath.Join(dir, "database")
+ if err := os.MkdirAll(dbDir, 0755); err != nil {
+ t.Fatalf("Failed to create database directory: %v", err)
+ }
+
+ // Create migrations directory
+ migrationsDir := filepath.Join(dbDir, "migrations")
+ if err := os.MkdirAll(migrationsDir, 0755); err != nil {
+ t.Fatalf("Failed to create migrations directory: %v", err)
+ }
+
+ // Create schema.sql
+ schemaPath := filepath.Join(dbDir, "schema.sql")
+ if err := os.WriteFile(schemaPath, []byte("-- existing schema\n"), 0644); err != nil {
+ t.Fatalf("Failed to create schema.sql: %v", err)
+ }
+
+ // Create .lvtrc (project config)
+ lvtConfig := "kit=multi\nmodule=testmodule\n"
+ if err := os.WriteFile(filepath.Join(dir, ".lvtrc"), []byte(lvtConfig), 0644); err != nil {
+ t.Fatalf("Failed to create .lvtrc: %v", err)
+ }
+}
diff --git a/internal/generator/types.go b/internal/generator/types.go
index a267eda..5997041 100644
--- a/internal/generator/types.go
+++ b/internal/generator/types.go
@@ -108,6 +108,13 @@ func singularizeForTemplate(s string) string {
return singularize(strings.ToLower(s))
}
+// ToCamelCase converts snake_case to CamelCase following Go conventions.
+// Common initialisms like ID, URL, HTTP are kept in all caps.
+// Exported for use by CLI commands that need consistent naming.
+func ToCamelCase(s string) string {
+ return toCamelCase(s)
+}
+
// toCamelCase converts snake_case to CamelCase following Go conventions
// Common initialisms like ID, URL, HTTP are kept in all caps
func toCamelCase(s string) string {
diff --git a/internal/kits/system/multi/templates/jobs/handler.go.tmpl b/internal/kits/system/multi/templates/jobs/handler.go.tmpl
new file mode 100644
index 0000000..972348c
--- /dev/null
+++ b/internal/kits/system/multi/templates/jobs/handler.go.tmpl
@@ -0,0 +1,37 @@
+package jobs
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/riverqueue/river"
+)
+
+// <<.JobNameCamel>>Args defines the payload for <<.JobName>> jobs.
+type <<.JobNameCamel>>Args struct {
+ // TODO: Define your job payload fields here.
+ // Example:
+ // To string `json:"to"`
+ // Subject string `json:"subject"`
+}
+
+// Kind returns the unique job type identifier used by River.
+func (<<.JobNameCamel>>Args) Kind() string { return "<<.JobName>>" }
+
+// <<.JobNameCamel>>Worker processes <<.JobName>> jobs.
+type <<.JobNameCamel>>Worker struct {
+ river.WorkerDefaults[<<.JobNameCamel>>Args]
+}
+
+// Work executes the <<.JobName>> job.
+func (w *<<.JobNameCamel>>Worker) Work(ctx context.Context, job *river.Job[<<.JobNameCamel>>Args]) error {
+ slog.Info("Processing <<.JobName>> job", "job_id", job.ID)
+
+ _ = job.Args // TODO: use job.Args fields
+
+ // TODO: Implement your job logic here.
+ // Return nil on success, or an error to trigger a retry.
+ // After max attempts are exhausted, the job moves to "discarded" state.
+
+ return nil
+}
diff --git a/internal/kits/system/multi/templates/jobs/migration.sql.tmpl b/internal/kits/system/multi/templates/jobs/migration.sql.tmpl
new file mode 100644
index 0000000..d2adad5
--- /dev/null
+++ b/internal/kits/system/multi/templates/jobs/migration.sql.tmpl
@@ -0,0 +1,123 @@
+-- +goose Up
+-- +goose StatementBegin
+
+-- River job queue tables (SQLite)
+-- Generated by: river migrate-get --line main --up --all --database-url "sqlite://"
+-- River version: v0.26.x
+
+CREATE TABLE IF NOT EXISTS river_migration (
+ line TEXT NOT NULL,
+ version INTEGER NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ CONSTRAINT line_length CHECK (length(line) > 0 AND length(line) < 128),
+ CONSTRAINT version_gte_1 CHECK (version >= 1),
+ PRIMARY KEY (line, version)
+);
+
+CREATE TABLE IF NOT EXISTS river_job (
+ id INTEGER PRIMARY KEY,
+ args BLOB NOT NULL DEFAULT '{}',
+ attempt INTEGER NOT NULL DEFAULT 0,
+ attempted_at TIMESTAMP,
+ attempted_by BLOB,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ errors BLOB,
+ finalized_at TIMESTAMP,
+ kind TEXT NOT NULL,
+ max_attempts INTEGER NOT NULL,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ priority INTEGER NOT NULL DEFAULT 1,
+ queue TEXT NOT NULL DEFAULT 'default',
+ state TEXT NOT NULL DEFAULT 'available',
+ scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ tags BLOB NOT NULL DEFAULT (json('[]')),
+ unique_key BLOB,
+ unique_states INTEGER,
+ CONSTRAINT finalized_or_finalized_at_null CHECK (
+ (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR
+ (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded'))
+ ),
+ CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4),
+ CONSTRAINT queue_length CHECK (length(queue) > 0 AND length(queue) < 128),
+ CONSTRAINT kind_length CHECK (length(kind) > 0 AND length(kind) < 128),
+ CONSTRAINT state_valid CHECK (state IN ('available', 'cancelled', 'completed', 'discarded', 'pending', 'retryable', 'running', 'scheduled'))
+);
+
+CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind);
+CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at) WHERE finalized_at IS NOT NULL;
+CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
+CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key)
+ WHERE unique_key IS NOT NULL
+ AND unique_states IS NOT NULL
+ AND CASE state
+ WHEN 'available' THEN unique_states & (1 << 0)
+ WHEN 'cancelled' THEN unique_states & (1 << 1)
+ WHEN 'completed' THEN unique_states & (1 << 2)
+ WHEN 'discarded' THEN unique_states & (1 << 3)
+ WHEN 'pending' THEN unique_states & (1 << 4)
+ WHEN 'retryable' THEN unique_states & (1 << 5)
+ WHEN 'running' THEN unique_states & (1 << 6)
+ WHEN 'scheduled' THEN unique_states & (1 << 7)
+ ELSE 0
+ END >= 1;
+
+CREATE TABLE IF NOT EXISTS river_leader (
+ elected_at TIMESTAMP NOT NULL,
+ expires_at TIMESTAMP NOT NULL,
+ leader_id TEXT NOT NULL,
+ name TEXT PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'),
+ CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128),
+ CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128)
+);
+
+CREATE TABLE IF NOT EXISTS river_queue (
+ name TEXT PRIMARY KEY NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ paused_at TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS river_client (
+ id TEXT PRIMARY KEY NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ paused_at TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL,
+ CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128)
+);
+
+CREATE TABLE IF NOT EXISTS river_client_queue (
+ river_client_id TEXT NOT NULL REFERENCES river_client (id) ON DELETE CASCADE,
+ name TEXT NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ max_workers INTEGER NOT NULL DEFAULT 0,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ num_jobs_completed INTEGER NOT NULL DEFAULT 0,
+ num_jobs_running INTEGER NOT NULL DEFAULT 0,
+ updated_at TIMESTAMP NOT NULL,
+ PRIMARY KEY (river_client_id, name),
+ CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128),
+ CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0),
+ CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0)
+);
+
+-- Record that all River migrations have been applied
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 2);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 3);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 4);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 5);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 6);
+
+-- +goose StatementEnd
+
+-- +goose Down
+-- +goose StatementBegin
+DROP TABLE IF EXISTS river_client_queue;
+DROP TABLE IF EXISTS river_client;
+DROP TABLE IF EXISTS river_queue;
+DROP TABLE IF EXISTS river_leader;
+DROP TABLE IF EXISTS river_job;
+DROP TABLE IF EXISTS river_migration;
+-- +goose StatementEnd
diff --git a/internal/kits/system/multi/templates/jobs/schema.sql.tmpl b/internal/kits/system/multi/templates/jobs/schema.sql.tmpl
new file mode 100644
index 0000000..55e6a3e
--- /dev/null
+++ b/internal/kits/system/multi/templates/jobs/schema.sql.tmpl
@@ -0,0 +1,105 @@
+-- River job queue tables (SQLite)
+
+CREATE TABLE IF NOT EXISTS river_migration (
+ line TEXT NOT NULL,
+ version INTEGER NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ CONSTRAINT line_length CHECK (length(line) > 0 AND length(line) < 128),
+ CONSTRAINT version_gte_1 CHECK (version >= 1),
+ PRIMARY KEY (line, version)
+);
+
+CREATE TABLE IF NOT EXISTS river_job (
+ id INTEGER PRIMARY KEY,
+ args BLOB NOT NULL DEFAULT '{}',
+ attempt INTEGER NOT NULL DEFAULT 0,
+ attempted_at TIMESTAMP,
+ attempted_by BLOB,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ errors BLOB,
+ finalized_at TIMESTAMP,
+ kind TEXT NOT NULL,
+ max_attempts INTEGER NOT NULL,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ priority INTEGER NOT NULL DEFAULT 1,
+ queue TEXT NOT NULL DEFAULT 'default',
+ state TEXT NOT NULL DEFAULT 'available',
+ scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ tags BLOB NOT NULL DEFAULT (json('[]')),
+ unique_key BLOB,
+ unique_states INTEGER,
+ CONSTRAINT finalized_or_finalized_at_null CHECK (
+ (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR
+ (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded'))
+ ),
+ CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4),
+ CONSTRAINT queue_length CHECK (length(queue) > 0 AND length(queue) < 128),
+ CONSTRAINT kind_length CHECK (length(kind) > 0 AND length(kind) < 128),
+ CONSTRAINT state_valid CHECK (state IN ('available', 'cancelled', 'completed', 'discarded', 'pending', 'retryable', 'running', 'scheduled'))
+);
+
+CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind);
+CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at) WHERE finalized_at IS NOT NULL;
+CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
+CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key)
+ WHERE unique_key IS NOT NULL
+ AND unique_states IS NOT NULL
+ AND CASE state
+ WHEN 'available' THEN unique_states & (1 << 0)
+ WHEN 'cancelled' THEN unique_states & (1 << 1)
+ WHEN 'completed' THEN unique_states & (1 << 2)
+ WHEN 'discarded' THEN unique_states & (1 << 3)
+ WHEN 'pending' THEN unique_states & (1 << 4)
+ WHEN 'retryable' THEN unique_states & (1 << 5)
+ WHEN 'running' THEN unique_states & (1 << 6)
+ WHEN 'scheduled' THEN unique_states & (1 << 7)
+ ELSE 0
+ END >= 1;
+
+CREATE TABLE IF NOT EXISTS river_leader (
+ elected_at TIMESTAMP NOT NULL,
+ expires_at TIMESTAMP NOT NULL,
+ leader_id TEXT NOT NULL,
+ name TEXT PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'),
+ CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128),
+ CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128)
+);
+
+CREATE TABLE IF NOT EXISTS river_queue (
+ name TEXT PRIMARY KEY NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ paused_at TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS river_client (
+ id TEXT PRIMARY KEY NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ paused_at TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL,
+ CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128)
+);
+
+CREATE TABLE IF NOT EXISTS river_client_queue (
+ river_client_id TEXT NOT NULL REFERENCES river_client (id) ON DELETE CASCADE,
+ name TEXT NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ max_workers INTEGER NOT NULL DEFAULT 0,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ num_jobs_completed INTEGER NOT NULL DEFAULT 0,
+ num_jobs_running INTEGER NOT NULL DEFAULT 0,
+ updated_at TIMESTAMP NOT NULL,
+ PRIMARY KEY (river_client_id, name),
+ CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128),
+ CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0),
+ CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0)
+);
+
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 2);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 3);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 4);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 5);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 6);
diff --git a/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl b/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl
new file mode 100644
index 0000000..b864713
--- /dev/null
+++ b/internal/kits/system/multi/templates/jobs/worker_init.go.tmpl
@@ -0,0 +1,28 @@
+package jobs
+
+import (
+ "github.com/riverqueue/river"
+)
+
+var client *river.Client[any]
+
+// SetupWorkers registers all job workers with River.
+// New workers are added here by `lvt gen job`.
+func SetupWorkers() *river.Workers {
+ workers := river.NewWorkers()
+
+ // Register job workers below (added by `lvt gen job`)
+
+ return workers
+}
+
+// SetClient stores the River client for use by handlers via Client().
+func SetClient(c *river.Client[any]) {
+ client = c
+}
+
+// Client returns the River client for enqueueing jobs.
+// Call from HTTP handlers: jobs.Client().Insert(ctx, args, nil)
+func Client() *river.Client[any] {
+ return client
+}
diff --git a/internal/kits/system/single/templates/jobs/handler.go.tmpl b/internal/kits/system/single/templates/jobs/handler.go.tmpl
new file mode 100644
index 0000000..972348c
--- /dev/null
+++ b/internal/kits/system/single/templates/jobs/handler.go.tmpl
@@ -0,0 +1,37 @@
+package jobs
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/riverqueue/river"
+)
+
+// <<.JobNameCamel>>Args defines the payload for <<.JobName>> jobs.
+type <<.JobNameCamel>>Args struct {
+ // TODO: Define your job payload fields here.
+ // Example:
+ // To string `json:"to"`
+ // Subject string `json:"subject"`
+}
+
+// Kind returns the unique job type identifier used by River.
+func (<<.JobNameCamel>>Args) Kind() string { return "<<.JobName>>" }
+
+// <<.JobNameCamel>>Worker processes <<.JobName>> jobs.
+type <<.JobNameCamel>>Worker struct {
+ river.WorkerDefaults[<<.JobNameCamel>>Args]
+}
+
+// Work executes the <<.JobName>> job.
+func (w *<<.JobNameCamel>>Worker) Work(ctx context.Context, job *river.Job[<<.JobNameCamel>>Args]) error {
+ slog.Info("Processing <<.JobName>> job", "job_id", job.ID)
+
+ _ = job.Args // TODO: use job.Args fields
+
+ // TODO: Implement your job logic here.
+ // Return nil on success, or an error to trigger a retry.
+ // After max attempts are exhausted, the job moves to "discarded" state.
+
+ return nil
+}
diff --git a/internal/kits/system/single/templates/jobs/migration.sql.tmpl b/internal/kits/system/single/templates/jobs/migration.sql.tmpl
new file mode 100644
index 0000000..d2adad5
--- /dev/null
+++ b/internal/kits/system/single/templates/jobs/migration.sql.tmpl
@@ -0,0 +1,123 @@
+-- +goose Up
+-- +goose StatementBegin
+
+-- River job queue tables (SQLite)
+-- Generated by: river migrate-get --line main --up --all --database-url "sqlite://"
+-- River version: v0.26.x
+
+CREATE TABLE IF NOT EXISTS river_migration (
+ line TEXT NOT NULL,
+ version INTEGER NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ CONSTRAINT line_length CHECK (length(line) > 0 AND length(line) < 128),
+ CONSTRAINT version_gte_1 CHECK (version >= 1),
+ PRIMARY KEY (line, version)
+);
+
+CREATE TABLE IF NOT EXISTS river_job (
+ id INTEGER PRIMARY KEY,
+ args BLOB NOT NULL DEFAULT '{}',
+ attempt INTEGER NOT NULL DEFAULT 0,
+ attempted_at TIMESTAMP,
+ attempted_by BLOB,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ errors BLOB,
+ finalized_at TIMESTAMP,
+ kind TEXT NOT NULL,
+ max_attempts INTEGER NOT NULL,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ priority INTEGER NOT NULL DEFAULT 1,
+ queue TEXT NOT NULL DEFAULT 'default',
+ state TEXT NOT NULL DEFAULT 'available',
+ scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ tags BLOB NOT NULL DEFAULT (json('[]')),
+ unique_key BLOB,
+ unique_states INTEGER,
+ CONSTRAINT finalized_or_finalized_at_null CHECK (
+ (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR
+ (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded'))
+ ),
+ CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4),
+ CONSTRAINT queue_length CHECK (length(queue) > 0 AND length(queue) < 128),
+ CONSTRAINT kind_length CHECK (length(kind) > 0 AND length(kind) < 128),
+ CONSTRAINT state_valid CHECK (state IN ('available', 'cancelled', 'completed', 'discarded', 'pending', 'retryable', 'running', 'scheduled'))
+);
+
+CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind);
+CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at) WHERE finalized_at IS NOT NULL;
+CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
+CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key)
+ WHERE unique_key IS NOT NULL
+ AND unique_states IS NOT NULL
+ AND CASE state
+ WHEN 'available' THEN unique_states & (1 << 0)
+ WHEN 'cancelled' THEN unique_states & (1 << 1)
+ WHEN 'completed' THEN unique_states & (1 << 2)
+ WHEN 'discarded' THEN unique_states & (1 << 3)
+ WHEN 'pending' THEN unique_states & (1 << 4)
+ WHEN 'retryable' THEN unique_states & (1 << 5)
+ WHEN 'running' THEN unique_states & (1 << 6)
+ WHEN 'scheduled' THEN unique_states & (1 << 7)
+ ELSE 0
+ END >= 1;
+
+CREATE TABLE IF NOT EXISTS river_leader (
+ elected_at TIMESTAMP NOT NULL,
+ expires_at TIMESTAMP NOT NULL,
+ leader_id TEXT NOT NULL,
+ name TEXT PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'),
+ CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128),
+ CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128)
+);
+
+CREATE TABLE IF NOT EXISTS river_queue (
+ name TEXT PRIMARY KEY NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ paused_at TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS river_client (
+ id TEXT PRIMARY KEY NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ paused_at TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL,
+ CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128)
+);
+
+CREATE TABLE IF NOT EXISTS river_client_queue (
+ river_client_id TEXT NOT NULL REFERENCES river_client (id) ON DELETE CASCADE,
+ name TEXT NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ max_workers INTEGER NOT NULL DEFAULT 0,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ num_jobs_completed INTEGER NOT NULL DEFAULT 0,
+ num_jobs_running INTEGER NOT NULL DEFAULT 0,
+ updated_at TIMESTAMP NOT NULL,
+ PRIMARY KEY (river_client_id, name),
+ CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128),
+ CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0),
+ CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0)
+);
+
+-- Record that all River migrations have been applied
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 2);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 3);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 4);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 5);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 6);
+
+-- +goose StatementEnd
+
+-- +goose Down
+-- +goose StatementBegin
+DROP TABLE IF EXISTS river_client_queue;
+DROP TABLE IF EXISTS river_client;
+DROP TABLE IF EXISTS river_queue;
+DROP TABLE IF EXISTS river_leader;
+DROP TABLE IF EXISTS river_job;
+DROP TABLE IF EXISTS river_migration;
+-- +goose StatementEnd
diff --git a/internal/kits/system/single/templates/jobs/schema.sql.tmpl b/internal/kits/system/single/templates/jobs/schema.sql.tmpl
new file mode 100644
index 0000000..55e6a3e
--- /dev/null
+++ b/internal/kits/system/single/templates/jobs/schema.sql.tmpl
@@ -0,0 +1,105 @@
+-- River job queue tables (SQLite)
+
+CREATE TABLE IF NOT EXISTS river_migration (
+ line TEXT NOT NULL,
+ version INTEGER NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ CONSTRAINT line_length CHECK (length(line) > 0 AND length(line) < 128),
+ CONSTRAINT version_gte_1 CHECK (version >= 1),
+ PRIMARY KEY (line, version)
+);
+
+CREATE TABLE IF NOT EXISTS river_job (
+ id INTEGER PRIMARY KEY,
+ args BLOB NOT NULL DEFAULT '{}',
+ attempt INTEGER NOT NULL DEFAULT 0,
+ attempted_at TIMESTAMP,
+ attempted_by BLOB,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ errors BLOB,
+ finalized_at TIMESTAMP,
+ kind TEXT NOT NULL,
+ max_attempts INTEGER NOT NULL,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ priority INTEGER NOT NULL DEFAULT 1,
+ queue TEXT NOT NULL DEFAULT 'default',
+ state TEXT NOT NULL DEFAULT 'available',
+ scheduled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ tags BLOB NOT NULL DEFAULT (json('[]')),
+ unique_key BLOB,
+ unique_states INTEGER,
+ CONSTRAINT finalized_or_finalized_at_null CHECK (
+ (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR
+ (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded'))
+ ),
+ CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4),
+ CONSTRAINT queue_length CHECK (length(queue) > 0 AND length(queue) < 128),
+ CONSTRAINT kind_length CHECK (length(kind) > 0 AND length(kind) < 128),
+ CONSTRAINT state_valid CHECK (state IN ('available', 'cancelled', 'completed', 'discarded', 'pending', 'retryable', 'running', 'scheduled'))
+);
+
+CREATE INDEX IF NOT EXISTS river_job_kind ON river_job (kind);
+CREATE INDEX IF NOT EXISTS river_job_state_and_finalized_at_index ON river_job (state, finalized_at) WHERE finalized_at IS NOT NULL;
+CREATE INDEX IF NOT EXISTS river_job_prioritized_fetching_index ON river_job (state, queue, priority, scheduled_at, id);
+CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key)
+ WHERE unique_key IS NOT NULL
+ AND unique_states IS NOT NULL
+ AND CASE state
+ WHEN 'available' THEN unique_states & (1 << 0)
+ WHEN 'cancelled' THEN unique_states & (1 << 1)
+ WHEN 'completed' THEN unique_states & (1 << 2)
+ WHEN 'discarded' THEN unique_states & (1 << 3)
+ WHEN 'pending' THEN unique_states & (1 << 4)
+ WHEN 'retryable' THEN unique_states & (1 << 5)
+ WHEN 'running' THEN unique_states & (1 << 6)
+ WHEN 'scheduled' THEN unique_states & (1 << 7)
+ ELSE 0
+ END >= 1;
+
+CREATE TABLE IF NOT EXISTS river_leader (
+ elected_at TIMESTAMP NOT NULL,
+ expires_at TIMESTAMP NOT NULL,
+ leader_id TEXT NOT NULL,
+ name TEXT PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'),
+ CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128),
+ CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128)
+);
+
+CREATE TABLE IF NOT EXISTS river_queue (
+ name TEXT PRIMARY KEY NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ paused_at TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS river_client (
+ id TEXT PRIMARY KEY NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ paused_at TIMESTAMP,
+ updated_at TIMESTAMP NOT NULL,
+ CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128)
+);
+
+CREATE TABLE IF NOT EXISTS river_client_queue (
+ river_client_id TEXT NOT NULL REFERENCES river_client (id) ON DELETE CASCADE,
+ name TEXT NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ max_workers INTEGER NOT NULL DEFAULT 0,
+ metadata BLOB NOT NULL DEFAULT (json('{}')),
+ num_jobs_completed INTEGER NOT NULL DEFAULT 0,
+ num_jobs_running INTEGER NOT NULL DEFAULT 0,
+ updated_at TIMESTAMP NOT NULL,
+ PRIMARY KEY (river_client_id, name),
+ CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128),
+ CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0),
+ CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0)
+);
+
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 1);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 2);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 3);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 4);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 5);
+INSERT OR IGNORE INTO river_migration (line, version) VALUES ('main', 6);
diff --git a/internal/kits/system/single/templates/jobs/worker_init.go.tmpl b/internal/kits/system/single/templates/jobs/worker_init.go.tmpl
new file mode 100644
index 0000000..b864713
--- /dev/null
+++ b/internal/kits/system/single/templates/jobs/worker_init.go.tmpl
@@ -0,0 +1,28 @@
+package jobs
+
+import (
+ "github.com/riverqueue/river"
+)
+
+var client *river.Client[any]
+
+// SetupWorkers registers all job workers with River.
+// New workers are added here by `lvt gen job`.
+func SetupWorkers() *river.Workers {
+ workers := river.NewWorkers()
+
+ // Register job workers below (added by `lvt gen job`)
+
+ return workers
+}
+
+// SetClient stores the River client for use by handlers via Client().
+func SetClient(c *river.Client[any]) {
+ client = c
+}
+
+// Client returns the River client for enqueueing jobs.
+// Call from HTTP handlers: jobs.Client().Insert(ctx, args, nil)
+func Client() *river.Client[any] {
+ return client
+}