diff --git a/src/config/server.env.ts b/src/config/server.env.ts index cd2d54d..22eb962 100644 --- a/src/config/server.env.ts +++ b/src/config/server.env.ts @@ -13,6 +13,7 @@ const ServerEnvSchema = z.object({ DROPBOX_API_URL: z.url(), TRIGGER_MACHINE: TriggerMachineSchema, CRON_SECRET: z.string().min(1), + WEBHOOK_CATCHUP_CRON: z.string().min(1), }) const env = ServerEnvSchema.parse(process.env) diff --git a/src/db/migrations/20260306080842_add_webhook_debounce_columns_to_dropbox_connections.sql b/src/db/migrations/20260306080842_add_webhook_debounce_columns_to_dropbox_connections.sql new file mode 100644 index 0000000..996a474 --- /dev/null +++ b/src/db/migrations/20260306080842_add_webhook_debounce_columns_to_dropbox_connections.sql @@ -0,0 +1 @@ +ALTER TABLE "dropbox_connections" ADD COLUMN "pending_webhook" boolean DEFAULT false NOT NULL, ADD COLUMN "last_webhook_synced_at" timestamp with time zone; \ No newline at end of file diff --git a/src/db/migrations/meta/20260306080842_snapshot.json b/src/db/migrations/meta/20260306080842_snapshot.json new file mode 100644 index 0000000..07446d8 --- /dev/null +++ b/src/db/migrations/meta/20260306080842_snapshot.json @@ -0,0 +1,516 @@ +{ + "id": "958423be-b57a-492f-b6da-10abc12a8cf3", + "prevId": "782a75df-3185-4302-9333-9ee92f0cde09", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.assembly_webhook_records": { + "name": "assembly_webhook_records", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "portal_id": { + "name": "portal_id", + "type": "varchar(32)", + "primaryKey": false, + "notNull": true + }, + "action": { + "name": "action", + "type": "assembly_webhook_events_enum", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "assembly_channel_id": { + "name": "assembly_channel_id", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "file_id": { + "name": "file_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "triggered_at": { + "name": "triggered_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "uq_all_columns_combined": { + "name": "uq_all_columns_combined", + "columns": [ + { + "expression": "portal_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "assembly_channel_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "file_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "triggered_at", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "action", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.channel_sync": { + "name": "channel_sync", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "portal_id": { + "name": "portal_id", + "type": "varchar(32)", + "primaryKey": false, + "notNull": true + }, + "dbx_account_id": { + "name": "dbx_account_id", + "type": "varchar(100)", + "primaryKey": false, + "notNull": true + }, + "assembly_channel_id": { + "name": "assembly_channel_id", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "dbx_root_path": { + "name": "dbx_root_path", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "dbx_root_id": { + "name": "dbx_root_id", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "dbx_cursor": { + "name": "dbx_cursor", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "status": { + "name": "status", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "default": false + }, + "total_files_count": { + "name": "total_files_count", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 0 + }, + "synced_files_count": { + "name": "synced_files_count", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 0 + }, + "last_synced_at": { + "name": "last_synced_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "idx_channel_sync_portal_id_dbxAccount_id_deleted_at": { + "name": "idx_channel_sync_portal_id_dbxAccount_id_deleted_at", + "columns": [ + { + "expression": "portal_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "dbx_account_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "deleted_at", + "isExpression": false, + "asc": true, + "nulls": "first" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "uq_channel_sync__channel_id_dbx_root_path": { + "name": "uq_channel_sync__channel_id_dbx_root_path", + "columns": [ + { + "expression": "assembly_channel_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "dbx_root_path", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "where": "\"channel_sync\".\"deleted_at\" is null", + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_channel_sync_portal_id_deleted_at_created_at": { + "name": "idx_channel_sync_portal_id_deleted_at_created_at", + "columns": [ + { + "expression": "portal_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "created_at", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "deleted_at", + "isExpression": false, + "asc": true, + "nulls": "first" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.dropbox_connections": { + "name": "dropbox_connections", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "portal_id": { + "name": "portal_id", + "type": "varchar(32)", + "primaryKey": false, + "notNull": true + }, + "account_id": { + "name": "account_id", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false + }, + "refresh_token": { + "name": "refresh_token", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false + }, + "root_namespace_id": { + "name": "root_namespace_id", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false + }, + "status": { + "name": "status", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "pending_webhook": { + "name": "pending_webhook", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "last_webhook_synced_at": { + "name": "last_webhook_synced_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + }, + "initiated_by": { + "name": "initiated_by", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "uq_dropbox_connections_portal_id": { + "name": "uq_dropbox_connections_portal_id", + "columns": [ + { + "expression": "portal_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.file_folder_sync": { + "name": "file_folder_sync", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "portal_id": { + "name": "portal_id", + "type": "varchar(32)", + "primaryKey": false, + "notNull": true + }, + "channel_sync_id": { + "name": "channel_sync_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "item_path": { + "name": "item_path", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "object": { + "name": "object", + "type": "object_types", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'file'" + }, + "content_hash": { + "name": "content_hash", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "dbx_file_id": { + "name": "dbx_file_id", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "assembly_file_id": { + "name": "assembly_file_id", + "type": "uuid", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "file_folder_sync_channel_sync_id_channel_sync_id_fk": { + "name": "file_folder_sync_channel_sync_id_channel_sync_id_fk", + "tableFrom": "file_folder_sync", + "tableTo": "channel_sync", + "columnsFrom": ["channel_sync_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "cascade" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.assembly_webhook_events_enum": { + "name": "assembly_webhook_events_enum", + "schema": "public", + "values": [ + "file.created", + "file.updated", + "file.deleted", + "folder.created", + "folder.updated", + "folder.deleted" + ] + }, + "public.object_types": { + "name": "object_types", + "schema": "public", + "values": ["file", "folder"] + } + }, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} diff --git a/src/db/migrations/meta/_journal.json b/src/db/migrations/meta/_journal.json index 6c48860..ff1b5e9 100644 --- a/src/db/migrations/meta/_journal.json +++ b/src/db/migrations/meta/_journal.json @@ -71,6 +71,13 @@ "when": 1770117966182, "tag": "20260203112606_add-uq-to-channel-sync", "breakpoints": false + }, + { + "idx": 10, + "version": "7", + "when": 1772784522115, + "tag": "20260306080842_add_webhook_debounce_columns_to_dropbox_connections", + "breakpoints": false } ] } diff --git a/src/db/schema/dropboxConnections.schema.ts b/src/db/schema/dropboxConnections.schema.ts index 6ab3f0b..065c4ee 100644 --- a/src/db/schema/dropboxConnections.schema.ts +++ b/src/db/schema/dropboxConnections.schema.ts @@ -1,4 +1,4 @@ -import { boolean, pgTable, uniqueIndex, uuid, varchar } from 'drizzle-orm/pg-core' +import { boolean, pgTable, timestamp, uniqueIndex, uuid, varchar } from 'drizzle-orm/pg-core' import { createInsertSchema, createSelectSchema, createUpdateSchema } from 'drizzle-zod' import type z from 'zod' import { timestamps } from '@/db/db.helpers' @@ -23,6 +23,12 @@ export const dropboxConnections = pgTable( // Connection status status: boolean().notNull().default(false), + // Flag to indicate a Dropbox webhook was deferred (debounced) and needs processing by cron + pendingWebhook: boolean().default(false).notNull(), + + // Timestamp of last completed webhook-triggered sync for this account + lastWebhookSyncedAt: timestamp({ withTimezone: true, mode: 'date' }), + // Copilot internalUserId that initiated the connection initiatedBy: uuid().notNull(), diff --git a/src/features/webhook/dropbox/api/webhook.controller.ts b/src/features/webhook/dropbox/api/webhook.controller.ts index 12d19f2..ec8cbec 100644 --- a/src/features/webhook/dropbox/api/webhook.controller.ts +++ b/src/features/webhook/dropbox/api/webhook.controller.ts @@ -2,7 +2,7 @@ import crypto from 'node:crypto' import status from 'http-status' import { type NextRequest, NextResponse } from 'next/server' import env from '@/config/server.env' -import { processDropboxChanges } from '@/trigger/processFileSync' +import { DropboxWebhook } from '@/features/webhook/dropbox/lib/webhook.service' import { sleep } from '@/utils/sleep' export const handleWebhookUrlVerification = (req: NextRequest) => { @@ -48,13 +48,8 @@ export const handleWebhookEvents = async (req: NextRequest) => { const { list_folder } = JSON.parse(body) const accounts = list_folder?.accounts ?? [] - await Promise.all( - accounts.map((account: string) => - processDropboxChanges.trigger(account, { - concurrencyKey: account, - }), - ), - ) + const dropboxWebhook = new DropboxWebhook() + await dropboxWebhook.handleDropboxEvents(accounts) // Dropbox expects a 200 OK with plain text body return new NextResponse('', { diff --git a/src/features/webhook/dropbox/lib/webhook.service.ts b/src/features/webhook/dropbox/lib/webhook.service.ts index f3fc5d4..02a9515 100644 --- a/src/features/webhook/dropbox/lib/webhook.service.ts +++ b/src/features/webhook/dropbox/lib/webhook.service.ts @@ -5,7 +5,10 @@ import z from 'zod' import env from '@/config/server.env' import db from '@/db' import { type ChannelSyncSelectType, channelSync } from '@/db/schema/channelSync.schema' -import type { DropboxConnectionTokens } from '@/db/schema/dropboxConnections.schema' +import { + type DropboxConnectionTokens, + dropboxConnections, +} from '@/db/schema/dropboxConnections.schema' import APIError from '@/errors/APIError' import { MapFilesService } from '@/features/sync/lib/MapFiles.service' import type { DropboxFileListFolderResultEntries } from '@/features/sync/types' @@ -14,9 +17,43 @@ import { generateToken } from '@/lib/copilot/generateToken' import User from '@/lib/copilot/models/User.model' import { DropboxClient } from '@/lib/dropbox/DropboxClient' import logger from '@/lib/logger' -import { handleChannelFileChanges } from '@/trigger/processFileSync' +import { handleChannelFileChanges, processDropboxChanges } from '@/trigger/processFileSync' + +const DEBOUNCE_WINDOW_MS = 5 * 60 * 1000 // 5 minutes export class DropboxWebhook { + async handleDropboxEvents(accounts: string[]) { + for (const account of accounts) { + const connection = await db.query.dropboxConnections.findFirst({ + where: (t, { eq, and }) => and(eq(t.accountId, account), eq(t.status, true)), + columns: { id: true, pendingWebhook: true, lastWebhookSyncedAt: true }, + }) + + if (!connection) continue + + // Skip if already pending — cron will handle it + if (connection.pendingWebhook) { + console.info(`Webhook skipped for account ${account}, already has pending webhook`) + continue + } + + // Debounce: if the account was synced recently, defer to cron + const debounceThreshold = new Date(Date.now() - DEBOUNCE_WINDOW_MS) + const recentlySynced = + connection.lastWebhookSyncedAt && connection.lastWebhookSyncedAt >= debounceThreshold + + if (recentlySynced) { + await db + .update(dropboxConnections) + .set({ pendingWebhook: true }) + .where(eq(dropboxConnections.id, connection.id)) + console.info(`Webhook debounced for account ${account}, marked as pending`) + } else { + await processDropboxChanges.trigger(account, { concurrencyKey: account }) + } + } + } + async fetchDropBoxChanges(accountId: string) { const connection = await this.getActiveConnection(accountId) @@ -58,6 +95,17 @@ export class DropboxWebhook { connectionToken, )) } + + // Clear pending webhook flag after all channels for this account are processed + await db + .update(dropboxConnections) + .set({ pendingWebhook: false, lastWebhookSyncedAt: new Date() }) + .where( + and( + eq(dropboxConnections.accountId, accountId), + eq(dropboxConnections.portalId, connection.portalId), + ), + ) } async getDropboxFileMetadata(filePath: string, dbxClient: Dropbox) { @@ -166,7 +214,7 @@ export class DropboxWebhook { mapFilesService, channelSyncId, ) - if (!dbxChanges) continue + if (!dbxChanges) break const { entries, newCursor, hasMore: more } = dbxChanges @@ -176,7 +224,7 @@ export class DropboxWebhook { } if (allChanges.length > 0) { - await handleChannelFileChanges.trigger({ + await handleChannelFileChanges.triggerAndWait({ files: allChanges, channelSyncId, dbxRootPath, diff --git a/src/trigger/processFileSync.ts b/src/trigger/processFileSync.ts index dadf47e..81abf23 100644 --- a/src/trigger/processFileSync.ts +++ b/src/trigger/processFileSync.ts @@ -261,7 +261,7 @@ export const handleChannelFileChanges = task({ const existingFile = mappedFiles.find((item) => item.dbxFileId === file.id) if (existingFile?.contentHash && existingFile.contentHash !== file.content_hash) { - await updateDropboxFileInAssembly.trigger({ + await updateDropboxFileInAssembly.triggerAndWait({ opts: { dbxRootPath, assemblyChannelId, diff --git a/src/trigger/scheduledTasks.ts b/src/trigger/scheduledTasks.ts new file mode 100644 index 0000000..d0531df --- /dev/null +++ b/src/trigger/scheduledTasks.ts @@ -0,0 +1,33 @@ +import { schedules } from '@trigger.dev/sdk/v3' +import { and, eq } from 'drizzle-orm' +import env from '@/config/server.env' +import db from '@/db' +import { dropboxConnections } from '@/db/schema/dropboxConnections.schema' +import { processDropboxChanges } from '@/trigger/processFileSync' + +export const pendingWebhookCatchUp = schedules.task({ + id: 'pending-webhook-catch-up', + cron: env.WEBHOOK_CATCHUP_CRON, + run: async () => { + console.info('Catch-up cron: triggering sync for all pending webhooks') + + const pendingConnections = await db + .select({ accountId: dropboxConnections.accountId }) + .from(dropboxConnections) + .where(and(eq(dropboxConnections.pendingWebhook, true), eq(dropboxConnections.status, true))) + + if (pendingConnections.length === 0) return + + console.info( + `Catch-up cron: triggering sync for ${pendingConnections.length} account(s) with pending webhooks`, + ) + + await Promise.all( + pendingConnections + .filter((c): c is { accountId: string } => c.accountId !== null) + .map(({ accountId }) => + processDropboxChanges.trigger(accountId, { concurrencyKey: accountId }), + ), + ) + }, +})