From ba5049ff5020690462cf5b41f63693f7d7ddcfce Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Tue, 9 Jun 2026 10:17:54 -0700 Subject: [PATCH 1/2] Bind Atlas acquisition windows at full microsecond precision MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The state token is the microsecond high-water text of MAX(updated_at) (to_char(... 'US')), but the acquire queries bound results by turning that token into a JS Date, which only holds milliseconds. The bind truncated the sub-millisecond digits, so a row whose true updated_at landed past the truncated bound was either re-fetched or — once a ceiled token became the next run's strict-greater lower bound — permanently skipped. Return the raw microsecond high-water text verbatim (no millisecond ceil) and bind changedAfter/changedOnOrBefore as $N::timestamptz text params instead of JS Dates. Postgres parses the full microsecond text, so <= includes the high-water row exactly and the next run's > excludes only that exact row — no rounding, no double-fetch, no drop. AtlasContentQuery's changedAfter/ changedOnOrBefore become string (microsecond text); ceilStateTokenToMillis and its __testing export are removed. Tests prove the bind path carries the microsecond text as a ::timestamptz cast (not a truncating Date), that the token round-trips at microsecond precision, and that a boundary row is included in run 1 and neither dropped nor re-fetched in run 2. --- src/__tests__/atlas-db.test.ts | 153 ++++++++++++++++++++++++++++++++- src/db/atlas.ts | 47 +++++++--- 2 files changed, 183 insertions(+), 17 deletions(-) diff --git a/src/__tests__/atlas-db.test.ts b/src/__tests__/atlas-db.test.ts index dda0e1a..7c2e548 100644 --- a/src/__tests__/atlas-db.test.ts +++ b/src/__tests__/atlas-db.test.ts @@ -365,7 +365,7 @@ describe("Atlas DB helpers", () => { ); const items = await listIndexableAtlasContent("atlas", { - changedOnOrBefore: new Date("2026-01-01T12:00:00Z"), + changedOnOrBefore: "2026-01-01T12:00:00.000000Z", }); expect(items.map((item) => item.key)).toEqual(["included"]); @@ -396,10 +396,12 @@ describe("Atlas DB helpers", () => { ["stale", new Date("2026-01-02T00:00:00Z")], ); - expect(await getAtlasStateToken("atlas")).toBe("2026-01-02T00:00:00.000Z"); + expect(await getAtlasStateToken("atlas")).toBe( + "2026-01-02T00:00:00.000000Z", + ); expect( await listRemovedAtlasContentIds("atlas", { - changedAfter: new Date("2026-01-01T12:00:00Z"), + changedAfter: "2026-01-01T12:00:00.000000Z", }), ).toEqual(["atlas-cache:stale"]); }); @@ -517,3 +519,148 @@ describe("Atlas row-mapper robustness", () => { expect(result?.toISOString()).toBe(iso); }); }); + +describe("Atlas state-token microsecond precision (no ceil)", () => { + // The high-water mark comes from a TIMESTAMPTZ (microsecond precision). We + // return it as raw microsecond text and the acquire queries bind it as a + // `$N::timestamptz` text param, so the bounds compare at full microsecond + // precision. There is no millisecond ceil and no JS Date in the bind path, + // so a row whose true updated_at carries sub-millisecond digits (e.g. + // .123456) is included EXACTLY by `<= token` and excluded EXACTLY by the next + // run's `> token` — no drop, no double-fetch. + let db: PGlite; + + beforeAll(async () => { + db = new PGlite(); + await db.waitReady; + await db.exec(extractAtlasDdl()); + __setPoolForTesting(poolFromPglite(db)); + }); + + afterAll(async () => { + __resetPoolForTesting(); + await db.close(); + }); + + beforeEach(async () => { + await db.query("DELETE FROM atlas_cache_pages"); + await db.query("DELETE FROM atlas_seed_entries"); + }); + + // Bind-path proof — this is the DB-independent guarantee that microseconds + // survive the SQL bind. It spies on the pool to capture the params handed to + // the driver and asserts the microsecond TEXT token (not a truncating JS + // Date) reaches the `$N::timestamptz` cast in the generated SQL. + it("binds changedAfter/changedOnOrBefore as ::timestamptz TEXT params, not Date objects", async () => { + const microToken = "2026-01-01T00:00:00.123456Z"; + const calls: { text: string; params: unknown[] }[] = []; + __setPoolForTesting({ + query: (text: string, params?: unknown[]) => { + calls.push({ text, params: params ?? [] }); + return Promise.resolve({ rows: [] }); + }, + connect: async () => ({ + query: () => Promise.resolve({ rows: [] }), + release: () => {}, + }), + end: async () => {}, + }); + try { + await listIndexableAtlasContent("atlas", { + changedAfter: microToken, + changedOnOrBefore: microToken, + }); + } finally { + // Restore the PGlite-backed pool for the rest of the suite. + __setPoolForTesting(poolFromPglite(db)); + } + + // Every emitted bound must be a ::timestamptz cast and the bound param must + // be the raw microsecond text — never a Date that would truncate to ms. + const boundCalls = calls.filter((c) => /updated_at\s*[<>]/.test(c.text)); + expect(boundCalls.length).toBeGreaterThan(0); + for (const call of boundCalls) { + expect(call.text).toContain("::timestamptz"); + expect(call.text).toMatch(/updated_at > \$\d+::timestamptz/); + expect(call.text).toMatch(/updated_at <= \$\d+::timestamptz/); + for (const param of call.params) { + expect(param).not.toBeInstanceOf(Date); + } + expect(call.params).toContain(microToken); + } + }); + + it("returns the raw microsecond high-water text as the state token", async () => { + // Insert a sub-millisecond timestamp via a SQL literal (a JS Date insert + // would truncate to ms before it ever reaches the column). + await upsertAtlasSeedCandidate({ + canonicalKey: "micro", + sourceName: "atlas", + title: "Micro", + content: "Micro content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("micro", "reviewer"); + await db.query( + "UPDATE atlas_seed_entries SET updated_at = '2026-01-01T00:00:00.123456Z' WHERE canonical_key = $1", + ["micro"], + ); + + expect(await getAtlasStateToken("atlas")).toBe( + "2026-01-01T00:00:00.123456Z", + ); + }); + + it("includes the high-water row in run 1 and neither drops nor re-fetches it across run 2", async () => { + // Run 1: a row sits exactly at the high-water mark with sub-ms digits. + await upsertAtlasSeedCandidate({ + canonicalKey: "boundary", + sourceName: "atlas", + title: "Boundary", + content: "Boundary content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("boundary", "reviewer"); + await db.query( + "UPDATE atlas_seed_entries SET updated_at = '2026-01-01T00:00:00.123456Z' WHERE canonical_key = $1", + ["boundary"], + ); + + const token1 = await getAtlasStateToken("atlas"); + expect(token1).toBe("2026-01-01T00:00:00.123456Z"); + + // Run 1 window `<= token1` must INCLUDE the boundary row exactly. + const run1 = await listIndexableAtlasContent("atlas", { + changedOnOrBefore: token1!, + }); + expect(run1.map((i) => i.key)).toEqual(["boundary"]); + + // A new row lands one microsecond after the run-1 high-water mark. + await upsertAtlasSeedCandidate({ + canonicalKey: "after", + sourceName: "atlas", + title: "After", + content: "After content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("after", "reviewer"); + await db.query( + "UPDATE atlas_seed_entries SET updated_at = '2026-01-01T00:00:00.123457Z' WHERE canonical_key = $1", + ["after"], + ); + + const token2 = await getAtlasStateToken("atlas"); + expect(token2).toBe("2026-01-01T00:00:00.123457Z"); + + // Run 2 window `> token1 AND <= token2` must EXCLUDE the boundary row (no + // double-fetch) and INCLUDE only the strictly-later row (no drop). + const run2 = await listIndexableAtlasContent("atlas", { + changedAfter: token1!, + changedOnOrBefore: token2!, + }); + expect(run2.map((i) => i.key)).toEqual(["after"]); + }); +}); diff --git a/src/db/atlas.ts b/src/db/atlas.ts index d803bfa..dd0389f 100644 --- a/src/db/atlas.ts +++ b/src/db/atlas.ts @@ -95,8 +95,14 @@ export interface AtlasRepositoryFilter { } export interface AtlasContentQuery { - changedAfter?: Date; - changedOnOrBefore?: Date; + // Microsecond-precision high-water TEXT (the `to_char(... 'US')` value of a + // row's `updated_at`, e.g. "2026-01-01T00:00:00.123456Z"). These bounds are + // bound into SQL as `$N::timestamptz` text params — NOT as JS Dates — so the + // sub-millisecond digits survive the round trip and `updated_at` is compared + // at full microsecond precision. A JS Date would truncate to milliseconds and + // either re-fetch (`<=`) or permanently drop (`>`) the boundary row. + changedAfter?: string; + changedOnOrBefore?: string; repositories?: AtlasRepositoryFilter[]; } @@ -237,13 +243,18 @@ function addUpdatedAtClauses( params: unknown[], ): string[] { const clauses: string[] = []; + // Bind the microsecond token as a TEXT param cast to timestamptz in SQL + // (`$N::timestamptz`) rather than a JS Date. Postgres parses the full + // microsecond text, so `updated_at > $token::timestamptz` excludes only the + // exact high-water row (no double-fetch next run) and `<= $token::timestamptz` + // includes it exactly — both at full precision, with no rounding. if (query.changedAfter) { params.push(query.changedAfter); - clauses.push(`${alias}.updated_at > $${params.length}`); + clauses.push(`${alias}.updated_at > $${params.length}::timestamptz`); } if (query.changedOnOrBefore) { params.push(query.changedOnOrBefore); - clauses.push(`${alias}.updated_at <= $${params.length}`); + clauses.push(`${alias}.updated_at <= $${params.length}::timestamptz`); } return clauses; } @@ -777,10 +788,18 @@ export async function getAtlasStateToken( ); if (cacheRepositoryClause) cacheClauses.push(cacheRepositoryClause); + // Select the high-water mark as microsecond TEXT (`to_char(... 'US')`) rather + // than letting the driver coerce it to a millisecond JS Date. The raw text + // preserves the sub-millisecond digits, and we hand it back verbatim as the + // state token: the acquire queries bind it as a `$N::timestamptz` text param, + // so `updated_at <= $token` includes the high-water row exactly and the next + // run's `updated_at > $token` excludes only that exact row — no rounding, no + // drop, no double-fetch. + const TOKEN_FORMAT = `'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'`; const [seedResult, cacheResult] = await Promise.all([ pool.query( ` - SELECT MAX(seed.updated_at) AS state_token + SELECT to_char(MAX(seed.updated_at) AT TIME ZONE 'UTC', ${TOKEN_FORMAT}) AS state_token FROM atlas_seed_entries seed WHERE ${seedClauses.join(" AND ")} `, @@ -788,7 +807,7 @@ export async function getAtlasStateToken( ), pool.query( ` - SELECT MAX(cache.updated_at) AS state_token + SELECT to_char(MAX(cache.updated_at) AT TIME ZONE 'UTC', ${TOKEN_FORMAT}) AS state_token FROM atlas_cache_pages cache WHERE ${cacheClauses.join(" AND ")} `, @@ -796,16 +815,16 @@ export async function getAtlasStateToken( ), ]); - const values = [ + const texts = [ seedResult.rows[0]?.state_token, cacheResult.rows[0]?.state_token, - ] - .map((value) => toDate(value, "atlas state token")) - .filter((value): value is Date => value !== null); - if (values.length === 0) return null; - return new Date( - Math.max(...values.map((value) => value.getTime())), - ).toISOString(); + ].filter((value): value is string => typeof value === "string"); + if (texts.length === 0) return null; + // Pick the larger of the two source high-water marks at full text precision + // and return it verbatim. The fixed-width `YYYY-MM-DDTHH:MM:SS.ffffffZ` text + // sorts lexicographically == chronologically, so string comparison preserves + // sub-millisecond ordering the driver's millisecond Date would have flattened. + return texts.reduce((a, b) => (a >= b ? a : b)); } // Test-only exports of the otherwise-private row mappers and timestamp parser. From 22cc3251b7135397f752d81d0fdef032abed8c29 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Tue, 9 Jun 2026 10:18:08 -0700 Subject: [PATCH 2/2] Pass Atlas state tokens through the provider as microsecond text and fail loud on garbage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Carry the microsecond high-water text straight into the SQL bounds instead of wrapping it in new Date(), which would truncate the microseconds the db layer now preserves. The epoch fallback for an empty source is microsecond text too. Drop the dead "currentStateToken ? ... : undefined" ternary — currentStateToken is proven non-null by the null-token early return above. Guard a malformed lastStateToken before it can reach the $N::timestamptz bind: a non-empty token must match the EXACT microsecond shape getAtlasStateToken emits (YYYY-MM-DDTHH:MM:SS.ffffffZ), or it throws a context-bearing error (source name + offending token). A bare new Date() probe was too loose — "2026" or "Jan 5 2026" parse in JS but bind with different / locale-dependent semantics than Postgres ::timestamptz, defeating the fail-loud intent. An empty lastStateToken stays the legitimate first-run signal (no lower bound). Run that validation BEFORE the null-token early return so a corrupt checkpoint fails loud even when the source is empty, rather than silently passing through and re-persisting on every run. Tests cover the fail-loud guard (including JS-parseable-but-non-microsecond tokens and a corrupt token against an empty source) and the empty-first-run lower bound; the null-token loud-skip test is retained and gains a positive control asserting the state-token read actually fired so a broken ESM-binding interception can no longer pass the negative assertions vacuously. --- src/__tests__/atlas-provider.test.ts | 166 +++++++++++++++++++++++++-- src/indexing/providers/atlas.ts | 78 +++++++++++-- 2 files changed, 225 insertions(+), 19 deletions(-) diff --git a/src/__tests__/atlas-provider.test.ts b/src/__tests__/atlas-provider.test.ts index c2f1f17..8548992 100644 --- a/src/__tests__/atlas-provider.test.ts +++ b/src/__tests__/atlas-provider.test.ts @@ -179,7 +179,7 @@ describe("AtlasDataProvider", () => { const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); const stateToken = await provider.getCurrentStateToken(); - expect(stateToken).toBe("2026-01-01T00:00:00.000Z"); + expect(stateToken).toBe("2026-01-01T00:00:00.000000Z"); await upsertAtlasSeedCandidate({ canonicalKey: "new", @@ -305,14 +305,14 @@ describe("AtlasDataProvider", () => { const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); const result = await provider.incrementalAcquire( - "2025-12-31T00:00:00.000Z", + "2025-12-31T00:00:00.000000Z", ); expect(result.items.map((item) => item.id)).toEqual([ "atlas-seed:included", "atlas-seed:future", ]); - expect(result.stateToken).toBe("2026-01-02T00:00:00.000Z"); + expect(result.stateToken).toBe("2026-01-02T00:00:00.000000Z"); }); it("bounds incremental acquisition to the token captured before listing rows", async () => { @@ -331,8 +331,8 @@ describe("AtlasDataProvider", () => { ); const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); - const capturedToken = "2026-01-01T00:00:00.000Z"; - const lateToken = "2026-01-02T00:00:00.000Z"; + const capturedToken = "2026-01-01T00:00:00.000000Z"; + const lateToken = "2026-01-02T00:00:00.000000Z"; const stateTokenSpy = vi .spyOn(atlasDb, "getAtlasStateToken") .mockImplementation(async () => { @@ -354,7 +354,7 @@ describe("AtlasDataProvider", () => { try { const result = await provider.incrementalAcquire( - "2025-12-31T00:00:00.000Z", + "2025-12-31T00:00:00.000000Z", ); expect(stateTokenSpy).toHaveBeenCalledTimes(1); @@ -388,7 +388,7 @@ describe("AtlasDataProvider", () => { const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); const stateToken = await provider.getCurrentStateToken(); - expect(stateToken).toBe("2026-01-01T00:00:00.000Z"); + expect(stateToken).toBe("2026-01-01T00:00:00.000000Z"); await markAtlasCachePageStale("runtime/overview", "seed changed"); await db.query( @@ -396,12 +396,14 @@ describe("AtlasDataProvider", () => { ["runtime/overview", new Date("2026-01-02T00:00:00Z")], ); - expect(await getAtlasStateToken("atlas")).toBe("2026-01-02T00:00:00.000Z"); + expect(await getAtlasStateToken("atlas")).toBe( + "2026-01-02T00:00:00.000000Z", + ); const result = await provider.incrementalAcquire(stateToken ?? ""); expect(result.items).toEqual([]); expect(result.removedIds).toEqual(["atlas-cache:runtime/overview"]); - expect(result.stateToken).toBe("2026-01-02T00:00:00.000Z"); + expect(result.stateToken).toBe("2026-01-02T00:00:00.000000Z"); }); it("incrementally removes rejected seeds and empty cache pages", async () => { @@ -433,7 +435,7 @@ describe("AtlasDataProvider", () => { const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); const result = await provider.incrementalAcquire( - "2026-01-01T00:00:00.000Z", + "2026-01-01T00:00:00.000000Z", ); expect(result.items).toEqual([]); @@ -441,7 +443,7 @@ describe("AtlasDataProvider", () => { "atlas-seed:seed-to-reject", "atlas-cache:runtime/empty", ]); - expect(result.stateToken).toBe("2026-01-03T00:00:00.000Z"); + expect(result.stateToken).toBe("2026-01-03T00:00:00.000000Z"); }); it("provider registry resolves type atlas", () => { @@ -449,4 +451,146 @@ describe("AtlasDataProvider", () => { const provider = factory(atlasConfig, { cloneDir: "/tmp" }); expect(provider).toBeInstanceOf(AtlasDataProvider); }); + + it("skips loudly when the current state token is null instead of running an empty window", async () => { + // A null current token means the high-water read saw no rows (source empty + // or unreadable). Carrying lastStateToken forward would build the window + // `> T AND <= T`, which matches nothing — a silent no-op that masks a + // possibly-failed high-water read. The pass must be skipped LOUDLY and the + // guaranteed-empty acquire queries must NOT run. + const stateTokenSpy = vi + .spyOn(atlasDb, "getAtlasStateToken") + .mockResolvedValue(null); + const listSpy = vi.spyOn(atlasDb, "listIndexableAtlasContent"); + const removedSpy = vi.spyOn(atlasDb, "listRemovedAtlasContentIds"); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + try { + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + const lastToken = "2026-01-01T00:00:00.000000Z"; + const result = await provider.incrementalAcquire(lastToken); + + expect(result).toEqual({ + items: [], + removedIds: [], + stateToken: lastToken, + }); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("state token was null"), + ); + // Positive control: the state-token read MUST have fired. Without this a + // broken ESM-binding interception (so the real impl ran instead of the + // mock) would make the `.not.toHaveBeenCalled()` assertions below pass + // vacuously and hide the regression. + expect(stateTokenSpy).toHaveBeenCalled(); + // The empty-window acquire queries must never be issued. + expect(listSpy).not.toHaveBeenCalled(); + expect(removedSpy).not.toHaveBeenCalled(); + } finally { + stateTokenSpy.mockRestore(); + listSpy.mockRestore(); + removedSpy.mockRestore(); + warnSpy.mockRestore(); + } + }); + + it("fails loud on a non-empty but unparseable lastStateToken instead of binding garbage", async () => { + // A garbage checkpoint must never reach the `$N::timestamptz` bind. The + // guard throws a context-bearing error (source name + offending token) + // BEFORE any acquire query runs, so a corrupted checkpoint surfaces as a + // loud failure rather than an Invalid-Date coercion or a Postgres parse + // error with no source attribution. + await upsertAtlasSeedCandidate({ + canonicalKey: "present", + sourceName: "atlas", + title: "Present", + content: "Present content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("present", "reviewer"); + + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + await expect( + provider.incrementalAcquire("not-a-timestamp"), + ).rejects.toThrow(/lastStateToken is not a valid microsecond state token/); + await expect( + provider.incrementalAcquire("not-a-timestamp"), + ).rejects.toThrow(/"atlas"/); + }); + + it("fails loud on a JS-parseable but non-microsecond-format lastStateToken", async () => { + // Strings like "2026" or "Jan 5 2026" satisfy `new Date(...)` but bind into + // `$N::timestamptz` with different / locale-dependent semantics than the + // fixed-width microsecond token getAtlasStateToken emits. The guard must + // require the EXACT emitted shape so these loose values fail loud (with the + // source name + offending token) instead of silently binding a different + // instant. + await upsertAtlasSeedCandidate({ + canonicalKey: "present", + sourceName: "atlas", + title: "Present", + content: "Present content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("present", "reviewer"); + + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + await expect(provider.incrementalAcquire("2026")).rejects.toThrow( + /lastStateToken is not a valid microsecond state token/, + ); + await expect(provider.incrementalAcquire("2026")).rejects.toThrow( + /"atlas"/, + ); + await expect(provider.incrementalAcquire("Jan 5 2026")).rejects.toThrow( + /lastStateToken is not a valid microsecond state token/, + ); + }); + + it("fails loud on a corrupt lastStateToken even when the source is empty", async () => { + // When the current state token is null (source empty/unreadable) the method + // returns early. A corrupt checkpoint must STILL fail loud in that path, + // otherwise garbage silently passes through and re-persists on every run of + // an empty source. Validation runs BEFORE the null-token early return. + const stateTokenSpy = vi + .spyOn(atlasDb, "getAtlasStateToken") + .mockResolvedValue(null); + try { + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + await expect( + provider.incrementalAcquire("not-a-timestamp"), + ).rejects.toThrow( + /lastStateToken is not a valid microsecond state token/, + ); + await expect( + provider.incrementalAcquire("not-a-timestamp"), + ).rejects.toThrow(/"atlas"/); + } finally { + stateTokenSpy.mockRestore(); + } + }); + + it("treats an empty lastStateToken as a first-run lower bound (no changedAfter)", async () => { + // An empty checkpoint is the legitimate first-run signal: index everything + // up to the current high-water mark with no `changedAfter` lower bound. + await upsertAtlasSeedCandidate({ + canonicalKey: "first", + sourceName: "atlas", + title: "First", + content: "First content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("first", "reviewer"); + await db.query( + "UPDATE atlas_seed_entries SET updated_at = $2 WHERE canonical_key = $1", + ["first", new Date("2026-01-01T00:00:00Z")], + ); + + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + const result = await provider.incrementalAcquire(""); + expect(result.items.map((item) => item.id)).toEqual(["atlas-seed:first"]); + expect(result.stateToken).toBe("2026-01-01T00:00:00.000000Z"); + }); }); diff --git a/src/indexing/providers/atlas.ts b/src/indexing/providers/atlas.ts index ff5f838..8ce736a 100644 --- a/src/indexing/providers/atlas.ts +++ b/src/indexing/providers/atlas.ts @@ -24,10 +24,14 @@ export class AtlasDataProvider implements DataProvider { } async fullAcquire(): Promise { + // The state token is microsecond high-water TEXT (or the epoch fallback as + // microsecond text when the source is empty). It flows straight into the + // SQL bound as a `$N::timestamptz` text param — never wrapped in a JS Date, + // which would truncate the microseconds. const stateToken = - (await this.getCurrentStateToken()) ?? new Date(0).toISOString(); + (await this.getCurrentStateToken()) ?? "1970-01-01T00:00:00.000000Z"; const query = { - changedOnOrBefore: new Date(stateToken), + changedOnOrBefore: stateToken, repositories: this.repositoryFilters(), }; const [items, removedIds] = await Promise.all([ @@ -42,10 +46,39 @@ export class AtlasDataProvider implements DataProvider { } async incrementalAcquire(lastStateToken: string): Promise { - const stateToken = (await this.getCurrentStateToken()) ?? lastStateToken; + // Fail loud on a malformed checkpoint BEFORE any other branch. An + // empty/undefined lastStateToken is the legitimate first-run "from the + // beginning" signal (no `changedAfter` lower bound); anything else must be a + // real microsecond timestamp, or the `$N::timestamptz` bind would either + // throw deep in Postgres with no source context or — worse — silently + // coerce garbage. Validating here (not after the null-token early return + // below) ensures a corrupt checkpoint surfaces even when the source is empty + // — otherwise garbage would silently pass through and re-persist on every + // run of an empty source. + const changedAfter = this.parseLowerBound(lastStateToken); + const currentStateToken = await this.getCurrentStateToken(); + // A null current token means the high-water read found no rows (source + // empty or unreadable). Falling back to lastStateToken would build the + // window `changedAfter: T AND changedOnOrBefore: T` (i.e. `> T AND <= T`), + // which matches nothing — a silent no-op that masks the case where the + // state-token query failed to see rows it should have. Skip the pass + // LOUDLY instead of issuing a guaranteed-empty query, and keep the caller's + // (now-validated) checkpoint unchanged so the next run retries from the same + // point. + if (currentStateToken === null) { + console.warn( + `[atlas] Skipping incremental acquire for source "${this.config.name}": ` + + `the current state token was null (source empty or unreadable). ` + + `Carrying lastStateToken forward without running an empty window.`, + ); + return { items: [], removedIds: [], stateToken: lastStateToken }; + } + // currentStateToken is proven non-null by the early return above — bind the + // raw microsecond text directly (no dead `? ... : undefined` ternary, no + // `new Date()` wrap that would truncate the microseconds). const query = { - changedAfter: lastStateToken ? new Date(lastStateToken) : undefined, - changedOnOrBefore: stateToken ? new Date(stateToken) : undefined, + changedAfter, + changedOnOrBefore: currentStateToken, repositories: this.repositoryFilters(), }; const [items, removedIds] = await Promise.all([ @@ -55,10 +88,39 @@ export class AtlasDataProvider implements DataProvider { return { items, removedIds, - stateToken, + stateToken: currentStateToken, }; } + // The exact fixed-width microsecond shape getAtlasStateToken emits via + // `to_char(... 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"')` — 6 fractional digits and a + // trailing Z. A bare `new Date(...)` probe is far looser than Postgres + // `::timestamptz`: "2026" or "Jan 5 2026" parse in JS but bind with + // different / locale-dependent semantics, defeating the fail-loud intent. We + // require this precise token so anything that did NOT come from our own + // state-token writer fails loud here instead of silently binding a different + // instant. + private static readonly STATE_TOKEN_PATTERN = + /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}Z$/; + + // Validate the persisted checkpoint before it reaches the SQL bind. Empty or + // undefined means "first run, no lower bound"; any non-empty value must be the + // exact microsecond state-token shape. We keep the raw microsecond text (the + // regex is only a validity gate — we never reformat it) so the `> $token` + // bound runs at full precision. + private parseLowerBound(lastStateToken: string): string | undefined { + if (!lastStateToken) return undefined; + if (!AtlasDataProvider.STATE_TOKEN_PATTERN.test(lastStateToken)) { + throw new Error( + `[atlas] Refusing incremental acquire for source "${this.config.name}": ` + + `lastStateToken is not a valid microsecond state token ` + + `(expected YYYY-MM-DDTHH:MM:SS.ffffffZ): ` + + `${JSON.stringify(lastStateToken)}`, + ); + } + return lastStateToken; + } + async getCurrentStateToken(): Promise { return getAtlasStateToken(this.config.name, { repositories: this.repositoryFilters(), @@ -66,8 +128,8 @@ export class AtlasDataProvider implements DataProvider { } private async acquireItems(query: { - changedAfter?: Date; - changedOnOrBefore?: Date; + changedAfter?: string; + changedOnOrBefore?: string; repositories?: AtlasRepositoryFilter[]; }): Promise { const entries = await listIndexableAtlasContent(this.config.name, query);