Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions backend/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"syscall"
"time"

"github.com/hibiken/asynq"

"backend/internal/bootstrap"
"backend/internal/infrastructure/queue"
"backend/internal/infrastructure/ws"
"backend/internal/server"
)
Expand Down Expand Up @@ -60,6 +63,23 @@ func main() {
hub := ws.NewHub()
go hub.Run(hubCtx)

var workerCancel context.CancelFunc
if app.Config.RedisURL != "" {
workerCtx, wCancel := context.WithCancel(context.Background())
workerCancel = wCancel
worker, err := queue.NewWorker(app.Config.RedisURL)
if err != nil {
fmt.Fprintf(os.Stderr, "startup failed: %v\n", err)
os.Exit(1)
}
worker.Register(queue.TypeWelcomeEmail, asynq.HandlerFunc(queue.HandleWelcomeEmail))
go func() {
if err := worker.Run(workerCtx); err != nil {
slog.Error("queue: worker error", "err", err)
}
}()
}

srv, err := server.NewServer(app, hub)
if err != nil {
fmt.Fprintf(os.Stderr, "startup failed: %v\n", err)
Expand All @@ -75,12 +95,21 @@ func main() {
}

<-done

if workerCancel != nil {
workerCancel() // stop worker before hub (in-flight jobs drain first)
}
hubCancel() // stop hub after all WS connections have been closed by server shutdown

if app.Cache != nil {
if err := app.Cache.Close(); err != nil {
slog.Error("cache close error", "error", err)
}
}
if app.Enqueuer != nil {
if err := app.Enqueuer.Close(); err != nil {
slog.Error("enqueuer close error", "error", err)
}
}
slog.Info("graceful shutdown complete")
}
2 changes: 2 additions & 0 deletions backend/docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ The `docs` agent reads this index first to locate the right file before diving i
| Firebase Auth (token verification, middleware, MeHandler) | [auth.md](auth.md) | `internal/usecase/auth_usecase.go`, `internal/transport/middleware/auth.go`, `internal/transport/handlers/auth_handler.go`, `pkg/firebase/admin.go`, `internal/bootstrap/bootstrap.go` |
| Observability (Sentry error tracking) | [observability.md](observability.md) | `internal/transport/middleware/sentry.go`, `internal/bootstrap/bootstrap.go`, `internal/transport/handlers/routes.go` |
| WebSocket (Hub, client, GET /ws, auth, wiring) | [websocket.md](websocket.md) | `internal/infrastructure/ws/`, `internal/transport/handlers/ws_handler.go`, `internal/transport/handlers/routes.go`, `internal/server/server.go`, `cmd/api/main.go` |
| Background job queue (Asynq, task definitions, worker, Asynqmon UI) | [queue.md](queue.md) | `internal/usecase/enqueuer.go`, `internal/infrastructure/queue/tasks.go`, `internal/infrastructure/queue/client.go`, `internal/infrastructure/queue/worker.go`, `internal/infrastructure/queue/handlers.go`, `internal/transport/handlers/routes.go`, `cmd/api/main.go` |
| Redis Streams event fan-out (producer, consumer, consumer groups) | [streams.md](streams.md) | `internal/infrastructure/streams/events.go`, `internal/infrastructure/streams/producer.go`, `internal/infrastructure/streams/consumer.go` |
18 changes: 11 additions & 7 deletions backend/docs/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,26 @@ All middleware lives in `internal/transport/middleware/` and follows the Gin `Ha
## Registration order

```go
// 1. Recovery + logger (debug: gin.Logger, release: middleware.Logger)
// 1. Sentry error reporting
r.Use(middleware.SentryMiddleware(sentryDSN))
// 2. Recovery + logger (debug: gin.Logger, release: middleware.Logger)
r.Use(gin.Recovery(), middleware.Logger())
// 2. Rate limiter (no-op when RPS <= 0)
// 3. Prometheus metrics collection
r.Use(middleware.PrometheusMiddleware())
// 4. Rate limiter (no-op when RPS <= 0)
r.Use(middleware.RateLimit(rps, burst))
// 3. CORS
// 5. CORS
r.Use(cors.New(...))

// Global routes (no auth):
r.GET("/", h.HelloWorldHandler)
r.GET("/health", h.HealthHandler)
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))

// Protected group — FirebaseAuth applied when verifier != nil:
// Protected group — FirebaseAuth applied when h.verifier != nil:
api := r.Group("/api/v1")
if verifier != nil {
api.Use(middleware.FirebaseAuth(verifier))
if h.verifier != nil {
api.Use(middleware.FirebaseAuth(h.verifier))
}
api.GET("/me", h.MeHandler)
```
Expand Down Expand Up @@ -71,7 +75,7 @@ val, _ := c.Get(middleware.FirebaseClaimsKey)
token, ok := val.(*usecase.FirebaseToken)
```

Pass `nil` as the `verifier` to `RegisterRoutes` to skip Firebase auth entirely (development without credentials).
Pass `nil` as the `verifier` to `NewHandler` to skip Firebase auth entirely (development without credentials). `RegisterRoutes` reads `h.verifier` from the struct — it is not a parameter of `RegisterRoutes`.

## Adding new middleware

Expand Down
206 changes: 206 additions & 0 deletions backend/docs/queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
---
topic: queue
last_verified: 2026-06-15
sources:
- internal/usecase/enqueuer.go
- internal/infrastructure/queue/tasks.go
- internal/infrastructure/queue/client.go
- internal/infrastructure/queue/worker.go
- internal/infrastructure/queue/handlers.go
- internal/transport/handlers/routes.go
- cmd/api/main.go
---

# Queue (Asynq)

## Overview

Background jobs are processed by [Asynq](https://github.com/hibiken/asynq), which uses
Redis as its broker. The same `REDIS_URL` env var used by the cache layer is reused — no
additional env vars are required. When `REDIS_URL` is not set, `app.Enqueuer` is `nil`
and no worker is started; the rest of the application continues normally.

Asynq handles discrete, retriable background jobs (welcome emails, notifications,
webhooks). For ordered event fan-out between services see `backend/docs/streams.md`.

## Task definitions

Task type constants and payload structs are co-located in
`internal/infrastructure/queue/tasks.go`:

```go
const (
TypeWelcomeEmail = "email:welcome"
)

type WelcomeEmailPayload struct {
UserID string `json:"user_id"`
Email string `json:"email"`
}
```

Both the enqueuer (caller side) and the handler (worker side) import these constants so
the string is never duplicated.

## Enqueuer interface

`internal/usecase/enqueuer.go` defines the port that use cases and handlers depend on:

```go
type Enqueuer interface {
Enqueue(ctx context.Context, taskType string, payload []byte) error
Close() error
}
```

The concrete implementation is created with `queue.NewClient`:

```go
// internal/infrastructure/queue/client.go
func NewClient(redisURL string) (usecase.Enqueuer, error)
```

`NewClient` parses `redisURL` using `go-redis/v9`'s `ParseURL`, constructs an
`asynq.Client`, and returns it wrapped as a `usecase.Enqueuer`.

### Enqueueing from a use case or handler

```go
payload, err := json.Marshal(queue.WelcomeEmailPayload{
UserID: userID,
Email: email,
})
if err != nil {
return err
}
if app.Enqueuer != nil {
if err := app.Enqueuer.Enqueue(ctx, queue.TypeWelcomeEmail, payload); err != nil {
return err
}
}
```

Guard with `!= nil` because `app.Enqueuer` is nil when `REDIS_URL` is unset.

### Bootstrap wiring

`app.Enqueuer` is populated by `bootstrap.Run` when `REDIS_URL` is set (see
`internal/bootstrap/bootstrap.go`). After the HTTP server shuts down, `main.go` calls
`app.Enqueuer.Close()` to release the connection.

## Worker setup

`internal/infrastructure/queue/worker.go`

```go
func NewWorker(redisURL string) (*Worker, error)
func (w *Worker) Register(taskType string, h asynq.Handler)
func (w *Worker) Run(ctx context.Context) error // blocking; cancel ctx to stop
```

`NewWorker` creates an `asynq.Server` with `Concurrency: 10`. Failed tasks are logged via
`slog.Error` through the server's `ErrorHandler`.

`Run` starts the server in an inner goroutine and blocks until either `ctx` is cancelled
(clean shutdown via `w.server.Shutdown()`) or the server returns an error.

### Goroutine wiring in cmd/api/main.go

The worker is started only when `REDIS_URL` is set, using a child context so it can be
cancelled independently of the hub:

```go
var workerCancel context.CancelFunc
if app.Config.RedisURL != "" {
workerCtx, wCancel := context.WithCancel(context.Background())
workerCancel = wCancel
worker, err := queue.NewWorker(app.Config.RedisURL)
// ...
worker.Register(queue.TypeWelcomeEmail, asynq.HandlerFunc(queue.HandleWelcomeEmail))
go func() {
if err := worker.Run(workerCtx); err != nil {
slog.Error("queue: worker error", "err", err)
}
}()
}
```

Shutdown order after `<-done`:

```go
if workerCancel != nil {
workerCancel() // stop worker before hub (in-flight jobs drain first)
}
hubCancel()
```

The worker is cancelled before the hub so that any task handler that calls `hub.Publish`
can still reach the hub during drain.

## Task handlers

Handler functions have the signature `func(context.Context, *asynq.Task) error` and live
in `internal/infrastructure/queue/handlers.go`:

```go
func HandleWelcomeEmail(_ context.Context, t *asynq.Task) error {
var p WelcomeEmailPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("welcome email: unmarshal payload: %w", err)
}
slog.Info("queue: welcome email task received", "user_id", p.UserID, "email", p.Email)
return nil
}
```

Returning a non-nil error causes Asynq to retry the task (up to its configured retry limit).

## Adding a new task

1. Add a `Type<Name> = "<category>:<action>"` constant and `<Name>Payload` struct to
`internal/infrastructure/queue/tasks.go`.
2. Write a `Handle<Name>(ctx context.Context, t *asynq.Task) error` function in
`internal/infrastructure/queue/handlers.go`.
3. Register the handler in `cmd/api/main.go`:
```go
worker.Register(queue.Type<Name>, asynq.HandlerFunc(queue.Handle<Name>))
```
4. Enqueue from the relevant use case or handler using `app.Enqueuer.Enqueue(ctx, queue.Type<Name>, payload)`.

## Asynqmon UI

When running in `gin.DebugMode` and `REDIS_URL` is set, the Asynqmon job-monitoring UI is
available at:

```
http://localhost:8080/admin/queues
```

Routes are registered in `RegisterRoutes` only under these conditions:

```go
if gin.Mode() == gin.DebugMode && h.queueUI != nil {
r.GET("/admin/queues", gin.WrapH(h.queueUI))
r.Any("/admin/queues/*path", gin.WrapH(h.queueUI))
}
```

The UI is not mounted in staging or production (`gin.ReleaseMode`).

## Testing

**Unit tests** call handler functions directly with `asynq.NewTask` — no Redis required:

```go
payload, _ := json.Marshal(queue.WelcomeEmailPayload{UserID: "u1", Email: "a@b.com"})
task := asynq.NewTask(queue.TypeWelcomeEmail, payload)
err := queue.HandleWelcomeEmail(context.Background(), task)
// assert err == nil
```

**Integration tests** use Testcontainers Redis (same `TestMain` pattern as
`internal/infrastructure/cache/redis/cache_test.go`). The `TestMain` function skips
integration tests gracefully when Docker is unavailable so that unit tests still run.

Enqueuer integration tests construct a real `queue.NewClient`, enqueue a task, and use
`asynq.NewInspector` (via `queue.NewInspector`) to verify the task appears in the queue.
11 changes: 6 additions & 5 deletions backend/docs/routing.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func NewHandler(healthUC usecase.HealthUseCase, verifier usecase.FirebaseTokenVe
The `Handler` struct holds use case interfaces and infrastructure dependencies — not `*sql.DB` directly. `verifier` is stored on the struct (not passed to `RegisterRoutes`) so the WebSocket handler can read it inline for query-param auth.

## Wiring (server.go)
`internal/server/server.go` contains `NewServer(app *bootstrap.App) *http.Server` — wiring only, no logic.
It receives the already-validated `*bootstrap.App` (which holds `*sql.DB`, `Cache`, `Firebase`, and `Config`), constructs the repository, use case, and handler in order, then returns a configured `*http.Server`. It does not read env vars or return an error.
`internal/server/server.go` contains `NewServer(app *bootstrap.App, hub *ws.Hub) (*http.Server, error)` — wiring only, no logic.
It receives the already-validated `*bootstrap.App` (which holds `*sql.DB`, `Cache`, `Firebase`, and `Config`) and a `*ws.Hub`, constructs the repository, use case, and handler in order, then returns a configured `*http.Server`. Errors from initialisation steps are returned to the caller.

```go
healthRepo := postgres.NewHealthRepository(app.DB)
Expand All @@ -44,13 +44,13 @@ return &http.Server{
IdleTimeout: time.Minute,
ReadTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second,
}
}, nil
```

## Route registration
All routes registered in `RegisterRoutes()` on `*Handler`, which returns `http.Handler`.
`rps` and `burst` come from `bootstrap.Config` (env vars `RATE_LIMIT_RPS` / `RATE_LIMIT_BURST`); pass `rps=0` to disable.
`verifier` is a `usecase.FirebaseTokenVerifier`; pass `nil` to skip Firebase auth (development only — see [auth](auth.md)).
`h.verifier` (set via `NewHandler`) controls Firebase auth — the verifier is read from the struct, not passed to `RegisterRoutes`; a `nil` verifier skips Firebase auth (development only — see [auth](auth.md)).

```go
func (h *Handler) RegisterRoutes(rps float64, burst int, sentryDSN string) http.Handler {
Expand Down Expand Up @@ -109,13 +109,14 @@ Allowed methods: GET, POST, PUT, DELETE, OPTIONS, PATCH.
| GET | `/` | none | `HelloWorldHandler` — returns `{"message": "Hello World"}` | `hello_handler.go` |
| GET | `/health` | none | `HealthHandler` — returns `HealthStats`; 503 when DB is down | `health_handler.go` |
| GET | `/ws` | `?token=` query param | `WsHandler` — upgrades to WebSocket; 401 when token missing/invalid | `ws_handler.go` |
| GET | `/metrics` | `LocalNetworkOnly()` | Prometheus scrape endpoint; restricted to loopback/RFC 1918 in staging/production | `metrics_handler.go` |
| GET | `/api/v1/me` | FirebaseAuth header | `MeHandler` — returns verified `FirebaseToken` claims | `auth_handler.go` |

## Graceful shutdown
Wired in `cmd/api/main.go` via `signal.NotifyContext` for SIGINT/SIGTERM.
5-second shutdown timeout. Server notifies `done chan bool` when complete.
`main()` calls `bootstrap.Run(ctx)` first; on failure it writes to stderr and calls `os.Exit(1)`.
`server.NewServer` does not return an error — all fallible startup work is in bootstrap.
`server.NewServer` returns `(*http.Server, error)`the caller in `cmd/api/main.go` checks the error and exits on failure.
Do not add shutdown logic to `internal/` — it belongs in `cmd/`.

## Adding a new route — checklist
Expand Down
Loading
Loading