Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions api/events_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package api

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/frain-dev/convoy/api/testdb"
"github.com/frain-dev/convoy/database"
"github.com/frain-dev/convoy/database/postgres"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/internal/pkg/metrics"
"github.com/frain-dev/convoy/pkg/httpheader"
)

type EventsIntegrationTestSuite struct {
suite.Suite
DB database.Database
ConvoyApp *ApplicationHandler
DefaultProject *datastore.Project
DefaultUser *datastore.User
}

func (s *EventsIntegrationTestSuite) SetupSuite() {
s.ConvoyApp = buildServer(s.T())
}

func (s *EventsIntegrationTestSuite) SetupTest() {
var err error

s.DB = s.ConvoyApp.A.DB

// Seed default user
s.DefaultUser, err = testdb.SeedDefaultUser(s.DB)
require.NoError(s.T(), err)

// Seed default organisation
org, err := testdb.SeedDefaultOrganisation(s.DB, s.DefaultUser)
require.NoError(s.T(), err)

// Seed default project
s.DefaultProject, err = testdb.SeedDefaultProject(s.DB, org.UID)
require.NoError(s.T(), err)
}

func (s *EventsIntegrationTestSuite) TearDownTest() {
metrics.Reset()
}

// Test_LoadEventsPaged_WithoutEndpoints tests that events without endpoint associations
// are visible in the event log when no endpoint filter is applied.
func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() {
ctx := context.Background()
eventRepo := postgres.NewEventRepo(s.DB)

data := json.RawMessage([]byte(`{"test": "data"}`))

// Create an event with no endpoints (simulating an event ingested for a source with no subscriptions)
event := &datastore.Event{
UID: ulid.Make().String(),
EventType: "test-event-no-endpoints",
Endpoints: []string{}, // Empty endpoints array
ProjectID: s.DefaultProject.UID,
Headers: httpheader.HTTPHeader{},
Raw: string(data),
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}

err := eventRepo.CreateEvent(ctx, event)
require.NoError(s.T(), err)

// Update status to Failure (events without subscriptions get Failure status)
err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus)
require.NoError(s.T(), err)

// Query without endpoint filter - should return the event
events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{
SearchParams: datastore.SearchParams{
CreatedAtStart: time.Now().Add(-time.Hour).Unix(),
CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(),
},
Pageable: datastore.Pageable{
PerPage: 10,
Direction: datastore.Next,
NextCursor: datastore.DefaultCursor,
},
})

require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(events))
require.Equal(s.T(), event.UID, events[0].UID)
require.Equal(s.T(), datastore.FailureStatus, events[0].Status)
}

// Test_LoadEventsPaged_WithEndpointFilter tests that:
// 1. When filtering by endpoint, only events with that endpoint are returned
// 2. When not filtering, both events with and without endpoints are returned
func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() {
ctx := context.Background()
eventRepo := postgres.NewEventRepo(s.DB)

// Create an endpoint
endpoint, err := testdb.SeedEndpoint(s.DB, s.DefaultProject, "", "", "", false, datastore.ActiveEndpointStatus)
require.NoError(s.T(), err)

data := json.RawMessage([]byte(`{"test": "data"}`))

// Create event with endpoint
eventWithEndpoint := &datastore.Event{
UID: ulid.Make().String(),
EventType: "test-event-with-endpoint",
Endpoints: []string{endpoint.UID},
ProjectID: s.DefaultProject.UID,
Headers: httpheader.HTTPHeader{},
Raw: string(data),
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}

err = eventRepo.CreateEvent(ctx, eventWithEndpoint)
require.NoError(s.T(), err)

// Create event without endpoint
eventWithoutEndpoint := &datastore.Event{
UID: ulid.Make().String(),
EventType: "test-event-without-endpoint",
Endpoints: []string{},
ProjectID: s.DefaultProject.UID,
Headers: httpheader.HTTPHeader{},
Raw: string(data),
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}

err = eventRepo.CreateEvent(ctx, eventWithoutEndpoint)
require.NoError(s.T(), err)

// Update status to Failure (events without subscriptions get Failure status)
err = eventRepo.UpdateEventStatus(ctx, eventWithoutEndpoint, datastore.FailureStatus)
require.NoError(s.T(), err)

// Query with endpoint filter - should only return event with matching endpoint
events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{
EndpointID: endpoint.UID,
SearchParams: datastore.SearchParams{
CreatedAtStart: time.Now().Add(-time.Hour).Unix(),
CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(),
},
Pageable: datastore.Pageable{
PerPage: 10,
Direction: datastore.Next,
NextCursor: datastore.DefaultCursor,
},
})

require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(events))
require.Equal(s.T(), eventWithEndpoint.UID, events[0].UID)

// Query without endpoint filter - should return both events
events, _, err = eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{
SearchParams: datastore.SearchParams{
CreatedAtStart: time.Now().Add(-time.Hour).Unix(),
CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(),
},
Pageable: datastore.Pageable{
PerPage: 10,
Direction: datastore.Next,
NextCursor: datastore.DefaultCursor,
},
})

require.NoError(s.T(), err)
require.Equal(s.T(), 2, len(events))
}

// Test_LoadEventsPaged_SearchWithoutEndpoints tests that events without endpoints
// are included in search results
func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints() {
ctx := context.Background()
eventRepo := postgres.NewEventRepo(s.DB)

data := json.RawMessage([]byte(`{"unique_search_term": "test12345"}`))

// Create an event with no endpoints but searchable content
event := &datastore.Event{
UID: ulid.Make().String(),
EventType: "test-event-searchable",
Endpoints: []string{},
ProjectID: s.DefaultProject.UID,
Headers: httpheader.HTTPHeader{},
Raw: string(data),
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}

err := eventRepo.CreateEvent(ctx, event)
require.NoError(s.T(), err)

// Update status to Failure (events without subscriptions get Failure status)
err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus)
require.NoError(s.T(), err)

// Copy to search table for text search
err = eventRepo.CopyRows(ctx, s.DefaultProject.UID, 1)
require.NoError(s.T(), err)

// Search for the event - should find it despite no endpoints
events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{
Query: "unique_search_term",
SearchParams: datastore.SearchParams{
CreatedAtStart: time.Now().Add(-time.Hour).Unix(),
CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(),
},
Pageable: datastore.Pageable{
PerPage: 10,
Direction: datastore.Next,
NextCursor: datastore.DefaultCursor,
},
})

require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(events))
require.Equal(s.T(), event.UID, events[0].UID)
// Note: events_search table doesn't have metadata or status columns,
// so we don't check status for search results
}

func TestEventsIntegrationTestSuite(t *testing.T) {
suite.Run(t, new(EventsIntegrationTestSuite))
}
30 changes: 22 additions & 8 deletions database/postgres/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,24 @@ const (
baseEventsPaged = `
with endpoint_ids as (select id from convoy.endpoints where owner_id = :owner_id), events as (
SELECT ev.id, ev.project_id,
ev.id AS event_type, ev.is_duplicate_event,
ev.event_type, ev.is_duplicate_event,
COALESCE(ev.source_id, '') AS source_id,
ev.headers, ev.raw, ev.data, ev.created_at,
COALESCE(idempotency_key, '') AS idempotency_key,
COALESCE(url_query_params, '') AS url_query_params,
ev.updated_at, ev.deleted_at,ev.acknowledged_at,
ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status,
COALESCE(s.id, '') AS "source_metadata.id",
COALESCE(s.name, '') AS "source_metadata.name"
FROM convoy.events ev
LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id
JOIN endpoint_ids e ON e.id = ee.endpoint_id
LEFT JOIN endpoint_ids e ON e.id = ee.endpoint_id
LEFT JOIN convoy.sources s ON s.id = ev.source_id
WHERE ev.deleted_at IS NULL`

baseEventsSearch = `
with events as (
SELECT ev.id, ev.project_id,
ev.id AS event_type, ev.is_duplicate_event,
ev.event_type, ev.is_duplicate_event,
COALESCE(ev.source_id, '') AS source_id,
ev.headers, ev.raw, ev.data, ev.created_at,
COALESCE(idempotency_key, '') AS idempotency_key,
Expand All @@ -124,7 +125,7 @@ const (
FROM convoy.events_search ev
LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id
LEFT JOIN convoy.sources s ON s.id = ev.source_id
JOIN convoy.endpoints e ON e.id = ee.endpoint_id
LEFT JOIN convoy.endpoints e ON e.id = ee.endpoint_id
WHERE ev.deleted_at IS NULL`

baseEventsPagedForward = `
Expand Down Expand Up @@ -181,11 +182,11 @@ const (

// EXISTS path: no GROUP BY, uses idx_events_project_created_pagination when filter.Query is empty
baseEventsPagedExists = `
SELECT ev.id, ev.project_id, ev.id AS event_type, ev.is_duplicate_event,
SELECT ev.id, ev.project_id, ev.event_type, ev.is_duplicate_event,
COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at,
COALESCE(ev.idempotency_key, '') AS idempotency_key,
COALESCE(ev.url_query_params, '') AS url_query_params,
ev.updated_at, ev.deleted_at, ev.acknowledged_at,
ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status,
COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name"
FROM convoy.events ev
LEFT JOIN convoy.sources s ON s.id = ev.source_id
Expand Down Expand Up @@ -534,7 +535,14 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte
} else {
suffix = getExistsBackwardSuffix(sortOrder)
}
query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix

// If no EXISTS subquery, don't add EXISTS clause
if existsSubquery == "" {
// Remove " AND EXISTS (" from baseEventsPagedExists (13 characters)
query = baseEventsPagedExists[:len(baseEventsPagedExists)-13] + filterQueryNoEndpoint + suffix
} else {
query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix
}
} else {
// Search or legacy path: CTE + JOIN + GROUP BY.
base := baseEventsPaged
Expand Down Expand Up @@ -761,7 +769,13 @@ func getCountDeliveriesPrevRowQuery(sortOrder string) string {

// buildExistsSubquery returns the EXISTS inner query for the events list (no search).
// Caller must bind :owner_id and :endpoint_ids when present.
// Returns empty string when no filters are specified to include events without endpoint associations.
func buildExistsSubquery(ownerID string, endpointIDs []string) string {
// If no filters, don't require endpoint associations
if util.IsStringEmpty(ownerID) && len(endpointIDs) == 0 {
return ""
}

q := "SELECT 1 FROM convoy.events_endpoints ee JOIN convoy.endpoints e ON e.id = ee.endpoint_id WHERE ee.event_id = ev.id"
if !util.IsStringEmpty(ownerID) {
q += " AND e.owner_id = :owner_id"
Expand Down
2 changes: 1 addition & 1 deletion docs/docs.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package docs Code generated by swaggo/swag at 2026-02-10 14:18:50.71968 +0100 CET m=+1.596704293. DO NOT EDIT
// Package docs Code generated by swaggo/swag at 2026-02-23 18:32:24.79121 +0100 CET m=+2.388581417. DO NOT EDIT
package docs

import "github.com/swaggo/swag"
Expand Down
Loading