Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/cli/src/clickhouse-live.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
createJournalTableName,
createLiveExecutor,
createPrefix,
createStatelessLiveExecutor,
formatTestDiagnostic,
getRequiredEnv,
quoteIdent,
Expand Down Expand Up @@ -348,6 +349,10 @@ export default schema(events, eventCounts, eventCountsMv)
'refreshable materialized view lifecycle: create → modify schedule → recreate → remove',
async () => {
const executor = createLiveExecutor(liveEnv)
// Metadata polling below intentionally queries SHOW CREATE and system.tables
// in parallel; use a stateless executor so those reads do not share one
// ClickHouse HTTP session.
const metadataExecutor = createStatelessLiveExecutor(liveEnv)
const database = liveEnv.clickhouseDatabase
const journalTable = createJournalTableName('rmv')
const cliEnv = { CHKIT_JOURNAL_TABLE: journalTable }
Expand Down Expand Up @@ -417,15 +422,15 @@ export default schema(target, rmv)
)

const queryCreate = async (): Promise<string | null> => {
const rows = await executor.query<{ create_table_query: string }>(
const rows = await metadataExecutor.query<{ create_table_query: string }>(
`SELECT create_table_query FROM system.tables WHERE database = '${database}' AND name = '${rmvName}'`
)
return rows[0]?.create_table_query ?? null
}

const queryShowCreate = async (): Promise<string | null> => {
try {
const rows = await executor.query<{ statement: string }>(
const rows = await metadataExecutor.query<{ statement: string }>(
`SHOW CREATE TABLE ${database}.${rmvName}`
)
return rows[0]?.statement ?? null
Expand Down Expand Up @@ -668,6 +673,7 @@ export default schema(target, rmv)
await executor.command(
`DROP TABLE IF EXISTS ${quoteIdent(database)}.${quoteIdent(journalTable)}`
)
await metadataExecutor.close()
await executor.close()
}
},
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/e2e-testkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export {
type LiveEnv,
getRequiredEnv,
createLiveExecutor,
createStatelessLiveExecutor,
quoteIdent,
createRunTag,
createPrefix,
Expand Down
6 changes: 5 additions & 1 deletion packages/cli/src/migration-scenario.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { describe, expect, test } from 'bun:test'
import { describe, expect, test as bunTest } from 'bun:test'
import { rm, writeFile } from 'node:fs/promises'

import { CORE_ENTRY, createFixture, renderUsersSchema, runCli } from './testkit.test'

// These scenarios shell out several times. Keep them serial even when the
// package test script runs with --concurrent.
const test = bunTest.serial

describe('@chkit/cli migration scenario flows', () => {
test('start state + schema changes produce expected diff and migrate destructive gate', async () => {
const fixture = await createFixture()
Expand Down
20 changes: 19 additions & 1 deletion packages/clickhouse/src/e2e-testkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
* Uses ClickHouseExecutor so any package that depends on @chkit/clickhouse can import this.
*/

import { createClickHouseExecutor, type ClickHouseExecutor } from './index.js'
import {
createClickHouseExecutor,
createStatelessClickHouseExecutor,
type ClickHouseExecutor,
} from './index.js'

// ---------------------------------------------------------------------------
// Environment
Expand Down Expand Up @@ -57,6 +61,20 @@ export function createLiveExecutor(env: LiveEnv): ClickHouseExecutor {
})
}

/**
* Use only for live tests that intentionally issue parallel queries through one
* executor. The default live executor is session-bound and should be used for
* normal sequential DDL workflows.
*/
export function createStatelessLiveExecutor(env: LiveEnv): ClickHouseExecutor {
return createStatelessClickHouseExecutor({
url: env.clickhouseUrl,
username: env.clickhouseUser,
password: env.clickhousePassword,
database: env.clickhouseDatabase,
})
}

export function quoteIdent(value: string): string {
return `\`${value.replace(/`/g, '``')}\``
}
Expand Down
28 changes: 28 additions & 0 deletions packages/clickhouse/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import { describe, expect, test } from 'bun:test'

import {
createClickHouseExecutor,
createSessionClickHouseClient,
createStatelessClickHouseExecutor,
createStatelessClickHouseClient,
inferSchemaKindFromEngine,
parseEngineFromCreateTableQuery,
parseOrderByFromCreateTableQuery,
Expand All @@ -25,6 +28,31 @@ describe('@chkit/clickhouse smoke', () => {
expect(typeof executor.listSchemaObjects).toBe('function')
})

test('creates stateless executor with command/query methods', () => {
const executor = createStatelessClickHouseExecutor({
url: 'http://localhost:8123',
database: 'default',
})

expect(typeof executor.command).toBe('function')
expect(typeof executor.query).toBe('function')
expect(typeof executor.listSchemaObjects).toBe('function')
})

test('exposes stateless and session-bound client constructors', async () => {
const config = {
url: 'http://localhost:8123',
database: 'default',
}
const stateless = createStatelessClickHouseClient(config)
const session = createSessionClickHouseClient(config)

expect(typeof stateless.query).toBe('function')
expect(typeof session.query).toBe('function')

await Promise.all([stateless.close(), session.close()])
})

test('infers schema kind from ClickHouse engine', () => {
expect(inferSchemaKindFromEngine('MergeTree')).toBe('table')
expect(inferSchemaKindFromEngine('View')).toBe('view')
Expand Down
58 changes: 45 additions & 13 deletions packages/clickhouse/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ export interface IntrospectedTable {
ttl?: string
}

type ClickHouseClient = ReturnType<typeof createClient>
type ClickHouseConfig = NonNullable<ChxConfig['clickhouse']>

export {
parseEngineFromCreateTableQuery,
parseOrderByFromCreateTableQuery,
Expand Down Expand Up @@ -335,31 +338,50 @@ function logProfiling(
})
}

export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhouse']>): ClickHouseExecutor {
const profiler = getLogger(['chkit', 'profiling'])
const DEFAULT_CLICKHOUSE_SETTINGS: ClickHouseSettings = {
wait_end_of_query: 1,
async_insert: 0,
send_progress_in_http_headers: 1,
}

const client = createClient({
export function createStatelessClickHouseClient(
config: ClickHouseConfig,
clickhouseSettings: ClickHouseSettings = DEFAULT_CLICKHOUSE_SETTINGS,
): ClickHouseClient {
return createClient({
url: config.url,
username: config.username,
password: config.password,
database: config.database,
session_id: crypto.randomUUID(),
clickhouse_settings: {
wait_end_of_query: 1,
async_insert: 0,
send_progress_in_http_headers: 1,
},
clickhouse_settings: clickhouseSettings,
})
}

const fireAndForgetClient = createClient({
/**
* Creates a ClickHouse client that sends one session_id with every request.
* Use only for workflows that need session state, such as temporary tables or
* session-level settings. ClickHouse allows only one in-flight query per HTTP
* session, so callers must serialize all requests made through this client.
*/
export function createSessionClickHouseClient(
config: ClickHouseConfig,
clickhouseSettings: ClickHouseSettings = DEFAULT_CLICKHOUSE_SETTINGS,
sessionId = crypto.randomUUID(),
): ClickHouseClient {
return createClient({
url: config.url,
username: config.username,
password: config.password,
database: config.database,
clickhouse_settings: {
wait_end_of_query: 0,
},
session_id: sessionId,
clickhouse_settings: clickhouseSettings,
})
}

function createExecutorWithClient(config: ClickHouseConfig, client: ClickHouseClient): ClickHouseExecutor {
const profiler = getLogger(['chkit', 'profiling'])

const fireAndForgetClient = createStatelessClickHouseClient(config, { wait_end_of_query: 0 })

return {
async command(sql: string): Promise<void> {
Expand Down Expand Up @@ -536,3 +558,13 @@ WHERE database IN (${quotedDatabases})`
},
}
}

export function createClickHouseExecutor(config: ClickHouseConfig): ClickHouseExecutor {
// Default executor is session-bound so DDL-heavy workflows run through a
// single ClickHouse HTTP session. Do not issue concurrent queries through it.
return createExecutorWithClient(config, createSessionClickHouseClient(config))
}

export function createStatelessClickHouseExecutor(config: ClickHouseConfig): ClickHouseExecutor {
return createExecutorWithClient(config, createStatelessClickHouseClient(config))
}
51 changes: 51 additions & 0 deletions packages/codegen/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,57 @@ describe('@chkit/codegen smoke', () => {
}
})

test('does not overwrite an existing migration generated in the same second', async () => {
const workdir = await mkdtemp(join(tmpdir(), 'chkit-codegen-test-'))

try {
const migrationsDir = join(workdir, 'migrations')
const metaDir = join(workdir, 'meta')
const now = new Date('2026-01-02T03:04:05.678Z')
const usersTable = table({
database: 'app',
name: 'users',
columns: [{ name: 'id', type: 'UInt64' }],
engine: 'MergeTree()',
primaryKey: ['id'],
orderBy: ['id'],
})
const eventsTable = table({
database: 'app',
name: 'events',
columns: [{ name: 'id', type: 'UInt64' }],
engine: 'MergeTree()',
primaryKey: ['id'],
orderBy: ['id'],
})

const first = await generateArtifacts({
definitions: [usersTable],
plan: planDiff([], [usersTable]),
migrationsDir,
metaDir,
now,
})
if (!first.migrationFile) throw new Error('expected first migration file')
const originalFirstSql = await readFile(first.migrationFile, 'utf8')

const second = await generateArtifacts({
definitions: [eventsTable],
plan: planDiff([usersTable], [eventsTable]),
migrationsDir,
metaDir,
now,
})

expect(first.migrationFile.endsWith('20260102030405_auto.sql')).toBe(true)
expect(second.migrationFile?.endsWith('20260102030405_auto_001.sql')).toBe(true)
expect(await readFile(first.migrationFile, 'utf8')).toBe(originalFirstSql)
expect(await readFile(second.migrationFile ?? '', 'utf8')).toContain('CREATE TABLE IF NOT EXISTS app.events')
} finally {
await rm(workdir, { recursive: true, force: true })
}
})

test('renders rename suggestion hints in migration header comments', async () => {
const workdir = await mkdtemp(join(tmpdir(), 'chkit-codegen-test-'))
try {
Expand Down
52 changes: 48 additions & 4 deletions packages/codegen/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,48 @@ function safeMigrationId(id: string): string {
return id.replace(/[^a-zA-Z0-9_-]/g, '')
}

function migrationFilePath(
migrationsDir: string,
timestamp: string,
migrationName: string,
collisionIndex: number
): string {
const suffix = collisionIndex === 0 ? '' : `_${String(collisionIndex).padStart(3, '0')}`
return join(migrationsDir, `${timestamp}_${migrationName}${suffix}.sql`)
}

function isFileExistsError(error: unknown): boolean {
return (
typeof error === 'object' &&
error !== null &&
'code' in error &&
(error as { code?: unknown }).code === 'EEXIST'
)
}

async function writeNewMigrationFile(input: {
migrationsDir: string
timestamp: string
migrationName: string
sql: string
}): Promise<string> {
for (let collisionIndex = 0; ; collisionIndex += 1) {
const filePath = migrationFilePath(
input.migrationsDir,
input.timestamp,
input.migrationName,
collisionIndex
)
try {
await writeFile(filePath, input.sql, { encoding: 'utf8', flag: 'wx' })
return filePath
} catch (error) {
if (isFileExistsError(error)) continue
throw error
}
}
}

function buildMigrationContent(input: {
generatedAt: string
cliVersion: string
Expand Down Expand Up @@ -84,14 +126,16 @@ export async function generateArtifacts(input: GenerateArtifactsInput): Promise<
})
const migrationFile =
input.plan.operations.length > 0
? join(input.migrationsDir, `${timestamp}_${migrationName}.sql`)
? await writeNewMigrationFile({
migrationsDir: input.migrationsDir,
timestamp,
migrationName,
sql,
})
: null
const snapshotFile = join(input.metaDir, 'snapshot.json')
const snapshot = createSnapshot(input.definitions)

if (migrationFile) {
await writeFile(migrationFile, sql, 'utf8')
}
await writeFile(snapshotFile, `${JSON.stringify(snapshot, null, 2)}\n`, 'utf8')

return {
Expand Down
Loading
Loading