diff --git a/apps/web/app/(app)/[organization]/connections/api.ts b/apps/web/app/(app)/[organization]/connections/api.ts index 6d5d5c64..b23443e0 100644 --- a/apps/web/app/(app)/[organization]/connections/api.ts +++ b/apps/web/app/(app)/[organization]/connections/api.ts @@ -1,15 +1,12 @@ import { isSuccess } from '@/lib/result'; import type { ResponseObject } from '@dory/shared'; import { ConnectionListItem, CreateConnectionPayload } from '@dory/shared/types/connections'; +import type { LocalFilesCreateRequest, LocalFilesInspectRequest, LocalFilesInspectResponse } from '@dory/shared/types/local-files'; import { authFetch } from '@/lib/client/auth-fetch'; import { translate } from '@dory/i18n/translate'; import { getClientLocale } from '@dory/i18n/client'; -async function fetchJsonResponse( - input: RequestInfo, - init: RequestInit, - errorMessage: string, -): Promise> { +async function fetchJsonResponse(input: RequestInfo, init: RequestInit, errorMessage: string): Promise> { const response = await authFetch(input, init); const result = await response.json().catch(e => { console.error('Failed to parse JSON response', e); @@ -30,7 +27,6 @@ function translateConnectionsApi(key: string) { return translate(getClientLocale(), key); } - export async function addConnection(params: CreateConnectionPayload): Promise> { const res = await fetchJsonResponse( '/api/connection', @@ -45,10 +41,7 @@ export async function addConnection(params: CreateConnectionPayload): Promise> { +export async function updateConnection(params: CreateConnectionPayload & { id?: string }): Promise> { const id = params.id ?? params.connection?.id; if (!id) { @@ -68,13 +61,8 @@ export async function updateConnection( return res; } - export async function getConnections(): Promise<{ data: ConnectionListItem[] }> { - const res = await fetchJsonResponse( - '/api/connection', - { method: 'GET' }, - translateConnectionsApi('Connections.Api.ListFailed'), - ); + const res = await fetchJsonResponse('/api/connection', { method: 'GET' }, translateConnectionsApi('Connections.Api.ListFailed')); if (!isSuccess(res)) { throw new Error(res.message || translateConnectionsApi('Connections.Api.ListFailed')); @@ -83,7 +71,6 @@ export async function getConnections(): Promise<{ data: ConnectionListItem[] }> return { data: res.data ?? [] }; } - export async function deleteConnection(id: string): Promise> { const res = await fetchJsonResponse( `/api/connection?id=${encodeURIComponent(id)}`, @@ -100,7 +87,6 @@ export async function deleteConnection(id: string): Promise return res; } - export async function getConnectionDetail(id: string): Promise<{ data: ConnectionListItem }> { const res = await fetchJsonResponse( `/api/connection?id=${encodeURIComponent(id)}`, @@ -120,10 +106,7 @@ export async function getConnectionDetail(id: string): Promise<{ data: Connectio return { data: detail }; } - -export async function testConnection( - params: CreateConnectionPayload & { timeout?: number }, -): Promise> { +export async function testConnection(params: CreateConnectionPayload & { timeout?: number }): Promise> { const res = await fetchJsonResponse( '/api/connection/test', { @@ -137,7 +120,6 @@ export async function testConnection( return res; } - export async function connectConnection(params: ConnectionListItem): Promise> { const res = await fetchJsonResponse( '/api/connection/connect', @@ -155,3 +137,27 @@ export async function connectConnection(params: ConnectionListItem): Promise> { + return fetchJsonResponse( + '/api/local-files/inspect', + { + method: 'POST', + body: JSON.stringify(params), + headers: { 'Content-Type': 'application/json' }, + }, + 'Failed to inspect local file', + ); +} + +export async function createLocalFiles(params: LocalFilesCreateRequest): Promise> { + return fetchJsonResponse( + '/api/local-files/create', + { + method: 'POST', + body: JSON.stringify(params), + headers: { 'Content-Type': 'application/json' }, + }, + 'Failed to create Local Files dataset', + ); +} diff --git a/apps/web/app/(app)/[organization]/connections/components/empty-state.tsx b/apps/web/app/(app)/[organization]/connections/components/empty-state.tsx index 91ee7072..9921c0ba 100644 --- a/apps/web/app/(app)/[organization]/connections/components/empty-state.tsx +++ b/apps/web/app/(app)/[organization]/connections/components/empty-state.tsx @@ -1,88 +1,66 @@ 'use client'; import { Button } from '@/registry/new-york-v4/ui/button'; -import { - Empty, - EmptyContent, - EmptyDescription, - EmptyHeader, - EmptyMedia, - EmptyTitle, -} from '@/registry/new-york-v4/ui/empty'; +import { Empty, EmptyContent, EmptyDescription, EmptyHeader, EmptyMedia, EmptyTitle } from '@/registry/new-york-v4/ui/empty'; import { Database } from 'lucide-react'; import { useTranslations } from 'next-intl'; type ConnectionsEmptyStateProps = { - searchQuery: string; - showSearchEmpty: boolean; - onAddConnection: () => void; - onLoadDemoData?: () => void; + searchQuery: string; + showSearchEmpty: boolean; + onAddConnection: () => void; + onAddLocalFiles?: () => void; + onLoadDemoData?: () => void; }; -export function ConnectionsEmptyState({ - searchQuery, - showSearchEmpty, - onAddConnection, - onLoadDemoData, -}: ConnectionsEmptyStateProps) { - const t = useTranslations('Connections'); - const trimmedQuery = searchQuery.trim(); +export function ConnectionsEmptyState({ searchQuery, showSearchEmpty, onAddConnection, onAddLocalFiles, onLoadDemoData }: ConnectionsEmptyStateProps) { + const t = useTranslations('Connections'); + const trimmedQuery = searchQuery.trim(); - const title = showSearchEmpty ? t('Search.emptyTitle') : t('Empty.title'); - const desc = showSearchEmpty - ? t('Search.emptyDescription', { query: trimmedQuery }) - : t('Empty.description'); + const title = showSearchEmpty ? t('Search.emptyTitle') : t('Empty.title'); + const desc = showSearchEmpty ? t('Search.emptyDescription', { query: trimmedQuery }) : t('Empty.description'); - return ( -
- - - - + return ( +
+ + + + - - - {title} - + + {title} - - {desc} - - + {desc} + - -
- + +
+ - {!showSearchEmpty && onLoadDemoData && ( - - )} -
+ {!showSearchEmpty && onAddLocalFiles && ( + + )} - {/* {!showSearchEmpty && ( + {!showSearchEmpty && onLoadDemoData && ( + + )} +
+ + {/* {!showSearchEmpty && (
{t('Empty.supportHint')}
)} */} - {showSearchEmpty && ( -
- {t('Search.emptyHint')} -
- )} -
-
-
- ); + {showSearchEmpty &&
{t('Search.emptyHint')}
} + +
+
+ ); } diff --git a/apps/web/app/(app)/[organization]/connections/components/local-files-dialog.tsx b/apps/web/app/(app)/[organization]/connections/components/local-files-dialog.tsx new file mode 100644 index 00000000..958bb6bd --- /dev/null +++ b/apps/web/app/(app)/[organization]/connections/components/local-files-dialog.tsx @@ -0,0 +1,213 @@ +'use client'; + +import * as React from 'react'; +import { useMemo, useState } from 'react'; +import { Database, FileSearch, Loader2 } from 'lucide-react'; +import { toast } from 'sonner'; + +import { Button } from '@/registry/new-york-v4/ui/button'; +import { Checkbox } from '@/registry/new-york-v4/ui/checkbox'; +import { Dialog, DialogContent, DialogFooter, DialogHeader, DialogTitle } from '@/registry/new-york-v4/ui/dialog'; +import { Input } from '@/registry/new-york-v4/ui/input'; +import { Label } from '@/registry/new-york-v4/ui/label'; +import { isSuccess } from '@/lib/result'; +import type { LocalFileRelationManifest, LocalFilesInspectResponse } from '@dory/shared/types/local-files'; + +import { createLocalFiles, inspectLocalFiles } from '../api'; + +type LocalFilesDialogProps = { + open: boolean; + onOpenChange: (open: boolean) => void; + onSuccess?: () => void; +}; + +function defaultDatasetName(filePath: string) { + const name = filePath + .split(/[\\/]/) + .filter(Boolean) + .pop() + ?.replace(/\.[^.]+$/, ''); + return name || 'Local Files'; +} + +export function LocalFilesDialog({ open, onOpenChange, onSuccess }: LocalFilesDialogProps) { + const [filePath, setFilePath] = useState(''); + const [datasetName, setDatasetName] = useState(''); + const [inspectResult, setInspectResult] = useState(null); + const [selectedRelations, setSelectedRelations] = useState>(new Set()); + const [inspecting, setInspecting] = useState(false); + const [creating, setCreating] = useState(false); + + const relations = inspectResult?.relations ?? []; + const canCreate = Boolean(inspectResult && datasetName.trim() && selectedRelations.size > 0); + + const selectedRelationList = useMemo(() => { + return relations.filter(relation => selectedRelations.has(relation.relationName)); + }, [relations, selectedRelations]); + + const reset = () => { + setFilePath(''); + setDatasetName(''); + setInspectResult(null); + setSelectedRelations(new Set()); + setInspecting(false); + setCreating(false); + }; + + const handleOpenChange = (nextOpen: boolean) => { + if (!nextOpen && !creating && !inspecting) { + reset(); + } + onOpenChange(nextOpen); + }; + + const handleInspect = async () => { + const trimmedPath = filePath.trim(); + if (!trimmedPath) { + toast.error('Enter a server-accessible file path.'); + return; + } + setInspecting(true); + try { + const result = await inspectLocalFiles({ + source: { + backend: 'serverPath', + filePath: trimmedPath, + }, + }); + if (!isSuccess(result) || !result.data) { + throw new Error(result.message || 'Failed to inspect local file'); + } + setInspectResult(result.data); + setDatasetName(current => current || defaultDatasetName(trimmedPath)); + setSelectedRelations(new Set(result.data.relations.map(relation => relation.relationName))); + } catch (error: any) { + toast.error(error?.message ?? 'Failed to inspect local file'); + } finally { + setInspecting(false); + } + }; + + const updateRelation = (relationName: string, checked: boolean) => { + setSelectedRelations(current => { + const next = new Set(current); + if (checked) { + next.add(relationName); + } else { + next.delete(relationName); + } + return next; + }); + }; + + const updateRelationName = (relation: LocalFileRelationManifest, relationName: string) => { + setInspectResult(current => { + if (!current) return current; + const nextRelations = current.relations.map(item => (item === relation ? { ...item, relationName } : item)); + const nextSelected = new Set(); + for (const item of nextRelations) { + const wasSelected = item === relation ? selectedRelations.has(relation.relationName) : selectedRelations.has(item.relationName); + if (wasSelected) nextSelected.add(item.relationName); + } + setSelectedRelations(nextSelected); + return { + ...current, + relations: nextRelations, + }; + }); + }; + + const handleCreate = async () => { + if (!inspectResult || !canCreate) return; + setCreating(true); + try { + const result = await createLocalFiles({ + name: datasetName.trim(), + source: { + backend: 'serverPath', + filePath: filePath.trim(), + }, + relations: selectedRelationList, + }); + if (!isSuccess(result)) { + throw new Error(result.message || 'Failed to create Local Files dataset'); + } + toast.success('Local Files dataset created.'); + onSuccess?.(); + handleOpenChange(false); + } catch (error: any) { + toast.error(error?.message ?? 'Failed to create Local Files dataset'); + } finally { + setCreating(false); + } + }; + + return ( + + + + + + Local Files + + + +
+
+ +
+ setFilePath(event.target.value)} /> + +
+
+ + {inspectResult ? ( + <> +
+ + setDatasetName(event.target.value)} /> +
+ +
+
+

Relations

+

+ Dataset source · {inspectResult.source.sourceType.toUpperCase()} · {inspectResult.source.sizeBytes.toLocaleString()} bytes +

+
+
+ {relations.map(relation => ( +
+ updateRelation(relation.relationName, checked === true)} + /> + + updateRelationName(relation, event.target.value)} /> +
+ ))} +
+
+ + ) : null} +
+ + + + + +
+
+ ); +} diff --git a/apps/web/app/(app)/[organization]/connections/page.tsx b/apps/web/app/(app)/[organization]/connections/page.tsx index 6f2b375d..a7f1f010 100644 --- a/apps/web/app/(app)/[organization]/connections/page.tsx +++ b/apps/web/app/(app)/[organization]/connections/page.tsx @@ -1,6 +1,6 @@ 'use client'; -import { useEffect } from 'react'; +import { useEffect, useState } from 'react'; import { useAtom, useAtomValue, useSetAtom } from 'jotai'; import { useTranslations } from 'next-intl'; import { toast } from 'sonner'; @@ -15,12 +15,14 @@ import { connectionDeleteAtom, connectionLoadingAtom, connectionOpenAtom, connec import { useConnectConnection } from './hooks/use-connect-connection'; import { useConnections, useDeleteConnection } from './hooks/use-connections'; import { DeleteDialog } from './components/delete-dialog'; +import { LocalFilesDialog } from './components/local-files-dialog'; import type { ConnectionListItem } from '@dory/shared/types/connections'; import { currentConnectionAtom } from '@/shared/stores/app.store'; export default function ConnectionsPage() { const t = useTranslations('Connections'); + const [localFilesOpen, setLocalFilesOpen] = useState(false); const connectLoadings = useAtomValue(connectionLoadingAtom); const setOpen = useSetAtom(connectionOpenAtom); @@ -48,7 +50,6 @@ export default function ConnectionsPage() { setOpen(true); }; - useEffect(() => { if (connectionsRes.data && connectionsRes.data.length > 0) { setSearchResult(connectionsRes.data); @@ -57,7 +58,6 @@ export default function ConnectionsPage() { } }, [connectionsRes.data, setSearchResult]); - function onConnect(payload: ConnectionListItem, navigateToConsole?: boolean) { connectMutation.mutate({ payload, navigateToConsole }); } @@ -86,9 +86,14 @@ export default function ConnectionsPage() { {!showEmptyState && ( - +
+ + +
)} @@ -96,12 +101,16 @@ export default function ConnectionsPage() { ) : ( showEmptyState && ( - + setLocalFilesOpen(true)} + /> ) )} - setDeleteOpen(false)} @@ -114,13 +123,14 @@ export default function ConnectionsPage() { return; } deleteConnectionMutation.mutateAsync(current.connection.id!); - + // deleteConnection(targetId) // .then(() => connectionsRes.refetch?.()) - + setCurrentConnection(null); }} /> + connectionsRes.refetch?.()} /> ); } diff --git a/apps/web/app/api/connection/[id]/databases/[database]/tables/[table]/columns/route.ts b/apps/web/app/api/connection/[id]/databases/[database]/tables/[table]/columns/route.ts index 9adcaee6..dd91c58d 100644 --- a/apps/web/app/api/connection/[id]/databases/[database]/tables/[table]/columns/route.ts +++ b/apps/web/app/api/connection/[id]/databases/[database]/tables/[table]/columns/route.ts @@ -16,7 +16,7 @@ const buildColumnSchema = (t: (key: string) => string) => }); export async function GET(req: NextRequest, context: { params: Promise<{ id: string; database: string; table: string }> }) { - return withUserAndOrganizationHandler(async ({ userId, organizationId }) => { + return withUserAndOrganizationHandler(async ({ db, userId, organizationId }) => { const locale = await getApiLocale(); const t = (key: string, values?: Record) => translateApi(key, values, locale); const columnSchema = buildColumnSchema(t); @@ -61,6 +61,19 @@ export async function GET(req: NextRequest, context: { params: Promise<{ id: str const { database, table } = parsed.data; try { + const snapshotColumns = await db.localFiles.getColumnSnapshotsForConnectionRelation(organizationId, datasourceId, table); + if (snapshotColumns?.length) { + return NextResponse.json( + ResponseUtil.success( + snapshotColumns.map(column => ({ + columnName: column.columnName, + columnType: column.columnType, + defaultExpression: null, + })), + ), + ); + } + const { entry } = await ensureConnectionPoolForUser(userId, organizationId, datasourceId, null); const metadata = entry.instance.capabilities.metadata; if (!hasMetadataCapability(metadata, 'getTableColumns')) { diff --git a/apps/web/app/api/local-files/create/route.ts b/apps/web/app/api/local-files/create/route.ts new file mode 100644 index 00000000..4001fade --- /dev/null +++ b/apps/web/app/api/local-files/create/route.ts @@ -0,0 +1,18 @@ +import { NextResponse } from 'next/server'; + +import { ResponseUtil } from '@/lib/result'; +import { handleApiError } from '@/app/api/utils/handle-error'; +import { withUserAndOrganizationHandler } from '@/app/api/utils/with-organization-handler'; +import { createLocalFilesDataset } from '@/lib/local-files/service'; + +export const runtime = 'nodejs'; + +export const POST = withUserAndOrganizationHandler(async ({ req, db, userId, organizationId }) => { + try { + const payload = await req.json(); + const result = await createLocalFilesDataset({ db, userId, organizationId }, payload); + return NextResponse.json(ResponseUtil.success(result), { status: 201 }); + } catch (err: any) { + return handleApiError(err); + } +}); diff --git a/apps/web/app/api/local-files/inspect/route.ts b/apps/web/app/api/local-files/inspect/route.ts new file mode 100644 index 00000000..f060153d --- /dev/null +++ b/apps/web/app/api/local-files/inspect/route.ts @@ -0,0 +1,18 @@ +import { NextResponse } from 'next/server'; + +import { ResponseUtil } from '@/lib/result'; +import { handleApiError } from '@/app/api/utils/handle-error'; +import { withUserAndOrganizationHandler } from '@/app/api/utils/with-organization-handler'; +import { inspectLocalFiles } from '@/lib/local-files/service'; + +export const runtime = 'nodejs'; + +export const POST = withUserAndOrganizationHandler(async ({ req, db, userId, organizationId }) => { + try { + const payload = await req.json(); + const result = await inspectLocalFiles({ db, userId, organizationId }, payload); + return NextResponse.json(ResponseUtil.success(result)); + } catch (err: any) { + return handleApiError(err); + } +}); diff --git a/apps/web/app/api/local-files/refresh/route.ts b/apps/web/app/api/local-files/refresh/route.ts new file mode 100644 index 00000000..794cd328 --- /dev/null +++ b/apps/web/app/api/local-files/refresh/route.ts @@ -0,0 +1,18 @@ +import { NextResponse } from 'next/server'; + +import { ResponseUtil } from '@/lib/result'; +import { handleApiError } from '@/app/api/utils/handle-error'; +import { withUserAndOrganizationHandler } from '@/app/api/utils/with-organization-handler'; +import { refreshLocalFilesDataset } from '@/lib/local-files/service'; + +export const runtime = 'nodejs'; + +export const POST = withUserAndOrganizationHandler(async ({ req, db, userId, organizationId }) => { + try { + const payload = await req.json(); + const result = await refreshLocalFilesDataset({ db, userId, organizationId }, payload); + return NextResponse.json(ResponseUtil.success(result)); + } catch (err: any) { + return handleApiError(err); + } +}); diff --git a/apps/web/hooks/use-tables.ts b/apps/web/hooks/use-tables.ts index 51717d46..820d4948 100644 --- a/apps/web/hooks/use-tables.ts +++ b/apps/web/hooks/use-tables.ts @@ -52,14 +52,34 @@ export function useTables(databases: string) { const encodedDb = encodeURIComponent(requestedDatabase); const request = (async () => { - const response = await authFetch(`/api/connection/${requestedConnectionId}/databases/${encodedDb}/tables`, { + const requestInit = { method: 'GET', headers: { 'X-Connection-ID': requestedConnectionId, }, - }); - const res = (await response.json()) as ResponseObject; - if (isSuccess(res)) { + }; + const [tablesResponse, viewsResponse] = await Promise.all([ + authFetch(`/api/connection/${requestedConnectionId}/databases/${encodedDb}/tables`, requestInit), + authFetch(`/api/connection/${requestedConnectionId}/databases/${encodedDb}/views`, requestInit), + ]); + const tablesResult = (await tablesResponse.json()) as ResponseObject; + const viewsResult = (await viewsResponse.json().catch(() => null)) as ResponseObject | null; + + if (isSuccess(tablesResult)) { + const merged = new Map(); + for (const item of tablesResult.data ?? []) { + const key = String(item?.value ?? item?.name ?? item?.label ?? ''); + if (key) merged.set(key, item); + } + if (viewsResult && isSuccess(viewsResult)) { + for (const item of viewsResult.data ?? []) { + const key = String(item?.value ?? item?.name ?? item?.label ?? ''); + if (key && !merged.has(key)) { + merged.set(key, item); + } + } + } + setTablesState(prev => { if (prev.connectionId !== requestedConnectionId || prev.database !== requestedDatabase) { return prev; @@ -68,7 +88,7 @@ export function useTables(databases: string) { return { connectionId: requestedConnectionId, database: requestedDatabase, - items: res.data ?? [], + items: [...merged.values()], }; }); } diff --git a/apps/web/lib/local-files/service.ts b/apps/web/lib/local-files/service.ts new file mode 100644 index 00000000..35063a0d --- /dev/null +++ b/apps/web/lib/local-files/service.ts @@ -0,0 +1,487 @@ +import fs from 'node:fs/promises'; +import path from 'node:path'; + +import { buildReadSql, fingerprintSourceStat, inspectSource, normalizeDatasetSchemaName, normalizeRelationName, statSource } from '@dory/files'; +import type { DBService } from '@dory/database'; +import type { BaseConnection } from '@dory/drivers/core'; +import type { + DatasetColumnSnapshot, + LocalFileRelationManifest, + LocalFileRelationMode, + LocalFilesCreateRequest, + LocalFilesInspectRequest, + LocalFilesRefreshRequest, +} from '@dory/shared/types/local-files'; +import { getRuntimeForServer, isDesktopRuntime } from '@dory/shared/runtime'; + +import { getOrCreateConnectionPool } from '@/lib/connection/connection-service'; + +type LocalFilesContext = { + db: DBService; + userId: string; + organizationId: string; +}; + +type DescribeRow = { + column_name?: string; + column_type?: string; + null?: string; + nullable?: string; +}; + +type RefreshRelationInput = { + id: string; + datasetId: string; + schemaName: string; + relationName: string; + previousSourceFingerprint?: string | null; + manifest: LocalFileRelationManifest; +}; + +const WORKSPACE_CONNECTION_OPTION_MODE = 'localFilesWorkspace'; + +function assertSelfHostedNodeRuntime() { + if (isDesktopRuntime()) { + throw new Error('Local Files are only available in self-hosted web runtime'); + } + const runtime = getRuntimeForServer(); + if (runtime && runtime !== 'web' && runtime !== 'docker') { + throw new Error('Local Files are only available in self-hosted web runtime'); + } +} + +function quoteIdentifier(value: string) { + return `"${value.replace(/"/g, '""')}"`; +} + +function qualifiedRelation(schemaName: string, relationName: string) { + return `${quoteIdentifier(schemaName)}.${quoteIdentifier(relationName)}`; +} + +function physicalTableName(relationName: string) { + return `_${normalizeRelationName(relationName)}_cache`; +} + +function getStorageRoot() { + return process.env.DORY_LOCAL_FILES_STORAGE_DIR?.trim() || path.join(process.cwd(), 'localdata', 'local-files'); +} + +function getWorkspacePath(organizationId: string) { + const safeOrganizationId = organizationId.replace(/[^a-zA-Z0-9_-]/g, '_'); + return path.join(getStorageRoot(), safeOrganizationId, 'workspace.duckdb'); +} + +function parseOptions(raw: unknown): Record { + if (!raw) return {}; + if (typeof raw === 'object' && !Array.isArray(raw)) return raw as Record; + if (typeof raw === 'string') { + try { + const parsed = JSON.parse(raw); + return parsed && typeof parsed === 'object' && !Array.isArray(parsed) ? (parsed as Record) : {}; + } catch { + return {}; + } + } + return {}; +} + +function safeJsonStringify(value: unknown) { + return JSON.stringify(value, (_key, item) => (typeof item === 'bigint' ? item.toString() : item)); +} + +function isLocalFilesWorkspaceConnection(connection: { type?: string | null; options?: string | null }) { + if (connection.type !== 'duckdb') return false; + return parseOptions(connection.options).mode === WORKSPACE_CONNECTION_OPTION_MODE; +} + +function inferSemantic(columnName: string, columnType: string): string | null { + const name = columnName.toLowerCase(); + const type = columnType.toLowerCase(); + if (/(^|_)id$/.test(name) || name.endsWith('_id')) return 'identifier'; + if (name.includes('email')) return 'email'; + if (name.includes('phone')) return 'phone'; + if (name.includes('url') || name.includes('uri')) return 'url'; + if (name.includes('date') || name.includes('time') || type.includes('date') || type.includes('time')) return 'temporal'; + if (name.includes('amount') || name.includes('price') || name.includes('cost') || name.includes('revenue')) return 'money'; + if (type.includes('int') || type.includes('double') || type.includes('decimal') || type.includes('float')) return 'numeric'; + return null; +} + +async function ensureWorkspaceConnection(ctx: LocalFilesContext) { + const existing = (await ctx.db.connections.list(ctx.organizationId)).find(item => isLocalFilesWorkspaceConnection(item.connection)); + if (existing) { + await fs.mkdir(path.dirname(existing.connection.path ?? getWorkspacePath(ctx.organizationId)), { recursive: true }); + return existing; + } + + const workspacePath = getWorkspacePath(ctx.organizationId); + await fs.mkdir(path.dirname(workspacePath), { recursive: true }); + + return ctx.db.connections.create(ctx.userId, ctx.organizationId, { + connection: { + type: 'duckdb', + engine: 'duckdb', + name: `Local Files Workspace ${ctx.organizationId.slice(0, 8)}`, + description: 'Dory-managed DuckDB workspace for Local Files datasets', + host: null, + port: null, + httpPort: null, + database: null, + path: workspacePath, + options: JSON.stringify({ + mode: WORKSPACE_CONNECTION_OPTION_MODE, + createIfMissing: true, + managedBy: 'local-files', + }), + status: 'Connected', + environment: 'local-files', + tags: 'local-files,managed', + }, + identities: [ + { + name: 'DuckDB', + username: 'duckdb', + role: null, + isDefault: true, + database: null, + }, + ], + ssh: { + enabled: false, + host: null, + port: null, + username: null, + authMethod: null, + }, + } as any); +} + +async function getWorkspaceConnection(ctx: LocalFilesContext, connectionId: string): Promise { + const poolEntry = await getOrCreateConnectionPool(ctx.organizationId, connectionId); + if (!poolEntry) { + throw new Error('Failed to initialize Local Files DuckDB workspace'); + } + return poolEntry.instance; +} + +async function loadDuckDbExtensions(connection: BaseConnection) { + try { + await connection.command('INSTALL excel'); + } catch { + // Extension may already be installed or unavailable in offline deployments. + } + try { + await connection.command('LOAD excel'); + } catch { + // CSV/JSON/Parquet still work without the Excel extension. + } +} + +async function createOrReplaceRelationViews(connection: BaseConnection, schemaName: string, relations: LocalFileRelationManifest[]) { + await connection.command(`CREATE SCHEMA IF NOT EXISTS ${quoteIdentifier(schemaName)}`); + for (const relation of relations) { + await materializeRelation(connection, schemaName, relation); + } +} + +async function materializeRelation(connection: BaseConnection, schemaName: string, relation: LocalFileRelationManifest) { + const relationName = normalizeRelationName(relation.relationName); + if (relation.mode === 'materialized') { + throw new Error('Materialized Local Files relations are reserved for a future release'); + } + + await connection.command(`CREATE SCHEMA IF NOT EXISTS ${quoteIdentifier(schemaName)}`); + + if (relation.mode === 'cached') { + const tableName = physicalTableName(relationName); + await connection.command(`CREATE OR REPLACE TABLE ${qualifiedRelation(schemaName, tableName)} AS ${buildReadSql({ ...relation, relationName })}`); + await connection.command(`CREATE OR REPLACE VIEW ${qualifiedRelation(schemaName, relationName)} AS SELECT * FROM ${qualifiedRelation(schemaName, tableName)}`); + return; + } + + await connection.command(`CREATE OR REPLACE VIEW ${qualifiedRelation(schemaName, relationName)} AS ${buildReadSql({ ...relation, relationName })}`); +} + +function parseNullable(value: string | undefined): boolean | null { + if (!value) return null; + const normalized = value.toLowerCase(); + if (normalized === 'yes' || normalized === 'true') return true; + if (normalized === 'no' || normalized === 'false') return false; + return null; +} + +async function collectColumnSnapshots(connection: BaseConnection, schemaName: string, relationName: string): Promise { + const qualified = qualifiedRelation(schemaName, relationName); + const describe = await connection.query(`DESCRIBE SELECT * FROM ${qualified}`); + const summarize = await connection.query>(`SUMMARIZE SELECT * FROM ${qualified}`); + const summaryByColumn = new Map>(); + for (const row of summarize.rows ?? []) { + const name = String(row.column_name ?? row.name ?? ''); + if (name) summaryByColumn.set(name, row); + } + + const snapshots: DatasetColumnSnapshot[] = []; + for (const row of describe.rows ?? []) { + const name = row.column_name; + if (!name) continue; + const type = row.column_type ?? 'UNKNOWN'; + const sampleResult = await connection.query<{ value: unknown }>( + `SELECT DISTINCT ${quoteIdentifier(name)} AS value FROM ${qualified} WHERE ${quoteIdentifier(name)} IS NOT NULL LIMIT 5`, + ); + snapshots.push({ + name, + type, + nullable: parseNullable(row.null ?? row.nullable), + detectedSemantic: inferSemantic(name, type), + sampleValues: (sampleResult.rows ?? []).map(sample => sample.value), + summary: summaryByColumn.get(name) ?? null, + }); + } + + return snapshots; +} + +async function refreshSnapshots(ctx: LocalFilesContext, connection: BaseConnection, datasetId: string, schemaName: string, relations: Array<{ id: string; relationName: string }>) { + for (const relation of relations) { + const snapshots = await collectColumnSnapshots(connection, schemaName, relation.relationName); + await ctx.db.localFiles.replaceColumnSnapshots( + relation.id, + snapshots.map((snapshot, index) => ({ + organizationId: ctx.organizationId, + datasetId, + relationId: relation.id, + columnName: snapshot.name, + columnType: snapshot.type, + nullable: snapshot.nullable == null ? null : snapshot.nullable ? 'true' : 'false', + detectedSemantic: snapshot.detectedSemantic, + sampleValues: safeJsonStringify(snapshot.sampleValues), + summary: safeJsonStringify(snapshot.summary ?? {}), + ordinalPosition: index, + })), + ); + } +} + +function columnSignature(columns: DatasetColumnSnapshot[]) { + return columns.map(column => `${column.name}:${column.type}:${column.nullable}`).join('|'); +} + +function existingColumnSignature(rows: Array<{ columnName: string; columnType: string; nullable: string | null }>) { + return rows.map(column => `${column.columnName}:${column.columnType}:${column.nullable == null ? 'null' : column.nullable === 'true'}`).join('|'); +} + +async function runRefreshPipeline(ctx: LocalFilesContext, connection: BaseConnection, relation: RefreshRelationInput, reason: string) { + const stat = await statSource(relation.manifest.source); + const sourceFingerprint = fingerprintSourceStat(stat); + const operation = await ctx.db.localFiles.createRefreshOperation({ + organizationId: ctx.organizationId, + datasetId: relation.datasetId, + relationId: relation.id, + reason, + sourceFingerprint, + previousSourceFingerprint: relation.previousSourceFingerprint ?? null, + }); + + try { + const manifest = { + ...relation.manifest, + sourceFingerprint, + }; + await materializeRelation(connection, relation.schemaName, manifest); + + const nextSnapshots = await collectColumnSnapshots(connection, relation.schemaName, relation.relationName); + const previousSnapshots = await ctx.db.localFiles.getColumnSnapshotsForRelation(relation.id); + const schemaDriftStatus = previousSnapshots?.length && existingColumnSignature(previousSnapshots) !== columnSignature(nextSnapshots) ? 'changed' : 'none'; + + await ctx.db.localFiles.replaceColumnSnapshots( + relation.id, + nextSnapshots.map((snapshot, index) => ({ + organizationId: ctx.organizationId, + datasetId: relation.datasetId, + relationId: relation.id, + columnName: snapshot.name, + columnType: snapshot.type, + nullable: snapshot.nullable == null ? null : snapshot.nullable ? 'true' : 'false', + detectedSemantic: snapshot.detectedSemantic, + sampleValues: safeJsonStringify(snapshot.sampleValues), + summary: safeJsonStringify(snapshot.summary ?? {}), + ordinalPosition: index, + })), + ); + await ctx.db.localFiles.updateRelationRefreshState(relation.id, { + status: 'ready', + sourceFingerprint, + lastSourceFingerprint: sourceFingerprint, + schemaDriftStatus, + }); + await ctx.db.localFiles.finishRefreshOperation(operation.id, { + status: 'success', + schemaDriftStatus, + sourceFingerprint, + previousSourceFingerprint: relation.previousSourceFingerprint ?? null, + }); + + return { schemaDriftStatus, sourceFingerprint }; + } catch (error: any) { + const message = error?.message ?? String(error); + await ctx.db.localFiles.updateRelationRefreshState(relation.id, { + status: 'error', + sourceFingerprint, + schemaDriftStatus: 'unknown', + error: message, + }); + await ctx.db.localFiles.finishRefreshOperation(operation.id, { + status: 'error', + schemaDriftStatus: 'unknown', + sourceFingerprint, + previousSourceFingerprint: relation.previousSourceFingerprint ?? null, + error: message, + }); + throw error; + } +} + +export async function inspectLocalFiles(ctx: LocalFilesContext, request: LocalFilesInspectRequest) { + assertSelfHostedNodeRuntime(); + const source = await statSource(request.source); + const relations = await inspectSource(request.source); + return { + source, + relations, + }; +} + +export async function createLocalFilesDataset(ctx: LocalFilesContext, request: LocalFilesCreateRequest) { + assertSelfHostedNodeRuntime(); + const source = await statSource(request.source); + const workspace = await ensureWorkspaceConnection(ctx); + const connection = await getWorkspaceConnection(ctx, workspace.connection.id); + await loadDuckDbExtensions(connection); + + const schemaName = normalizeDatasetSchemaName(request.schemaName || request.name); + const requestedMode: LocalFileRelationMode = request.mode ?? 'virtual'; + const relations = request.relations.map(relation => ({ + ...relation, + relationName: normalizeRelationName(relation.relationName), + mode: relation.mode ?? requestedMode, + })); + + const created = await ctx.db.localFiles.createDatasetWithRelations({ + fileAsset: { + organizationId: ctx.organizationId, + createdByUserId: ctx.userId, + backend: source.backend, + sourceType: source.sourceType, + path: source.path, + sizeBytes: String(source.sizeBytes), + mtimeMs: String(Math.round(source.mtimeMs)), + }, + dataset: { + organizationId: ctx.organizationId, + createdByUserId: ctx.userId, + connectionId: workspace.connection.id, + name: request.name.trim(), + schemaName, + }, + relations: relations.map(relation => ({ + organizationId: ctx.organizationId, + sourceType: relation.sourceType, + sheetName: relation.sheetName ?? null, + relationName: relation.relationName, + mode: relation.mode, + duckdbSchema: schemaName, + duckdbRelation: relation.relationName, + physicalTableName: relation.mode === 'cached' ? physicalTableName(relation.relationName) : null, + sourceFingerprint: relation.sourceFingerprint, + lastSourceFingerprint: null, + schemaDriftStatus: 'unknown', + refreshStrategy: 'manual', + readSql: buildReadSql(relation), + })), + }); + + for (const createdRelation of created.relations) { + const manifest = relations.find(relation => relation.relationName === createdRelation.relationName); + if (!manifest) continue; + await runRefreshPipeline( + ctx, + connection, + { + id: createdRelation.id, + datasetId: created.dataset.id, + schemaName, + relationName: createdRelation.relationName, + previousSourceFingerprint: null, + manifest, + }, + 'create', + ); + } + await ctx.db.localFiles.markDatasetRefresh(ctx.organizationId, created.dataset.id, { status: 'success' }); + await ctx.db.localFiles.markRelationsRefreshed( + created.relations.map(relation => relation.id), + { status: 'ready' }, + ); + + return { + connection: workspace, + dataset: created.dataset, + fileAsset: created.fileAsset, + relations: created.relations, + }; +} + +export async function refreshLocalFilesDataset(ctx: LocalFilesContext, request: LocalFilesRefreshRequest) { + assertSelfHostedNodeRuntime(); + const record = await ctx.db.localFiles.getDataset(ctx.organizationId, request.datasetId); + if (!record) { + throw new Error('Local Files dataset not found'); + } + + const connection = await getWorkspaceConnection(ctx, record.dataset.connectionId); + await loadDuckDbExtensions(connection); + + const relations = record.relations.map(relation => { + const asset = record.fileAssets.find(item => item.id === relation.fileAssetId); + return { + sourceType: relation.sourceType as LocalFileRelationManifest['sourceType'], + source: { + backend: 'serverPath' as const, + filePath: asset?.path ?? '', + }, + duckdbPath: asset?.path ?? '', + sheetName: relation.sheetName ?? undefined, + relationName: relation.relationName, + mode: relation.mode as LocalFileRelationMode, + sourceFingerprint: relation.sourceFingerprint ?? '', + previousSourceFingerprint: relation.lastSourceFingerprint, + id: relation.id, + }; + }); + if (relations.some(relation => !relation.duckdbPath)) { + throw new Error('Local Files source asset not found'); + } + + for (const relation of relations) { + await runRefreshPipeline( + ctx, + connection, + { + id: relation.id, + datasetId: record.dataset.id, + schemaName: record.dataset.schemaName, + relationName: relation.relationName, + previousSourceFingerprint: relation.previousSourceFingerprint, + manifest: relation, + }, + 'manual', + ); + } + await ctx.db.localFiles.markDatasetRefresh(ctx.organizationId, record.dataset.id, { status: 'success' }); + await ctx.db.localFiles.markRelationsRefreshed( + record.relations.map(relation => relation.id), + { status: 'ready' }, + ); + + return record; +} diff --git a/apps/web/package.json b/apps/web/package.json index 82be79d5..08c89eb4 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -56,6 +56,7 @@ "@dory/analysis": "0.1.0", "@dory/database": "0.1.0", "@dory/drivers": "0.1.0", + "@dory/files": "0.1.0", "@dory/i18n": "0.1.0", "@dory/shared": "0.1.0", "@dory/web-utils": "0.1.0", diff --git a/apps/web/tsconfig.json b/apps/web/tsconfig.json index e38657af..603ed4ed 100644 --- a/apps/web/tsconfig.json +++ b/apps/web/tsconfig.json @@ -29,6 +29,8 @@ "@dory/drivers": ["../../packages/drivers/src/index.ts"], "@dory/drivers/*": ["../../packages/drivers/src/*/index.ts", "../../packages/drivers/src/*"], "@dory/drivers/database/clickhouse/privileges": ["../../packages/drivers/src/database/clickhouse/capabilities/privileges.ts"], + "@dory/files": ["../../packages/files/src/index.ts"], + "@dory/files/*": ["../../packages/files/src/*"], "@dory/i18n": ["../../packages/i18n/src/index.ts"], "@dory/i18n/*": ["../../packages/i18n/src/*"], "@dory/shared": ["../../packages/shared/src/index.ts"], diff --git a/packages/database/src/index.ts b/packages/database/src/index.ts index a67a520d..2f72aeb8 100644 --- a/packages/database/src/index.ts +++ b/packages/database/src/index.ts @@ -11,6 +11,7 @@ import { PostgresAiUsageRepository } from './postgres/impl/ai-usage'; import { PostgresSyncOperationsRepository } from './postgres/impl/sync-operations'; import { PostgresBillingRepository } from './postgres/impl/billing'; import { PostgresMcpRepository } from './postgres/impl/mcp'; +import { PostgresLocalFilesRepository } from './postgres/impl/local-files'; import { translateDatabase } from './i18n'; import type { AiUsageRepository } from '@dory/shared'; @@ -31,6 +32,7 @@ export type PostgresDBService = { syncOperations: PostgresSyncOperationsRepository; billing: PostgresBillingRepository; mcp: PostgresMcpRepository; + localFiles: PostgresLocalFilesRepository; }; /** @@ -85,6 +87,9 @@ export async function getDBService(): Promise { const mcpRepo = new PostgresMcpRepository(); await mcpRepo.init(); + const localFilesRepo = new PostgresLocalFilesRepository(); + await localFilesRepo.init(); + instance = { tabState: tabStateRepo, chat: chatRepo, @@ -98,6 +103,7 @@ export async function getDBService(): Promise { syncOperations: syncOperationsRepo, billing: billingRepo, mcp: mcpRepo, + localFiles: localFilesRepo, }; break; } diff --git a/packages/database/src/pglite/migrations.json b/packages/database/src/pglite/migrations.json index 1c4a3e0d..f5735642 100644 --- a/packages/database/src/pglite/migrations.json +++ b/packages/database/src/pglite/migrations.json @@ -172,10 +172,39 @@ }, { "sql": [ - "CREATE INDEX \"idx_mcp_access_tokens_organization_user_created\" ON \"mcp_access_tokens\" USING btree (\"organization_id\",\"created_by_user_id\",\"created_at\");" + "CREATE INDEX \"idx_mcp_access_tokens_organization_user_created\" ON \"mcp_access_tokens\" USING btree (\"organization_id\",\"created_by_user_id\",\"created_at\");\n" ], "bps": true, "folderMillis": 1778574691122, "hash": "f6317b19edb35c0bd8f2c3a77f5552afa07b7451b3131e6c3dadeeca4acac7bd" + }, + { + "sql": [ + "CREATE TABLE \"file_assets\" (\n \"id\" text PRIMARY KEY NOT NULL,\n \"organization_id\" text NOT NULL,\n \"created_by_user_id\" text,\n \"backend\" text NOT NULL,\n \"source_type\" text NOT NULL,\n \"path\" text NOT NULL,\n \"storage_key\" text,\n \"size_bytes\" text,\n \"mtime_ms\" text,\n \"status\" text DEFAULT 'ready' NOT NULL,\n \"metadata\" text DEFAULT '{}' NOT NULL,\n \"created_at\" timestamp with time zone DEFAULT now() NOT NULL,\n \"updated_at\" timestamp with time zone DEFAULT now() NOT NULL,\n \"deleted_at\" timestamp with time zone\n);\n", + "\nCREATE TABLE \"datasets\" (\n \"id\" text PRIMARY KEY NOT NULL,\n \"organization_id\" text NOT NULL,\n \"created_by_user_id\" text,\n \"connection_id\" text NOT NULL,\n \"name\" text NOT NULL,\n \"schema_name\" text NOT NULL,\n \"status\" text DEFAULT 'ready' NOT NULL,\n \"refresh_status\" text DEFAULT 'idle' NOT NULL,\n \"last_refresh_at\" timestamp with time zone,\n \"last_refresh_error\" text,\n \"created_at\" timestamp with time zone DEFAULT now() NOT NULL,\n \"updated_at\" timestamp with time zone DEFAULT now() NOT NULL,\n \"deleted_at\" timestamp with time zone\n);\n", + "\nCREATE TABLE \"dataset_relations\" (\n \"id\" text PRIMARY KEY NOT NULL,\n \"organization_id\" text NOT NULL,\n \"dataset_id\" text NOT NULL,\n \"file_asset_id\" text NOT NULL,\n \"source_type\" text NOT NULL,\n \"sheet_name\" text,\n \"relation_name\" text NOT NULL,\n \"mode\" text DEFAULT 'virtual' NOT NULL,\n \"duckdb_schema\" text NOT NULL,\n \"duckdb_relation\" text NOT NULL,\n \"physical_table_name\" text,\n \"source_fingerprint\" text,\n \"last_source_fingerprint\" text,\n \"schema_drift_status\" text DEFAULT 'unknown' NOT NULL,\n \"refresh_strategy\" text DEFAULT 'manual' NOT NULL,\n \"read_sql\" text NOT NULL,\n \"status\" text DEFAULT 'ready' NOT NULL,\n \"last_refresh_at\" timestamp with time zone,\n \"last_refresh_error\" text,\n \"created_at\" timestamp with time zone DEFAULT now() NOT NULL,\n \"updated_at\" timestamp with time zone DEFAULT now() NOT NULL,\n \"deleted_at\" timestamp with time zone\n);\n", + "\nCREATE TABLE \"dataset_refresh_operations\" (\n \"id\" text PRIMARY KEY NOT NULL,\n \"organization_id\" text NOT NULL,\n \"dataset_id\" text NOT NULL,\n \"relation_id\" text,\n \"operation_type\" text DEFAULT 'refresh' NOT NULL,\n \"status\" text DEFAULT 'running' NOT NULL,\n \"reason\" text DEFAULT 'manual' NOT NULL,\n \"source_fingerprint\" text,\n \"previous_source_fingerprint\" text,\n \"schema_drift_status\" text DEFAULT 'unknown' NOT NULL,\n \"error\" text,\n \"started_at\" timestamp with time zone DEFAULT now() NOT NULL,\n \"finished_at\" timestamp with time zone,\n \"created_at\" timestamp with time zone DEFAULT now() NOT NULL\n);\n", + "\nCREATE TABLE \"dataset_relation_columns\" (\n \"id\" text PRIMARY KEY NOT NULL,\n \"organization_id\" text NOT NULL,\n \"dataset_id\" text NOT NULL,\n \"relation_id\" text NOT NULL,\n \"column_name\" text NOT NULL,\n \"column_type\" text NOT NULL,\n \"nullable\" text,\n \"detected_semantic\" text,\n \"sample_values\" text DEFAULT '[]' NOT NULL,\n \"summary\" text DEFAULT '{}' NOT NULL,\n \"ordinal_position\" integer DEFAULT 0 NOT NULL,\n \"refreshed_at\" timestamp with time zone DEFAULT now() NOT NULL\n);\n", + "\nCREATE INDEX \"idx_file_assets_organization\" ON \"file_assets\" USING btree (\"organization_id\");\n", + "\nCREATE INDEX \"idx_file_assets_org_backend_path\" ON \"file_assets\" USING btree (\"organization_id\",\"backend\",\"path\");\n", + "\nCREATE UNIQUE INDEX \"uniq_datasets_organization_schema\" ON \"datasets\" USING btree (\"organization_id\",\"schema_name\") WHERE \"datasets\".\"deleted_at\" IS NULL;\n", + "\nCREATE INDEX \"idx_datasets_organization\" ON \"datasets\" USING btree (\"organization_id\");\n", + "\nCREATE INDEX \"idx_datasets_connection\" ON \"datasets\" USING btree (\"connection_id\");\n", + "\nCREATE UNIQUE INDEX \"uniq_dataset_relations_name\" ON \"dataset_relations\" USING btree (\"dataset_id\",\"relation_name\") WHERE \"dataset_relations\".\"deleted_at\" IS NULL;\n", + "\nCREATE INDEX \"idx_dataset_relations_organization\" ON \"dataset_relations\" USING btree (\"organization_id\");\n", + "\nCREATE INDEX \"idx_dataset_relations_dataset\" ON \"dataset_relations\" USING btree (\"dataset_id\");\n", + "\nCREATE INDEX \"idx_dataset_relations_file_asset\" ON \"dataset_relations\" USING btree (\"file_asset_id\");\n", + "\nCREATE INDEX \"idx_dataset_refresh_ops_organization\" ON \"dataset_refresh_operations\" USING btree (\"organization_id\");\n", + "\nCREATE INDEX \"idx_dataset_refresh_ops_dataset\" ON \"dataset_refresh_operations\" USING btree (\"dataset_id\");\n", + "\nCREATE INDEX \"idx_dataset_refresh_ops_relation\" ON \"dataset_refresh_operations\" USING btree (\"relation_id\");\n", + "\nCREATE INDEX \"idx_dataset_refresh_ops_status\" ON \"dataset_refresh_operations\" USING btree (\"status\");\n", + "\nCREATE UNIQUE INDEX \"uniq_dataset_relation_columns_name\" ON \"dataset_relation_columns\" USING btree (\"relation_id\",\"column_name\");\n", + "\nCREATE INDEX \"idx_dataset_relation_columns_organization\" ON \"dataset_relation_columns\" USING btree (\"organization_id\");\n", + "\nCREATE INDEX \"idx_dataset_relation_columns_dataset\" ON \"dataset_relation_columns\" USING btree (\"dataset_id\");\n", + "\nCREATE INDEX \"idx_dataset_relation_columns_relation\" ON \"dataset_relation_columns\" USING btree (\"relation_id\");\n" + ], + "bps": true, + "folderMillis": 1778648400000, + "hash": "b58d96b8037473a03788135e50fab5420654d12b30617d9660243793fbe01138" } -] +] \ No newline at end of file diff --git a/packages/database/src/pglite/migrations/0010_local_files.sql b/packages/database/src/pglite/migrations/0010_local_files.sql new file mode 100644 index 00000000..b9af858b --- /dev/null +++ b/packages/database/src/pglite/migrations/0010_local_files.sql @@ -0,0 +1,123 @@ +CREATE TABLE "file_assets" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "created_by_user_id" text, + "backend" text NOT NULL, + "source_type" text NOT NULL, + "path" text NOT NULL, + "storage_key" text, + "size_bytes" text, + "mtime_ms" text, + "status" text DEFAULT 'ready' NOT NULL, + "metadata" text DEFAULT '{}' NOT NULL, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + "deleted_at" timestamp with time zone +); +--> statement-breakpoint +CREATE TABLE "datasets" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "created_by_user_id" text, + "connection_id" text NOT NULL, + "name" text NOT NULL, + "schema_name" text NOT NULL, + "status" text DEFAULT 'ready' NOT NULL, + "refresh_status" text DEFAULT 'idle' NOT NULL, + "last_refresh_at" timestamp with time zone, + "last_refresh_error" text, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + "deleted_at" timestamp with time zone +); +--> statement-breakpoint +CREATE TABLE "dataset_relations" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "dataset_id" text NOT NULL, + "file_asset_id" text NOT NULL, + "source_type" text NOT NULL, + "sheet_name" text, + "relation_name" text NOT NULL, + "mode" text DEFAULT 'virtual' NOT NULL, + "duckdb_schema" text NOT NULL, + "duckdb_relation" text NOT NULL, + "physical_table_name" text, + "source_fingerprint" text, + "last_source_fingerprint" text, + "schema_drift_status" text DEFAULT 'unknown' NOT NULL, + "refresh_strategy" text DEFAULT 'manual' NOT NULL, + "read_sql" text NOT NULL, + "status" text DEFAULT 'ready' NOT NULL, + "last_refresh_at" timestamp with time zone, + "last_refresh_error" text, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + "deleted_at" timestamp with time zone +); +--> statement-breakpoint +CREATE TABLE "dataset_refresh_operations" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "dataset_id" text NOT NULL, + "relation_id" text, + "operation_type" text DEFAULT 'refresh' NOT NULL, + "status" text DEFAULT 'running' NOT NULL, + "reason" text DEFAULT 'manual' NOT NULL, + "source_fingerprint" text, + "previous_source_fingerprint" text, + "schema_drift_status" text DEFAULT 'unknown' NOT NULL, + "error" text, + "started_at" timestamp with time zone DEFAULT now() NOT NULL, + "finished_at" timestamp with time zone, + "created_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE TABLE "dataset_relation_columns" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "dataset_id" text NOT NULL, + "relation_id" text NOT NULL, + "column_name" text NOT NULL, + "column_type" text NOT NULL, + "nullable" text, + "detected_semantic" text, + "sample_values" text DEFAULT '[]' NOT NULL, + "summary" text DEFAULT '{}' NOT NULL, + "ordinal_position" integer DEFAULT 0 NOT NULL, + "refreshed_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE INDEX "idx_file_assets_organization" ON "file_assets" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_file_assets_org_backend_path" ON "file_assets" USING btree ("organization_id","backend","path"); +--> statement-breakpoint +CREATE UNIQUE INDEX "uniq_datasets_organization_schema" ON "datasets" USING btree ("organization_id","schema_name") WHERE "datasets"."deleted_at" IS NULL; +--> statement-breakpoint +CREATE INDEX "idx_datasets_organization" ON "datasets" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_datasets_connection" ON "datasets" USING btree ("connection_id"); +--> statement-breakpoint +CREATE UNIQUE INDEX "uniq_dataset_relations_name" ON "dataset_relations" USING btree ("dataset_id","relation_name") WHERE "dataset_relations"."deleted_at" IS NULL; +--> statement-breakpoint +CREATE INDEX "idx_dataset_relations_organization" ON "dataset_relations" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relations_dataset" ON "dataset_relations" USING btree ("dataset_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relations_file_asset" ON "dataset_relations" USING btree ("file_asset_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_refresh_ops_organization" ON "dataset_refresh_operations" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_refresh_ops_dataset" ON "dataset_refresh_operations" USING btree ("dataset_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_refresh_ops_relation" ON "dataset_refresh_operations" USING btree ("relation_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_refresh_ops_status" ON "dataset_refresh_operations" USING btree ("status"); +--> statement-breakpoint +CREATE UNIQUE INDEX "uniq_dataset_relation_columns_name" ON "dataset_relation_columns" USING btree ("relation_id","column_name"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relation_columns_organization" ON "dataset_relation_columns" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relation_columns_dataset" ON "dataset_relation_columns" USING btree ("dataset_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relation_columns_relation" ON "dataset_relation_columns" USING btree ("relation_id"); diff --git a/packages/database/src/pglite/migrations/meta/_journal.json b/packages/database/src/pglite/migrations/meta/_journal.json index 89eaa2f9..b37f7699 100644 --- a/packages/database/src/pglite/migrations/meta/_journal.json +++ b/packages/database/src/pglite/migrations/meta/_journal.json @@ -71,6 +71,13 @@ "when": 1778574691122, "tag": "0009_personal_mcp_tokens", "breakpoints": true + }, + { + "idx": 10, + "version": "7", + "when": 1778648400000, + "tag": "0010_local_files", + "breakpoints": true } ] } diff --git a/packages/database/src/postgres/impl/local-files/index.ts b/packages/database/src/postgres/impl/local-files/index.ts new file mode 100644 index 00000000..eaac0d41 --- /dev/null +++ b/packages/database/src/postgres/impl/local-files/index.ts @@ -0,0 +1,266 @@ +import { and, eq, inArray, isNull } from 'drizzle-orm'; + +import { datasetRefreshOperations, datasetRelationColumns, datasetRelations, datasets, fileAssets } from '@dory/database/postgres/schemas/local-files'; +import { DatabaseError } from '@dory/shared/errors/DatabaseError'; +import type { PostgresDBClient } from '@dory/shared'; +import { translateDatabase } from '@dory/database/i18n'; + +import { getClient } from '../../client'; + +export type LocalFileAssetCreateInput = { + organizationId: string; + createdByUserId: string; + backend: string; + sourceType: string; + path: string; + storageKey?: string | null; + sizeBytes?: string | null; + mtimeMs?: string | null; + metadata?: string; +}; + +export type DatasetCreateInput = { + organizationId: string; + createdByUserId: string; + connectionId: string; + name: string; + schemaName: string; +}; + +export type DatasetRelationCreateInput = { + organizationId: string; + datasetId: string; + fileAssetId: string; + sourceType: string; + sheetName?: string | null; + relationName: string; + mode: string; + duckdbSchema: string; + duckdbRelation: string; + physicalTableName?: string | null; + sourceFingerprint?: string | null; + lastSourceFingerprint?: string | null; + schemaDriftStatus?: string; + refreshStrategy?: string; + readSql: string; +}; + +export type DatasetRelationColumnSnapshotInput = { + organizationId: string; + datasetId: string; + relationId: string; + columnName: string; + columnType: string; + nullable?: string | null; + detectedSemantic?: string | null; + sampleValues?: string; + summary?: string; + ordinalPosition: number; +}; + +export class PostgresLocalFilesRepository { + private db!: PostgresDBClient; + + async init() { + try { + const client = await getClient(); + if (!client) { + throw new DatabaseError(translateDatabase('Database.Errors.ConnectionFailed'), 500); + } + this.db = client as PostgresDBClient; + } catch (e) { + console.error(translateDatabase('Database.Logs.InitFailed'), e); + throw new DatabaseError(translateDatabase('Database.Errors.InitFailed'), 500); + } + } + + async createDatasetWithRelations(input: { + fileAsset: LocalFileAssetCreateInput; + dataset: DatasetCreateInput; + relations: Omit[]; + }) { + return this.db.transaction(async tx => { + const [fileAsset] = await tx.insert(fileAssets).values(input.fileAsset).returning(); + const [dataset] = await tx.insert(datasets).values(input.dataset).returning(); + if (!fileAsset || !dataset) { + throw new DatabaseError('Failed to create local file dataset', 500); + } + + const relationRows = input.relations.length + ? await tx + .insert(datasetRelations) + .values( + input.relations.map(relation => ({ + ...relation, + datasetId: dataset.id, + fileAssetId: fileAsset.id, + })), + ) + .returning() + : []; + + return { + fileAsset, + dataset, + relations: relationRows, + }; + }); + } + + async getDataset(organizationId: string, datasetId: string) { + const [dataset] = await this.db + .select() + .from(datasets) + .where(and(eq(datasets.organizationId, organizationId), eq(datasets.id, datasetId), isNull(datasets.deletedAt))) + .limit(1); + if (!dataset) return null; + + const relations = await this.db + .select() + .from(datasetRelations) + .where(and(eq(datasetRelations.organizationId, organizationId), eq(datasetRelations.datasetId, datasetId), isNull(datasetRelations.deletedAt))); + + const assetIds = [...new Set(relations.map(relation => relation.fileAssetId))]; + const assets = assetIds.length ? await this.db.select().from(fileAssets).where(inArray(fileAssets.id, assetIds)) : []; + + return { + dataset, + relations, + fileAssets: assets, + }; + } + + async replaceColumnSnapshots(relationId: string, columns: DatasetRelationColumnSnapshotInput[]) { + await this.db.transaction(async tx => { + await tx.delete(datasetRelationColumns).where(eq(datasetRelationColumns.relationId, relationId)); + if (columns.length) { + await tx.insert(datasetRelationColumns).values(columns); + } + }); + } + + async markDatasetRefresh(organizationId: string, datasetId: string, input: { status: string; error?: string | null }) { + await this.db + .update(datasets) + .set({ + refreshStatus: input.status, + lastRefreshError: input.error ?? null, + lastRefreshAt: input.status === 'success' ? new Date() : undefined, + }) + .where(and(eq(datasets.organizationId, organizationId), eq(datasets.id, datasetId))); + } + + async markRelationsRefreshed(relationIds: string[], input: { status: string; error?: string | null }) { + if (!relationIds.length) return; + await this.db + .update(datasetRelations) + .set({ + status: input.status, + lastRefreshError: input.error ?? null, + lastRefreshAt: input.status === 'ready' ? new Date() : undefined, + }) + .where(inArray(datasetRelations.id, relationIds)); + } + + async updateRelationRefreshState( + relationId: string, + input: { + status: string; + sourceFingerprint?: string | null; + lastSourceFingerprint?: string | null; + schemaDriftStatus?: string; + error?: string | null; + }, + ) { + await this.db + .update(datasetRelations) + .set({ + status: input.status, + sourceFingerprint: input.sourceFingerprint ?? undefined, + lastSourceFingerprint: input.lastSourceFingerprint ?? undefined, + schemaDriftStatus: input.schemaDriftStatus ?? undefined, + lastRefreshError: input.error ?? null, + lastRefreshAt: input.status === 'ready' ? new Date() : undefined, + }) + .where(eq(datasetRelations.id, relationId)); + } + + async createRefreshOperation(input: { + organizationId: string; + datasetId: string; + relationId?: string | null; + reason: string; + sourceFingerprint?: string | null; + previousSourceFingerprint?: string | null; + }) { + const [operation] = await this.db.insert(datasetRefreshOperations).values(input).returning(); + if (!operation) { + throw new DatabaseError('Failed to create dataset refresh operation', 500); + } + return operation; + } + + async finishRefreshOperation( + operationId: string, + input: { + status: string; + schemaDriftStatus?: string; + sourceFingerprint?: string | null; + previousSourceFingerprint?: string | null; + error?: string | null; + }, + ) { + await this.db + .update(datasetRefreshOperations) + .set({ + status: input.status, + schemaDriftStatus: input.schemaDriftStatus ?? undefined, + sourceFingerprint: input.sourceFingerprint ?? undefined, + previousSourceFingerprint: input.previousSourceFingerprint ?? undefined, + error: input.error ?? null, + finishedAt: new Date(), + }) + .where(eq(datasetRefreshOperations.id, operationId)); + } + + async getColumnSnapshotsForConnectionRelation(organizationId: string, connectionId: string, tableRef: string) { + const [schemaName, relationName] = tableRef.includes('.') ? tableRef.split('.', 2) : [null, tableRef]; + const datasetRows = await this.db + .select() + .from(datasets) + .where( + and( + eq(datasets.organizationId, organizationId), + eq(datasets.connectionId, connectionId), + schemaName ? eq(datasets.schemaName, schemaName) : isNull(datasets.deletedAt), + isNull(datasets.deletedAt), + ), + ); + if (!datasetRows.length) return null; + + const relationRows = await this.db + .select() + .from(datasetRelations) + .where( + and( + eq(datasetRelations.organizationId, organizationId), + inArray( + datasetRelations.datasetId, + datasetRows.map(dataset => dataset.id), + ), + eq(datasetRelations.relationName, relationName), + isNull(datasetRelations.deletedAt), + ), + ) + .limit(1); + + const relation = relationRows[0]; + if (!relation) return null; + + return this.db.select().from(datasetRelationColumns).where(eq(datasetRelationColumns.relationId, relation.id)).orderBy(datasetRelationColumns.ordinalPosition); + } + + async getColumnSnapshotsForRelation(relationId: string) { + return this.db.select().from(datasetRelationColumns).where(eq(datasetRelationColumns.relationId, relationId)).orderBy(datasetRelationColumns.ordinalPosition); + } +} diff --git a/packages/database/src/postgres/migrations/0010_local_files.sql b/packages/database/src/postgres/migrations/0010_local_files.sql new file mode 100644 index 00000000..b9af858b --- /dev/null +++ b/packages/database/src/postgres/migrations/0010_local_files.sql @@ -0,0 +1,123 @@ +CREATE TABLE "file_assets" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "created_by_user_id" text, + "backend" text NOT NULL, + "source_type" text NOT NULL, + "path" text NOT NULL, + "storage_key" text, + "size_bytes" text, + "mtime_ms" text, + "status" text DEFAULT 'ready' NOT NULL, + "metadata" text DEFAULT '{}' NOT NULL, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + "deleted_at" timestamp with time zone +); +--> statement-breakpoint +CREATE TABLE "datasets" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "created_by_user_id" text, + "connection_id" text NOT NULL, + "name" text NOT NULL, + "schema_name" text NOT NULL, + "status" text DEFAULT 'ready' NOT NULL, + "refresh_status" text DEFAULT 'idle' NOT NULL, + "last_refresh_at" timestamp with time zone, + "last_refresh_error" text, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + "deleted_at" timestamp with time zone +); +--> statement-breakpoint +CREATE TABLE "dataset_relations" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "dataset_id" text NOT NULL, + "file_asset_id" text NOT NULL, + "source_type" text NOT NULL, + "sheet_name" text, + "relation_name" text NOT NULL, + "mode" text DEFAULT 'virtual' NOT NULL, + "duckdb_schema" text NOT NULL, + "duckdb_relation" text NOT NULL, + "physical_table_name" text, + "source_fingerprint" text, + "last_source_fingerprint" text, + "schema_drift_status" text DEFAULT 'unknown' NOT NULL, + "refresh_strategy" text DEFAULT 'manual' NOT NULL, + "read_sql" text NOT NULL, + "status" text DEFAULT 'ready' NOT NULL, + "last_refresh_at" timestamp with time zone, + "last_refresh_error" text, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + "deleted_at" timestamp with time zone +); +--> statement-breakpoint +CREATE TABLE "dataset_refresh_operations" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "dataset_id" text NOT NULL, + "relation_id" text, + "operation_type" text DEFAULT 'refresh' NOT NULL, + "status" text DEFAULT 'running' NOT NULL, + "reason" text DEFAULT 'manual' NOT NULL, + "source_fingerprint" text, + "previous_source_fingerprint" text, + "schema_drift_status" text DEFAULT 'unknown' NOT NULL, + "error" text, + "started_at" timestamp with time zone DEFAULT now() NOT NULL, + "finished_at" timestamp with time zone, + "created_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE TABLE "dataset_relation_columns" ( + "id" text PRIMARY KEY NOT NULL, + "organization_id" text NOT NULL, + "dataset_id" text NOT NULL, + "relation_id" text NOT NULL, + "column_name" text NOT NULL, + "column_type" text NOT NULL, + "nullable" text, + "detected_semantic" text, + "sample_values" text DEFAULT '[]' NOT NULL, + "summary" text DEFAULT '{}' NOT NULL, + "ordinal_position" integer DEFAULT 0 NOT NULL, + "refreshed_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE INDEX "idx_file_assets_organization" ON "file_assets" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_file_assets_org_backend_path" ON "file_assets" USING btree ("organization_id","backend","path"); +--> statement-breakpoint +CREATE UNIQUE INDEX "uniq_datasets_organization_schema" ON "datasets" USING btree ("organization_id","schema_name") WHERE "datasets"."deleted_at" IS NULL; +--> statement-breakpoint +CREATE INDEX "idx_datasets_organization" ON "datasets" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_datasets_connection" ON "datasets" USING btree ("connection_id"); +--> statement-breakpoint +CREATE UNIQUE INDEX "uniq_dataset_relations_name" ON "dataset_relations" USING btree ("dataset_id","relation_name") WHERE "dataset_relations"."deleted_at" IS NULL; +--> statement-breakpoint +CREATE INDEX "idx_dataset_relations_organization" ON "dataset_relations" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relations_dataset" ON "dataset_relations" USING btree ("dataset_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relations_file_asset" ON "dataset_relations" USING btree ("file_asset_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_refresh_ops_organization" ON "dataset_refresh_operations" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_refresh_ops_dataset" ON "dataset_refresh_operations" USING btree ("dataset_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_refresh_ops_relation" ON "dataset_refresh_operations" USING btree ("relation_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_refresh_ops_status" ON "dataset_refresh_operations" USING btree ("status"); +--> statement-breakpoint +CREATE UNIQUE INDEX "uniq_dataset_relation_columns_name" ON "dataset_relation_columns" USING btree ("relation_id","column_name"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relation_columns_organization" ON "dataset_relation_columns" USING btree ("organization_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relation_columns_dataset" ON "dataset_relation_columns" USING btree ("dataset_id"); +--> statement-breakpoint +CREATE INDEX "idx_dataset_relation_columns_relation" ON "dataset_relation_columns" USING btree ("relation_id"); diff --git a/packages/database/src/postgres/migrations/meta/_journal.json b/packages/database/src/postgres/migrations/meta/_journal.json index df27e9ea..ee02b91e 100644 --- a/packages/database/src/postgres/migrations/meta/_journal.json +++ b/packages/database/src/postgres/migrations/meta/_journal.json @@ -71,6 +71,13 @@ "when": 1778574691122, "tag": "0009_personal_mcp_tokens", "breakpoints": true + }, + { + "idx": 10, + "version": "7", + "when": 1778648400000, + "tag": "0010_local_files", + "breakpoints": true } ] } diff --git a/packages/database/src/postgres/schemas/index.ts b/packages/database/src/postgres/schemas/index.ts index 4e842953..9ee88f66 100644 --- a/packages/database/src/postgres/schemas/index.ts +++ b/packages/database/src/postgres/schemas/index.ts @@ -12,6 +12,7 @@ import * as savedQueryFoldersSchema from './saved-query-folders'; import * as aiUsageSchema from './ai-usage'; import * as syncOperationsSchema from './sync-operations'; import * as mcpSchema from './mcp'; +import * as localFilesSchema from './local-files'; export * from './tabs'; export * from './auth-schema'; @@ -27,6 +28,7 @@ export * from './saved-query-folders'; export * from './ai-usage'; export * from './sync-operations'; export * from './mcp'; +export * from './local-files'; export const schema = { ...tabsSchema, @@ -43,6 +45,7 @@ export const schema = { ...aiUsageSchema, ...syncOperationsSchema, ...mcpSchema, + ...localFilesSchema, }; export type DBSchema = typeof schema; diff --git a/packages/database/src/postgres/schemas/local-files.ts b/packages/database/src/postgres/schemas/local-files.ts new file mode 100644 index 00000000..467b16ca --- /dev/null +++ b/packages/database/src/postgres/schemas/local-files.ts @@ -0,0 +1,152 @@ +import { integer, text, timestamp, pgTable, index, uniqueIndex } from 'drizzle-orm/pg-core'; +import { sql } from 'drizzle-orm'; +import { newEntityId } from '@dory/shared/id'; + +export const fileAssets = pgTable( + 'file_assets', + { + id: text('id') + .primaryKey() + .$defaultFn(() => newEntityId()), + organizationId: text('organization_id').notNull(), + createdByUserId: text('created_by_user_id'), + backend: text('backend').notNull(), + sourceType: text('source_type').notNull(), + path: text('path').notNull(), + storageKey: text('storage_key'), + sizeBytes: text('size_bytes'), + mtimeMs: text('mtime_ms'), + status: text('status').notNull().default('ready'), + metadata: text('metadata').notNull().default('{}'), + createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(), + updatedAt: timestamp('updated_at', { withTimezone: true }) + .notNull() + .$onUpdateFn(() => new Date()), + deletedAt: timestamp('deleted_at', { withTimezone: true }), + }, + t => [index('idx_file_assets_organization').on(t.organizationId), index('idx_file_assets_org_backend_path').on(t.organizationId, t.backend, t.path)], +); + +export const datasets = pgTable( + 'datasets', + { + id: text('id') + .primaryKey() + .$defaultFn(() => newEntityId()), + organizationId: text('organization_id').notNull(), + createdByUserId: text('created_by_user_id'), + connectionId: text('connection_id').notNull(), + name: text('name').notNull(), + schemaName: text('schema_name').notNull(), + status: text('status').notNull().default('ready'), + refreshStatus: text('refresh_status').notNull().default('idle'), + lastRefreshAt: timestamp('last_refresh_at', { withTimezone: true }), + lastRefreshError: text('last_refresh_error'), + createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(), + updatedAt: timestamp('updated_at', { withTimezone: true }) + .notNull() + .$onUpdateFn(() => new Date()), + deletedAt: timestamp('deleted_at', { withTimezone: true }), + }, + t => [ + uniqueIndex('uniq_datasets_organization_schema') + .on(t.organizationId, t.schemaName) + .where(sql`${t.deletedAt} IS NULL`), + index('idx_datasets_organization').on(t.organizationId), + index('idx_datasets_connection').on(t.connectionId), + ], +); + +export const datasetRelations = pgTable( + 'dataset_relations', + { + id: text('id') + .primaryKey() + .$defaultFn(() => newEntityId()), + organizationId: text('organization_id').notNull(), + datasetId: text('dataset_id').notNull(), + fileAssetId: text('file_asset_id').notNull(), + sourceType: text('source_type').notNull(), + sheetName: text('sheet_name'), + relationName: text('relation_name').notNull(), + mode: text('mode').notNull().default('virtual'), + duckdbSchema: text('duckdb_schema').notNull(), + duckdbRelation: text('duckdb_relation').notNull(), + physicalTableName: text('physical_table_name'), + sourceFingerprint: text('source_fingerprint'), + lastSourceFingerprint: text('last_source_fingerprint'), + schemaDriftStatus: text('schema_drift_status').notNull().default('unknown'), + refreshStrategy: text('refresh_strategy').notNull().default('manual'), + readSql: text('read_sql').notNull(), + status: text('status').notNull().default('ready'), + lastRefreshAt: timestamp('last_refresh_at', { withTimezone: true }), + lastRefreshError: text('last_refresh_error'), + createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(), + updatedAt: timestamp('updated_at', { withTimezone: true }) + .notNull() + .$onUpdateFn(() => new Date()), + deletedAt: timestamp('deleted_at', { withTimezone: true }), + }, + t => [ + uniqueIndex('uniq_dataset_relations_name') + .on(t.datasetId, t.relationName) + .where(sql`${t.deletedAt} IS NULL`), + index('idx_dataset_relations_organization').on(t.organizationId), + index('idx_dataset_relations_dataset').on(t.datasetId), + index('idx_dataset_relations_file_asset').on(t.fileAssetId), + ], +); + +export const datasetRefreshOperations = pgTable( + 'dataset_refresh_operations', + { + id: text('id') + .primaryKey() + .$defaultFn(() => newEntityId()), + organizationId: text('organization_id').notNull(), + datasetId: text('dataset_id').notNull(), + relationId: text('relation_id'), + operationType: text('operation_type').notNull().default('refresh'), + status: text('status').notNull().default('running'), + reason: text('reason').notNull().default('manual'), + sourceFingerprint: text('source_fingerprint'), + previousSourceFingerprint: text('previous_source_fingerprint'), + schemaDriftStatus: text('schema_drift_status').notNull().default('unknown'), + error: text('error'), + startedAt: timestamp('started_at', { withTimezone: true }).notNull().defaultNow(), + finishedAt: timestamp('finished_at', { withTimezone: true }), + createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(), + }, + t => [ + index('idx_dataset_refresh_ops_organization').on(t.organizationId), + index('idx_dataset_refresh_ops_dataset').on(t.datasetId), + index('idx_dataset_refresh_ops_relation').on(t.relationId), + index('idx_dataset_refresh_ops_status').on(t.status), + ], +); + +export const datasetRelationColumns = pgTable( + 'dataset_relation_columns', + { + id: text('id') + .primaryKey() + .$defaultFn(() => newEntityId()), + organizationId: text('organization_id').notNull(), + datasetId: text('dataset_id').notNull(), + relationId: text('relation_id').notNull(), + columnName: text('column_name').notNull(), + columnType: text('column_type').notNull(), + nullable: text('nullable'), + detectedSemantic: text('detected_semantic'), + sampleValues: text('sample_values').notNull().default('[]'), + summary: text('summary').notNull().default('{}'), + ordinalPosition: integer('ordinal_position').notNull().default(0), + refreshedAt: timestamp('refreshed_at', { withTimezone: true }).notNull().defaultNow(), + }, + t => [ + uniqueIndex('uniq_dataset_relation_columns_name').on(t.relationId, t.columnName), + index('idx_dataset_relation_columns_organization').on(t.organizationId), + index('idx_dataset_relation_columns_dataset').on(t.datasetId), + index('idx_dataset_relation_columns_relation').on(t.relationId), + ], +); diff --git a/packages/database/src/schema.ts b/packages/database/src/schema.ts index 1f1a22fe..9ff3d39e 100644 --- a/packages/database/src/schema.ts +++ b/packages/database/src/schema.ts @@ -18,5 +18,9 @@ export const subscription = activeSchemas.subscription; export const organizations = activeSchemas?.organizations; export const ai_schema_cache = activeSchemas?.aiSchemaCache; export const mcpAccessTokens = activeSchemas?.mcpAccessTokens; +export const fileAssets = activeSchemas?.fileAssets; +export const datasets = activeSchemas?.datasets; +export const datasetRelations = activeSchemas?.datasetRelations; +export const datasetRelationColumns = activeSchemas?.datasetRelationColumns; export type ActiveDBSchema = typeof activeSchemas.schema; diff --git a/packages/drivers/src/database/duckdb/duckdb-driver.ts b/packages/drivers/src/database/duckdb/duckdb-driver.ts index cef28b14..81efb7f8 100644 --- a/packages/drivers/src/database/duckdb/duckdb-driver.ts +++ b/packages/drivers/src/database/duckdb/duckdb-driver.ts @@ -29,7 +29,7 @@ export function isMotherDuckConfig(config: BaseConfig): boolean { return parseOptions(config).mode === 'motherduck'; } -function assertAbsoluteExistingPath(filePath?: string): string { +function assertAbsoluteExistingPath(filePath: string | undefined, options: Record): string { const normalized = filePath?.trim(); if (!normalized) { throw new Error('DuckDB path is required'); @@ -38,7 +38,11 @@ function assertAbsoluteExistingPath(filePath?: string): string { throw new Error('DuckDB path must be absolute'); } if (!fs.existsSync(normalized)) { - throw new Error('DuckDB file does not exist'); + if (options.createIfMissing === true) { + fs.mkdirSync(path.dirname(normalized), { recursive: true }); + } else { + throw new Error('DuckDB file does not exist'); + } } return normalized; } @@ -70,6 +74,7 @@ function resolveDuckDbInstanceOptions(config: BaseConfig): Record = { + '.csv': 'csv', + '.tsv': 'csv', + '.parquet': 'parquet', + '.json': 'json', + '.jsonl': 'json', + '.ndjson': 'json', + '.xlsx': 'excel', + '.xlsm': 'excel', +}; + +export type FileBackend = { + name: LocalFileSourceBackend; + statSource(source: LocalFileSourceDescriptor): Promise; + openReadable(source: LocalFileSourceDescriptor): Promise; + resolveDuckdbPath(source: LocalFileSourceDescriptor): Promise; +}; + +type ZipEntry = { + name: string; + compressionMethod: number; + compressedSize: number; + uncompressedSize: number; + localHeaderOffset: number; +}; + +const backends = new Map(); + +function quoteLiteral(value: string) { + return `'${value.replace(/'/g, "''")}'`; +} + +function normalizeIdentifier(value: string, fallback = 'relation') { + const normalized = value + .trim() + .toLowerCase() + .replace(/[^a-z0-9_]+/g, '_') + .replace(/^_+|_+$/g, '') + .replace(/_+/g, '_'); + const withFallback = normalized || fallback; + return /^[a-z_]/.test(withFallback) ? withFallback : `_${withFallback}`; +} + +function uniqueName(base: string, used: Set) { + let candidate = base; + let suffix = 2; + while (used.has(candidate)) { + candidate = `${base}_${suffix}`; + suffix += 1; + } + used.add(candidate); + return candidate; +} + +function decodeXml(value: string) { + return value + .replace(/"/g, '"') + .replace(/'/g, "'") + .replace(/</g, '<') + .replace(/>/g, '>') + .replace(/&/g, '&'); +} + +function readUInt32(buffer: Buffer, offset: number) { + return buffer.readUInt32LE(offset); +} + +function readUInt16(buffer: Buffer, offset: number) { + return buffer.readUInt16LE(offset); +} + +function sourceBackend(source: LocalFileSourceDescriptor): LocalFileSourceBackend { + return source.backend; +} + +function unsupportedBackend(name: LocalFileSourceBackend): FileBackend { + const fail = async () => { + throw new Error(`Unsupported file backend: ${name}`); + }; + return { + name, + statSource: fail, + openReadable: fail, + resolveDuckdbPath: fail, + }; +} + +function parseZipEntries(buffer: Buffer): ZipEntry[] { + let eocdOffset = -1; + for (let offset = buffer.length - 22; offset >= 0; offset -= 1) { + if (readUInt32(buffer, offset) === 0x06054b50) { + eocdOffset = offset; + break; + } + } + if (eocdOffset < 0) { + throw new Error('Invalid XLSX file: missing zip directory'); + } + + const entryCount = readUInt16(buffer, eocdOffset + 10); + let centralOffset = readUInt32(buffer, eocdOffset + 16); + const entries: ZipEntry[] = []; + + for (let i = 0; i < entryCount; i += 1) { + if (readUInt32(buffer, centralOffset) !== 0x02014b50) { + throw new Error('Invalid XLSX file: malformed zip directory'); + } + const compressionMethod = readUInt16(buffer, centralOffset + 10); + const compressedSize = readUInt32(buffer, centralOffset + 20); + const uncompressedSize = readUInt32(buffer, centralOffset + 24); + const fileNameLength = readUInt16(buffer, centralOffset + 28); + const extraLength = readUInt16(buffer, centralOffset + 30); + const commentLength = readUInt16(buffer, centralOffset + 32); + const localHeaderOffset = readUInt32(buffer, centralOffset + 42); + const name = buffer.subarray(centralOffset + 46, centralOffset + 46 + fileNameLength).toString('utf8'); + + entries.push({ + name, + compressionMethod, + compressedSize, + uncompressedSize, + localHeaderOffset, + }); + + centralOffset += 46 + fileNameLength + extraLength + commentLength; + } + + return entries; +} + +function readZipEntry(buffer: Buffer, entry: ZipEntry) { + const offset = entry.localHeaderOffset; + if (readUInt32(buffer, offset) !== 0x04034b50) { + throw new Error(`Invalid XLSX file: malformed local header for ${entry.name}`); + } + const fileNameLength = readUInt16(buffer, offset + 26); + const extraLength = readUInt16(buffer, offset + 28); + const dataStart = offset + 30 + fileNameLength + extraLength; + const compressed = buffer.subarray(dataStart, dataStart + entry.compressedSize); + + if (entry.compressionMethod === 0) { + return compressed.toString('utf8'); + } + if (entry.compressionMethod === 8) { + return inflateRawSync(compressed).toString('utf8'); + } + + throw new Error(`Unsupported XLSX compression method: ${entry.compressionMethod}`); +} + +function extractWorkbookSheets(workbookXml: string): string[] { + const sheets: string[] = []; + const sheetRegex = /]*\bname=(["'])(.*?)\1[^>]*\/?>/g; + for (const match of workbookXml.matchAll(sheetRegex)) { + if (match[2]) { + sheets.push(decodeXml(match[2])); + } + } + return sheets; +} + +async function inspectExcelSheets(filePath: string): Promise { + const buffer = await fs.readFile(filePath); + const entries = parseZipEntries(buffer); + const workbookEntry = entries.find(entry => entry.name === 'xl/workbook.xml'); + if (!workbookEntry) { + throw new Error('Invalid XLSX file: missing workbook.xml'); + } + return extractWorkbookSheets(readZipEntry(buffer, workbookEntry)); +} + +export function detectSourceType(filePath: string): LocalFileSourceType { + const extension = path.extname(filePath).toLowerCase(); + const type = SUPPORTED_EXTENSIONS[extension]; + if (!type) { + throw new Error(`Unsupported file type: ${extension || 'unknown'}`); + } + return type; +} + +export function normalizeRelationName(value: string, fallback = 'relation') { + return normalizeIdentifier(value, fallback); +} + +export function normalizeDatasetSchemaName(value: string, fallback = 'dataset') { + return normalizeIdentifier(value, fallback); +} + +export function fingerprintSourceStat(stat: LocalFileSourceStat) { + return `${stat.backend}:${stat.path}:${stat.sizeBytes}:${Math.round(stat.mtimeMs)}:${stat.sourceType}`; +} + +export const serverPathBackend: FileBackend = { + name: 'serverPath', + async statSource(source) { + if (source.backend !== 'serverPath') { + throw new Error(`serverPath backend cannot read ${source.backend} sources`); + } + const filePath = source.filePath.trim(); + if (!path.isAbsolute(filePath)) { + throw new Error('Local file path must be absolute'); + } + + const stat = await fs.stat(filePath); + if (!stat.isFile()) { + throw new Error('Local file path must point to a file'); + } + await fs.access(filePath, fsConstants.R_OK); + + return { + backend: 'serverPath', + path: filePath, + sizeBytes: stat.size, + mtimeMs: stat.mtimeMs, + sourceType: detectSourceType(filePath), + }; + }, + async openReadable(source): Promise { + const readPath = await this.resolveDuckdbPath(source); + return createReadStream(readPath); + }, + async resolveDuckdbPath(source) { + const stat = await this.statSource(source); + return stat.path; + }, +}; + +export function registerFileBackend(backend: FileBackend) { + backends.set(backend.name, backend); +} + +export function getFileBackend(name: LocalFileSourceBackend): FileBackend { + return backends.get(name) ?? unsupportedBackend(name); +} + +export function listFileBackends() { + return [...backends.keys()]; +} + +for (const backend of [serverPathBackend]) { + registerFileBackend(backend); +} + +export async function statSource(source: LocalFileSourceDescriptor): Promise { + return getFileBackend(sourceBackend(source)).statSource(source); +} + +export async function openReadable(source: LocalFileSourceDescriptor): Promise { + return getFileBackend(sourceBackend(source)).openReadable(source); +} + +export async function resolveDuckDbReadPath(source: LocalFileSourceDescriptor): Promise { + return getFileBackend(sourceBackend(source)).resolveDuckdbPath(source); +} + +export async function resolveDuckdbPath(source: LocalFileSourceDescriptor): Promise { + return resolveDuckDbReadPath(source); +} + +export async function inspectSource(source: LocalFileSourceDescriptor, options?: { mode?: LocalFileRelationMode }): Promise { + const stat = await statSource(source); + const duckdbPath = await resolveDuckDbReadPath(source); + const sourceFingerprint = fingerprintSourceStat(stat); + const mode = options?.mode ?? 'virtual'; + const used = new Set(); + + if (stat.sourceType === 'excel') { + const sheets = await inspectExcelSheets(duckdbPath); + if (!sheets.length) { + throw new Error('Excel workbook has no sheets'); + } + return sheets.map(sheetName => ({ + sourceType: 'excel', + source, + duckdbPath, + sheetName, + relationName: uniqueName(normalizeRelationName(sheetName, 'sheet'), used), + mode, + sourceFingerprint, + })); + } + + const parsed = path.parse(duckdbPath); + return [ + { + sourceType: stat.sourceType, + source, + duckdbPath, + relationName: uniqueName(normalizeRelationName('data', parsed.name || 'data'), used), + mode, + sourceFingerprint, + }, + ]; +} + +export function buildReadSql(relation: LocalFileRelationManifest): string { + const filePath = quoteLiteral(relation.duckdbPath); + switch (relation.sourceType) { + case 'excel': + if (!relation.sheetName) { + throw new Error('Excel relation requires a sheet name'); + } + return `SELECT * FROM read_xlsx(${filePath}, sheet = ${quoteLiteral(relation.sheetName)})`; + case 'csv': { + const extension = path.extname(relation.duckdbPath).toLowerCase(); + const delimiter = extension === '.tsv' ? `, delim = ${quoteLiteral('\t')}` : ''; + return `SELECT * FROM read_csv_auto(${filePath}${delimiter})`; + } + case 'parquet': + return `SELECT * FROM read_parquet(${filePath})`; + case 'json': + return `SELECT * FROM read_json_auto(${filePath})`; + default: + throw new Error(`Unsupported source type: ${(relation as LocalFileRelationManifest).sourceType}`); + } +} + +export async function sampleSource(_source: LocalFileSourceDescriptor): Promise[]> { + return []; +} diff --git a/packages/files/tsconfig.json b/packages/files/tsconfig.json new file mode 100644 index 00000000..9170ce79 --- /dev/null +++ b/packages/files/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2022", + "lib": ["ES2022"], + "module": "esnext", + "moduleResolution": "bundler", + "strict": true, + "noEmit": true, + "esModuleInterop": true, + "skipLibCheck": true, + "isolatedModules": true, + "resolveJsonModule": true, + "types": ["node"] + }, + "include": ["src/**/*.ts", "src/**/*.d.ts"], + "exclude": ["node_modules"] +} diff --git a/packages/shared/src/types/index.ts b/packages/shared/src/types/index.ts index b0bb6b54..fda16b44 100644 --- a/packages/shared/src/types/index.ts +++ b/packages/shared/src/types/index.ts @@ -10,3 +10,4 @@ export * from "./connector"; export * from "./assistant"; export * from "./ai-schema-cache"; export * from "./ai-usage"; +export * from "./local-files"; diff --git a/packages/shared/src/types/local-files.ts b/packages/shared/src/types/local-files.ts new file mode 100644 index 00000000..0f5fedbc --- /dev/null +++ b/packages/shared/src/types/local-files.ts @@ -0,0 +1,68 @@ +export type LocalFileSourceBackend = 'serverPath' | 'local' | 's3' | 'r2' | 'oss' | 'gcs'; + +export type LocalFileSourceType = 'excel' | 'csv' | 'parquet' | 'json'; + +export type LocalFileRelationMode = 'virtual' | 'cached' | 'materialized'; + +export type LocalFileSchemaDriftStatus = 'unknown' | 'none' | 'changed'; + +export type LocalFileRefreshStrategy = 'manual' | 'onChange' | 'scheduled'; + +export type LocalFileSourceDescriptor = + | { + backend: 'serverPath'; + filePath: string; + } + | { + backend: Exclude; + key: string; + bucket?: string; + }; + +export type LocalFileSourceStat = { + backend: LocalFileSourceBackend; + path: string; + sizeBytes: number; + mtimeMs: number; + sourceType: LocalFileSourceType; +}; + +export type LocalFileRelationManifest = { + sourceType: LocalFileSourceType; + source: LocalFileSourceDescriptor; + duckdbPath: string; + sheetName?: string; + relationName: string; + mode: LocalFileRelationMode; + sourceFingerprint: string; +}; + +export type LocalFilesInspectRequest = { + source: LocalFileSourceDescriptor; +}; + +export type LocalFilesInspectResponse = { + source: LocalFileSourceStat; + relations: LocalFileRelationManifest[]; +}; + +export type LocalFilesCreateRequest = { + name: string; + schemaName?: string; + source: LocalFileSourceDescriptor; + relations: LocalFileRelationManifest[]; + mode?: LocalFileRelationMode; +}; + +export type LocalFilesRefreshRequest = { + datasetId: string; +}; + +export type DatasetColumnSnapshot = { + name: string; + type: string; + nullable: boolean | null; + detectedSemantic: string | null; + sampleValues: unknown[]; + summary: Record | null; +}; diff --git a/yarn.lock b/yarn.lock index 9d81afbf..ac4fc971 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2340,6 +2340,16 @@ __metadata: languageName: unknown linkType: soft +"@dory/files@npm:0.1.0, @dory/files@workspace:packages/files": + version: 0.0.0-use.local + resolution: "@dory/files@workspace:packages/files" + dependencies: + "@dory/shared": "npm:0.1.0" + "@types/node": "npm:^25.5.2" + typescript: "npm:^6.0.2" + languageName: unknown + linkType: soft + "@dory/i18n@npm:0.1.0, @dory/i18n@workspace:packages/i18n": version: 0.0.0-use.local resolution: "@dory/i18n@workspace:packages/i18n" @@ -22707,6 +22717,7 @@ __metadata: "@dory/analysis": "npm:0.1.0" "@dory/database": "npm:0.1.0" "@dory/drivers": "npm:0.1.0" + "@dory/files": "npm:0.1.0" "@dory/i18n": "npm:0.1.0" "@dory/shared": "npm:0.1.0" "@dory/web-utils": "npm:0.1.0"