Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d2eb78f
replace backfill chunking with smart planner
KeKs0r Apr 1, 2026
6d224b7
fix smart chunking review issues
KeKs0r Apr 1, 2026
ab1239e
Update Algo
KeKs0r Apr 2, 2026
3f99820
Fix CI
KeKs0r Apr 2, 2026
fbd3a19
Export backfill SDK helpers and obsessiondb service types
KeKs0r Apr 2, 2026
78742a7
Trace Queries with Performance.
KeKs0r Apr 3, 2026
2340608
oversample the equal width
KeKs0r Apr 3, 2026
916796d
Fix parallel replicas over-counting on SELECT count() queries
KeKs0r Apr 3, 2026
c5b3ea2
Disable parallel replicas on all chunking planning queries
KeKs0r Apr 3, 2026
1fa10d0
Move parallel replicas setting from SQL to query API
KeKs0r Apr 3, 2026
b5f7be8
Use uncompressed bytes for chunk size targeting
KeKs0r Apr 3, 2026
38dc5d8
Reduce equal-width oversampling from 5x to 3x
KeKs0r Apr 3, 2026
4c40203
Use EXPLAIN ESTIMATE first pass for string sort key splitting
KeKs0r Apr 3, 2026
e8e7b90
Use full GROUP BY key for hot key detection in string sub-ranges
KeKs0r Apr 3, 2026
cbfc9a6
Fix string boundary truncation in equal-width and quantile splitting
KeKs0r Apr 4, 2026
0b0f51f
Add multiple-hot-keys E2E scenario and rename seed script
KeKs0r Apr 4, 2026
064d387
Fix equal-width oversample precision and add changeset
KeKs0r Apr 5, 2026
f655fc3
Add playground benchmarks and smart chunking research notes
KeKs0r Apr 5, 2026
ca3cba4
Add structured logging to backfill smart chunking
KeKs0r Apr 14, 2026
fba1596
Document plugin-backfill SDK in package README
KeKs0r Apr 14, 2026
b2a755e
Remove playground/ benchmark scripts and drop SDK doc references
KeKs0r Apr 14, 2026
77d018a
Fixing cluster
KeKs0r Apr 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/smart-chunking-rewrite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@chkit/plugin-backfill": patch
"@chkit/clickhouse": patch
"chkit": patch
---

Rewrite backfill chunk planning with multi-strategy smart chunking. The planner now introspects partition layout, sort key distribution, and row estimates to produce better-sized chunks using strategies like equal-width splitting, quantile ranges, temporal bucketing, string prefix splitting, and group-by-key splitting. Adds a dedicated `sdk` entry point for programmatic access to chunking internals.
6 changes: 6 additions & 0 deletions .changeset/structured-backfill-logging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chkit": patch
"@chkit/plugin-backfill": patch
---

Add structured logging to backfill chunk planning via `@logtape/logtape`. The smart chunking planner now logs introspection, partition planning, and per-strategy split decisions, and emits warnings when ClickHouse queries exceed 5s. Enable with `CHKIT_DEBUG=1`.
24 changes: 15 additions & 9 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"@chkit/clickhouse": "workspace:*",
"@chkit/codegen": "workspace:*",
"@chkit/core": "workspace:*",
"@logtape/logtape": "^2.0.5",
"fast-glob": "^3.3.2"
}
}
3 changes: 3 additions & 0 deletions packages/cli/src/bin/chkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { loadPluginRuntime } from './plugin-runtime.js'
import { getInternalPlugins } from './internal-plugins/index.js'
import { CLI_VERSION } from './version.js'
import { debug } from './debug.js'
import { configureCliLogging } from './logging.js'

const WELL_KNOWN_PLUGIN_COMMANDS: Record<string, string> = {
codegen: 'Codegen',
Expand Down Expand Up @@ -73,6 +74,8 @@ function collectPluginCommands(runtime: Awaited<ReturnType<typeof loadPluginRunt
}

async function main(): Promise<void> {
configureCliLogging()

const argv = process.argv.slice(2)
const commandName = argv[0]
debug('cli', `chkit ${CLI_VERSION} — argv: [${argv.join(', ')}]`)
Expand Down
23 changes: 8 additions & 15 deletions packages/cli/src/bin/debug.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
import process from 'node:process'
import { getLogger } from '@logtape/logtape'

const enabled = process.env.CHKIT_DEBUG === '1' || process.env.CHKIT_DEBUG === 'true'

function timestamp(): string {
const now = new Date()
return now.toISOString().slice(11, 23) // HH:mm:ss.SSS
}
import { isDebugEnabled } from './logging.js'

export function debug(category: string, message: string, detail?: unknown): void {
if (!enabled) return
const prefix = `[chkit:${category}]`
if (!isDebugEnabled()) return
const logger = getLogger(['chkit', category])
if (detail !== undefined) {
console.error(`${timestamp()} ${prefix} ${message}`, detail)
} else {
console.error(`${timestamp()} ${prefix} ${message}`)
logger.debug(message, { detail })
return
}
logger.debug(message)
}

export function isDebugEnabled(): boolean {
return enabled
}
export { isDebugEnabled } from './logging.js'
34 changes: 34 additions & 0 deletions packages/cli/src/bin/logging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import process from 'node:process'

import { configureSync, getConfig, getConsoleSink, getTextFormatter } from '@logtape/logtape'

const enabled = process.env.CHKIT_DEBUG === '1' || process.env.CHKIT_DEBUG === 'true'

export function configureCliLogging(): void {
if (!enabled || getConfig()) return

configureSync({
sinks: {
console: getConsoleSink({
formatter: getTextFormatter({ timestamp: 'time' }),
}),
},
loggers: [
{
category: 'chkit',
sinks: ['console'],
lowestLevel: 'debug',
},
{
category: 'logtape',
sinks: ['console'],
lowestLevel: 'error',
},
],
reset: true,
})
}

export function isDebugEnabled(): boolean {
return enabled
}
2 changes: 1 addition & 1 deletion packages/cli/src/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async function waitForParts(
database: string,
table: string,
expectedPartitions: number,
timeoutMs = 15_000,
timeoutMs = 60_000,
): Promise<void> {
const start = Date.now()
while (Date.now() - start < timeoutMs) {
Expand Down
1 change: 1 addition & 0 deletions packages/clickhouse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"dependencies": {
"@chkit/core": "workspace:*",
"@clickhouse/client": "^1.11.0",
"@logtape/logtape": "^2.0.5",
"p-retry": "^7.1.1"
}
}
75 changes: 64 additions & 11 deletions packages/clickhouse/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { createClient } from '@clickhouse/client'
import { createClient, type ClickHouseSettings } from '@clickhouse/client'
import {
normalizeSQLFragment,
type ChxConfig,
type ColumnDefinition,
type ProjectionDefinition,
type SkipIndexDefinition,
} from '@chkit/core'
import { getLogger } from '@logtape/logtape'
import {
parseEngineFromCreateTableQuery,
parseOrderByFromCreateTableQuery,
Expand All @@ -28,9 +29,11 @@ export interface QueryStatus {
error?: string
}

export type { ClickHouseSettings }

export interface ClickHouseExecutor {
command(sql: string): Promise<void>
query<T>(sql: string): Promise<T[]>
query<T>(sql: string, settings?: ClickHouseSettings): Promise<T[]>
insert<T extends Record<string, unknown>>(params: { table: string; values: T[] }): Promise<void>
listSchemaObjects(): Promise<SchemaObjectRef[]>
listTableDetails(databases: string[]): Promise<IntrospectedTable[]>
Expand Down Expand Up @@ -249,7 +252,54 @@ export {
waitForTableAbsent,
} from './ddl-propagation.js'

function parseSummaryFromHeaders(headers: Record<string, string | string[] | undefined>): {
read_rows: string
read_bytes: string
written_rows: string
written_bytes: string
result_rows: string
result_bytes: string
elapsed_ns: string
} | undefined {
const raw = headers['x-clickhouse-summary']
if (!raw || typeof raw !== 'string') return undefined
try {
return JSON.parse(raw)
} catch {
return undefined
}
}

function logProfiling(
logger: ReturnType<typeof getLogger>,
query: string,
queryId: string,
summary?: {
read_rows: string
read_bytes: string
written_rows: string
written_bytes: string
result_rows?: string
result_bytes?: string
elapsed_ns: string
},
): void {
logger.trace('Query completed: {query}', {
query,
queryId,
readRows: Number(summary?.read_rows ?? 0),
readBytes: Number(summary?.read_bytes ?? 0),
writtenRows: Number(summary?.written_rows ?? 0),
writtenBytes: Number(summary?.written_bytes ?? 0),
elapsedMs: Number(summary?.elapsed_ns ?? 0) / 1_000_000,
resultRows: Number(summary?.result_rows ?? 0),
resultBytes: Number(summary?.result_bytes ?? 0),
})
}

export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhouse']>): ClickHouseExecutor {
const profiler = getLogger(['chkit', 'profiling'])

const client = createClient({
url: config.url,
username: config.username,
Expand All @@ -259,6 +309,7 @@ export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhou
clickhouse_settings: {
wait_end_of_query: 1,
async_insert: 0,
send_progress_in_http_headers: 1,
},
})

Expand All @@ -275,11 +326,10 @@ export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhou
return {
async command(sql: string): Promise<void> {
try {
await client.command({ query: sql, http_headers: { 'X-DDL': '1' } })
const result = await client.command({ query: sql, http_headers: { 'X-DDL': '1' } })
logProfiling(profiler, sql, result.query_id, result.summary)
} catch (error) {
if (isUnknownDatabaseError(error)) {
// The configured database doesn't exist yet. Retry without the
// session database so that CREATE DATABASE can succeed.
const fallback = createClient({
url: config.url,
username: config.username,
Expand All @@ -296,21 +346,24 @@ export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhou
wrapConnectionError(error, config.url)
}
},
async query<T>(sql: string): Promise<T[]> {
async query<T>(sql: string, settings?: ClickHouseSettings): Promise<T[]> {
try {
const result = await client.query({ query: sql, format: 'JSONEachRow', http_headers: { 'X-DDL': '1' } })
return result.json<T>()
const result = await client.query({ query: sql, format: 'JSONEachRow', http_headers: { 'X-DDL': '1' }, ...(settings ? { clickhouse_settings: settings } : {}) })
const rows = await result.json<T>()
logProfiling(profiler, sql, result.query_id, parseSummaryFromHeaders(result.response_headers))
return rows
} catch (error) {
wrapConnectionError(error, config.url)
}
},
async insert<T extends Record<string, unknown>>(params: { table: string; values: T[] }): Promise<void> {
try {
await client.insert({
const result = await client.insert({
table: params.table,
values: params.values,
format: 'JSONEachRow',
})
logProfiling(profiler, `INSERT INTO ${params.table}`, result.query_id, result.summary)
} catch (error) {
wrapConnectionError(error, config.url)
}
Expand All @@ -327,7 +380,7 @@ export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhou
async queryStatus(queryId: string, options?: { afterTime?: string }): Promise<QueryStatus> {
try {
const running = await client.query({
query: `SELECT read_rows, read_bytes, written_rows, written_bytes, elapsed FROM clusterAllReplicas('parallel_replicas', system.processes) WHERE user = currentUser() AND query_id = {qid:String} SETTINGS skip_unavailable_shards = 1`,
query: `SELECT read_rows, read_bytes, written_rows, written_bytes, elapsed FROM clusterAllReplicas('cluster', system.processes) WHERE user = currentUser() AND query_id = {qid:String} SETTINGS skip_unavailable_shards = 1`,
query_params: { qid: queryId },
format: 'JSONEachRow',
})
Expand All @@ -353,7 +406,7 @@ export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhou
const afterTime = options?.afterTime ?? '1970-01-01T00:00:00Z'
const log = await client.query({
query: `SELECT type, written_rows, written_bytes, query_duration_ms, exception
FROM clusterAllReplicas('parallel_replicas', system.query_log)
FROM clusterAllReplicas('cluster', system.query_log)
WHERE user = currentUser()
AND query_id = {qid:String}
AND type IN ('QueryFinish', 'ExceptionWhileProcessing')
Expand Down
Loading
Loading