Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type WorkOSClient interface {
ListGlobalRoles(ctx context.Context) ([]workos.Role, error)
ListEvents(ctx context.Context, opts events.ListEventsOpts) (events.ListEventsResponse, error)
UpdateUserExternalID(ctx context.Context, workosUserID, externalID string) error
UpdateOrganizationExternalID(ctx context.Context, workosOrgID, externalID string) error
}

type BackfillWorkOSOrganizationParams struct {
Expand Down
39 changes: 39 additions & 0 deletions server/internal/background/activities/backfill_workos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,45 @@ func TestBackfillWorkOSOrganization_ExternalIDChangeDoesNotChangeOrganizationID(
require.ErrorIs(t, err, pgx.ErrNoRows)
}

func TestBackfillWorkOSOrganization_DoesNotClearDisabled(t *testing.T) {
t.Parallel()

ctx := context.Background()
conn := newOrgEventsTestConn(t, "workos_backfill_preserves_disabled")
logger := testenv.NewLogger(t)

const organizationID = "gram_org_backfill_disabled"
const workosOrgID = "org_01JBACKFILLDISABLED"

seedLinkedWorkOSOrganization(t, ctx, conn, organizationID, workosOrgID)
_, err := orgrepo.New(conn).DisableOrganizationByWorkosID(ctx, orgrepo.DisableOrganizationByWorkosIDParams{
WorkosID: conv.ToPGText(workosOrgID),
WorkosLastEventID: conv.ToPGText(""),
})
require.NoError(t, err)

workosClient := newWorkOSSnapshotClient(t, ctx,
workos.Organization{
ID: workosOrgID,
Name: "Backfill Still Disabled",
ExternalID: organizationID,
CreatedAt: "2026-05-07T11:00:00Z",
UpdatedAt: "2026-05-07T12:00:00Z",
},
nil,
nil,
)
activity := activities.NewBackfillWorkOSOrganization(logger, conn, workosClient)

err = activity.Do(ctx, activities.BackfillWorkOSOrganizationParams{WorkOSOrganizationID: workosOrgID})
require.NoError(t, err)

org, err := orgrepo.New(conn).GetOrganizationByWorkosID(ctx, conv.ToPGText(workosOrgID))
require.NoError(t, err)
require.Equal(t, "Backfill Still Disabled", org.Name)
require.True(t, org.DisabledAt.Valid)
}

func TestBackfillWorkOSOrganization_UnknownUserSyncsSingleRoleAssignment(t *testing.T) {
t.Parallel()

Expand Down
232 changes: 176 additions & 56 deletions server/internal/background/activities/process_workos_org_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (

accessrepo "github.com/speakeasy-api/gram/server/internal/access/repo"
"github.com/speakeasy-api/gram/server/internal/attr"
"github.com/speakeasy-api/gram/server/internal/auth/orgslug"
"github.com/speakeasy-api/gram/server/internal/conv"
"github.com/speakeasy-api/gram/server/internal/database"
"github.com/speakeasy-api/gram/server/internal/o11y"
"github.com/speakeasy-api/gram/server/internal/oops"
orgid "github.com/speakeasy-api/gram/server/internal/organizations/id"
orgrepo "github.com/speakeasy-api/gram/server/internal/organizations/repo"
"github.com/speakeasy-api/gram/server/internal/thirdparty/workos"
workosrepo "github.com/speakeasy-api/gram/server/internal/thirdparty/workos/repo"
Expand All @@ -42,10 +44,8 @@ type ProcessWorkOSOrganizationEventsResult struct {
}

// ProcessWorkOSOrganizationEvents pages through WorkOS organization-scoped events
// since the stored cursor, advancing the cursor as it goes. Event handling itself
// (upserting org metadata, role rows, etc.) is wired in a follow-up PR — for now
// the activity only advances the cursor so subsequent runs pick up where this
// left off.
// since the stored cursor, applying supported organization, role, and
// membership events in a transaction before advancing the cursor.
type ProcessWorkOSOrganizationEvents struct {
db *pgxpool.Pool
logger *slog.Logger
Expand Down Expand Up @@ -151,30 +151,23 @@ func (p *ProcessWorkOSOrganizationEvents) handlePage(ctx context.Context, logger
if err != nil {
return lastEventID, oops.E(oops.CodeUnexpected, err, "failed to handle WorkOS event").Log(ctx, eventLogger)
}
if eventID != "" {
lastEventID = eventID
}
lastEventID = eventID
}

return lastEventID, nil
}

// handleEvent will be implemented in a subsequent PR.
//
// Note: the cursor advances as soon as this returns nil, even though the
// dispatched handler is currently a no-op. If the workflow runs before the
// real handlers land, all in-flight WorkOS events will be consumed and the
// cursor will move past them — real handlers will only see events going
// forward. That is the intended design (sync handles forward, not history;
// historical state is reconciled by the future backfill workflow).
// handleEvent applies a single WorkOS event and advances the per-organization
// cursor in the same transaction.
func (p *ProcessWorkOSOrganizationEvents) handleEvent(ctx context.Context, logger *slog.Logger, workosOrgID string, event events.Event) (string, error) {
dbtx, err := p.db.Begin(ctx)
if err != nil {
return "", fmt.Errorf("begin transaction: %w", err)
}
defer o11y.NoLogDefer(func() error { return dbtx.Rollback(ctx) })

if err := handleOrganizationEvent(ctx, logger, dbtx, event); err != nil {
externalIDUpdate, err := handleOrganizationEvent(ctx, logger, dbtx, event)
if err != nil {
return "", err
}

Expand All @@ -189,29 +182,35 @@ func (p *ProcessWorkOSOrganizationEvents) handleEvent(ctx context.Context, logge
return "", fmt.Errorf("commit transaction: %w", err)
}

if externalIDUpdate != nil {
if err := p.workosClient.UpdateOrganizationExternalID(ctx, externalIDUpdate.workosOrgID, externalIDUpdate.externalID); err != nil {
logger.WarnContext(ctx, "failed to update WorkOS organization external ID", attr.SlogError(err))
}
}

return event.ID, nil
}

// handleOrganizationEvent dispatches a WorkOS event scoped to a specific
// organization to its handler. Each handler is responsible for the
// ShouldProcessEvent guard against duplicate apply.
func handleOrganizationEvent(ctx context.Context, logger *slog.Logger, dbtx database.DBTX, event events.Event) error {
func handleOrganizationEvent(ctx context.Context, logger *slog.Logger, dbtx database.DBTX, event events.Event) (*workosOrgExternalIDUpdate, error) {
switch event.Event {
case string(workos.EventKindOrganizationCreated), string(workos.EventKindOrganizationUpdated):
return handleOrganizationUpsert(ctx, logger, dbtx, event)
case string(workos.EventKindOrganizationDeleted):
return handleOrganizationDeleted(ctx, logger, dbtx, event)
return nil, handleOrganizationDeleted(ctx, logger, dbtx, event)
case string(workos.EventKindOrganizationRoleCreated), string(workos.EventKindOrganizationRoleUpdated):
return handleRoleUpsert(ctx, logger, dbtx, event)
return nil, handleRoleUpsert(ctx, logger, dbtx, event)
case string(workos.EventKindOrganizationRoleDeleted):
return handleRoleDeleted(ctx, logger, dbtx, event)
return nil, handleRoleDeleted(ctx, logger, dbtx, event)
case string(workos.EventKindOrganizationMembershipCreated), string(workos.EventKindOrganizationMembershipUpdated):
return handleOrganizationMembershipUpsert(ctx, logger, dbtx, event)
return nil, handleOrganizationMembershipUpsert(ctx, logger, dbtx, event)
case string(workos.EventKindOrganizationMembershipDeleted):
return handleOrganizationMembershipDeleted(ctx, logger, dbtx, event)
return nil, handleOrganizationMembershipDeleted(ctx, logger, dbtx, event)
}

return oops.Permanent(fmt.Errorf("unhandled workos organization event type: %s", event.Event))
return nil, oops.Permanent(fmt.Errorf("unhandled workos organization event type: %s", event.Event))
}

// workosOrganizationEventPayload is the relevant subset of an
Expand All @@ -223,66 +222,181 @@ type workosOrganizationEventPayload struct {
UpdatedAt time.Time `json:"updated_at"`
}

type workosOrgExternalIDUpdate struct {
workosOrgID string
externalID string
}

type resolvedWorkOSOrganization struct {
row orgrepo.OrganizationMetadatum
organizationID string
isNew bool
needsExternalIDUpdate bool
}

// handleOrganizationUpsert applies an organization.created or
// organization.updated event. Maps the WorkOS organization to a Gram org by
// looking up workos_id; falls back to the payload's external_id for orgs not
// yet linked.
func handleOrganizationUpsert(ctx context.Context, logger *slog.Logger, dbtx database.DBTX, event events.Event) error {
// organization.updated event. The mapping rules are:
//
// - workos_id is authoritative first: if a Gram org already points at this
// WorkOS org, update that row.
// - otherwise, WorkOS external_id is the Gram org ID to link/update.
// - if WorkOS external_id points at no Gram org, create/link that local row
// using WorkOS external_id as the Gram org ID.
// - if WorkOS has no external_id, derive a deterministic Gram org ID from the
// WorkOS org ID, create/link that local row, then write the derived ID back
// to WorkOS after the DB transaction commits.
//
// WorkOS owns name/workos_id/cursor metadata, but never updates an existing
// Gram slug. New org slugs are chosen once and uniqued locally.
func handleOrganizationUpsert(ctx context.Context, logger *slog.Logger, dbtx database.DBTX, event events.Event) (*workosOrgExternalIDUpdate, error) {
var payload workosOrganizationEventPayload
if err := json.Unmarshal(event.Data, &payload); err != nil {
return oops.Permanent(fmt.Errorf("unmarshal organization event payload: %w", err))
return nil, oops.Permanent(fmt.Errorf("unmarshal organization event payload: %w", err))
}

repo := orgrepo.New(dbtx)

resolved, err := resolveOrgForWorkOSEvent(ctx, repo, payload)
switch {
case err != nil:
return nil, err
case resolved.isNew:
if err := createOrganizationFromWorkOSEvent(ctx, repo, payload, event.ID, resolved.organizationID); err != nil {
return nil, err
}
default:
if err := updateOrganizationFromWorkOSEvent(ctx, repo, resolved.row, payload, event.ID); err != nil {
return nil, err
}
}

if resolved.needsExternalIDUpdate {
// The WorkOS write happens after commit in handleEvent so the remote
// external_id never points at an org row that failed to persist locally.
return &workosOrgExternalIDUpdate{workosOrgID: payload.ID, externalID: resolved.organizationID}, nil
}
return nil, nil
}

func resolveOrgForWorkOSEvent(ctx context.Context, repo *orgrepo.Queries, payload workosOrganizationEventPayload) (resolvedWorkOSOrganization, error) {
row, err := repo.GetOrganizationByWorkosID(ctx, conv.ToPGText(payload.ID))
switch {
case err == nil:
return resolvedWorkOSOrganization{
row: row,
organizationID: row.ID,
isNew: false,
needsExternalIDUpdate: payload.ExternalID == "",
}, nil
case errors.Is(err, pgx.ErrNoRows):
if payload.ExternalID == "" {
// Unlinked org with no external_id: skip + advance cursor so a
// later event in the same stream that does carry an external_id
// (or sets workos_id via another path) can still land. A
// permanent error here would stall the per-org workflow and
// block every subsequent event for this org.
logger.WarnContext(ctx, "skipping organization event for unlinked org with no external_id", attr.SlogWorkOSOrganizationID(payload.ID))
return nil
}
// Resolve below by external_id or by a deterministic ID derived from
// the WorkOS org ID.
case err != nil:
return fmt.Errorf("get organization by workos id %q: %w", payload.ID, err)
return resolvedWorkOSOrganization{}, fmt.Errorf("get organization for workos id %q: %w", payload.ID, err)
}

var lastEventID *string
if row.WorkosLastEventID.Valid {
lastEventID = &row.WorkosLastEventID.String
organizationID := payload.ExternalID
needsExternalIDUpdate := false
if organizationID == "" {
organizationID = orgid.FromWorkOSID(payload.ID)
needsExternalIDUpdate = true
}
var rowUpdatedAt *time.Time
if row.WorkosUpdatedAt.Valid {
rowUpdatedAt = &row.WorkosUpdatedAt.Time

row, err = repo.GetOrganizationMetadata(ctx, organizationID)
switch {
case errors.Is(err, pgx.ErrNoRows):
var newRow orgrepo.OrganizationMetadatum
return resolvedWorkOSOrganization{
row: newRow,
organizationID: organizationID,
isNew: true,
needsExternalIDUpdate: needsExternalIDUpdate,
}, nil
case err != nil:
return resolvedWorkOSOrganization{}, fmt.Errorf("get organization metadata %q: %w", organizationID, err)
default:
return resolvedWorkOSOrganization{
row: row,
organizationID: organizationID,
isNew: false,
needsExternalIDUpdate: needsExternalIDUpdate,
}, nil
}
if !ShouldProcessEvent(lastEventID, rowUpdatedAt, event.ID, payload.UpdatedAt) {
}

func createOrganizationFromWorkOSEvent(ctx context.Context, repo *orgrepo.Queries, payload workosOrganizationEventPayload, eventID string, organizationID string) error {
// A zero row makes the normal cursor check accept genuine creates while
// keeping the create path consistent if a row appears between resolve and
// insert.
var newRow orgrepo.OrganizationMetadatum
if !shouldApplyOrganizationEvent(newRow, payload, eventID) {
return nil
}

organizationID := payload.ExternalID
if err == nil {
organizationID = row.ID
slug := orgslug.Slugify(payload.Name)
if slug == "" {
return fmt.Errorf("slugify workos organization name %q: empty slug", payload.Name)
}
if err := repo.LockOrganizationSlug(ctx, slug); err != nil {
return fmt.Errorf("lock organization slug %q: %w", slug, err)
}
uniqueSlug, err := orgslug.FindUnique(ctx, repo, slug)
if err != nil {
return fmt.Errorf("find unique slug for workos organization %q: %w", payload.ID, err)
}

_, err = repo.UpsertOrganizationMetadataFromWorkOS(ctx, orgrepo.UpsertOrganizationMetadataFromWorkOSParams{
_, err = repo.CreateOrganizationMetadataFromWorkOS(ctx, orgrepo.CreateOrganizationMetadataFromWorkOSParams{
ID: organizationID,
Name: payload.Name,
Slug: conv.ToSlug(payload.Name),
Slug: uniqueSlug,
WorkosID: conv.ToPGText(payload.ID),
WorkosUpdatedAt: conv.ToPGTimestamptz(payload.UpdatedAt),
WorkosLastEventID: conv.ToPGText(event.ID),
WorkosLastEventID: conv.ToPGText(eventID),
})
if err != nil {
return fmt.Errorf("create organization %q from workos event: %w", payload.ID, err)
}
return nil
}

func updateOrganizationFromWorkOSEvent(ctx context.Context, repo *orgrepo.Queries, row orgrepo.OrganizationMetadatum, payload workosOrganizationEventPayload, eventID string) error {
if !shouldApplyOrganizationEvent(row, payload, eventID) {
return nil
}

_, err := repo.UpdateOrganizationMetadataFromWorkOS(ctx, orgrepo.UpdateOrganizationMetadataFromWorkOSParams{
ID: row.ID,
Name: payload.Name,
WorkosID: conv.ToPGText(payload.ID),
WorkosUpdatedAt: conv.ToPGTimestamptz(payload.UpdatedAt),
WorkosLastEventID: conv.ToPGText(eventID),
})
if err != nil {
return fmt.Errorf("upsert organization %q from workos event: %w", payload.ID, err)
return fmt.Errorf("update organization %q from workos event: %w", payload.ID, err)
}

return nil
}

func shouldApplyOrganizationEvent(row orgrepo.OrganizationMetadatum, payload workosOrganizationEventPayload, eventID string) bool {
sameWorkOSLink := !row.WorkosID.Valid || row.WorkosID.String == "" || row.WorkosID.String == payload.ID
if !sameWorkOSLink {
// The row is being relinked from another WorkOS org. Its stored cursor
// belongs to the previous WorkOS org, so this event must apply fresh.
return true
}

var lastEventID *string
if row.WorkosLastEventID.Valid {
lastEventID = &row.WorkosLastEventID.String
}
var rowUpdatedAt *time.Time
if row.WorkosUpdatedAt.Valid {
rowUpdatedAt = &row.WorkosUpdatedAt.Time
}
return ShouldProcessEvent(lastEventID, rowUpdatedAt, eventID, payload.UpdatedAt)
}

func handleOrganizationDeleted(ctx context.Context, logger *slog.Logger, dbtx database.DBTX, event events.Event) error {
var payload workosOrganizationEventPayload
if err := json.Unmarshal(event.Data, &payload); err != nil {
Expand Down Expand Up @@ -368,8 +482,11 @@ func upsertOrganizationRole(ctx context.Context, logger *slog.Logger, dbtx datab
if existing.WorkosLastEventID.Valid {
lastEventID = &existing.WorkosLastEventID.String
}
rowUpdatedAt := existing.WorkosUpdatedAt.Time
if !ShouldProcessEvent(lastEventID, &rowUpdatedAt, event.ID, payload.UpdatedAt) {
var rowUpdatedAt *time.Time
if existing.WorkosUpdatedAt.Valid {
rowUpdatedAt = &existing.WorkosUpdatedAt.Time
}
if !ShouldProcessEvent(lastEventID, rowUpdatedAt, event.ID, payload.UpdatedAt) {
return nil
}

Expand Down Expand Up @@ -424,8 +541,11 @@ func handleRoleDeleted(ctx context.Context, logger *slog.Logger, dbtx database.D
if existing.WorkosLastEventID.Valid {
lastEventID = &existing.WorkosLastEventID.String
}
rowUpdatedAt := existing.WorkosUpdatedAt.Time
if !ShouldProcessEvent(lastEventID, &rowUpdatedAt, event.ID, payload.UpdatedAt) {
var rowUpdatedAt *time.Time
if existing.WorkosUpdatedAt.Valid {
rowUpdatedAt = &existing.WorkosUpdatedAt.Time
}
if !ShouldProcessEvent(lastEventID, rowUpdatedAt, event.ID, payload.UpdatedAt) {
return nil
}

Expand Down
Loading
Loading