From d0bf7fe750da7035291988f63f6ea0262077fbd1 Mon Sep 17 00:00:00 2001 From: Alvaro Date: Mon, 20 Apr 2026 13:30:48 +0200 Subject: [PATCH 1/2] feat(core): support refreshable materialized views MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an optional `refresh` field to `materializedView()` covering all clauses ClickHouse supports for refreshable MVs (GA since 24.10): EVERY/AFTER, OFFSET, RANDOMIZE FOR, DEPENDS ON, SETTINGS, APPEND, and EMPTY. The planner emits `ALTER TABLE ... MODIFY REFRESH ...` for schedule-only changes and drop+recreate for structural ones. APPEND is treated as structural because ClickHouse rejects toggling it via ALTER, and MODIFY REFRESH always re-emits APPEND when present (the server rejects omitting it on an existing APPEND MV). Validator rejects combinations verified invalid against ClickHouse 25.12: missing or both `every`/`after`, malformed intervals, `DEPENDS ON` paired with `REFRESH AFTER`, and non-APPEND RMVs targeting `SharedMergeTree` / `Replicated*` tables in the same schema. `@chkit/plugin-pull` parses the REFRESH block out of `system.tables.create_table_query`, strips Cloud-injected `DEFINER` / `SQL SECURITY` clauses, and renders `refresh: { ... }` back into the generated schema file. Also converts `sql-validation.e2e.test.ts` from `test('setup'/'teardown')` to `beforeAll/afterAll` so tests nested under the top-level `describe` see an initialized client — pre-existing test-ordering bug. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/refreshable-materialized-views.md | 27 ++ .../src/content/docs/schema/dsl-reference.md | 15 + .../content/docs/schema/refreshable-views.md | 159 +++++++++ packages/cli/src/clickhouse-live.e2e.test.ts | 330 ++++++++++++++++++ packages/core/src/canonical.ts | 57 ++- packages/core/src/index.test.ts | 306 ++++++++++++++++ packages/core/src/model-types.ts | 18 + packages/core/src/planner.ts | 70 +++- packages/core/src/sql-validation.e2e.test.ts | 114 +++++- packages/core/src/sql.ts | 53 ++- packages/core/src/validate.ts | 84 +++++ packages/plugin-pull/src/index.test.ts | 98 ++++++ packages/plugin-pull/src/index.ts | 3 + packages/plugin-pull/src/pull.e2e.test.ts | 15 +- packages/plugin-pull/src/render-schema.ts | 40 ++- packages/plugin-pull/src/view-parser.ts | 139 +++++++- 16 files changed, 1508 insertions(+), 20 deletions(-) create mode 100644 .changeset/refreshable-materialized-views.md create mode 100644 apps/docs/src/content/docs/schema/refreshable-views.md diff --git a/.changeset/refreshable-materialized-views.md b/.changeset/refreshable-materialized-views.md new file mode 100644 index 0000000..10e0654 --- /dev/null +++ b/.changeset/refreshable-materialized-views.md @@ -0,0 +1,27 @@ +--- +"@chkit/core": patch +"@chkit/plugin-pull": patch +--- + +Add support for ClickHouse refreshable materialized views (RMVs), GA since ClickHouse 24.10. + +Define a refreshable MV by adding a `refresh` field to `materializedView()`: + +```ts +const dailyReport = materializedView({ + database: 'analytics', + name: 'daily_report_mv', + to: { database: 'analytics', name: 'daily_report' }, + refresh: { every: '1 DAY', offset: '2 HOUR' }, + as: 'SELECT toDate(ts) AS day, count() AS total FROM analytics.events GROUP BY day', +}) +``` + +Supported fields: `every`, `after`, `offset`, `randomize`, `dependsOn`, `settings`, `append`, `empty`. + +Highlights: +- `chkit generate` emits `ALTER TABLE ... MODIFY REFRESH` for schedule-only changes and `DROP ... SYNC` + `CREATE` for structural changes (added/removed refresh, toggled APPEND). +- `chkit pull` parses the REFRESH clause from `system.tables.create_table_query` and ignores the `DEFINER` / `SQL SECURITY` clauses that ClickHouse Cloud auto-injects. +- Validation catches: missing/both `every`/`after`, invalid interval formats, non-APPEND RMV pointing at a replicated (`SharedMergeTree` / `Replicated*`) target (ClickHouse rejects this), and `DEPENDS ON` paired with `REFRESH AFTER`. + +See the [Refreshable materialized views docs](https://chkit.obsessiondb.com/schema/refreshable-views/) for the full reference. diff --git a/apps/docs/src/content/docs/schema/dsl-reference.md b/apps/docs/src/content/docs/schema/dsl-reference.md index 47d13cc..0e8b327 100644 --- a/apps/docs/src/content/docs/schema/dsl-reference.md +++ b/apps/docs/src/content/docs/schema/dsl-reference.md @@ -230,6 +230,7 @@ materializedView(input: Omit): MaterializedV | `database` | `string` | yes | Database name | | `name` | `string` | yes | Materialized view name | | `to` | `{ database: string; name: string }` | yes | Target table for the view | +| `refresh` | `MaterializedViewRefresh` | no | Refresh schedule — see [Refreshable materialized views](/schema/refreshable-views/) | | `as` | `string` | yes | SELECT query | | `comment` | `string` | no | View comment | @@ -244,6 +245,20 @@ const eventCounts = materializedView({ }) ``` +For a refreshable (scheduled) materialized view, add the `refresh` field: + +```ts +const dailyReport = materializedView({ + database: 'analytics', + name: 'daily_report_mv', + to: { database: 'analytics', name: 'daily_report' }, + refresh: { every: '1 DAY', offset: '2 HOUR' }, + as: 'SELECT toDate(ts) AS day, count() AS total FROM analytics.events GROUP BY day', +}) +``` + +See [Refreshable materialized views](/schema/refreshable-views/) for the full `refresh` field reference, including APPEND mode, `DEPENDS ON`, and the ClickHouse rules that chkit validates. + ## Type system reference The [codegen plugin](/plugins/codegen/) maps ClickHouse types to TypeScript types using these rules: diff --git a/apps/docs/src/content/docs/schema/refreshable-views.md b/apps/docs/src/content/docs/schema/refreshable-views.md new file mode 100644 index 0000000..6a774c3 --- /dev/null +++ b/apps/docs/src/content/docs/schema/refreshable-views.md @@ -0,0 +1,159 @@ +--- +title: Refreshable Materialized Views +description: Schedule ClickHouse materialized views to refresh on a cron-like cadence, with APPEND mode, DEPENDS ON chains, and the three hard rules chkit enforces. +--- + +ClickHouse [refreshable materialized views](https://clickhouse.com/docs/materialized-view/refreshable-materialized-view) (RMVs) periodically re-execute a SELECT on a schedule, rather than firing per-INSERT like regular incremental MVs. They've been production-ready since ClickHouse 24.10. + +chkit models an RMV as a regular `materializedView()` with an extra `refresh` field: + +```ts +import { materializedView } from '@chkit/core' + +const dailyReport = materializedView({ + database: 'analytics', + name: 'daily_report_mv', + to: { database: 'analytics', name: 'daily_report' }, + refresh: { + every: '1 DAY', + offset: '2 HOUR', + }, + as: 'SELECT toDate(ts) AS day, count() AS total FROM analytics.events GROUP BY day', +}) +``` + +Without a `refresh` field, the definition is a plain incremental materialized view. + +## The `refresh` field + +| Field | Type | Description | +|-------|------|-------------| +| `every` | `string` | Calendar-aligned schedule (e.g., `'1 HOUR'`, `'30 SECOND'`, `'1 DAY'`). Mutually exclusive with `after`. | +| `after` | `string` | Relative schedule — `after` the previous refresh completed. Mutually exclusive with `every`. | +| `offset` | `string` | Shifts calendar-aligned time (e.g., `'2 HOUR'` with `every: '1 DAY'` refreshes at 02:00 UTC). | +| `randomize` | `string` | Adds jitter to avoid thundering herd. `'30 SECOND'` spreads refreshes over a 30-second window. | +| `dependsOn` | `Array<{ database, name }>` | Chain refreshes: run only after the listed MVs finish. Only valid with `every` — not `after`. | +| `settings` | `Record` | Refresh settings. Common keys: `refresh_retries`, `refresh_retry_initial_backoff_ms`, `refresh_retry_max_backoff_ms`. | +| `append` | `boolean` | When `true`, refreshes INSERT rather than REPLACE. See below. | +| `empty` | `boolean` | Suppress the initial refresh on creation. | + +Intervals use the ClickHouse form: ` ` where `` is `SECOND`, `MINUTE`, `HOUR`, `DAY`, `WEEK`, `MONTH`, or `YEAR`. chkit canonicalizes `'1 hour'` → `'1 HOUR'` automatically. + +## APPEND vs. default + +| Mode | Behavior | Atomicity | +|------|----------|-----------| +| Default (`append: false`, omitted) | **Truncate and replace** — the target table is atomically swapped with the refresh result | Atomic — readers always see a complete, consistent snapshot | +| `append: true` | **INSERT** — rows accumulate on each refresh | Non-atomic, like a regular `INSERT SELECT` | + +APPEND is useful for periodic snapshots where you want history, e.g.: + +```ts +const hourlySnapshot = materializedView({ + database: 'analytics', + name: 'hourly_snapshot_mv', + to: { database: 'analytics', name: 'hourly_snapshots' }, + refresh: { + every: '1 HOUR', + append: true, + }, + as: 'SELECT now() AS snapshot_ts, org_id, count() AS cnt FROM analytics.events GROUP BY org_id', +}) +``` + +## Chaining refreshes with `DEPENDS ON` + +Use `dependsOn` to ensure an RMV runs only after its upstream MVs have refreshed: + +```ts +const hourlyBase = materializedView({ + database: 'analytics', + name: 'hourly_base_mv', + to: { database: 'analytics', name: 'hourly_base' }, + refresh: { every: '1 HOUR' }, + as: 'SELECT ...', +}) + +const hourlyAggregate = materializedView({ + database: 'analytics', + name: 'hourly_aggregate_mv', + to: { database: 'analytics', name: 'hourly_aggregate' }, + refresh: { + every: '1 HOUR', + dependsOn: [{ database: 'analytics', name: 'hourly_base_mv' }], + }, + as: 'SELECT ... FROM analytics.hourly_base GROUP BY ...', +}) +``` + +`DEPENDS ON` is only supported with `REFRESH EVERY` — ClickHouse rejects it when paired with `REFRESH AFTER`. chkit enforces this at `generate` / `check` time (`refresh_depends_on_requires_every`). + +## Three hard rules chkit enforces + +These come from the ClickHouse server's actual behavior, not the docs. Each rule has a validation code you'll see in `chkit check --json`. + +### Rule 1 — `APPEND` is structural + +`ALTER TABLE … MODIFY REFRESH` cannot add or remove `APPEND`. Server returns: *"Adding or removing APPEND is not supported."* + +Any change to `refresh.append` between the old and new definition triggers **drop + recreate** (risk `caution`). Schedule-only changes stay as a single `MODIFY REFRESH`. + +### Rule 2 — Re-include `APPEND` on refresh-only changes + +Omitting `APPEND` in `MODIFY REFRESH` for an existing APPEND MV is interpreted as "remove APPEND" and rejected. chkit always re-emits `APPEND` in the generated `ALTER` when the MV has `append: true`, so you never need to think about it. + +### Rule 3 — Non-APPEND RMV + replicated target is rejected + +On ClickHouse Cloud and any `SharedMergeTree` / `Replicated*` target, a non-APPEND RMV is refused outright with: *"This combination doesn't work: refreshable materialized view, no APPEND, non-replicated database, replicated table. Each refresh would replace the replicated table locally, but other replicas wouldn't see it. Refusing to create."* + +chkit validates this at `generate` / `check` time whenever the target table is defined in the same schema. Validation code: `refresh_append_required_for_replicated_target`. Fix: set `refresh.append = true`, or target a non-replicated table. + +## Generated SQL + +For the `dailyReport` example at the top, `chkit generate` produces: + +```sql +CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.daily_report_mv +REFRESH EVERY 1 DAY OFFSET 2 HOUR TO analytics.daily_report AS +SELECT toDate(ts) AS day, count() AS total FROM analytics.events GROUP BY day; +``` + +## Migration behavior + +| Change | Migration emitted | Risk | +|--------|-------------------|------| +| Adding `refresh` to an existing non-refreshable MV | `DROP TABLE ... SYNC` + `CREATE MATERIALIZED VIEW` | `caution` | +| Removing `refresh` | `DROP TABLE ... SYNC` + `CREATE MATERIALIZED VIEW` | `caution` | +| Toggling `append` | `DROP TABLE ... SYNC` + `CREATE MATERIALIZED VIEW` (Rule 1) | `caution` | +| Changing `every` / `after` / `offset` / `randomize` / `dependsOn` / `settings` only | `ALTER TABLE ... MODIFY REFRESH ...` | `caution` | +| Changing `as`, `to`, or `comment` | `DROP TABLE ... SYNC` + `CREATE MATERIALIZED VIEW` | `caution` | +| No change | No operation | — | + +## Monitoring + +Check refresh status in `system.view_refreshes`: + +```sql +SELECT database, view, status, last_success_time, last_refresh_time, next_refresh_time, read_rows, written_rows +FROM system.view_refreshes +WHERE database = 'analytics'; +``` + +Manual control: + +```sql +SYSTEM REFRESH VIEW analytics.daily_report_mv; -- trigger immediate refresh +SYSTEM STOP VIEW analytics.daily_report_mv; -- pause scheduling +SYSTEM START VIEW analytics.daily_report_mv; -- resume scheduling +SYSTEM CANCEL VIEW analytics.daily_report_mv; -- cancel in-flight refresh +``` + +## Version requirements + +| ClickHouse version | Status | +|--------------------|--------| +| 23.12 – 24.9 | Experimental — required `allow_experimental_refreshable_materialized_view = 1` | +| 24.9 | APPEND mode added | +| 24.10 + | **Production-ready** — no flag required | + +chkit targets 24.10 and above for refreshable MV support. diff --git a/packages/cli/src/clickhouse-live.e2e.test.ts b/packages/cli/src/clickhouse-live.e2e.test.ts index 6f732be..8a4a9a2 100644 --- a/packages/cli/src/clickhouse-live.e2e.test.ts +++ b/packages/cli/src/clickhouse-live.e2e.test.ts @@ -344,6 +344,336 @@ export default schema(events, eventCounts, eventCountsMv) 240_000 ) + test( + 'refreshable materialized view lifecycle: create → modify schedule → recreate → remove', + async () => { + const executor = createLiveExecutor(liveEnv) + const database = liveEnv.clickhouseDatabase + const journalTable = createJournalTableName('rmv') + const cliEnv = { CHKIT_JOURNAL_TABLE: journalTable } + const prefix = createPrefix('rmv') + const targetTable = `${prefix}target` + const rmvName = `${prefix}mv` + + const dir = await mkdtemp(join(tmpdir(), 'chkit-cli-e2e-rmv-')) + const schemaPath = join(dir, 'schema.ts') + const configPath = join(dir, 'clickhouse.config.ts') + const outDir = join(dir, 'chkit') + const migrationsDir = join(outDir, 'migrations') + const metaDir = join(outDir, 'meta') + + const { clickhouseUrl, clickhouseUser, clickhousePassword } = getRequiredEnv() + + const renderSchema = (options: { + includeRmv: boolean + every?: string + asQuery?: string + }): string => { + const targetDef = `const target = table({ + database: '${database}', + name: '${targetTable}', + columns: [ + { name: 'org_id', type: 'String' }, + { name: 'total', type: 'UInt64' }, + ], + engine: 'MergeTree()', + primaryKey: ['org_id'], + orderBy: ['org_id'], +})` + if (!options.includeRmv) { + return `import { schema, table } from '${CORE_ENTRY}'\n\n${targetDef}\n\nexport default schema(target)\n` + } + const every = options.every ?? '1 HOUR' + const asQuery = + options.asQuery ?? + `SELECT org_id, count() AS total FROM ${database}.${targetTable} GROUP BY org_id` + return `import { schema, table, materializedView } from '${CORE_ENTRY}' + +${targetDef} + +const rmv = materializedView({ + database: '${database}', + name: '${rmvName}', + to: { database: '${database}', name: '${targetTable}' }, + refresh: { + every: '${every}', + append: true, + }, + as: \`${asQuery}\`, +}) + +export default schema(target, rmv) +` + } + + const writeSchema = (options: Parameters[0]) => + writeFile(schemaPath, renderSchema(options), 'utf8') + + await writeSchema({ includeRmv: true, every: '1 HOUR' }) + await writeFile( + configPath, + `export default {\n schema: '${schemaPath}',\n outDir: '${outDir}',\n migrationsDir: '${migrationsDir}',\n metaDir: '${metaDir}',\n clickhouse: {\n url: '${clickhouseUrl}',\n username: '${clickhouseUser}',\n password: '${clickhousePassword}',\n database: '${database}',\n },\n}\n`, + 'utf8' + ) + + const queryCreate = async (): Promise => { + const rows = await executor.query<{ create_table_query: string }>( + `SELECT create_table_query FROM system.tables WHERE database = '${database}' AND name = '${rmvName}'` + ) + return rows[0]?.create_table_query ?? null + } + + const queryShowCreate = async (): Promise => { + try { + const rows = await executor.query<{ statement: string }>( + `SHOW CREATE TABLE ${database}.${rmvName}` + ) + return rows[0]?.statement ?? null + } catch { + return null + } + } + + // Both SHOW CREATE and system.tables.create_table_query can be stale on Cloud + // after MODIFY REFRESH / DROP+CREATE. Combine them and poll up to timeoutMs for + // the expected substring to appear in either. + const waitForRmvContent = async ( + expected: string, + timeoutMs = 60_000 + ): Promise<{ found: boolean; lastCombined: string }> => { + const deadline = Date.now() + timeoutMs + let lastCombined = '' + while (Date.now() < deadline) { + const [live, show] = await Promise.all([queryCreate(), queryShowCreate()]) + lastCombined = `${live ?? ''}\n---\n${show ?? ''}` + if (lastCombined.includes(expected)) return { found: true, lastCombined } + await new Promise((resolve) => setTimeout(resolve, 1000)) + } + return { found: false, lastCombined } + } + + // An RMV is considered "gone" when its create_table_query is absent or empty AND + // SHOW CREATE errors. system.tables row count alone is unreliable on Cloud — + // zombie rows with empty create_table_query can linger for a minute during + // SharedMergeTree replica convergence. + const queryGone = async (): Promise => { + const live = await queryCreate() + if (live && live.trim().length > 0) return false + const show = await queryShowCreate() + if (show && show.trim().length > 0) return false + return true + } + + try { + // -------- Step 1: create target table + refreshable MV -------- + const gen1 = runCli(dir, ['generate', '--config', configPath, '--json'], cliEnv) + if (gen1.exitCode !== 0) { + throw new Error(formatTestDiagnostic('generate step 1 failed', gen1)) + } + const gen1Payload = JSON.parse(gen1.stdout) as { + operationCount: number + migrationFile: string | null + } + expect(gen1Payload.operationCount).toBeGreaterThan(0) + expect(gen1Payload.migrationFile).toBeTruthy() + if (!gen1Payload.migrationFile) { + throw new Error('generate step 1 produced no migration file') + } + + const migration1Sql = await readFile( + gen1Payload.migrationFile.startsWith('/') + ? gen1Payload.migrationFile + : join(migrationsDir, gen1Payload.migrationFile), + 'utf8' + ) + expect(migration1Sql).toContain('CREATE MATERIALIZED VIEW IF NOT EXISTS') + expect(migration1Sql).toContain('REFRESH EVERY 1 HOUR') + expect(migration1Sql).toContain('APPEND') + + const exec1 = await runCliWithRetry( + dir, + ['migrate', '--config', configPath, '--execute', '--json'], + { extraEnv: cliEnv } + ) + if (exec1.exitCode !== 0) { + throw new Error(formatTestDiagnostic('migrate step 1 failed', exec1)) + } + + await waitForTable(executor, database, targetTable) + await waitForView(executor, database, rmvName) + + const createSqlAfterStep1 = await queryCreate() + expect(createSqlAfterStep1).toBeTruthy() + expect(createSqlAfterStep1).toContain('REFRESH EVERY 1 HOUR') + expect(createSqlAfterStep1).toContain('APPEND') + + // Refresh row exists in system.view_refreshes + const refreshRows = await executor.query<{ c: string }>( + `SELECT count() AS c FROM system.view_refreshes WHERE database = '${database}' AND view = '${rmvName}'` + ) + expect((refreshRows[0]?.c ?? '0') !== '0').toBe(true) + + // -------- Step 2: schedule-only change → MODIFY REFRESH (preserves APPEND) -------- + await writeSchema({ includeRmv: true, every: '30 MINUTE' }) + + // Dryrun first to assert the plan is exactly one MODIFY REFRESH op (Rules 1 & 2). + const plan2 = runCli(dir, ['generate', '--config', configPath, '--dryrun', '--json'], cliEnv) + if (plan2.exitCode !== 0) { + throw new Error(formatTestDiagnostic('generate --dryrun step 2 failed', plan2)) + } + const plan2Payload = JSON.parse(plan2.stdout) as { + operationCount: number + operations: Array<{ type: string; sql: string }> + } + expect(plan2Payload.operationCount).toBe(1) + expect(plan2Payload.operations[0]?.type).toBe('alter_materialized_view_modify_refresh') + // Rule 2: APPEND must be re-included in MODIFY REFRESH for an APPEND MV. + expect(plan2Payload.operations[0]?.sql).toContain('MODIFY REFRESH EVERY 30 MINUTE') + expect(plan2Payload.operations[0]?.sql).toContain('APPEND') + + const gen2 = runCli(dir, ['generate', '--config', configPath, '--json'], cliEnv) + if (gen2.exitCode !== 0) { + throw new Error(formatTestDiagnostic('generate step 2 failed', gen2)) + } + const gen2Payload = JSON.parse(gen2.stdout) as { + operationCount: number + migrationFile: string | null + } + expect(gen2Payload.operationCount).toBe(1) + + const exec2 = await runCliWithRetry( + dir, + ['migrate', '--config', configPath, '--execute', '--json'], + { extraEnv: cliEnv } + ) + if (exec2.exitCode !== 0) { + throw new Error(formatTestDiagnostic('migrate step 2 failed', exec2)) + } + + // SHOW CREATE and create_table_query can each be stale on Cloud for tens of + // seconds after MODIFY REFRESH; poll both until one reflects the new schedule. + const scheduleCheck = await waitForRmvContent('30 MINUTE') + if (!scheduleCheck.found) { + throw new Error( + `MODIFY REFRESH not reflected in server metadata within 60s.\n${scheduleCheck.lastCombined}` + ) + } + expect(scheduleCheck.lastCombined).toContain('APPEND') + + // -------- Step 3: query change → drop + recreate -------- + await writeSchema({ + includeRmv: true, + every: '30 MINUTE', + asQuery: `SELECT org_id, count() * 2 AS total FROM ${database}.${targetTable} GROUP BY org_id`, + }) + const plan3 = runCli(dir, ['generate', '--config', configPath, '--dryrun', '--json'], cliEnv) + if (plan3.exitCode !== 0) { + throw new Error(formatTestDiagnostic('generate --dryrun step 3 failed', plan3)) + } + const plan3Payload = JSON.parse(plan3.stdout) as { + operationCount: number + operations: Array<{ type: string }> + } + expect(plan3Payload.operationCount).toBe(2) + expect(plan3Payload.operations.map((op) => op.type)).toEqual([ + 'drop_materialized_view', + 'create_materialized_view', + ]) + + const gen3 = runCli(dir, ['generate', '--config', configPath, '--json'], cliEnv) + if (gen3.exitCode !== 0) { + throw new Error(formatTestDiagnostic('generate step 3 failed', gen3)) + } + + const exec3 = await runCliWithRetry( + dir, + ['migrate', '--config', configPath, '--execute', '--json'], + { extraEnv: cliEnv } + ) + if (exec3.exitCode !== 0) { + throw new Error(formatTestDiagnostic('migrate step 3 failed', exec3)) + } + + await waitForView(executor, database, rmvName) + // After drop+recreate, poll both metadata sources for the new query — Cloud + // caches may take tens of seconds to reflect the new CREATE across replicas. + const recreateCheck = await waitForRmvContent('count() * 2') + if (!recreateCheck.found) { + throw new Error( + `drop+recreate not reflected in server metadata within 60s.\n${recreateCheck.lastCombined}` + ) + } + expect(recreateCheck.lastCombined).toContain('REFRESH EVERY 30 MINUTE') + expect(recreateCheck.lastCombined).toContain('APPEND') + + // -------- Step 4: remove RMV from schema → drop -------- + await writeSchema({ includeRmv: false }) + const plan4 = runCli(dir, ['generate', '--config', configPath, '--dryrun', '--json'], cliEnv) + if (plan4.exitCode !== 0) { + throw new Error(formatTestDiagnostic('generate --dryrun step 4 failed', plan4)) + } + const plan4Payload = JSON.parse(plan4.stdout) as { + operationCount: number + operations: Array<{ type: string }> + } + expect(plan4Payload.operationCount).toBe(1) + expect(plan4Payload.operations[0]?.type).toBe('drop_materialized_view') + + const gen4 = runCli(dir, ['generate', '--config', configPath, '--json'], cliEnv) + if (gen4.exitCode !== 0) { + throw new Error(formatTestDiagnostic('generate step 4 failed', gen4)) + } + + const exec4 = await runCliWithRetry( + dir, + [ + 'migrate', + '--config', + configPath, + '--execute', + '--allow-destructive', + '--json', + ], + { extraEnv: cliEnv } + ) + if (exec4.exitCode !== 0) { + throw new Error(formatTestDiagnostic('migrate step 4 failed', exec4)) + } + + // Poll until both SHOW CREATE and system.tables.create_table_query confirm the + // RMV DDL is gone. Cloud DDL is eventually consistent; SharedMergeTree replicas + // can take 30s+ to converge after a DROP. + let gone = false + for (let attempt = 0; attempt < 120; attempt += 1) { + if (await queryGone()) { + gone = true + break + } + await new Promise((resolve) => setTimeout(resolve, 500)) + } + if (!gone) { + const lingering = await queryCreate() + throw new Error( + `RMV ${database}.${rmvName} DDL still present after drop + 60s polling.\nStep 4 stdout:\n${exec4.stdout}\nLingering create_table_query:\n${lingering}` + ) + } + } finally { + await rm(dir, { recursive: true, force: true }) + await executor.command( + `DROP TABLE IF EXISTS ${quoteIdent(database)}.${quoteIdent(rmvName)} SYNC` + ) + await executor.command( + `DROP TABLE IF EXISTS ${quoteIdent(database)}.${quoteIdent(targetTable)}` + ) + await executor.command( + `DROP TABLE IF EXISTS ${quoteIdent(database)}.${quoteIdent(journalTable)}` + ) + await executor.close() + } + }, + 240_000 + ) + // TODO: Stabilize this test — it's flaky in CI because `check` reports drift for // extra objects in the shared database that belong to other test runs. test.skipIf(new Date() < new Date('2026-06-01'))( diff --git a/packages/core/src/canonical.ts b/packages/core/src/canonical.ts index 90fa710..2c6d47f 100644 --- a/packages/core/src/canonical.ts +++ b/packages/core/src/canonical.ts @@ -1,6 +1,7 @@ import type { ColumnDefinition, MaterializedViewDefinition, + MaterializedViewRefresh, ProjectionDefinition, SchemaDefinition, SkipIndexDefinition, @@ -91,8 +92,55 @@ function canonicalizeView(def: ViewDefinition): ViewDefinition { } } +const INTERVAL_UNIT_PATTERN = /\b(second|minute|hour|day|week|month|year)s?\b/gi + +function canonicalizeInterval(value: string | undefined): string | undefined { + if (value === undefined) return undefined + return value + .trim() + .replace(/\s+/g, ' ') + .replace(INTERVAL_UNIT_PATTERN, (unit) => unit.toUpperCase().replace(/S$/, '')) +} + +function canonicalizeRefresh( + refresh: MaterializedViewRefresh | undefined +): MaterializedViewRefresh | undefined { + if (!refresh) return undefined + + const dependsOn = refresh.dependsOn + ? [...refresh.dependsOn] + .map((dep) => ({ database: dep.database.trim(), name: dep.name.trim() })) + .sort((a, b) => { + const aKey = `${a.database}.${a.name}` + const bKey = `${b.database}.${b.name}` + return aKey.localeCompare(bKey) + }) + : undefined + + const settings = refresh.settings + ? Object.fromEntries( + Object.entries(refresh.settings).sort(([a], [b]) => a.localeCompare(b)) + ) + : undefined + + const canonical: MaterializedViewRefresh = {} + const every = canonicalizeInterval(refresh.every) + const after = canonicalizeInterval(refresh.after) + const offset = canonicalizeInterval(refresh.offset) + const randomize = canonicalizeInterval(refresh.randomize) + if (every !== undefined) canonical.every = every + if (after !== undefined) canonical.after = after + if (offset !== undefined) canonical.offset = offset + if (randomize !== undefined) canonical.randomize = randomize + if (dependsOn && dependsOn.length > 0) canonical.dependsOn = dependsOn + if (settings && Object.keys(settings).length > 0) canonical.settings = settings + if (refresh.append) canonical.append = true + if (refresh.empty) canonical.empty = true + return canonical +} + function canonicalizeMaterializedView(def: MaterializedViewDefinition): MaterializedViewDefinition { - return { + const canonical: MaterializedViewDefinition = { ...def, database: def.database.trim(), name: def.name.trim(), @@ -103,6 +151,13 @@ function canonicalizeMaterializedView(def: MaterializedViewDefinition): Material as: normalizeSQLFragment(def.as), comment: def.comment?.trim(), } + const refresh = canonicalizeRefresh(def.refresh) + if (refresh) { + canonical.refresh = refresh + } else { + delete canonical.refresh + } + return canonical } export function canonicalizeDefinition(def: SchemaDefinition): SchemaDefinition { diff --git a/packages/core/src/index.test.ts b/packages/core/src/index.test.ts index b39a8b2..df2369e 100644 --- a/packages/core/src/index.test.ts +++ b/packages/core/src/index.test.ts @@ -860,3 +860,309 @@ describe('@chkit/core planner v1', () => { ]) }) }) + +describe('@chkit/core refreshable materialized views', () => { + const baseMv = { + database: 'analytics', + name: 'daily_mv', + to: { database: 'analytics', name: 'daily_rollup' }, + as: 'SELECT toDate(ts) AS day, count() AS total FROM analytics.events GROUP BY day', + } + + test('renders CREATE with REFRESH EVERY + TO', () => { + const mv = materializedView({ + ...baseMv, + refresh: { every: '1 HOUR' }, + }) + const sql = toCreateSQL(mv) + expect(sql).toContain('CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.daily_mv') + expect(sql).toContain('REFRESH EVERY 1 HOUR') + expect(sql).toContain('TO analytics.daily_rollup') + expect(sql).not.toContain('APPEND') + expect(sql).not.toContain('EMPTY') + }) + + test('renders CREATE with APPEND + OFFSET + RANDOMIZE + SETTINGS', () => { + const mv = materializedView({ + ...baseMv, + refresh: { + every: '1 DAY', + offset: '2 HOUR', + randomize: '5 MINUTE', + settings: { refresh_retries: 3 }, + append: true, + }, + }) + const sql = toCreateSQL(mv) + expect(sql).toContain('REFRESH EVERY 1 DAY OFFSET 2 HOUR RANDOMIZE FOR 5 MINUTE') + expect(sql).toContain('SETTINGS refresh_retries = 3') + expect(sql).toContain('APPEND') + expect(sql).toContain('TO analytics.daily_rollup') + }) + + test('renders CREATE with DEPENDS ON and EMPTY', () => { + const mv = materializedView({ + ...baseMv, + refresh: { + every: '1 HOUR', + dependsOn: [{ database: 'analytics', name: 'upstream_mv' }], + empty: true, + }, + }) + const sql = toCreateSQL(mv) + expect(sql).toContain('REFRESH EVERY 1 HOUR DEPENDS ON analytics.upstream_mv') + expect(sql).toContain(' EMPTY AS') + }) + + test('diff: adding refresh to an existing MV triggers drop+recreate (structural)', () => { + const oldDefs = [materializedView(baseMv)] + const newDefs = [materializedView({ ...baseMv, refresh: { every: '1 HOUR' } })] + const plan = planDiff(oldDefs, newDefs) + expect(plan.operations.map((op) => op.type)).toEqual([ + 'drop_materialized_view', + 'create_materialized_view', + ]) + }) + + test('diff: removing refresh triggers drop+recreate', () => { + const oldDefs = [materializedView({ ...baseMv, refresh: { every: '1 HOUR' } })] + const newDefs = [materializedView(baseMv)] + const plan = planDiff(oldDefs, newDefs) + expect(plan.operations.map((op) => op.type)).toEqual([ + 'drop_materialized_view', + 'create_materialized_view', + ]) + }) + + test('diff: toggling APPEND triggers drop+recreate (Rule 1)', () => { + const oldDefs = [ + materializedView({ ...baseMv, refresh: { every: '1 HOUR', append: true } }), + ] + const newDefs = [ + materializedView({ ...baseMv, refresh: { every: '1 HOUR' } }), + ] + const plan = planDiff(oldDefs, newDefs) + expect(plan.operations.map((op) => op.type)).toEqual([ + 'drop_materialized_view', + 'create_materialized_view', + ]) + }) + + test('diff: schedule-only change emits MODIFY REFRESH', () => { + const oldDefs = [materializedView({ ...baseMv, refresh: { every: '1 HOUR' } })] + const newDefs = [materializedView({ ...baseMv, refresh: { every: '30 MINUTE' } })] + const plan = planDiff(oldDefs, newDefs) + expect(plan.operations).toHaveLength(1) + const op = plan.operations[0] + expect(op?.type).toBe('alter_materialized_view_modify_refresh') + expect(op?.sql).toContain('ALTER TABLE analytics.daily_mv MODIFY REFRESH EVERY 30 MINUTE') + expect(op?.sql).not.toContain('APPEND') + }) + + test('diff: schedule-only change on APPEND MV preserves APPEND in MODIFY REFRESH (Rule 2)', () => { + const oldDefs = [ + materializedView({ ...baseMv, refresh: { every: '1 HOUR', append: true } }), + ] + const newDefs = [ + materializedView({ ...baseMv, refresh: { every: '30 SECOND', append: true } }), + ] + const plan = planDiff(oldDefs, newDefs) + expect(plan.operations).toHaveLength(1) + const op = plan.operations[0] + expect(op?.type).toBe('alter_materialized_view_modify_refresh') + expect(op?.sql).toContain('MODIFY REFRESH EVERY 30 SECOND') + expect(op?.sql).toContain('APPEND') + }) + + test('diff: randomize/dependsOn/settings changes emit MODIFY REFRESH', () => { + const oldDefs = [materializedView({ ...baseMv, refresh: { every: '1 HOUR' } })] + const newDefs = [ + materializedView({ + ...baseMv, + refresh: { + every: '1 HOUR', + randomize: '1 MINUTE', + dependsOn: [{ database: 'analytics', name: 'upstream' }], + settings: { refresh_retries: 5 }, + }, + }), + ] + const plan = planDiff(oldDefs, newDefs) + expect(plan.operations).toHaveLength(1) + const op = plan.operations[0] + expect(op?.type).toBe('alter_materialized_view_modify_refresh') + expect(op?.sql).toContain('RANDOMIZE FOR 1 MINUTE') + expect(op?.sql).toContain('DEPENDS ON analytics.upstream') + expect(op?.sql).toContain('SETTINGS refresh_retries = 5') + }) + + test('diff: equivalent refresh yields no ops', () => { + const defs = [ + materializedView({ + ...baseMv, + refresh: { every: '1 HOUR', append: true }, + }), + ] + const plan = planDiff(defs, defs) + expect(plan.operations).toEqual([]) + }) + + test('MODIFY REFRESH ranks with other alters', () => { + const oldDefs = [ + table({ + database: 'analytics', + name: 'daily_rollup', + columns: [{ name: 'day', type: 'Date' }], + engine: 'MergeTree()', + primaryKey: ['day'], + orderBy: ['day'], + }), + materializedView({ ...baseMv, refresh: { every: '1 HOUR' } }), + ] + const newDefs = [ + table({ + database: 'analytics', + name: 'daily_rollup', + columns: [ + { name: 'day', type: 'Date' }, + { name: 'total', type: 'UInt64' }, + ], + engine: 'MergeTree()', + primaryKey: ['day'], + orderBy: ['day'], + }), + materializedView({ ...baseMv, refresh: { every: '30 MINUTE' } }), + ] + const plan = planDiff(oldDefs, newDefs) + const types = plan.operations.map((op) => op.type) + // All alter ops come together (rank 1), neither before drops nor after creates. + const firstAlter = types.indexOf('alter_table_add_column') + const firstRefresh = types.indexOf('alter_materialized_view_modify_refresh') + expect(firstAlter).toBeGreaterThanOrEqual(0) + expect(firstRefresh).toBeGreaterThanOrEqual(0) + // No create_* follow the alters + const lastCreate = Math.max( + types.lastIndexOf('create_database'), + types.lastIndexOf('create_table'), + types.lastIndexOf('create_view'), + types.lastIndexOf('create_materialized_view') + ) + expect(Math.max(firstAlter, firstRefresh)).toBeLessThan( + lastCreate === -1 ? Number.POSITIVE_INFINITY : lastCreate + 1 + ) + }) + + test('canonicalization uppercases intervals and sorts dependsOn/settings', () => { + const defs = canonicalizeDefinitions([ + materializedView({ + ...baseMv, + refresh: { + every: '1 hour', + randomize: '30 seconds', + dependsOn: [ + { database: 'z', name: 'b' }, + { database: 'a', name: 'a' }, + ], + settings: { refresh_retries: 3, refresh_retry_initial_backoff_ms: 100 }, + }, + }), + ]) + const mv = defs[0] + expect(mv?.kind).toBe('materialized_view') + if (mv?.kind !== 'materialized_view' || !mv.refresh) throw new Error('expected refresh') + expect(mv.refresh.every).toBe('1 HOUR') + expect(mv.refresh.randomize).toBe('30 SECOND') + expect(mv.refresh.dependsOn).toEqual([ + { database: 'a', name: 'a' }, + { database: 'z', name: 'b' }, + ]) + expect(Object.keys(mv.refresh.settings ?? {})).toEqual([ + 'refresh_retries', + 'refresh_retry_initial_backoff_ms', + ]) + }) + + test('validates refresh requires exactly one of every/after', () => { + const missing = validateDefinitions([ + materializedView({ ...baseMv, refresh: {} }), + ]) + expect(missing.map((i) => i.code)).toContain('refresh_requires_every_or_after') + + const both = validateDefinitions([ + materializedView({ ...baseMv, refresh: { every: '1 HOUR', after: '10 MINUTE' } }), + ]) + expect(both.map((i) => i.code)).toContain('refresh_every_after_mutually_exclusive') + }) + + test('validates interval format', () => { + const issues = validateDefinitions([ + materializedView({ ...baseMv, refresh: { every: 'soonish' } }), + ]) + expect(issues.map((i) => i.code)).toContain('refresh_interval_format') + }) + + test('validates DEPENDS ON is only allowed with REFRESH EVERY', () => { + const withAfter = validateDefinitions([ + materializedView({ + ...baseMv, + refresh: { + after: '10 MINUTE', + dependsOn: [{ database: 'analytics', name: 'upstream' }], + }, + }), + ]) + expect(withAfter.map((i) => i.code)).toContain('refresh_depends_on_requires_every') + + const withEvery = validateDefinitions([ + materializedView({ + ...baseMv, + refresh: { + every: '1 HOUR', + dependsOn: [{ database: 'analytics', name: 'upstream' }], + }, + }), + ]) + expect(withEvery.some((i) => i.code === 'refresh_depends_on_requires_every')).toBe(false) + }) + + test('validates non-APPEND RMV with replicated target (Rule 3)', () => { + const issues = validateDefinitions([ + table({ + database: 'analytics', + name: 'daily_rollup', + columns: [{ name: 'day', type: 'Date' }], + engine: 'SharedMergeTree', + primaryKey: ['day'], + orderBy: ['day'], + }), + materializedView({ ...baseMv, refresh: { every: '1 HOUR' } }), + ]) + expect(issues.map((i) => i.code)).toContain('refresh_append_required_for_replicated_target') + }) + + test('no issue when APPEND RMV targets replicated table', () => { + const issues = validateDefinitions([ + table({ + database: 'analytics', + name: 'daily_rollup', + columns: [{ name: 'day', type: 'Date' }], + engine: 'SharedMergeTree', + primaryKey: ['day'], + orderBy: ['day'], + }), + materializedView({ ...baseMv, refresh: { every: '1 HOUR', append: true } }), + ]) + expect( + issues.some((i) => i.code === 'refresh_append_required_for_replicated_target') + ).toBe(false) + }) + + test('no issue when target table is not in the schema (external)', () => { + const issues = validateDefinitions([ + materializedView({ ...baseMv, refresh: { every: '1 HOUR' } }), + ]) + expect( + issues.some((i) => i.code === 'refresh_append_required_for_replicated_target') + ).toBe(false) + }) +}) diff --git a/packages/core/src/model-types.ts b/packages/core/src/model-types.ts index a489e1c..e1ff2a0 100644 --- a/packages/core/src/model-types.ts +++ b/packages/core/src/model-types.ts @@ -80,11 +80,23 @@ export interface ViewDefinition { comment?: string } +export interface MaterializedViewRefresh { + every?: string + after?: string + offset?: string + randomize?: string + dependsOn?: Array<{ database: string; name: string }> + settings?: Record + append?: boolean + empty?: boolean +} + export interface MaterializedViewDefinition { kind: 'materialized_view' database: string name: string to: { database: string; name: string } + refresh?: MaterializedViewRefresh as: string comment?: string } @@ -192,6 +204,7 @@ export type MigrationOperationType = | 'drop_view' | 'create_materialized_view' | 'drop_materialized_view' + | 'alter_materialized_view_modify_refresh' | 'alter_table_add_column' | 'alter_table_modify_column' | 'alter_table_drop_column' @@ -239,6 +252,11 @@ export type ValidationIssueCode = | 'primary_key_missing_column' | 'order_by_missing_column' | 'index_type_missing_args' + | 'refresh_requires_every_or_after' + | 'refresh_every_after_mutually_exclusive' + | 'refresh_interval_format' + | 'refresh_append_required_for_replicated_target' + | 'refresh_depends_on_requires_every' export interface ValidationIssue { code: ValidationIssueCode diff --git a/packages/core/src/planner.ts b/packages/core/src/planner.ts index e715aba..9eff2ec 100644 --- a/packages/core/src/planner.ts +++ b/packages/core/src/planner.ts @@ -3,6 +3,8 @@ import { diffByName, diffClauses, diffSettings } from './diff-primitives.js' import type { ColumnDefinition, ColumnRenameSuggestion, + MaterializedViewDefinition, + MaterializedViewRefresh, MigrationOperation, MigrationPlan, RiskLevel, @@ -17,6 +19,7 @@ import { renderAlterDropIndex, renderAlterDropProjection, renderAlterModifyColumn, + renderAlterModifyRefresh, renderAlterModifySetting, renderAlterModifyTTL, renderAlterResetSetting, @@ -188,6 +191,61 @@ function inferColumnRenameSuggestions( }) } +function refreshEqual( + oldRefresh: MaterializedViewRefresh | undefined, + newRefresh: MaterializedViewRefresh | undefined +): boolean { + return JSON.stringify(oldRefresh ?? null) === JSON.stringify(newRefresh ?? null) +} + +function diffMaterializedView( + oldDef: MaterializedViewDefinition, + newDef: MaterializedViewDefinition +): MigrationOperation[] { + const oldAppend = oldDef.refresh?.append === true + const newAppend = newDef.refresh?.append === true + const hasOldRefresh = oldDef.refresh !== undefined + const hasNewRefresh = newDef.refresh !== undefined + + const structuralChange = + newDef.as !== oldDef.as || + newDef.comment !== oldDef.comment || + newDef.to.database !== oldDef.to.database || + newDef.to.name !== oldDef.to.name || + hasOldRefresh !== hasNewRefresh || + oldAppend !== newAppend + + if (structuralChange) { + return [ + { + type: 'drop_materialized_view', + key: definitionKey(newDef), + risk: 'caution', + sql: `DROP TABLE IF EXISTS ${newDef.database}.${newDef.name} SYNC;`, + }, + { + type: 'create_materialized_view', + key: definitionKey(newDef), + risk: 'caution', + sql: toCreateSQL(newDef), + }, + ] + } + + if (hasNewRefresh && !refreshEqual(oldDef.refresh, newDef.refresh)) { + return [ + { + type: 'alter_materialized_view_modify_refresh', + key: `materialized_view:${newDef.database}.${newDef.name}:refresh`, + risk: 'caution', + sql: renderAlterModifyRefresh(newDef), + }, + ] + } + + return [] +} + function diffTables(oldDef: TableDefinition, newDef: TableDefinition): TableDiffResult { if (requiresTableRecreate(oldDef, newDef)) { return { @@ -391,15 +449,8 @@ export function planDiff(oldDefinitions: SchemaDefinition[], newDefinitions: Sch newDef.kind === 'materialized_view' && oldDef.kind === 'materialized_view' ) { - const changed = - newDef.as !== oldDef.as || - newDef.comment !== oldDef.comment || - newDef.to.database !== oldDef.to.database || - newDef.to.name !== oldDef.to.name - if (changed) { - pushDropOperation(operations, oldDef, 'caution') - pushCreateOperation(operations, newDef, 'caution') - } + const mvOps = diffMaterializedView(oldDef, newDef) + operations.push(...mvOps) continue } @@ -424,6 +475,7 @@ export function planDiff(oldDefinitions: SchemaDefinition[], newDefinitions: Sch operations.sort((a, b) => { const rank = (op: MigrationOperation): number => { if (op.type.startsWith('drop_')) return 0 + if (op.type === 'alter_materialized_view_modify_refresh') return 1 if (op.type.startsWith('alter_')) return 1 if (op.type === 'create_database') return 2 if (op.type === 'create_table') return 3 diff --git a/packages/core/src/sql-validation.e2e.test.ts b/packages/core/src/sql-validation.e2e.test.ts index 7163731..2ae3a6c 100644 --- a/packages/core/src/sql-validation.e2e.test.ts +++ b/packages/core/src/sql-validation.e2e.test.ts @@ -6,7 +6,7 @@ * No DDL is executed — only parsing via EXPLAIN AST. */ -import { describe, expect, test } from 'bun:test' +import { afterAll, beforeAll, describe, expect, test } from 'bun:test' import { createClient } from '@clickhouse/client' import type { ColumnDefinition, @@ -23,6 +23,7 @@ import { renderAlterDropIndex, renderAlterAddProjection, renderAlterDropProjection, + renderAlterModifyRefresh, renderAlterModifySetting, renderAlterResetSetting, renderAlterModifyTTL, @@ -116,7 +117,7 @@ function baseTable(overrides?: Partial>): TableDef describe('SQL validation via EXPLAIN AST', () => { let client: QueryClient - test('setup', () => { + beforeAll(() => { client = createQueryClient() }) @@ -699,6 +700,113 @@ ORDER BY (\`id\`, toDate(\`created_at\`))` }) await assertValidSQL(client, toCreateSQL(def)) }) + + test('refreshable MV — REFRESH EVERY + TO', async () => { + const def = materializedView({ + database: 'default', + name: 'test_rmv', + to: { database: 'default', name: 'rmv_target' }, + refresh: { every: '1 HOUR' }, + as: 'SELECT id, count() AS cnt FROM default.source GROUP BY id', + }) + await assertValidSQL(client, toCreateSQL(def)) + }) + + test('refreshable MV — APPEND + OFFSET + RANDOMIZE + SETTINGS', async () => { + const def = materializedView({ + database: 'default', + name: 'test_rmv_append', + to: { database: 'default', name: 'rmv_append_target' }, + refresh: { + every: '1 DAY', + offset: '2 HOUR', + randomize: '5 MINUTE', + settings: { refresh_retries: 3 }, + append: true, + }, + as: 'SELECT toDate(created_at) AS day, count() AS c FROM default.events GROUP BY day', + }) + await assertValidSQL(client, toCreateSQL(def)) + }) + + test('refreshable MV — REFRESH AFTER (no DEPENDS ON, per ClickHouse rule)', async () => { + const def = materializedView({ + database: 'default', + name: 'test_rmv_after', + to: { database: 'default', name: 'rmv_after_target' }, + refresh: { after: '10 MINUTE' }, + as: 'SELECT id FROM default.source', + }) + await assertValidSQL(client, toCreateSQL(def)) + }) + + test('refreshable MV — REFRESH EVERY + DEPENDS ON', async () => { + const def = materializedView({ + database: 'default', + name: 'test_rmv_deps', + to: { database: 'default', name: 'rmv_deps_target' }, + refresh: { + every: '1 HOUR', + dependsOn: [{ database: 'default', name: 'upstream_mv' }], + }, + as: 'SELECT id FROM default.source', + }) + await assertValidSQL(client, toCreateSQL(def)) + }) + + test('refreshable MV — EMPTY clause', async () => { + const def = materializedView({ + database: 'default', + name: 'test_rmv_empty', + to: { database: 'default', name: 'rmv_empty_target' }, + refresh: { every: '1 HOUR', empty: true }, + as: 'SELECT id FROM default.source', + }) + await assertValidSQL(client, toCreateSQL(def)) + }) + }) + + // ========================================================================= + // ALTER TABLE — MODIFY REFRESH + // ========================================================================= + + describe('ALTER TABLE — MODIFY REFRESH', () => { + test('MODIFY REFRESH EVERY', async () => { + const def = materializedView({ + database: 'default', + name: 'test_rmv', + to: { database: 'default', name: 'rmv_target' }, + refresh: { every: '30 MINUTE' }, + as: 'SELECT 1', + }) + await assertValidSQL(client, renderAlterModifyRefresh(def)) + }) + + test('MODIFY REFRESH with APPEND preserved (Rule 2)', async () => { + const def = materializedView({ + database: 'default', + name: 'test_rmv', + to: { database: 'default', name: 'rmv_target' }, + refresh: { every: '30 SECOND', append: true }, + as: 'SELECT 1', + }) + await assertValidSQL(client, renderAlterModifyRefresh(def)) + }) + + test('MODIFY REFRESH AFTER + RANDOMIZE + SETTINGS', async () => { + const def = materializedView({ + database: 'default', + name: 'test_rmv', + to: { database: 'default', name: 'rmv_target' }, + refresh: { + after: '5 MINUTE', + randomize: '30 SECOND', + settings: { refresh_retries: 3 }, + }, + as: 'SELECT 1', + }) + await assertValidSQL(client, renderAlterModifyRefresh(def)) + }) }) // ========================================================================= @@ -1015,7 +1123,7 @@ ORDER BY (\`id\`, toDate(\`created_at\`))` // Cleanup // ========================================================================= - test('teardown', async () => { + afterAll(async () => { await client.close() }) }, { timeout: 60_000 }) diff --git a/packages/core/src/sql.ts b/packages/core/src/sql.ts index d1ee77d..e097c4f 100644 --- a/packages/core/src/sql.ts +++ b/packages/core/src/sql.ts @@ -1,6 +1,7 @@ import type { ColumnDefinition, MaterializedViewDefinition, + MaterializedViewRefresh, SchemaDefinition, SkipIndexDefinition, TableDefinition, @@ -69,8 +70,58 @@ function renderViewSQL(def: ViewDefinition): string { return `CREATE VIEW IF NOT EXISTS ${def.database}.${def.name} AS\n${def.as};` } +function renderRefreshSettings(settings: Record): string { + return Object.entries(settings) + .map(([k, v]) => `${k} = ${typeof v === 'string' ? `'${v.replace(/'/g, "''")}'` : v}`) + .join(', ') +} + +function renderDependsOn(dependsOn: Array<{ database: string; name: string }>): string { + return dependsOn.map((dep) => `${dep.database}.${dep.name}`).join(', ') +} + +/** + * Renders the REFRESH clause block for a refreshable materialized view. + * Grammar order: REFRESH EVERY|AFTER [OFFSET ] [RANDOMIZE FOR ] + * [DEPENDS ON ] [SETTINGS ] [APPEND] + * Note: EMPTY belongs after TO in CREATE, so it's NOT included here. + */ +export function renderRefreshClause(refresh: MaterializedViewRefresh): string { + const parts: string[] = [] + if (refresh.every) parts.push(`REFRESH EVERY ${refresh.every}`) + else if (refresh.after) parts.push(`REFRESH AFTER ${refresh.after}`) + if (refresh.offset) parts.push(`OFFSET ${refresh.offset}`) + if (refresh.randomize) parts.push(`RANDOMIZE FOR ${refresh.randomize}`) + if (refresh.dependsOn && refresh.dependsOn.length > 0) { + parts.push(`DEPENDS ON ${renderDependsOn(refresh.dependsOn)}`) + } + if (refresh.settings && Object.keys(refresh.settings).length > 0) { + parts.push(`SETTINGS ${renderRefreshSettings(refresh.settings)}`) + } + if (refresh.append) parts.push('APPEND') + return parts.join(' ') +} + function renderMaterializedViewSQL(def: MaterializedViewDefinition): string { - return `CREATE MATERIALIZED VIEW IF NOT EXISTS ${def.database}.${def.name} TO ${def.to.database}.${def.to.name} AS\n${def.as};` + const header = `CREATE MATERIALIZED VIEW IF NOT EXISTS ${def.database}.${def.name}` + const refreshBlock = def.refresh ? `\n${renderRefreshClause(def.refresh)}` : '' + const toClause = ` TO ${def.to.database}.${def.to.name}` + const emptyClause = def.refresh?.empty ? ' EMPTY' : '' + return `${header}${refreshBlock}${toClause}${emptyClause} AS\n${def.as};` +} + +/** + * Renders ALTER TABLE ... MODIFY REFRESH for a refresh-only change. + * Per live validation: APPEND must be re-included if present, because ClickHouse + * treats omitting APPEND as "remove APPEND" and rejects. + */ +export function renderAlterModifyRefresh(def: MaterializedViewDefinition): string { + if (!def.refresh) { + throw new Error( + `Cannot render MODIFY REFRESH for ${def.database}.${def.name}: refresh is not set` + ) + } + return `ALTER TABLE ${def.database}.${def.name} MODIFY ${renderRefreshClause(def.refresh)};` } export function toCreateSQL(def: SchemaDefinition): string { diff --git a/packages/core/src/validate.ts b/packages/core/src/validate.ts index bbef38a..371ae8a 100644 --- a/packages/core/src/validate.ts +++ b/packages/core/src/validate.ts @@ -1,6 +1,8 @@ import { definitionKey } from './canonical.js' import { normalizeKeyColumns } from './key-clause.js' import type { + MaterializedViewDefinition, + MaterializedViewRefresh, SchemaDefinition, TableDefinition, ValidationIssue, @@ -100,6 +102,86 @@ function validateTableDefinition(def: TableDefinition, issues: ValidationIssue[] } } +const INTERVAL_PATTERN = + /^\s*\d+\s+(SECOND|MINUTE|HOUR|DAY|WEEK|MONTH|YEAR)(\s+\d+\s+(SECOND|MINUTE|HOUR|DAY|WEEK|MONTH|YEAR))*\s*$/i + +const REPLICATED_ENGINE_PATTERN = /^(Shared|Replicated)/ + +function validateInterval( + def: MaterializedViewDefinition, + issues: ValidationIssue[], + field: keyof MaterializedViewRefresh, + value: string | undefined +): void { + if (value === undefined) return + if (!INTERVAL_PATTERN.test(value)) { + pushValidationIssue( + issues, + def, + 'refresh_interval_format', + `Materialized view ${def.database}.${def.name} refresh.${String(field)} "${value}" is not a valid interval (expected e.g. "1 HOUR", "30 SECOND")` + ) + } +} + +function validateMaterializedViewDefinition( + def: MaterializedViewDefinition, + issues: ValidationIssue[], + definitions: SchemaDefinition[] +): void { + const { refresh } = def + if (!refresh) return + + const hasEvery = refresh.every !== undefined && refresh.every.length > 0 + const hasAfter = refresh.after !== undefined && refresh.after.length > 0 + if (!hasEvery && !hasAfter) { + pushValidationIssue( + issues, + def, + 'refresh_requires_every_or_after', + `Materialized view ${def.database}.${def.name} refresh requires exactly one of "every" or "after"` + ) + } else if (hasEvery && hasAfter) { + pushValidationIssue( + issues, + def, + 'refresh_every_after_mutually_exclusive', + `Materialized view ${def.database}.${def.name} refresh specifies both "every" and "after"; choose one` + ) + } + + validateInterval(def, issues, 'every', refresh.every) + validateInterval(def, issues, 'after', refresh.after) + validateInterval(def, issues, 'offset', refresh.offset) + validateInterval(def, issues, 'randomize', refresh.randomize) + + if (refresh.dependsOn && refresh.dependsOn.length > 0 && hasAfter && !hasEvery) { + pushValidationIssue( + issues, + def, + 'refresh_depends_on_requires_every', + `Materialized view ${def.database}.${def.name} uses DEPENDS ON with REFRESH AFTER; ClickHouse only allows DEPENDS ON with REFRESH EVERY.` + ) + } + + if (!refresh.append) { + const target = definitions.find( + (other): other is TableDefinition => + other.kind === 'table' && + other.database === def.to.database && + other.name === def.to.name + ) + if (target && REPLICATED_ENGINE_PATTERN.test(target.engine)) { + pushValidationIssue( + issues, + def, + 'refresh_append_required_for_replicated_target', + `Materialized view ${def.database}.${def.name} refreshes a replicated target ${target.database}.${target.name} (${target.engine}) without APPEND. ClickHouse rejects this combination. Set refresh.append = true, or target a non-replicated table.` + ) + } + } +} + export function validateDefinitions(definitions: SchemaDefinition[]): ValidationIssue[] { const issues: ValidationIssue[] = [] const objectKeys = new Set() @@ -118,6 +200,8 @@ export function validateDefinitions(definitions: SchemaDefinition[]): Validation if (def.kind === 'table') { validateTableDefinition(def, issues) + } else if (def.kind === 'materialized_view') { + validateMaterializedViewDefinition(def, issues, definitions) } } diff --git a/packages/plugin-pull/src/index.test.ts b/packages/plugin-pull/src/index.test.ts index 026fcb0..28549c7 100644 --- a/packages/plugin-pull/src/index.test.ts +++ b/packages/plugin-pull/src/index.test.ts @@ -74,6 +74,33 @@ describe('@chkit/plugin-pull renderSchemaFile', () => { expect(content).toContain('to: { database: "app", name: "events_rollup" }') expect(content).toContain('export default schema(app_events_view, app_events_mv)') }) + + test('renders refreshable materialized view with full refresh block', () => { + const content = renderSchemaFile([ + { + kind: 'materialized_view', + database: 'analytics', + name: 'daily_mv', + to: { database: 'analytics', name: 'daily_rollup' }, + refresh: { + every: '1 HOUR', + randomize: '30 SECOND', + dependsOn: [{ database: 'analytics', name: 'upstream' }], + settings: { refresh_retries: 3 }, + append: true, + }, + as: 'SELECT 1', + }, + ]) + expect(content).toContain('refresh: {') + expect(content).toContain('every: "1 HOUR",') + expect(content).toContain('randomize: "30 SECOND",') + expect(content).toContain('dependsOn: [') + expect(content).toContain('{ database: "analytics", name: "upstream" },') + expect(content).toContain('settings: {') + expect(content).toContain('refresh_retries: 3,') + expect(content).toContain('append: true,') + }) }) describe('@chkit/plugin-pull schema command', () => { @@ -347,6 +374,77 @@ FROM app.users;`) }) expect(definition).toBeNull() }) + + test('parseRefreshClause returns null when no REFRESH present', () => { + const refresh = __testUtils.parseRefreshClause( + 'CREATE MATERIALIZED VIEW app.mv TO app.t AS SELECT 1' + ) + expect(refresh).toBeNull() + }) + + test('parseRefreshClause extracts REFRESH EVERY + APPEND', () => { + const refresh = __testUtils.parseRefreshClause( + 'CREATE MATERIALIZED VIEW app.mv REFRESH EVERY 30 SECOND APPEND TO app.t AS SELECT 1' + ) + expect(refresh).toEqual({ every: '30 SECOND', append: true }) + }) + + test('parseRefreshClause extracts AFTER + OFFSET + RANDOMIZE + SETTINGS', () => { + const refresh = __testUtils.parseRefreshClause( + `CREATE MATERIALIZED VIEW app.mv +REFRESH AFTER 10 MINUTE OFFSET 5 MINUTE RANDOMIZE FOR 30 SECOND SETTINGS refresh_retries = 3, refresh_retry_initial_backoff_ms = 100 +TO app.t AS SELECT 1` + ) + expect(refresh).toEqual({ + after: '10 MINUTE', + offset: '5 MINUTE', + randomize: '30 SECOND', + settings: { refresh_retries: 3, refresh_retry_initial_backoff_ms: 100 }, + }) + }) + + test('parseRefreshClause extracts DEPENDS ON list', () => { + const refresh = __testUtils.parseRefreshClause( + 'CREATE MATERIALIZED VIEW app.mv REFRESH EVERY 1 HOUR DEPENDS ON analytics.upstream_mv, other.dep TO app.t AS SELECT 1' + ) + expect(refresh?.dependsOn).toEqual([ + { database: 'analytics', name: 'upstream_mv' }, + { database: 'other', name: 'dep' }, + ]) + }) + + test('parseRefreshClause ignores DEFINER / SQL SECURITY noise that Cloud auto-injects', () => { + const cloudDdl = `CREATE MATERIALIZED VIEW app.mv +REFRESH EVERY 30 SECOND APPEND TO app.t +(\`day\` Date, \`total\` UInt64) +DEFINER = default SQL SECURITY DEFINER +AS SELECT today() AS day, toUInt64(1) AS total` + const refresh = __testUtils.parseRefreshClause(cloudDdl) + expect(refresh).toEqual({ every: '30 SECOND', append: true }) + // parseAsClause should also strip DEFINER/SQL SECURITY out of the AS body + const asClause = __testUtils.parseAsClause(cloudDdl) + expect(asClause).not.toContain('DEFINER') + expect(asClause).not.toContain('SQL SECURITY') + expect(asClause).toContain('SELECT today() AS day') + }) + + test('mapSystemTableRowToDefinition attaches parsed refresh metadata', () => { + const definition = __testUtils.mapSystemTableRowToDefinition({ + database: 'app', + name: 'mv', + engine: 'MaterializedView', + create_table_query: + 'CREATE MATERIALIZED VIEW app.mv REFRESH EVERY 1 HOUR APPEND TO app.t AS SELECT 1', + }) + expect(definition).toEqual({ + kind: 'materialized_view', + database: 'app', + name: 'mv', + to: { database: 'app', name: 't' }, + as: 'SELECT 1', + refresh: { every: '1 HOUR', append: true }, + }) + }) }) describe('@chkit/plugin-pull skipped objects summary', () => { diff --git a/packages/plugin-pull/src/index.ts b/packages/plugin-pull/src/index.ts index b27448e..b168be9 100644 --- a/packages/plugin-pull/src/index.ts +++ b/packages/plugin-pull/src/index.ts @@ -25,6 +25,7 @@ import { renderSchemaFile } from './render-schema.js' import { mapSystemTableRowToDefinition, parseAsClause, + parseRefreshClause, parseToClause, type IntrospectedObject, type SystemTableRow, @@ -319,6 +320,7 @@ function mapIntrospectedObjectToDefinition(introspected: IntrospectedObject): Sc name: introspected.name, to: introspected.to, as: introspected.as, + ...(introspected.refresh ? { refresh: introspected.refresh } : {}), } } return mapIntrospectedTableToDefinition(introspected) @@ -397,6 +399,7 @@ export const __testUtils = { summarizeSkippedObjects, parseAsClause, parseToClause, + parseRefreshClause, mapSystemTableRowToDefinition, } diff --git a/packages/plugin-pull/src/pull.e2e.test.ts b/packages/plugin-pull/src/pull.e2e.test.ts index 3fa1be6..7200830 100644 --- a/packages/plugin-pull/src/pull.e2e.test.ts +++ b/packages/plugin-pull/src/pull.e2e.test.ts @@ -86,6 +86,7 @@ describe('@chkit/plugin-pull live env e2e', () => { const eventsView = `${prefix}events_view` const eventsRollupTable = `${prefix}events_rollup` const eventsMaterializedView = `${prefix}events_mv` + const eventsRefreshableView = `${prefix}events_rmv` const noiseTable = `${prefix}noise_table` const dir = await mkdtemp(join(tmpdir(), 'chkit-plugin-pull-e2e-')) @@ -124,6 +125,11 @@ describe('@chkit/plugin-pull live env e2e', () => { await executor.command( `CREATE MATERIALIZED VIEW ${quoteIdent(targetDatabase)}.${quoteIdent(eventsMaterializedView)} TO ${quoteIdent(targetDatabase)}.${quoteIdent(eventsRollupTable)} AS SELECT id, count() AS c FROM ${quoteIdent(targetDatabase)}.${quoteIdent(eventsTable)} GROUP BY id` ) + // Refreshable MV — must use APPEND on replicated (SharedMergeTree) targets, + // per the ClickHouse Cloud constraint verified on customer-benchmark. + await executor.command( + `CREATE MATERIALIZED VIEW ${quoteIdent(targetDatabase)}.${quoteIdent(eventsRefreshableView)} REFRESH EVERY 1 HOUR APPEND TO ${quoteIdent(targetDatabase)}.${quoteIdent(eventsRollupTable)} AS SELECT id, count() AS c FROM ${quoteIdent(targetDatabase)}.${quoteIdent(eventsTable)} GROUP BY id` + ) await executor.command( `CREATE TABLE ${quoteIdent(noiseDatabase)}.${quoteIdent(noiseTable)} (id UInt64) ENGINE = MergeTree() ORDER BY (id)` ) @@ -159,7 +165,7 @@ describe('@chkit/plugin-pull live env e2e', () => { } expect(payload.ok).toBe(true) expect(payload.command).toBe('schema') - expect(payload.definitionCount).toBe(5) + expect(payload.definitionCount).toBe(6) expect(payload.tableCount).toBe(3) expect(payload.databases).toEqual([targetDatabase]) expect(payload.outFile).toBe(pulledSchemaPath) @@ -174,6 +180,13 @@ describe('@chkit/plugin-pull live env e2e', () => { expect(pulledSchema).toContain(`name: "${eventsView}"`) expect(pulledSchema).toContain(`name: "${eventsMaterializedView}"`) expect(pulledSchema).toContain('materializedView({') + // Refreshable MV — assert the refresh block was parsed and rendered + expect(pulledSchema).toContain(`name: "${eventsRefreshableView}"`) + expect(pulledSchema).toContain('refresh: {') + expect(pulledSchema).toContain('every: "1 HOUR",') + expect(pulledSchema).toContain('append: true,') + expect(pulledSchema).not.toContain('DEFINER') + expect(pulledSchema).not.toContain('SQL SECURITY') expect(pulledSchema).not.toContain(noiseDatabase) } finally { await rm(dir, { recursive: true, force: true }) diff --git a/packages/plugin-pull/src/render-schema.ts b/packages/plugin-pull/src/render-schema.ts index 6e5b214..7cb92cc 100644 --- a/packages/plugin-pull/src/render-schema.ts +++ b/packages/plugin-pull/src/render-schema.ts @@ -1,4 +1,8 @@ -import { canonicalizeDefinitions, type SchemaDefinition } from '@chkit/core' +import { + canonicalizeDefinitions, + type MaterializedViewRefresh, + type SchemaDefinition, +} from '@chkit/core' export function renderSchemaFile(definitions: SchemaDefinition[]): string { const canonical = canonicalizeDefinitions(definitions) @@ -97,6 +101,9 @@ export function renderSchemaFile(definitions: SchemaDefinition[]): string { lines.push(` database: ${renderString(definition.database)},`) lines.push(` name: ${renderString(definition.name)},`) lines.push(` to: { database: ${renderString(definition.to.database)}, name: ${renderString(definition.to.name)} },`) + if (definition.refresh) { + lines.push(...renderRefreshLines(definition.refresh)) + } lines.push(` as: ${renderString(definition.as)},`) lines.push('})') } @@ -139,6 +146,37 @@ function renderLiteral(value: string | number | boolean): string { return String(value) } +function renderRefreshLines(refresh: MaterializedViewRefresh): string[] { + const lines: string[] = [] + lines.push(' refresh: {') + if (refresh.every) lines.push(` every: ${renderString(refresh.every)},`) + if (refresh.after) lines.push(` after: ${renderString(refresh.after)},`) + if (refresh.offset) lines.push(` offset: ${renderString(refresh.offset)},`) + if (refresh.randomize) lines.push(` randomize: ${renderString(refresh.randomize)},`) + if (refresh.dependsOn && refresh.dependsOn.length > 0) { + lines.push(' dependsOn: [') + for (const dep of refresh.dependsOn) { + lines.push( + ` { database: ${renderString(dep.database)}, name: ${renderString(dep.name)} },` + ) + } + lines.push(' ],') + } + if (refresh.settings && Object.keys(refresh.settings).length > 0) { + lines.push(' settings: {') + for (const key of Object.keys(refresh.settings).sort()) { + const value = refresh.settings[key] + if (value === undefined) continue + lines.push(` ${renderKey(key)}: ${renderLiteral(value)},`) + } + lines.push(' },') + } + if (refresh.append) lines.push(' append: true,') + if (refresh.empty) lines.push(' empty: true,') + lines.push(' },') + return lines +} + function renderKey(value: string): string { if (/^[a-zA-Z_$][a-zA-Z0-9_$]*$/.test(value)) return value return renderString(value) diff --git a/packages/plugin-pull/src/view-parser.ts b/packages/plugin-pull/src/view-parser.ts index 72857e7..d33bedf 100644 --- a/packages/plugin-pull/src/view-parser.ts +++ b/packages/plugin-pull/src/view-parser.ts @@ -1,5 +1,9 @@ import { inferSchemaKindFromEngine, type IntrospectedTable } from '@chkit/clickhouse' -import type { MaterializedViewDefinition, ViewDefinition } from '@chkit/core' +import type { + MaterializedViewDefinition, + MaterializedViewRefresh, + ViewDefinition, +} from '@chkit/core' export interface SystemTableRow { database: string @@ -11,7 +15,10 @@ export interface SystemTableRow { export type IntrospectedObject = | ({ kind: 'table' } & IntrospectedTable) | ({ kind: 'view' } & Pick) - | ({ kind: 'materialized_view' } & Pick) + | ({ kind: 'materialized_view' } & Pick< + MaterializedViewDefinition, + 'database' | 'name' | 'to' | 'as' | 'refresh' + >) | IntrospectedTable export function mapSystemTableRowToDefinition( @@ -27,19 +34,143 @@ export function mapSystemTableRowToDefinition( const as = parseAsClause(row.create_table_query) const to = parseToClause(row.create_table_query, row.database) if (!as || !to) return null - return { kind: 'materialized_view', database: row.database, name: row.name, to, as } + const refresh = parseRefreshClause(row.create_table_query) + const base = { kind: 'materialized_view' as const, database: row.database, name: row.name, to, as } + return refresh ? { ...base, refresh } : base } return null } +/** + * Strips `DEFINER = ... SQL SECURITY ...` clauses that ClickHouse Cloud auto-injects + * into MV DDL. These are orthogonal to schema semantics and absent from user input. + */ +function stripDefinerClauses(query: string): string { + return query + .replace(/\bDEFINER\s*=\s*(?:CURRENT_USER|`[^`]+`|"[^"]+"|[A-Za-z0-9_]+)/gi, '') + .replace(/\bSQL\s+SECURITY\s+(?:DEFINER|INVOKER|NONE)/gi, '') +} + export function parseAsClause(query: string | undefined): string | null { if (!query) return null - const match = /\bAS\b([\s\S]*)$/i.exec(query) + const cleaned = stripDefinerClauses(query) + const match = /\bAS\b([\s\S]*)$/i.exec(cleaned) if (!match?.[1]) return null const asClause = match[1].trim().replace(/;$/, '').trim() return asClause.length > 0 ? asClause : null } +const INTERVAL_TOKEN = '\\d+\\s+(?:SECOND|MINUTE|HOUR|DAY|WEEK|MONTH|YEAR)S?(?:\\s+\\d+\\s+(?:SECOND|MINUTE|HOUR|DAY|WEEK|MONTH|YEAR)S?)*' + +function normalizeInterval(value: string): string { + return value + .trim() + .replace(/\s+/g, ' ') + .replace(/\b(second|minute|hour|day|week|month|year)s?\b/gi, (unit) => + unit.toUpperCase().replace(/S$/, '') + ) +} + +function parseRefreshSettings(segment: string): Record { + const out: Record = {} + // Split on top-level commas (no parentheses nesting expected in SETTINGS). + for (const entry of segment.split(',')) { + const match = /^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+?)\s*$/.exec(entry) + if (!match?.[1] || match[2] === undefined) continue + const key = match[1] + const raw = match[2].trim() + if (/^'.*'$/.test(raw)) { + out[key] = raw.slice(1, -1).replace(/''/g, "'") + continue + } + const asNumber = Number(raw) + if (!Number.isNaN(asNumber) && raw !== '') { + out[key] = asNumber + continue + } + out[key] = raw + } + return out +} + +function parseDependsOn(segment: string): Array<{ database: string; name: string }> { + const out: Array<{ database: string; name: string }> = [] + for (const entry of segment.split(',')) { + const trimmed = entry.trim() + if (!trimmed) continue + const parts = trimmed.split('.').map((part) => part.replace(/^[`"]|[`"]$/g, '').trim()) + if (parts.length === 1 && parts[0]) { + out.push({ database: 'default', name: parts[0] }) + } else if (parts.length === 2 && parts[0] && parts[1]) { + out.push({ database: parts[0], name: parts[1] }) + } + } + return out +} + +/** + * Parses the REFRESH block from a CREATE MATERIALIZED VIEW statement. + * Returns null when the query contains no REFRESH clause (regular MV). + * + * Handles, in any order: + * REFRESH EVERY|AFTER + * OFFSET + * RANDOMIZE FOR + * DEPENDS ON + * SETTINGS + * APPEND + * EMPTY + * + * Strips DEFINER / SQL SECURITY auto-injected by ClickHouse Cloud before parsing. + */ +export function parseRefreshClause( + query: string | undefined +): MaterializedViewRefresh | null { + if (!query) return null + const cleaned = stripDefinerClauses(query) + const refreshMatch = new RegExp( + `\\bREFRESH\\s+(EVERY|AFTER)\\s+(${INTERVAL_TOKEN})`, + 'i' + ).exec(cleaned) + if (!refreshMatch) return null + + const refresh: MaterializedViewRefresh = {} + if (refreshMatch[1]?.toUpperCase() === 'EVERY') { + refresh.every = normalizeInterval(refreshMatch[2] ?? '') + } else { + refresh.after = normalizeInterval(refreshMatch[2] ?? '') + } + + const offsetMatch = new RegExp(`\\bOFFSET\\s+(${INTERVAL_TOKEN})`, 'i').exec(cleaned) + if (offsetMatch?.[1]) refresh.offset = normalizeInterval(offsetMatch[1]) + + const randomizeMatch = new RegExp(`\\bRANDOMIZE\\s+FOR\\s+(${INTERVAL_TOKEN})`, 'i').exec(cleaned) + if (randomizeMatch?.[1]) refresh.randomize = normalizeInterval(randomizeMatch[1]) + + // DEPENDS ON runs until the next known keyword or end of refresh block. + const dependsMatch = + /\bDEPENDS\s+ON\s+([\s\S]*?)(?=\bSETTINGS\b|\bAPPEND\b|\bTO\b|\bEMPTY\b|\bAS\b|$)/i.exec(cleaned) + if (dependsMatch?.[1]) { + const deps = parseDependsOn(dependsMatch[1]) + if (deps.length > 0) refresh.dependsOn = deps + } + + const settingsMatch = + /\bSETTINGS\s+([\s\S]*?)(?=\bAPPEND\b|\bTO\b|\bEMPTY\b|\bAS\b|$)/i.exec(cleaned) + if (settingsMatch?.[1]) { + const settings = parseRefreshSettings(settingsMatch[1]) + if (Object.keys(settings).length > 0) refresh.settings = settings + } + + // APPEND / EMPTY must appear between the TO/SETTINGS area and AS. + const afterRefresh = cleaned.slice(refreshMatch.index) + const beforeAs = afterRefresh.split(/\bAS\b/i)[0] ?? '' + if (/\bAPPEND\b/i.test(beforeAs)) refresh.append = true + if (/\bEMPTY\b/i.test(beforeAs)) refresh.empty = true + + return refresh +} + export function parseToClause( query: string | undefined, fallbackDatabase: string From 922d62d63734d6ea85590c0a305740bd3a403b62 Mon Sep 17 00:00:00 2001 From: Alvaro Date: Mon, 20 Apr 2026 14:08:44 +0200 Subject: [PATCH 2/2] chore: gitignore local thoughts/ notes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prevents ticket exports, research drafts, and other working notes from ever being tracked. Flagged in code review — Linear URLs, personal emails, and customer hostnames sometimes end up in these files. Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 07402df..91c3ca4 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ coverage .vscode .idea .claude/settings.local.json +# Local research, ticket exports, and other working notes — never committed. +thoughts/