diff --git a/src/app/api/workers/move-files/route.ts b/src/app/api/workers/move-files/route.ts new file mode 100644 index 0000000..f057bd1 --- /dev/null +++ b/src/app/api/workers/move-files/route.ts @@ -0,0 +1,6 @@ +import { moveFilesToCorrectPath } from '@/features/workers/move-files/api/moveFiles.controller' +import { withErrorHandler } from '@/utils/withErrorHandler' + +export const maxDuration = 300 + +export const GET = withErrorHandler(moveFilesToCorrectPath) diff --git a/src/app/api/workers/move-folders/route.ts b/src/app/api/workers/move-folders/route.ts new file mode 100644 index 0000000..bde1991 --- /dev/null +++ b/src/app/api/workers/move-folders/route.ts @@ -0,0 +1,6 @@ +import { moveFoldersToCorrectPath } from '@/features/workers/move-files/api/moveFiles.controller' +import { withErrorHandler } from '@/utils/withErrorHandler' + +export const maxDuration = 300 + +export const GET = withErrorHandler(moveFoldersToCorrectPath) diff --git a/src/config/server.env.ts b/src/config/server.env.ts index cd2d54d..4b176b1 100644 --- a/src/config/server.env.ts +++ b/src/config/server.env.ts @@ -12,6 +12,7 @@ const ServerEnvSchema = z.object({ DROPBOX_SCOPES: z.string().min(1), DROPBOX_API_URL: z.url(), TRIGGER_MACHINE: TriggerMachineSchema, + FILE_MIGRATION_PORTAL_ID: z.string(), CRON_SECRET: z.string().min(1), }) diff --git a/src/db/migrations/20260218105316_add_root_namespace_id_in_connection_table.sql b/src/db/migrations/20260218105316_add_root_namespace_id_in_connection_table.sql new file mode 100644 index 0000000..39af421 --- /dev/null +++ b/src/db/migrations/20260218105316_add_root_namespace_id_in_connection_table.sql @@ -0,0 +1,9 @@ +CREATE TABLE "incorrect_path_files" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "portal_id" varchar(32) NOT NULL, + "file_ids" jsonb DEFAULT '[]'::jsonb NOT NULL, + "is_move_complete" boolean DEFAULT false NOT NULL, + "channel_id" varchar(255) NOT NULL, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL +); diff --git a/src/db/migrations/meta/20260218105316_snapshot.json b/src/db/migrations/meta/20260218105316_snapshot.json new file mode 100644 index 0000000..7fc2d14 --- /dev/null +++ b/src/db/migrations/meta/20260218105316_snapshot.json @@ -0,0 +1,563 @@ +{ + "id": "93803165-3714-4678-8542-25bd2240ff7a", + "prevId": "782a75df-3185-4302-9333-9ee92f0cde09", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.assembly_webhook_records": { + "name": "assembly_webhook_records", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "portal_id": { + "name": "portal_id", + "type": "varchar(32)", + "primaryKey": false, + "notNull": true + }, + "action": { + "name": "action", + "type": "assembly_webhook_events_enum", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "assembly_channel_id": { + "name": "assembly_channel_id", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "file_id": { + "name": "file_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "triggered_at": { + "name": "triggered_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "uq_all_columns_combined": { + "name": "uq_all_columns_combined", + "columns": [ + { + "expression": "portal_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "assembly_channel_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "file_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "triggered_at", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "action", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.channel_sync": { + "name": "channel_sync", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "portal_id": { + "name": "portal_id", + "type": "varchar(32)", + "primaryKey": false, + "notNull": true + }, + "dbx_account_id": { + "name": "dbx_account_id", + "type": "varchar(100)", + "primaryKey": false, + "notNull": true + }, + "assembly_channel_id": { + "name": "assembly_channel_id", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "dbx_root_path": { + "name": "dbx_root_path", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "dbx_root_id": { + "name": "dbx_root_id", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "dbx_cursor": { + "name": "dbx_cursor", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "status": { + "name": "status", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "default": false + }, + "total_files_count": { + "name": "total_files_count", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 0 + }, + "synced_files_count": { + "name": "synced_files_count", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 0 + }, + "last_synced_at": { + "name": "last_synced_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "idx_channel_sync_portal_id_dbxAccount_id_deleted_at": { + "name": "idx_channel_sync_portal_id_dbxAccount_id_deleted_at", + "columns": [ + { + "expression": "portal_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "dbx_account_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "deleted_at", + "isExpression": false, + "asc": true, + "nulls": "first" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "uq_channel_sync__channel_id_dbx_root_path": { + "name": "uq_channel_sync__channel_id_dbx_root_path", + "columns": [ + { + "expression": "assembly_channel_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "dbx_root_path", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "where": "\"channel_sync\".\"deleted_at\" is null", + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_channel_sync_portal_id_deleted_at_created_at": { + "name": "idx_channel_sync_portal_id_deleted_at_created_at", + "columns": [ + { + "expression": "portal_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "created_at", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "deleted_at", + "isExpression": false, + "asc": true, + "nulls": "first" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.dropbox_connections": { + "name": "dropbox_connections", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "portal_id": { + "name": "portal_id", + "type": "varchar(32)", + "primaryKey": false, + "notNull": true + }, + "account_id": { + "name": "account_id", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false + }, + "refresh_token": { + "name": "refresh_token", + "type": "varchar(255)", + "primaryKey": false, + "notNull": false + }, + "root_namespace_id": { + "name": "root_namespace_id", + "type": "varchar(100)", + "primaryKey": false, + "notNull": false + }, + "status": { + "name": "status", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "initiated_by": { + "name": "initiated_by", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "uq_dropbox_connections_portal_id": { + "name": "uq_dropbox_connections_portal_id", + "columns": [ + { + "expression": "portal_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.file_folder_sync": { + "name": "file_folder_sync", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "portal_id": { + "name": "portal_id", + "type": "varchar(32)", + "primaryKey": false, + "notNull": true + }, + "channel_sync_id": { + "name": "channel_sync_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "item_path": { + "name": "item_path", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "object": { + "name": "object", + "type": "object_types", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'file'" + }, + "content_hash": { + "name": "content_hash", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "dbx_file_id": { + "name": "dbx_file_id", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "assembly_file_id": { + "name": "assembly_file_id", + "type": "uuid", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "file_folder_sync_channel_sync_id_channel_sync_id_fk": { + "name": "file_folder_sync_channel_sync_id_channel_sync_id_fk", + "tableFrom": "file_folder_sync", + "tableTo": "channel_sync", + "columnsFrom": ["channel_sync_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "cascade" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.incorrect_path_files": { + "name": "incorrect_path_files", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "portal_id": { + "name": "portal_id", + "type": "varchar(32)", + "primaryKey": false, + "notNull": true + }, + "file_ids": { + "name": "file_ids", + "type": "jsonb", + "primaryKey": false, + "notNull": true, + "default": "'[]'::jsonb" + }, + "is_move_complete": { + "name": "is_move_complete", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "channel_id": { + "name": "channel_id", + "type": "varchar(255)", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.assembly_webhook_events_enum": { + "name": "assembly_webhook_events_enum", + "schema": "public", + "values": [ + "file.created", + "file.updated", + "file.deleted", + "folder.created", + "folder.updated", + "folder.deleted" + ] + }, + "public.object_types": { + "name": "object_types", + "schema": "public", + "values": ["file", "folder"] + } + }, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} diff --git a/src/db/migrations/meta/_journal.json b/src/db/migrations/meta/_journal.json index 6c48860..ee37d7c 100644 --- a/src/db/migrations/meta/_journal.json +++ b/src/db/migrations/meta/_journal.json @@ -71,6 +71,13 @@ "when": 1770117966182, "tag": "20260203112606_add-uq-to-channel-sync", "breakpoints": false + }, + { + "idx": 10, + "version": "7", + "when": 1771411996739, + "tag": "20260218105316_add_root_namespace_id_in_connection_table", + "breakpoints": false } ] } diff --git a/src/db/schema/incorrectPathFiles.schema.ts b/src/db/schema/incorrectPathFiles.schema.ts new file mode 100644 index 0000000..4c47611 --- /dev/null +++ b/src/db/schema/incorrectPathFiles.schema.ts @@ -0,0 +1,24 @@ +import { type InferInsertModel, type InferSelectModel, sql } from 'drizzle-orm' +import { boolean, jsonb, pgTable, uuid, varchar } from 'drizzle-orm/pg-core' +import { createInsertSchema, createUpdateSchema } from 'drizzle-zod' +import type z from 'zod' +import { timestamps } from '@/db/db.helpers' + +export const incorrectPathFiles = pgTable('incorrect_path_files', { + id: uuid().primaryKey().notNull().defaultRandom(), + portalId: varchar({ length: 32 }).notNull(), // Workspace ID / Portal ID in Copilot + fileIds: jsonb().$type().notNull().default(sql`'[]'::jsonb`), + isMoveComplete: boolean().notNull().default(false), + channelId: varchar({ length: 255 }).notNull(), + ...timestamps, +}) + +export const IncorrectPathCreateSchema = createInsertSchema(incorrectPathFiles) +export type IncorrectPathFilesCreateType = InferInsertModel +export type IncorrectPathFilesSelectType = InferSelectModel + +export const IncorrectPathUpdatePayloadSchema = createUpdateSchema(incorrectPathFiles).omit({ + id: true, + portalId: true, +}) +export type IncorrectPathUpdatePayload = z.infer diff --git a/src/db/schema/index.ts b/src/db/schema/index.ts index 8ad3a88..3e7784c 100644 --- a/src/db/schema/index.ts +++ b/src/db/schema/index.ts @@ -1,11 +1,13 @@ import { channelSync } from '@/db/schema/channelSync.schema' import { dropboxConnections } from '@/db/schema/dropboxConnections.schema' import { fileFolderSync } from '@/db/schema/fileFolderSync.schema' +import { incorrectPathFiles } from '@/db/schema/incorrectPathFiles.schema' import { relations } from '@/db/schema/relations' export const schema = { dropboxConnections, channelSync, fileFolderSync, - relations, + incorrectPathFiles, + ...relations, } diff --git a/src/features/workers/move-files/api/moveFiles.controller.ts b/src/features/workers/move-files/api/moveFiles.controller.ts new file mode 100644 index 0000000..59b3133 --- /dev/null +++ b/src/features/workers/move-files/api/moveFiles.controller.ts @@ -0,0 +1,27 @@ +import httpStatus from 'http-status' +import { type NextRequest, NextResponse } from 'next/server' +import env from '@/config/server.env' +import APIError from '@/errors/APIError' +import { MoveFilesService } from '@/features/workers/move-files/lib/moveFiles.service' + +export const moveFilesToCorrectPath = async (request: NextRequest) => { + const authHeader = request.headers.get('authorization') + if (authHeader !== `Bearer ${env.CRON_SECRET}`) { + throw new APIError('Unauthorized', httpStatus.UNAUTHORIZED) + } + + const resyncService = new MoveFilesService() + await resyncService.initiateFileMove() + return NextResponse.json({ message: 'Successfully triggered the folder-move process.' }) +} + +export const moveFoldersToCorrectPath = async (request: NextRequest) => { + const authHeader = request.headers.get('authorization') + if (authHeader !== `Bearer ${env.CRON_SECRET}`) { + throw new APIError('Unauthorized', httpStatus.UNAUTHORIZED) + } + + const resyncService = new MoveFilesService() + await resyncService.initiateFolderMove() + return NextResponse.json({ message: 'Succeslyfully triggered the file-move process.' }) +} diff --git a/src/features/workers/move-files/lib/moveFiles.service.ts b/src/features/workers/move-files/lib/moveFiles.service.ts new file mode 100644 index 0000000..eeb23ed --- /dev/null +++ b/src/features/workers/move-files/lib/moveFiles.service.ts @@ -0,0 +1,451 @@ +import { and, asc, eq, gte, ne } from 'drizzle-orm' +import httpStatus from 'http-status' +import { NextResponse } from 'next/server' +import { ApiError } from 'node_modules/copilot-node-sdk/dist/codegen/api' +import fetch from 'node-fetch' +import env from '@/config/server.env' +import db from '@/db' +import { ObjectType } from '@/db/constants' +import { channelSync } from '@/db/schema/channelSync.schema' +import { type FileSyncSelectType, fileFolderSync } from '@/db/schema/fileFolderSync.schema' +import { + type IncorrectPathFilesSelectType, + type IncorrectPathUpdatePayload, + incorrectPathFiles, +} from '@/db/schema/incorrectPathFiles.schema' +import APIError from '@/errors/APIError' +import { copilotBottleneck } from '@/lib/copilot/bottleneck' +import { CopilotAPI } from '@/lib/copilot/CopilotAPI' +import { generateToken } from '@/lib/copilot/generateToken' +import type { CopilotFileRetrieve } from '@/lib/copilot/types' +import { withRetry } from '@/lib/withRetry' +import { startFileMoveProcess } from '@/trigger/moveIncorrectPathFiles' +import { getFaultyPath } from '@/utils/filePath' + +const targetDate = new Date('2026-02-04T01:21:00Z') +const targetDate2 = new Date('2026-02-19T00:00:00Z') +const portalId = env.FILE_MIGRATION_PORTAL_ID + +export class MoveFilesService { + private async getPortalInfo(portalId: string) { + const portalInfo = await db.query.dropboxConnections.findFirst({ + where: (dropboxConnections, { eq }) => eq(dropboxConnections.portalId, portalId), + }) + if (!portalInfo) throw new APIError('Portal not found', httpStatus.NOT_FOUND) + return portalInfo + } + + private async getOrCreateMoveFilesForPortal(portalId: string, channelId: string) { + const record = await db.query.incorrectPathFiles.findFirst({ + where: (incorrectPathFiles, { eq, and }) => + and(eq(incorrectPathFiles.portalId, portalId), eq(incorrectPathFiles.channelId, channelId)), + }) + + if (record) return record + + const [newRecord] = await db + .insert(incorrectPathFiles) + .values({ portalId, channelId, isMoveComplete: false }) + .returning() + + return newRecord + } + + async initiateFolderMove() { + if (!portalId) { + console.error('Portal id is not set') + return NextResponse.json({ error: 'Portal id is not set' }, { status: 400 }) + } + + // 1. get all faulty folders + const faultyFolders = await db.query.fileFolderSync.findMany({ + where: (fileFolderSync, { eq, and, gt, lt }) => + and( + eq(fileFolderSync.portalId, portalId), + eq(fileFolderSync.object, ObjectType.FOLDER), + gt(fileFolderSync.updatedAt, targetDate), + lt(fileFolderSync.updatedAt, targetDate2), + ), + with: { + channel: { + columns: { + id: true, + assemblyChannelId: true, + }, + }, + }, + orderBy: [asc(fileFolderSync.createdAt)], + }) + const faultyChannelSync = [ + ...new Set(faultyFolders.map((folder) => folder.channel.assemblyChannelId)), + ] + + const channelSyncs = await db.query.channelSync.findMany({ + where: (channelSync, { eq, and, inArray }) => + and( + eq(channelSync.portalId, portalId), + eq(channelSync.status, true), + inArray(channelSync.assemblyChannelId, faultyChannelSync), + ), + }) + console.info({ + faultyFolders, + channelSyncsCount: channelSyncs.length, + }) + + // 2. get all files from assembly + + if (!portalId) { + console.error('Portal id is not set') + return NextResponse.json({ error: 'Portal id is not set' }, { status: 400 }) + } + + const portalInfo = await this.getPortalInfo(portalId) + const token = generateToken(env.COPILOT_API_KEY, { + workspaceId: portalId, + internalUserId: portalInfo.initiatedBy, + }) + + if (!token) throw new APIError('Failed to generate token', httpStatus.INTERNAL_SERVER_ERROR) + const copilotApi = new CopilotAPI(token) + + const allPromises = [] + const allFilesPath: string[] = [] + for (const channel of channelSyncs) { + // 1. get all files from assembly + const promise = copilotBottleneck.schedule(() => { + return this.getFilesForChannelFromAssembly( + channel.assemblyChannelId, + copilotApi, + allFilesPath, + ) + }) + allPromises.push(promise) + } + await Promise.all(allPromises) + console.info(`all paths count: ${allFilesPath.length}`) + + const newPromises = [] + // check if folder exists, if not create + for (const faultyFolder of faultyFolders) { + if (!faultyFolder.itemPath) continue + + if ( + allFilesPath.includes(`${faultyFolder.itemPath}+${faultyFolder.channel.assemblyChannelId}`) + ) + continue + + console.info(`Processing for folder: ${faultyFolder.itemPath}`) + + const newPromise = copilotBottleneck.schedule(() => { + return this.checkAndCreateFolder( + faultyFolder, + faultyFolder.channel.assemblyChannelId, + copilotApi, + ) + }) + newPromises.push(newPromise) + } + await Promise.all(newPromises) + + return NextResponse.json({ success: true }) + } + + private async checkAndCreateFolder( + faultyFolder: FileSyncSelectType, + channelId: string, + copilotApi: CopilotAPI, + ) { + const faultyFolderName = getFaultyPath(faultyFolder.itemPath as string) + console.info( + `\n\n checkAndCreateFolder. Folder path: ${faultyFolder.itemPath}. Folder id: ${faultyFolder.assemblyFileId}. Faulty folder name: ${faultyFolderName} \n\n`, + ) + + if (!faultyFolder.itemPath || !faultyFolder.assemblyFileId) return + + // next step: create folder in assembly + try { + const folderCreateResponse = await copilotApi.createFile( + faultyFolder.itemPath, + channelId, + ObjectType.FOLDER, + ) + console.info(`checkAndCreateFolder. Folder created. Folder ID: ${folderCreateResponse.id}`) + + console.info( + `checkAndCreateFolder. Deleting faulty folder. Folder ID: ${faultyFolder.assemblyFileId}\n`, + ) + await copilotApi.deleteFile(faultyFolder.assemblyFileId) + + // update folder id in db + await db + .update(fileFolderSync) + .set({ assemblyFileId: folderCreateResponse.id }) + .where(eq(fileFolderSync.id, faultyFolder.id)) + } catch (error) { + console.error(error) + console.error({ err: JSON.stringify(error) }) + const sError = error as ApiError + if (sError.status === 400 && sError.body.message === 'Folder already exists') { + console.info('Folder already exists. Attempt to delete the older one') + // await copilotApi.deleteFile(faultyFolder.assemblyFileId) + return + } + console.info('Exists') + return + } + } + + private async getFilesForChannelFromAssembly( + channelId: string, + copilotApi: CopilotAPI, + allFilesPath: string[], + ) { + console.info(`getFilesForChannelFromAssembly. Channel ID: ${channelId}`) + const files = await copilotApi.listFiles(channelId, undefined, 1500) + const paths = files.data.map((file) => `/${file.path}+${channelId}`) + allFilesPath.push(...paths) + return allFilesPath + } + + async initiateFileMove() { + if (!portalId) { + console.error('Portal id is not set') + return NextResponse.json({ error: 'Portal id is not set' }, { status: 400 }) + } + + const portalInfo = await this.getPortalInfo(portalId) + const token = generateToken(env.COPILOT_API_KEY, { + workspaceId: portalId, + internalUserId: portalInfo.initiatedBy, + }) + + if (!token) throw new APIError('Failed to generate token', httpStatus.INTERNAL_SERVER_ERROR) + const copilotApi = new CopilotAPI(token) + + const processedChannels = await db.query.incorrectPathFiles.findMany({ + where: (incorrectPathFiles, { eq, and }) => + and(eq(incorrectPathFiles.portalId, portalId), eq(incorrectPathFiles.isMoveComplete, true)), + }) + const mappedProcessedhannels = processedChannels.map((channel) => channel.channelId) + + const channelsToProcess = await db.query.channelSync.findMany({ + where: (channelSync, { eq, and, notInArray }) => + and( + gte(channelSync.updatedAt, targetDate), + eq(channelSync.portalId, portalId), + notInArray(channelSync.assemblyChannelId, mappedProcessedhannels), + ne(channelSync.id, '4267a14d-b2f1-4ae3-8111-2e634675982e'), + ), + orderBy: [asc(channelSync.syncedFilesCount)], + }) + + for (const channel of channelsToProcess) { + console.info(`\n\n\n ##### New channel ${channel.assemblyChannelId} #####\n`) + const movedFiles = await this.getOrCreateMoveFilesForPortal( + portalId, + channel.assemblyChannelId, + ) + if (movedFiles.isMoveComplete) continue + + let loop = 5 + + console.info({ channelKoRecord: channel }) + + const tempidArray = movedFiles.fileIds + while (loop > 0) { + const mapFiles = await db.query.fileFolderSync.findMany({ + where: (fileFolderSync, { eq, and, isNull, notInArray }) => + and( + gte(fileFolderSync.updatedAt, targetDate), + eq(fileFolderSync.channelSyncId, channel.id), + isNull(fileFolderSync.deletedAt), + notInArray(fileFolderSync.id, tempidArray), + eq(fileFolderSync.object, ObjectType.FILE), + ), + orderBy: [asc(fileFolderSync.createdAt)], + limit: 100, + }) + + if (!mapFiles.length) { + await this.updateIncorrectPathFilesTable(portalId, channel.assemblyChannelId, { + isMoveComplete: true, + }) + break + } + + const mapFileIds = mapFiles.map((file) => file.id).filter((item) => item !== null) + + tempidArray.push(...mapFileIds) + + console.info({ mapFiles, tempidArray }) + + // actual sync process + const allFilesForChannel = await copilotApi.listFiles( + channel.assemblyChannelId, + undefined, + 1500, // custom limit + ) + + // trigger function to batch process files + startFileMoveProcess.trigger({ + channel, + allFilesForChannel, + mapFiles, + token, + movedFiles, + }) + + loop-- + } + console.info(`\n\n\n ##### Compelte${channel.assemblyChannelId} #####\n`) + + break + } + + return NextResponse.json({ success: true }) + } + + async updateIncorrectPathFilesTable( + portalId: string, + channelId: string, + payload: IncorrectPathUpdatePayload, + ) { + await db + .update(incorrectPathFiles) + .set(payload) + .where( + and(eq(incorrectPathFiles.portalId, portalId), eq(incorrectPathFiles.channelId, channelId)), + ) + } + + async moveFileToCorrectPath({ + pathToUpload, + existingFile, + copilotApi, + fileFolderid, + movedFiles, + }: { + pathToUpload: string + existingFile: CopilotFileRetrieve + copilotApi: CopilotAPI + fileFolderid: string + movedFiles: IncorrectPathFilesSelectType + }) { + if (existingFile.downloadUrl) { + const existingIds = movedFiles.fileIds + // create file/folder + try { + const fileCreateResponse = await copilotApi.createFile( + pathToUpload, + existingFile.channelId, + ObjectType.FILE, + ) + if (!fileCreateResponse.uploadUrl) { + console.error('\nFailed to upload file to Assembly. No upload url\n') + return + } + + try { + existingIds.push(fileFolderid) + + // transfer file + const success = await this.transferFile( + existingFile.downloadUrl, + fileCreateResponse.uploadUrl, + ) + console.info( + `\n\n\nmoveFilesService#movefileToCorrectPath. File upload success: ${success}. Uploaded file to path: ${fileCreateResponse.path}. New id: ${fileCreateResponse.id}`, + ) + + console.info( + `updating file in db with assembly id: ${fileCreateResponse.id}. Map file id: ${fileFolderid}. Older file id: ${existingFile.id}`, + ) + await db + .update(fileFolderSync) + .set({ assemblyFileId: fileCreateResponse.id }) + .where(eq(fileFolderSync.id, fileFolderid)) + + await this.updateIncorrectPathFilesTable(movedFiles.portalId, movedFiles.channelId, { + fileIds: existingIds, + }) + + console.info( + `moveFilesService#movefileToCorrectPath. Deleting file. File ID: ${existingFile.id}`, + ) + await copilotApi.deleteFile(existingFile.id) + + console.info( + `\nmoveFilesService#movefileToCorrectPath. New file Id: ${fileCreateResponse.id}. Type: ${ObjectType.FILE}. File ID: ${fileFolderid} \n\n\n`, + ) + } catch (error) { + console.error(error) + console.error({ err: JSON.stringify(error) }) + + // delete new file created if file transfer fails + await copilotApi.deleteFile(fileCreateResponse.id) + return + } + } catch (error: unknown) { + if ( + error instanceof ApiError && + error.status === 400 && + error.body.message === 'Parent folder does not exist' + ) { + console.error( + `\nParent folder does not exist. Skipping file with map id: ${fileFolderid}\n`, + ) + existingIds.push(fileFolderid) + await this.updateIncorrectPathFilesTable(movedFiles.portalId, movedFiles.channelId, { + fileIds: existingIds, + }) + } + console.error('Error occured while moving file', error) + console.error({ err: JSON.stringify(error) }) + } + } + } + + private async _transferFile(downloadUrl: string, uploadUrl: string) { + // Step 1: download + const downloadRes = await fetch(downloadUrl) + + if (!downloadRes.ok) { + const body = await downloadRes.text() + console.error({ error: body }) + throw new Error('Download failed') + } + + const contentLength = downloadRes.headers.get('content-length') + + if (!contentLength) { + throw new Error('Missing content-length') + } + + // Step 2: upload + const uploadRes = await fetch(uploadUrl, { + method: 'PUT', // presigned upload URLs use PUT + body: downloadRes.body, + headers: { + 'Content-Type': downloadRes.headers.get('content-type') || 'application/octet-stream', + 'Content-Length': contentLength, + }, + }) + + if (!uploadRes.ok) { + const body = await uploadRes.text() + console.error({ error: body }) + throw new Error('Upload failed') + } + + return true + } + + private wrapWithRetry( + fn: (...args: Args) => Promise, + ): (...args: Args) => Promise { + return (...args: Args): Promise => withRetry(fn.bind(this), args) + } + + transferFile = this.wrapWithRetry(this._transferFile) +} diff --git a/src/trigger/moveIncorrectPathFiles.ts b/src/trigger/moveIncorrectPathFiles.ts new file mode 100644 index 0000000..d2174cd --- /dev/null +++ b/src/trigger/moveIncorrectPathFiles.ts @@ -0,0 +1,94 @@ +import { logger, task } from '@trigger.dev/sdk' +import env from '@/config/server.env' +import type { ChannelSyncSelectType } from '@/db/schema/channelSync.schema' +import type { FileSyncSelectType } from '@/db/schema/fileFolderSync.schema' +import type { IncorrectPathFilesSelectType } from '@/db/schema/incorrectPathFiles.schema' +import { MoveFilesService } from '@/features/workers/move-files/lib/moveFiles.service' +import { copilotBottleneck } from '@/lib/copilot/bottleneck' +import { CopilotAPI } from '@/lib/copilot/CopilotAPI' +import type { CopilotFileList } from '@/lib/copilot/types' +import { sanitizeFileNameForAssembly } from '@/utils/filePath' + +export const startFileMoveProcess = task({ + id: 'incorrect-path-file-move-process', + machine: env.TRIGGER_MACHINE, + queue: { + name: 'incorrect-path-file-move-process', + concurrencyLimit: 5, + }, + run: async (payload: { + channel: ChannelSyncSelectType + allFilesForChannel: CopilotFileList + mapFiles: FileSyncSelectType[] + token: string + movedFiles: IncorrectPathFilesSelectType + }) => { + const { channel, allFilesForChannel, mapFiles, token, movedFiles } = payload + console.info( + `moveIncorrectPathFiles#startFileMoveProcess, Process start for channel ID: ${channel.assemblyChannelId}`, + ) + + const filePaths = allFilesForChannel.data.map((file) => `/${file.path}`) + const filePromises = [] + const copilotApi = new CopilotAPI(token) + const moveFileService = new MoveFilesService() + + for (const mapFile of mapFiles) { + if (!mapFile.itemPath || filePaths.includes(sanitizeFileNameForAssembly(mapFile.itemPath))) { + !mapFile.itemPath + ? console.info('Skipping file. No path:', mapFile.id, '\n') + : console.info('Skipping file. Already in correct path:', mapFile.itemPath, '\n') + + const existingIds = movedFiles.fileIds + existingIds.push(mapFile.id) + + await moveFileService.updateIncorrectPathFilesTable( + movedFiles.portalId, + movedFiles.channelId, + { + fileIds: existingIds, + }, + ) + continue + } + + const fileDetail = allFilesForChannel.data.find((file) => file.id === mapFile.assemblyFileId) + + if (!fileDetail || fileDetail.status === 'pending') { + console.error('File not found in Copilot. Skipping file:', mapFile.itemPath, '\n') + + // logic to upload in pending status and update file folder sync table + + const existingIds = movedFiles.fileIds + existingIds.push(mapFile.id) + + await moveFileService.updateIncorrectPathFilesTable( + movedFiles.portalId, + movedFiles.channelId, + { + fileIds: existingIds, + }, + ) + continue + } + + logger.info( + `file to transfer: id: ${mapFile.id} dbx_id: ${mapFile.dbxFileId} contentHash: ${mapFile.contentHash} assembly id: ${mapFile.assemblyFileId} \n`, + mapFile, + ) + console.info(`file to transfer: ${mapFile.itemPath}\n`) + + const prm = copilotBottleneck.schedule(() => { + return moveFileService.moveFileToCorrectPath({ + pathToUpload: mapFile.itemPath as string, + existingFile: fileDetail, + copilotApi, + fileFolderid: mapFile.id, + movedFiles, + }) + }) + filePromises.push(prm) + } + await Promise.all(filePromises) + }, +})