Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/db/migrations/postgres/00019_digest_sent_column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +goose Up
ALTER TABLE notification_digest_queue ADD COLUMN sent BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE notification_digest_queue ADD COLUMN sent_at TIMESTAMP;

-- +goose Down
ALTER TABLE notification_digest_queue DROP COLUMN IF EXISTS sent_at;
ALTER TABLE notification_digest_queue DROP COLUMN IF EXISTS sent;
6 changes: 6 additions & 0 deletions internal/db/migrations/sqlite/00019_digest_sent_column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- +goose Up
ALTER TABLE notification_digest_queue ADD COLUMN sent INTEGER NOT NULL DEFAULT 0;
ALTER TABLE notification_digest_queue ADD COLUMN sent_at DATETIME;

-- +goose Down
-- SQLite does not support DROP COLUMN in versions prior to 3.35; column additions are left in place on rollback.
31 changes: 16 additions & 15 deletions internal/db/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,21 @@ func (s *Store) seed() error {
// allowedResetTables is a whitelist of table names that can be dropped during reset.
// SECURITY: This prevents potential SQL injection if table names were ever derived from user input.
var allowedResetTables = map[string]bool{
"user_status_pages": true,
"users": true,
"sessions": true,
"groups": true,
"monitors": true,
"monitor_checks": true,
"monitor_events": true,
"status_pages": true,
"api_keys": true,
"settings": true,
"monitor_outages": true,
"notification_channels": true,
"incidents": true,
"goose_db_version": true,
"user_status_pages": true,
"users": true,
"sessions": true,
"groups": true,
"monitors": true,
"monitor_checks": true,
"monitor_events": true,
"status_pages": true,
"api_keys": true,
"settings": true,
"monitor_outages": true,
"notification_channels": true,
"notification_digest_queue": true,
"incidents": true,
"goose_db_version": true,
}

// isValidTableName checks if a table name is in the allowed whitelist.
Expand All @@ -232,7 +233,7 @@ func (s *Store) Reset() error {
"user_status_pages",
"users", "sessions", "groups", "monitors", "monitor_checks",
"monitor_events", "status_pages", "api_keys", "settings", "monitor_outages",
"notification_channels", "incidents",
"notification_channels", "notification_digest_queue", "incidents",
"goose_db_version", // Goose migration tracking table
}

Expand Down
68 changes: 53 additions & 15 deletions internal/db/store_settings.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package db

import (
"fmt"
"log"
"strings"
"time"
)

Expand Down Expand Up @@ -179,15 +181,10 @@ func (s *Store) InsertDigestEvent(monitorID, monitorName, monitorURL, eventType,
return err
}

// GetAndClearDigestEvents retrieves all queued digest events and deletes them atomically.
func (s *Store) GetAndClearDigestEvents() ([]DigestEvent, error) {
tx, err := s.db.Begin()
if err != nil {
return nil, err
}
defer func() { _ = tx.Rollback() }()

rows, err := tx.Query("SELECT id, monitor_id, monitor_name, monitor_url, event_type, message, event_time FROM notification_digest_queue ORDER BY event_time ASC")
// GetUnsentDigestEvents retrieves all queued digest events that have not yet been sent.
// Events are NOT deleted; call MarkDigestEventsSent after a successful send.
func (s *Store) GetUnsentDigestEvents() ([]DigestEvent, error) {
rows, err := s.db.Query(s.rebind("SELECT id, monitor_id, monitor_name, monitor_url, event_type, message, event_time FROM notification_digest_queue WHERE sent = ? ORDER BY event_time ASC"), false)
if err != nil {
return nil, err
}
Expand All @@ -201,18 +198,59 @@ func (s *Store) GetAndClearDigestEvents() ([]DigestEvent, error) {
}
events = append(events, e)
}
if err := rows.Err(); err != nil {
return nil, err
}
return events, nil
}

if len(events) > 0 {
if _, err = tx.Exec("DELETE FROM notification_digest_queue"); err != nil {
return nil, err
// MarkDigestEventsSent marks the given digest event IDs as sent so they are not
// re-dispatched on the next digest run. Old sent events are cleaned up by PruneDigestEvents.
func (s *Store) MarkDigestEventsSent(ids []int64) error {
if len(ids) == 0 {
return nil
}

args := make([]interface{}, len(ids))
for i, id := range ids {
args[i] = id
}

var query string
if s.IsPostgres() {
placeholders := make([]string, len(ids))
for i := range ids {
placeholders[i] = fmt.Sprintf("$%d", i+1)
}
query = fmt.Sprintf("UPDATE notification_digest_queue SET sent = TRUE, sent_at = NOW() WHERE id IN (%s)",
strings.Join(placeholders, ", "))
} else {
placeholders := make([]string, len(ids))
for i := range ids {
placeholders[i] = "?"
}
query = fmt.Sprintf("UPDATE notification_digest_queue SET sent = 1, sent_at = datetime('now') WHERE id IN (%s)",
strings.Join(placeholders, ", "))
}

if err := tx.Commit(); err != nil {
return nil, err
_, err := s.db.Exec(query, args...)
return err
}

// PruneDigestEvents deletes sent digest events older than the given number of days.
// This is called by the retention worker alongside PruneMonitorChecks.
func (s *Store) PruneDigestEvents(days int) error {
if days < 1 || days > 3650 {
return fmt.Errorf("invalid retention days: must be between 1 and 3650")
}

return events, nil
var err error
if s.IsPostgres() {
_, err = s.db.Exec("DELETE FROM notification_digest_queue WHERE sent = TRUE AND sent_at < NOW() - MAKE_INTERVAL(days => $1)", days)
} else {
_, err = s.db.Exec("DELETE FROM notification_digest_queue WHERE sent = 1 AND sent_at < datetime('now', '-' || ? || ' days')", days)
}
return err
}

func (s *Store) GetDBSize() (int64, error) {
Expand Down
229 changes: 229 additions & 0 deletions internal/db/store_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"testing"
"time"
)

func TestSettingsResult(t *testing.T) {
Expand Down Expand Up @@ -45,3 +46,231 @@ func TestSystemStats(t *testing.T) {
t.Logf("Total monitors: %d", stats.TotalMonitors)
}
}

// --- Digest queue tests ---

func insertTestDigestEvent(t *testing.T, s *Store, monitorID, eventType string, eventTime time.Time) {
t.Helper()
err := s.InsertDigestEvent(monitorID, "Monitor "+monitorID, "https://"+monitorID+".example.com", eventType, "test message", eventTime)
if err != nil {
t.Fatalf("InsertDigestEvent failed: %v", err)
}
}

func TestDigestQueue_InsertAndGetUnsent(t *testing.T) {
RunTestWithBothDBs(t, "InsertAndGetUnsent", func(t *testing.T, s *Store) {
// Empty queue should return empty slice, not error.
events, err := s.GetUnsentDigestEvents()
if err != nil {
t.Fatalf("GetUnsentDigestEvents on empty queue: %v", err)
}
if len(events) != 0 {
t.Errorf("expected 0 events, got %d", len(events))
}

// Insert two events.
t0 := time.Date(2026, 3, 25, 8, 0, 0, 0, time.UTC)
insertTestDigestEvent(t, s, "m1", "down", t0)
insertTestDigestEvent(t, s, "m2", "degraded", t0.Add(time.Hour))

events, err = s.GetUnsentDigestEvents()
if err != nil {
t.Fatalf("GetUnsentDigestEvents: %v", err)
}
if len(events) != 2 {
t.Fatalf("expected 2 events, got %d", len(events))
}

// Events should be ordered by event_time ASC.
if events[0].MonitorID != "m1" {
t.Errorf("expected first event from m1, got %s", events[0].MonitorID)
}
if events[1].MonitorID != "m2" {
t.Errorf("expected second event from m2, got %s", events[1].MonitorID)
}

// Verify fields are populated correctly.
if events[0].EventType != "down" {
t.Errorf("expected event_type 'down', got %s", events[0].EventType)
}
if events[0].MonitorName != "Monitor m1" {
t.Errorf("expected monitor name 'Monitor m1', got %s", events[0].MonitorName)
}
})
}

func TestDigestQueue_MarkSentRemovesFromUnsent(t *testing.T) {
RunTestWithBothDBs(t, "MarkSentRemovesFromUnsent", func(t *testing.T, s *Store) {
t0 := time.Date(2026, 3, 25, 8, 0, 0, 0, time.UTC)
insertTestDigestEvent(t, s, "m1", "down", t0)
insertTestDigestEvent(t, s, "m2", "degraded", t0.Add(time.Hour))

events, err := s.GetUnsentDigestEvents()
if err != nil {
t.Fatalf("GetUnsentDigestEvents: %v", err)
}
if len(events) != 2 {
t.Fatalf("expected 2 unsent events, got %d", len(events))
}

// Mark only the first event as sent.
if err := s.MarkDigestEventsSent([]int64{events[0].ID}); err != nil {
t.Fatalf("MarkDigestEventsSent: %v", err)
}

// Only one event should remain unsent.
remaining, err := s.GetUnsentDigestEvents()
if err != nil {
t.Fatalf("GetUnsentDigestEvents after mark: %v", err)
}
if len(remaining) != 1 {
t.Fatalf("expected 1 unsent event after marking, got %d", len(remaining))
}
if remaining[0].MonitorID != "m2" {
t.Errorf("expected remaining event from m2, got %s", remaining[0].MonitorID)
}

// Mark the second event as sent.
if err := s.MarkDigestEventsSent([]int64{remaining[0].ID}); err != nil {
t.Fatalf("MarkDigestEventsSent second: %v", err)
}

// Queue should now be empty.
final, err := s.GetUnsentDigestEvents()
if err != nil {
t.Fatalf("GetUnsentDigestEvents after all sent: %v", err)
}
if len(final) != 0 {
t.Errorf("expected 0 unsent events after all marked, got %d", len(final))
}
})
}

func TestDigestQueue_MarkSentEmptySliceIsNoOp(t *testing.T) {
RunTestWithBothDBs(t, "MarkSentEmptySlice", func(t *testing.T, s *Store) {
// Should not error on empty ID slice.
if err := s.MarkDigestEventsSent([]int64{}); err != nil {
t.Errorf("MarkDigestEventsSent(empty) should be a no-op, got: %v", err)
}
})
}

func TestDigestQueue_MarkSentAllAtOnce(t *testing.T) {
RunTestWithBothDBs(t, "MarkSentAllAtOnce", func(t *testing.T, s *Store) {
t0 := time.Date(2026, 3, 25, 8, 0, 0, 0, time.UTC)
for i := 0; i < 5; i++ {
insertTestDigestEvent(t, s, "m1", "down", t0.Add(time.Duration(i)*time.Minute))
}

events, err := s.GetUnsentDigestEvents()
if err != nil {
t.Fatalf("GetUnsentDigestEvents: %v", err)
}
if len(events) != 5 {
t.Fatalf("expected 5 events, got %d", len(events))
}

ids := make([]int64, len(events))
for i, e := range events {
ids[i] = e.ID
}

if err := s.MarkDigestEventsSent(ids); err != nil {
t.Fatalf("MarkDigestEventsSent: %v", err)
}

remaining, err := s.GetUnsentDigestEvents()
if err != nil {
t.Fatalf("GetUnsentDigestEvents after bulk mark: %v", err)
}
if len(remaining) != 0 {
t.Errorf("expected 0 remaining unsent, got %d", len(remaining))
}
})
}

func TestDigestQueue_PruneDeletesOldSentEvents(t *testing.T) {
RunTestWithBothDBs(t, "PruneDeletesOldSent", func(t *testing.T, s *Store) {
t0 := time.Date(2026, 3, 25, 8, 0, 0, 0, time.UTC)
insertTestDigestEvent(t, s, "m1", "down", t0)

events, err := s.GetUnsentDigestEvents()
if err != nil {
t.Fatalf("GetUnsentDigestEvents: %v", err)
}
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}

// Mark it as sent (sent_at = now in DB).
if err := s.MarkDigestEventsSent([]int64{events[0].ID}); err != nil {
t.Fatalf("MarkDigestEventsSent: %v", err)
}

// Pruning with 365 days should NOT delete it (just marked sent moments ago).
if err := s.PruneDigestEvents(365); err != nil {
t.Fatalf("PruneDigestEvents(365): %v", err)
}

// Manually set sent_at far in the past to simulate an old record.
if s.IsPostgres() {
_, _ = s.db.Exec("UPDATE notification_digest_queue SET sent_at = NOW() - INTERVAL '400 days'")
} else {
_, _ = s.db.Exec("UPDATE notification_digest_queue SET sent_at = datetime('now', '-400 days')")
}

// Pruning with 365 days should now delete the old record.
if err := s.PruneDigestEvents(365); err != nil {
t.Fatalf("PruneDigestEvents(365) after backdating: %v", err)
}

// Verify the record is gone.
var count int
_ = s.db.QueryRow("SELECT COUNT(*) FROM notification_digest_queue").Scan(&count)
if count != 0 {
t.Errorf("expected 0 rows after pruning old sent event, got %d", count)
}
})
}

func TestDigestQueue_PruneDoesNotDeleteUnsentEvents(t *testing.T) {
RunTestWithBothDBs(t, "PruneDoesNotDeleteUnsent", func(t *testing.T, s *Store) {
t0 := time.Date(2020, 1, 1, 8, 0, 0, 0, time.UTC) // very old event time
insertTestDigestEvent(t, s, "m1", "down", t0)

// Pruning should NOT delete unsent events regardless of event_time age,
// because PruneDigestEvents only removes rows where sent = true.
if err := s.PruneDigestEvents(1); err != nil {
t.Fatalf("PruneDigestEvents: %v", err)
}

events, err := s.GetUnsentDigestEvents()
if err != nil {
t.Fatalf("GetUnsentDigestEvents: %v", err)
}
if len(events) != 1 {
t.Errorf("expected unsent event to survive pruning, got %d events", len(events))
}
})
}

func TestDigestQueue_PruneInvalidDays(t *testing.T) {
s := newTestStore(t)

if err := s.PruneDigestEvents(0); err == nil {
t.Error("PruneDigestEvents(0) should return error")
}
if err := s.PruneDigestEvents(-1); err == nil {
t.Error("PruneDigestEvents(-1) should return error")
}
if err := s.PruneDigestEvents(3651); err == nil {
t.Error("PruneDigestEvents(3651) should return error")
}
// Boundary values should succeed.
if err := s.PruneDigestEvents(1); err != nil {
t.Errorf("PruneDigestEvents(1) should succeed, got: %v", err)
}
if err := s.PruneDigestEvents(3650); err != nil {
t.Errorf("PruneDigestEvents(3650) should succeed, got: %v", err)
}
}
Loading
Loading