From 344bd2a613cd62ea3908654921bb563eada43c35 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 23 Feb 2026 15:42:59 +0100 Subject: [PATCH 1/3] Add integration tests for event pagination and update event queries to handle missing endpoints. --- api/events_integration_test.go | 230 +++++++++++++++++++++++++++++++++ database/postgres/event.go | 19 ++- docs/docs.go | 2 +- 3 files changed, 247 insertions(+), 4 deletions(-) create mode 100644 api/events_integration_test.go diff --git a/api/events_integration_test.go b/api/events_integration_test.go new file mode 100644 index 0000000000..83c10ecfcb --- /dev/null +++ b/api/events_integration_test.go @@ -0,0 +1,230 @@ +package api + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/frain-dev/convoy/api/testdb" + "github.com/frain-dev/convoy/database" + "github.com/frain-dev/convoy/database/postgres" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/internal/pkg/metrics" + "github.com/frain-dev/convoy/pkg/httpheader" +) + +type EventsIntegrationTestSuite struct { + suite.Suite + DB database.Database + ConvoyApp *ApplicationHandler + DefaultProject *datastore.Project + DefaultUser *datastore.User +} + +func (s *EventsIntegrationTestSuite) SetupSuite() { + s.ConvoyApp = buildServer(s.T()) +} + +func (s *EventsIntegrationTestSuite) SetupTest() { + var err error + + s.DB = s.ConvoyApp.A.DB + + // Seed default user + s.DefaultUser, err = testdb.SeedDefaultUser(s.DB) + require.NoError(s.T(), err) + + // Seed default organisation + org, err := testdb.SeedDefaultOrganisation(s.DB, s.DefaultUser) + require.NoError(s.T(), err) + + // Seed default project + s.DefaultProject, err = testdb.SeedDefaultProject(s.DB, org.UID) + require.NoError(s.T(), err) +} + +func (s *EventsIntegrationTestSuite) TearDownTest() { + metrics.Reset() +} + +// Test_LoadEventsPaged_WithoutEndpoints tests that events without endpoint associations +// are visible in the event log when no endpoint filter is applied. +func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() { + ctx := context.Background() + eventRepo := postgres.NewEventRepo(s.DB) + + data := json.RawMessage([]byte(`{"test": "data"}`)) + + // Create an event with no endpoints (simulating an event ingested for a source with no subscriptions) + event := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-no-endpoints", + Endpoints: []string{}, // Empty endpoints array + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + Status: datastore.FailureStatus, // Events without subscriptions get Failure status + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err := eventRepo.CreateEvent(ctx, event) + require.NoError(s.T(), err) + + // Query without endpoint filter - should return the event + events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 1, len(events)) + require.Equal(s.T(), event.UID, events[0].UID) + require.Equal(s.T(), datastore.FailureStatus, events[0].Status) +} + +// Test_LoadEventsPaged_WithEndpointFilter tests that: +// 1. When filtering by endpoint, only events with that endpoint are returned +// 2. When not filtering, both events with and without endpoints are returned +func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() { + ctx := context.Background() + eventRepo := postgres.NewEventRepo(s.DB) + + // Create an endpoint + endpoint, err := testdb.SeedEndpoint(s.DB, s.DefaultProject, "", "", "", false, datastore.ActiveEndpointStatus) + require.NoError(s.T(), err) + + data := json.RawMessage([]byte(`{"test": "data"}`)) + + // Create event with endpoint + eventWithEndpoint := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-with-endpoint", + Endpoints: []string{endpoint.UID}, + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err = eventRepo.CreateEvent(ctx, eventWithEndpoint) + require.NoError(s.T(), err) + + // Create event without endpoint + eventWithoutEndpoint := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-without-endpoint", + Endpoints: []string{}, + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + Status: datastore.FailureStatus, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err = eventRepo.CreateEvent(ctx, eventWithoutEndpoint) + require.NoError(s.T(), err) + + // Query with endpoint filter - should only return event with matching endpoint + events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + EndpointID: endpoint.UID, + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 1, len(events)) + require.Equal(s.T(), eventWithEndpoint.UID, events[0].UID) + + // Query without endpoint filter - should return both events + events, _, err = eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 2, len(events)) +} + +// Test_LoadEventsPaged_SearchWithoutEndpoints tests that events without endpoints +// are included in search results +func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints() { + ctx := context.Background() + eventRepo := postgres.NewEventRepo(s.DB) + + data := json.RawMessage([]byte(`{"unique_search_term": "test12345"}`)) + + // Create an event with no endpoints but searchable content + event := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-searchable", + Endpoints: []string{}, + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + Status: datastore.FailureStatus, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err := eventRepo.CreateEvent(ctx, event) + require.NoError(s.T(), err) + + // Copy to search table for text search + err = eventRepo.CopyRows(ctx, s.DefaultProject.UID, 1) + require.NoError(s.T(), err) + + // Search for the event - should find it despite no endpoints + events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + Query: "unique_search_term", + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 1, len(events)) + require.Equal(s.T(), event.UID, events[0].UID) +} + +func TestEventsIntegrationTestSuite(t *testing.T) { + suite.Run(t, new(EventsIntegrationTestSuite)) +} diff --git a/database/postgres/event.go b/database/postgres/event.go index ca6bc23072..52ac8469db 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -107,7 +107,7 @@ const ( COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id - JOIN endpoint_ids e ON e.id = ee.endpoint_id + LEFT JOIN endpoint_ids e ON e.id = ee.endpoint_id LEFT JOIN convoy.sources s ON s.id = ev.source_id WHERE ev.deleted_at IS NULL` @@ -124,7 +124,7 @@ const ( FROM convoy.events_search ev LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id LEFT JOIN convoy.sources s ON s.id = ev.source_id - JOIN convoy.endpoints e ON e.id = ee.endpoint_id + LEFT JOIN convoy.endpoints e ON e.id = ee.endpoint_id WHERE ev.deleted_at IS NULL` baseEventsPagedForward = ` @@ -534,7 +534,14 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte } else { suffix = getExistsBackwardSuffix(sortOrder) } - query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix + + // If no EXISTS subquery, don't add EXISTS clause + if existsSubquery == "" { + // Remove " AND EXISTS (" from baseEventsPagedExists (13 characters) + query = baseEventsPagedExists[:len(baseEventsPagedExists)-13] + filterQueryNoEndpoint + suffix + } else { + query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix + } } else { // Search or legacy path: CTE + JOIN + GROUP BY. base := baseEventsPaged @@ -761,7 +768,13 @@ func getCountDeliveriesPrevRowQuery(sortOrder string) string { // buildExistsSubquery returns the EXISTS inner query for the events list (no search). // Caller must bind :owner_id and :endpoint_ids when present. +// Returns empty string when no filters are specified to include events without endpoint associations. func buildExistsSubquery(ownerID string, endpointIDs []string) string { + // If no filters, don't require endpoint associations + if util.IsStringEmpty(ownerID) && len(endpointIDs) == 0 { + return "" + } + q := "SELECT 1 FROM convoy.events_endpoints ee JOIN convoy.endpoints e ON e.id = ee.endpoint_id WHERE ee.event_id = ev.id" if !util.IsStringEmpty(ownerID) { q += " AND e.owner_id = :owner_id" diff --git a/docs/docs.go b/docs/docs.go index ba9adc0843..9e10eccf8a 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,4 +1,4 @@ -// Package docs Code generated by swaggo/swag at 2026-02-10 14:18:50.71968 +0100 CET m=+1.596704293. DO NOT EDIT +// Package docs Code generated by swaggo/swag at 2026-02-23 15:43:01.720952 +0100 CET m=+2.017875126. DO NOT EDIT package docs import "github.com/swaggo/swag" From de03053d5ac8dd7029622cee01b05560f95e8db7 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 23 Feb 2026 18:32:22 +0100 Subject: [PATCH 2/3] Add integration tests for event status updates and enhance queries to include status and metadata fields. --- api/events_integration_test.go | 17 ++++++++++++++--- database/postgres/event.go | 5 +++-- docs/docs.go | 2 +- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/api/events_integration_test.go b/api/events_integration_test.go index 83c10ecfcb..ceec1deef1 100644 --- a/api/events_integration_test.go +++ b/api/events_integration_test.go @@ -69,7 +69,6 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() { Headers: httpheader.HTTPHeader{}, Raw: string(data), Data: data, - Status: datastore.FailureStatus, // Events without subscriptions get Failure status CreatedAt: time.Now(), UpdatedAt: time.Now(), } @@ -77,6 +76,10 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() { err := eventRepo.CreateEvent(ctx, event) require.NoError(s.T(), err) + // Update status to Failure (events without subscriptions get Failure status) + err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus) + require.NoError(s.T(), err) + // Query without endpoint filter - should return the event events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ SearchParams: datastore.SearchParams{ @@ -134,7 +137,6 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() { Headers: httpheader.HTTPHeader{}, Raw: string(data), Data: data, - Status: datastore.FailureStatus, CreatedAt: time.Now(), UpdatedAt: time.Now(), } @@ -142,6 +144,10 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() { err = eventRepo.CreateEvent(ctx, eventWithoutEndpoint) require.NoError(s.T(), err) + // Update status to Failure (events without subscriptions get Failure status) + err = eventRepo.UpdateEventStatus(ctx, eventWithoutEndpoint, datastore.FailureStatus) + require.NoError(s.T(), err) + // Query with endpoint filter - should only return event with matching endpoint events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ EndpointID: endpoint.UID, @@ -194,7 +200,6 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints Headers: httpheader.HTTPHeader{}, Raw: string(data), Data: data, - Status: datastore.FailureStatus, CreatedAt: time.Now(), UpdatedAt: time.Now(), } @@ -202,6 +207,10 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints err := eventRepo.CreateEvent(ctx, event) require.NoError(s.T(), err) + // Update status to Failure (events without subscriptions get Failure status) + err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus) + require.NoError(s.T(), err) + // Copy to search table for text search err = eventRepo.CopyRows(ctx, s.DefaultProject.UID, 1) require.NoError(s.T(), err) @@ -223,6 +232,8 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints require.NoError(s.T(), err) require.Equal(s.T(), 1, len(events)) require.Equal(s.T(), event.UID, events[0].UID) + // Note: events_search table doesn't have metadata or status columns, + // so we don't check status for search results } func TestEventsIntegrationTestSuite(t *testing.T) { diff --git a/database/postgres/event.go b/database/postgres/event.go index 52ac8469db..b242f06ad6 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -102,7 +102,7 @@ const ( ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(idempotency_key, '') AS idempotency_key, COALESCE(url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at,ev.acknowledged_at, + ev.updated_at, ev.deleted_at,ev.acknowledged_at,ev.metadata,ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev @@ -112,6 +112,7 @@ const ( WHERE ev.deleted_at IS NULL` baseEventsSearch = ` + with events as ( SELECT ev.id, ev.project_id, ev.id AS event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, @@ -185,7 +186,7 @@ const ( COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(ev.idempotency_key, '') AS idempotency_key, COALESCE(ev.url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at, ev.acknowledged_at, + ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev LEFT JOIN convoy.sources s ON s.id = ev.source_id diff --git a/docs/docs.go b/docs/docs.go index 9e10eccf8a..3475833ea5 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,4 +1,4 @@ -// Package docs Code generated by swaggo/swag at 2026-02-23 15:43:01.720952 +0100 CET m=+2.017875126. DO NOT EDIT +// Package docs Code generated by swaggo/swag at 2026-02-23 18:32:24.79121 +0100 CET m=+2.388581417. DO NOT EDIT package docs import "github.com/swaggo/swag" From 2ec796bdb9f529d95b96a3aac24f03df181d7c88 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 23 Feb 2026 22:30:11 +0100 Subject: [PATCH 3/3] Fix event queries by correcting `event_type` field alias and formatting inconsistencies. --- database/postgres/event.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/database/postgres/event.go b/database/postgres/event.go index b242f06ad6..aa2ba8b088 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -97,12 +97,12 @@ const ( baseEventsPaged = ` with endpoint_ids as (select id from convoy.endpoints where owner_id = :owner_id), events as ( SELECT ev.id, ev.project_id, - ev.id AS event_type, ev.is_duplicate_event, + ev.event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(idempotency_key, '') AS idempotency_key, COALESCE(url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at,ev.acknowledged_at,ev.metadata,ev.status, + ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev @@ -114,7 +114,7 @@ const ( baseEventsSearch = ` with events as ( SELECT ev.id, ev.project_id, - ev.id AS event_type, ev.is_duplicate_event, + ev.event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(idempotency_key, '') AS idempotency_key, @@ -182,7 +182,7 @@ const ( // EXISTS path: no GROUP BY, uses idx_events_project_created_pagination when filter.Query is empty baseEventsPagedExists = ` - SELECT ev.id, ev.project_id, ev.id AS event_type, ev.is_duplicate_event, + SELECT ev.id, ev.project_id, ev.event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(ev.idempotency_key, '') AS idempotency_key, COALESCE(ev.url_query_params, '') AS url_query_params,