Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions api/lib/functions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { batch } from '/api/lib/json_store.ts'
import { join } from '@std/path'
import { ensureDir } from '@std/fs'
import { DeploymentFunction } from '/api/schema.ts'

// Define the function signatures
export type FunctionContext = {
deploymentUrl: string
projectId: string
variables?: Record<string, unknown>
}

export type ReadTransformer<T = unknown> = (
row: T,
ctx: FunctionContext,
) => T | Promise<T>

export type ProjectFunctionModule = {
read?: ReadTransformer
config?: {
targets?: string[]
events?: string[]
}
}

export type LoadedFunction = {
name: string // filename
module: ProjectFunctionModule
}

// Map<projectSlug, List of loaded functions>
const functionsMap = new Map<string, LoadedFunction[]>()
let watcher: Deno.FsWatcher | null = null
const functionsDir = './db/functions'

export async function init() {
await ensureDir(functionsDir)
await loadAll()
startWatcher()
}

async function loadAll() {
console.info('Loading project functions...')
for await (const entry of Deno.readDir(functionsDir)) {
if (entry.isDirectory) {
await reloadProjectFunctions(entry.name)
}
}
}

async function reloadProjectFunctions(slug: string) {
const projectDir = join(functionsDir, slug)
const loaded: LoadedFunction[] = []

try {
await batch(5, Deno.readDir(projectDir), async (entry) => {
if (entry.isFile && entry.name.endsWith('.js')) {
const mainFile = join(projectDir, entry.name)
// Build a fresh import URL to bust cache
const importUrl = `file://${await Deno.realPath(
mainFile,
)}?t=${Date.now()}`
try {
const module = await import(importUrl)
// We expect a default export or specific named exports
const fns = module.default
if (fns && typeof fns === 'object') {
loaded.push({ name: entry.name, module: fns })
}
} catch (e) {
console.error(`Failed to import ${entry.name} for ${slug}:`, e)
}
}
})

// Sort by filename to ensure deterministic execution order
loaded.sort((a, b) => a.name.localeCompare(b.name))

if (loaded.length > 0) {
functionsMap.set(slug, loaded)
console.info(`Loaded ${loaded.length} functions for project: ${slug}`)
} else {
functionsMap.delete(slug)
}
} catch (err) {
if (!(err instanceof Deno.errors.NotFound)) {
console.error(`Failed to load functions for ${slug}:`, err)
}
functionsMap.delete(slug)
}
}

function startWatcher() {
if (watcher) return
console.info(`Starting function watcher on ${functionsDir}`)
watcher = Deno.watchFs(functionsDir, { recursive: true }) // Process events
;(async () => {
for await (const event of watcher!) {
if (['modify', 'create', 'remove'].includes(event.kind)) {
for (const path of event.paths) {
if (path.endsWith('.js')) {
const parts = path.split('/')
const fileName = parts.pop()
const slug = parts.pop()
if (fileName && slug) {
await reloadProjectFunctions(slug)
}
}
}
}
}
})()
}

export function getProjectFunctions(
slug: string,
): LoadedFunction[] | undefined {
return functionsMap.get(slug)
}

export function stopWatcher() {
if (watcher) {
watcher.close()
watcher = null
}
}

export async function applyReadTransformers<T>(
data: T,
projectId: string,
deploymentUrl: string,
tableName: string,
projectFunctions?: LoadedFunction[],
configMap?: Map<string, DeploymentFunction>,
): Promise<T> {
if (!projectFunctions || projectFunctions.length === 0) {
return data
}
let currentData = data
for (const { name, module } of projectFunctions) {
if (!module.read) continue
const config = configMap?.get(name)
if (!config) continue
if (module.config?.targets && !module.config.targets.includes(tableName)) {
continue
}
if (module.config?.events && !module.config.events.includes('read')) {
continue
}

const ctx: FunctionContext = {
deploymentUrl,
projectId,
variables: config.variables || {},
}

currentData = await module.read(currentData, ctx) as T
}

return currentData
}
146 changes: 146 additions & 0 deletions api/lib/functions_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { assertEquals } from '@std/assert'
import * as functions from './functions.ts'
import { join } from '@std/path'
import { ensureDir } from '@std/fs'
import { DeploymentFunctionsCollection } from '../schema.ts'

Deno.test('Functions Module - Pipeline & Config', async () => {
const testSlug = 'test-project-' + Date.now()
const functionsDir = './db/functions'
const projectDir = join(functionsDir, testSlug)
const file1 = join(projectDir, '01-first.js')
const file2 = join(projectDir, '02-second.js')

try {
await Deno.remove('./db_test/deployment_functions', { recursive: true })
await ensureDir('./db_test/deployment_functions')
} catch {
// Skipped
}

await ensureDir(projectDir)

// Initialize module
await functions.init()

// Define test row type
type TestRow = {
id: number
step1?: boolean
step2?: boolean
var1?: string
}

// 1. Create function files
const code1 = `
export default {
read: (row, ctx) => {
return { ...row, step1: true, var1: ctx.variables.var1 }
}
}
`
const code2 = `
export default {
read: (row) => {
return { ...row, step2: true }
}
}
`
await Deno.writeTextFile(file1, code1)
await Deno.writeTextFile(file2, code2)

// Give watcher time
await new Promise((r) => setTimeout(r, 1000))

// 2. Verify loading and sorting
const loaded = functions.getProjectFunctions(testSlug)
if (!loaded) throw new Error('Functions not loaded')
assertEquals(loaded.length, 2)
assertEquals(loaded[0].name, '01-first.js')
assertEquals(loaded[1].name, '02-second.js')

// 3. Mock Deployment Config
const deploymentUrl = 'test-pipeline-' + Date.now() + '.com'

// Config for 01-first.js (Enabled with variables)
await DeploymentFunctionsCollection.insert({
id: deploymentUrl + ':01-first.js',
deploymentUrl,
functionName: '01-first.js',
enabled: true,
variables: { var1: 'secret-value' },
})

// Config for 02-second.js (Disabled)
await DeploymentFunctionsCollection.insert({
id: deploymentUrl + ':02-second.js',
deploymentUrl,
functionName: '02-second.js',
enabled: false,
variables: {},
})

// 4. Simulate Pipeline Execution (Manually, echoing sql.ts logic)
// We can't import sql.ts functions easily here without mocking runSQL,
// so we re-implement the pipeline logic to verify the components work.

let row: TestRow = { id: 1 }
const configs = DeploymentFunctionsCollection.filter((c) =>
c.deploymentUrl === deploymentUrl && c.enabled
)
const configMap = new Map(configs.map((c) => [c.functionName, c]))

for (const { name, module } of loaded) {
const config = configMap.get(name)
if (!config || !module.read) continue

const ctx = {
deploymentUrl,
projectId: testSlug,
variables: config.variables || undefined,
}
row = await module.read(row, ctx) as TestRow
}

const result = row
assertEquals(result.step1, true)
assertEquals(result.var1, 'secret-value')
assertEquals(result.step2, undefined) // Should be skipped

// 5. Enable second function
await DeploymentFunctionsCollection.update(deploymentUrl + ':02-second.js', {
enabled: true,
})

// Rerun pipeline
row = { id: 1 }
const configs2 = DeploymentFunctionsCollection.filter((c) =>
c.deploymentUrl === deploymentUrl && c.enabled
)
const configMap2 = new Map(configs2.map((c) => [c.functionName, c]))

for (const { name, module } of loaded) {
const config = configMap2.get(name)
if (!config || !module.read) continue
const ctx = {
deploymentUrl,
projectId: testSlug,
variables: config.variables || undefined,
}
row = await module.read(row, ctx) as TestRow
}

const result2 = row
assertEquals(result2.step1, true)
assertEquals(result2.step2, true)

// Cleanup
await Deno.remove(projectDir, { recursive: true })
try {
await Deno.remove('./db_test/deployment_functions', { recursive: true })
} catch {
// Skipped
}
await new Promise((r) => setTimeout(r, 500))
functions.stopWatcher()
})
Loading
Loading