diff --git a/src/__tests__/atlas-db.test.ts b/src/__tests__/atlas-db.test.ts index dda0e1a..7c2e548 100644 --- a/src/__tests__/atlas-db.test.ts +++ b/src/__tests__/atlas-db.test.ts @@ -365,7 +365,7 @@ describe("Atlas DB helpers", () => { ); const items = await listIndexableAtlasContent("atlas", { - changedOnOrBefore: new Date("2026-01-01T12:00:00Z"), + changedOnOrBefore: "2026-01-01T12:00:00.000000Z", }); expect(items.map((item) => item.key)).toEqual(["included"]); @@ -396,10 +396,12 @@ describe("Atlas DB helpers", () => { ["stale", new Date("2026-01-02T00:00:00Z")], ); - expect(await getAtlasStateToken("atlas")).toBe("2026-01-02T00:00:00.000Z"); + expect(await getAtlasStateToken("atlas")).toBe( + "2026-01-02T00:00:00.000000Z", + ); expect( await listRemovedAtlasContentIds("atlas", { - changedAfter: new Date("2026-01-01T12:00:00Z"), + changedAfter: "2026-01-01T12:00:00.000000Z", }), ).toEqual(["atlas-cache:stale"]); }); @@ -517,3 +519,148 @@ describe("Atlas row-mapper robustness", () => { expect(result?.toISOString()).toBe(iso); }); }); + +describe("Atlas state-token microsecond precision (no ceil)", () => { + // The high-water mark comes from a TIMESTAMPTZ (microsecond precision). We + // return it as raw microsecond text and the acquire queries bind it as a + // `$N::timestamptz` text param, so the bounds compare at full microsecond + // precision. There is no millisecond ceil and no JS Date in the bind path, + // so a row whose true updated_at carries sub-millisecond digits (e.g. + // .123456) is included EXACTLY by `<= token` and excluded EXACTLY by the next + // run's `> token` — no drop, no double-fetch. + let db: PGlite; + + beforeAll(async () => { + db = new PGlite(); + await db.waitReady; + await db.exec(extractAtlasDdl()); + __setPoolForTesting(poolFromPglite(db)); + }); + + afterAll(async () => { + __resetPoolForTesting(); + await db.close(); + }); + + beforeEach(async () => { + await db.query("DELETE FROM atlas_cache_pages"); + await db.query("DELETE FROM atlas_seed_entries"); + }); + + // Bind-path proof — this is the DB-independent guarantee that microseconds + // survive the SQL bind. It spies on the pool to capture the params handed to + // the driver and asserts the microsecond TEXT token (not a truncating JS + // Date) reaches the `$N::timestamptz` cast in the generated SQL. + it("binds changedAfter/changedOnOrBefore as ::timestamptz TEXT params, not Date objects", async () => { + const microToken = "2026-01-01T00:00:00.123456Z"; + const calls: { text: string; params: unknown[] }[] = []; + __setPoolForTesting({ + query: (text: string, params?: unknown[]) => { + calls.push({ text, params: params ?? [] }); + return Promise.resolve({ rows: [] }); + }, + connect: async () => ({ + query: () => Promise.resolve({ rows: [] }), + release: () => {}, + }), + end: async () => {}, + }); + try { + await listIndexableAtlasContent("atlas", { + changedAfter: microToken, + changedOnOrBefore: microToken, + }); + } finally { + // Restore the PGlite-backed pool for the rest of the suite. + __setPoolForTesting(poolFromPglite(db)); + } + + // Every emitted bound must be a ::timestamptz cast and the bound param must + // be the raw microsecond text — never a Date that would truncate to ms. + const boundCalls = calls.filter((c) => /updated_at\s*[<>]/.test(c.text)); + expect(boundCalls.length).toBeGreaterThan(0); + for (const call of boundCalls) { + expect(call.text).toContain("::timestamptz"); + expect(call.text).toMatch(/updated_at > \$\d+::timestamptz/); + expect(call.text).toMatch(/updated_at <= \$\d+::timestamptz/); + for (const param of call.params) { + expect(param).not.toBeInstanceOf(Date); + } + expect(call.params).toContain(microToken); + } + }); + + it("returns the raw microsecond high-water text as the state token", async () => { + // Insert a sub-millisecond timestamp via a SQL literal (a JS Date insert + // would truncate to ms before it ever reaches the column). + await upsertAtlasSeedCandidate({ + canonicalKey: "micro", + sourceName: "atlas", + title: "Micro", + content: "Micro content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("micro", "reviewer"); + await db.query( + "UPDATE atlas_seed_entries SET updated_at = '2026-01-01T00:00:00.123456Z' WHERE canonical_key = $1", + ["micro"], + ); + + expect(await getAtlasStateToken("atlas")).toBe( + "2026-01-01T00:00:00.123456Z", + ); + }); + + it("includes the high-water row in run 1 and neither drops nor re-fetches it across run 2", async () => { + // Run 1: a row sits exactly at the high-water mark with sub-ms digits. + await upsertAtlasSeedCandidate({ + canonicalKey: "boundary", + sourceName: "atlas", + title: "Boundary", + content: "Boundary content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("boundary", "reviewer"); + await db.query( + "UPDATE atlas_seed_entries SET updated_at = '2026-01-01T00:00:00.123456Z' WHERE canonical_key = $1", + ["boundary"], + ); + + const token1 = await getAtlasStateToken("atlas"); + expect(token1).toBe("2026-01-01T00:00:00.123456Z"); + + // Run 1 window `<= token1` must INCLUDE the boundary row exactly. + const run1 = await listIndexableAtlasContent("atlas", { + changedOnOrBefore: token1!, + }); + expect(run1.map((i) => i.key)).toEqual(["boundary"]); + + // A new row lands one microsecond after the run-1 high-water mark. + await upsertAtlasSeedCandidate({ + canonicalKey: "after", + sourceName: "atlas", + title: "After", + content: "After content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("after", "reviewer"); + await db.query( + "UPDATE atlas_seed_entries SET updated_at = '2026-01-01T00:00:00.123457Z' WHERE canonical_key = $1", + ["after"], + ); + + const token2 = await getAtlasStateToken("atlas"); + expect(token2).toBe("2026-01-01T00:00:00.123457Z"); + + // Run 2 window `> token1 AND <= token2` must EXCLUDE the boundary row (no + // double-fetch) and INCLUDE only the strictly-later row (no drop). + const run2 = await listIndexableAtlasContent("atlas", { + changedAfter: token1!, + changedOnOrBefore: token2!, + }); + expect(run2.map((i) => i.key)).toEqual(["after"]); + }); +}); diff --git a/src/__tests__/atlas-provider.test.ts b/src/__tests__/atlas-provider.test.ts index c2f1f17..8548992 100644 --- a/src/__tests__/atlas-provider.test.ts +++ b/src/__tests__/atlas-provider.test.ts @@ -179,7 +179,7 @@ describe("AtlasDataProvider", () => { const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); const stateToken = await provider.getCurrentStateToken(); - expect(stateToken).toBe("2026-01-01T00:00:00.000Z"); + expect(stateToken).toBe("2026-01-01T00:00:00.000000Z"); await upsertAtlasSeedCandidate({ canonicalKey: "new", @@ -305,14 +305,14 @@ describe("AtlasDataProvider", () => { const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); const result = await provider.incrementalAcquire( - "2025-12-31T00:00:00.000Z", + "2025-12-31T00:00:00.000000Z", ); expect(result.items.map((item) => item.id)).toEqual([ "atlas-seed:included", "atlas-seed:future", ]); - expect(result.stateToken).toBe("2026-01-02T00:00:00.000Z"); + expect(result.stateToken).toBe("2026-01-02T00:00:00.000000Z"); }); it("bounds incremental acquisition to the token captured before listing rows", async () => { @@ -331,8 +331,8 @@ describe("AtlasDataProvider", () => { ); const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); - const capturedToken = "2026-01-01T00:00:00.000Z"; - const lateToken = "2026-01-02T00:00:00.000Z"; + const capturedToken = "2026-01-01T00:00:00.000000Z"; + const lateToken = "2026-01-02T00:00:00.000000Z"; const stateTokenSpy = vi .spyOn(atlasDb, "getAtlasStateToken") .mockImplementation(async () => { @@ -354,7 +354,7 @@ describe("AtlasDataProvider", () => { try { const result = await provider.incrementalAcquire( - "2025-12-31T00:00:00.000Z", + "2025-12-31T00:00:00.000000Z", ); expect(stateTokenSpy).toHaveBeenCalledTimes(1); @@ -388,7 +388,7 @@ describe("AtlasDataProvider", () => { const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); const stateToken = await provider.getCurrentStateToken(); - expect(stateToken).toBe("2026-01-01T00:00:00.000Z"); + expect(stateToken).toBe("2026-01-01T00:00:00.000000Z"); await markAtlasCachePageStale("runtime/overview", "seed changed"); await db.query( @@ -396,12 +396,14 @@ describe("AtlasDataProvider", () => { ["runtime/overview", new Date("2026-01-02T00:00:00Z")], ); - expect(await getAtlasStateToken("atlas")).toBe("2026-01-02T00:00:00.000Z"); + expect(await getAtlasStateToken("atlas")).toBe( + "2026-01-02T00:00:00.000000Z", + ); const result = await provider.incrementalAcquire(stateToken ?? ""); expect(result.items).toEqual([]); expect(result.removedIds).toEqual(["atlas-cache:runtime/overview"]); - expect(result.stateToken).toBe("2026-01-02T00:00:00.000Z"); + expect(result.stateToken).toBe("2026-01-02T00:00:00.000000Z"); }); it("incrementally removes rejected seeds and empty cache pages", async () => { @@ -433,7 +435,7 @@ describe("AtlasDataProvider", () => { const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); const result = await provider.incrementalAcquire( - "2026-01-01T00:00:00.000Z", + "2026-01-01T00:00:00.000000Z", ); expect(result.items).toEqual([]); @@ -441,7 +443,7 @@ describe("AtlasDataProvider", () => { "atlas-seed:seed-to-reject", "atlas-cache:runtime/empty", ]); - expect(result.stateToken).toBe("2026-01-03T00:00:00.000Z"); + expect(result.stateToken).toBe("2026-01-03T00:00:00.000000Z"); }); it("provider registry resolves type atlas", () => { @@ -449,4 +451,146 @@ describe("AtlasDataProvider", () => { const provider = factory(atlasConfig, { cloneDir: "/tmp" }); expect(provider).toBeInstanceOf(AtlasDataProvider); }); + + it("skips loudly when the current state token is null instead of running an empty window", async () => { + // A null current token means the high-water read saw no rows (source empty + // or unreadable). Carrying lastStateToken forward would build the window + // `> T AND <= T`, which matches nothing — a silent no-op that masks a + // possibly-failed high-water read. The pass must be skipped LOUDLY and the + // guaranteed-empty acquire queries must NOT run. + const stateTokenSpy = vi + .spyOn(atlasDb, "getAtlasStateToken") + .mockResolvedValue(null); + const listSpy = vi.spyOn(atlasDb, "listIndexableAtlasContent"); + const removedSpy = vi.spyOn(atlasDb, "listRemovedAtlasContentIds"); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + try { + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + const lastToken = "2026-01-01T00:00:00.000000Z"; + const result = await provider.incrementalAcquire(lastToken); + + expect(result).toEqual({ + items: [], + removedIds: [], + stateToken: lastToken, + }); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("state token was null"), + ); + // Positive control: the state-token read MUST have fired. Without this a + // broken ESM-binding interception (so the real impl ran instead of the + // mock) would make the `.not.toHaveBeenCalled()` assertions below pass + // vacuously and hide the regression. + expect(stateTokenSpy).toHaveBeenCalled(); + // The empty-window acquire queries must never be issued. + expect(listSpy).not.toHaveBeenCalled(); + expect(removedSpy).not.toHaveBeenCalled(); + } finally { + stateTokenSpy.mockRestore(); + listSpy.mockRestore(); + removedSpy.mockRestore(); + warnSpy.mockRestore(); + } + }); + + it("fails loud on a non-empty but unparseable lastStateToken instead of binding garbage", async () => { + // A garbage checkpoint must never reach the `$N::timestamptz` bind. The + // guard throws a context-bearing error (source name + offending token) + // BEFORE any acquire query runs, so a corrupted checkpoint surfaces as a + // loud failure rather than an Invalid-Date coercion or a Postgres parse + // error with no source attribution. + await upsertAtlasSeedCandidate({ + canonicalKey: "present", + sourceName: "atlas", + title: "Present", + content: "Present content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("present", "reviewer"); + + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + await expect( + provider.incrementalAcquire("not-a-timestamp"), + ).rejects.toThrow(/lastStateToken is not a valid microsecond state token/); + await expect( + provider.incrementalAcquire("not-a-timestamp"), + ).rejects.toThrow(/"atlas"/); + }); + + it("fails loud on a JS-parseable but non-microsecond-format lastStateToken", async () => { + // Strings like "2026" or "Jan 5 2026" satisfy `new Date(...)` but bind into + // `$N::timestamptz` with different / locale-dependent semantics than the + // fixed-width microsecond token getAtlasStateToken emits. The guard must + // require the EXACT emitted shape so these loose values fail loud (with the + // source name + offending token) instead of silently binding a different + // instant. + await upsertAtlasSeedCandidate({ + canonicalKey: "present", + sourceName: "atlas", + title: "Present", + content: "Present content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("present", "reviewer"); + + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + await expect(provider.incrementalAcquire("2026")).rejects.toThrow( + /lastStateToken is not a valid microsecond state token/, + ); + await expect(provider.incrementalAcquire("2026")).rejects.toThrow( + /"atlas"/, + ); + await expect(provider.incrementalAcquire("Jan 5 2026")).rejects.toThrow( + /lastStateToken is not a valid microsecond state token/, + ); + }); + + it("fails loud on a corrupt lastStateToken even when the source is empty", async () => { + // When the current state token is null (source empty/unreadable) the method + // returns early. A corrupt checkpoint must STILL fail loud in that path, + // otherwise garbage silently passes through and re-persists on every run of + // an empty source. Validation runs BEFORE the null-token early return. + const stateTokenSpy = vi + .spyOn(atlasDb, "getAtlasStateToken") + .mockResolvedValue(null); + try { + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + await expect( + provider.incrementalAcquire("not-a-timestamp"), + ).rejects.toThrow( + /lastStateToken is not a valid microsecond state token/, + ); + await expect( + provider.incrementalAcquire("not-a-timestamp"), + ).rejects.toThrow(/"atlas"/); + } finally { + stateTokenSpy.mockRestore(); + } + }); + + it("treats an empty lastStateToken as a first-run lower bound (no changedAfter)", async () => { + // An empty checkpoint is the legitimate first-run signal: index everything + // up to the current high-water mark with no `changedAfter` lower bound. + await upsertAtlasSeedCandidate({ + canonicalKey: "first", + sourceName: "atlas", + title: "First", + content: "First content", + provenance: {}, + evidence: [], + }); + await approveAtlasSeedEntry("first", "reviewer"); + await db.query( + "UPDATE atlas_seed_entries SET updated_at = $2 WHERE canonical_key = $1", + ["first", new Date("2026-01-01T00:00:00Z")], + ); + + const provider = new AtlasDataProvider(atlasConfig, { cloneDir: "/tmp" }); + const result = await provider.incrementalAcquire(""); + expect(result.items.map((item) => item.id)).toEqual(["atlas-seed:first"]); + expect(result.stateToken).toBe("2026-01-01T00:00:00.000000Z"); + }); }); diff --git a/src/db/atlas.ts b/src/db/atlas.ts index d803bfa..dd0389f 100644 --- a/src/db/atlas.ts +++ b/src/db/atlas.ts @@ -95,8 +95,14 @@ export interface AtlasRepositoryFilter { } export interface AtlasContentQuery { - changedAfter?: Date; - changedOnOrBefore?: Date; + // Microsecond-precision high-water TEXT (the `to_char(... 'US')` value of a + // row's `updated_at`, e.g. "2026-01-01T00:00:00.123456Z"). These bounds are + // bound into SQL as `$N::timestamptz` text params — NOT as JS Dates — so the + // sub-millisecond digits survive the round trip and `updated_at` is compared + // at full microsecond precision. A JS Date would truncate to milliseconds and + // either re-fetch (`<=`) or permanently drop (`>`) the boundary row. + changedAfter?: string; + changedOnOrBefore?: string; repositories?: AtlasRepositoryFilter[]; } @@ -237,13 +243,18 @@ function addUpdatedAtClauses( params: unknown[], ): string[] { const clauses: string[] = []; + // Bind the microsecond token as a TEXT param cast to timestamptz in SQL + // (`$N::timestamptz`) rather than a JS Date. Postgres parses the full + // microsecond text, so `updated_at > $token::timestamptz` excludes only the + // exact high-water row (no double-fetch next run) and `<= $token::timestamptz` + // includes it exactly — both at full precision, with no rounding. if (query.changedAfter) { params.push(query.changedAfter); - clauses.push(`${alias}.updated_at > $${params.length}`); + clauses.push(`${alias}.updated_at > $${params.length}::timestamptz`); } if (query.changedOnOrBefore) { params.push(query.changedOnOrBefore); - clauses.push(`${alias}.updated_at <= $${params.length}`); + clauses.push(`${alias}.updated_at <= $${params.length}::timestamptz`); } return clauses; } @@ -777,10 +788,18 @@ export async function getAtlasStateToken( ); if (cacheRepositoryClause) cacheClauses.push(cacheRepositoryClause); + // Select the high-water mark as microsecond TEXT (`to_char(... 'US')`) rather + // than letting the driver coerce it to a millisecond JS Date. The raw text + // preserves the sub-millisecond digits, and we hand it back verbatim as the + // state token: the acquire queries bind it as a `$N::timestamptz` text param, + // so `updated_at <= $token` includes the high-water row exactly and the next + // run's `updated_at > $token` excludes only that exact row — no rounding, no + // drop, no double-fetch. + const TOKEN_FORMAT = `'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'`; const [seedResult, cacheResult] = await Promise.all([ pool.query( ` - SELECT MAX(seed.updated_at) AS state_token + SELECT to_char(MAX(seed.updated_at) AT TIME ZONE 'UTC', ${TOKEN_FORMAT}) AS state_token FROM atlas_seed_entries seed WHERE ${seedClauses.join(" AND ")} `, @@ -788,7 +807,7 @@ export async function getAtlasStateToken( ), pool.query( ` - SELECT MAX(cache.updated_at) AS state_token + SELECT to_char(MAX(cache.updated_at) AT TIME ZONE 'UTC', ${TOKEN_FORMAT}) AS state_token FROM atlas_cache_pages cache WHERE ${cacheClauses.join(" AND ")} `, @@ -796,16 +815,16 @@ export async function getAtlasStateToken( ), ]); - const values = [ + const texts = [ seedResult.rows[0]?.state_token, cacheResult.rows[0]?.state_token, - ] - .map((value) => toDate(value, "atlas state token")) - .filter((value): value is Date => value !== null); - if (values.length === 0) return null; - return new Date( - Math.max(...values.map((value) => value.getTime())), - ).toISOString(); + ].filter((value): value is string => typeof value === "string"); + if (texts.length === 0) return null; + // Pick the larger of the two source high-water marks at full text precision + // and return it verbatim. The fixed-width `YYYY-MM-DDTHH:MM:SS.ffffffZ` text + // sorts lexicographically == chronologically, so string comparison preserves + // sub-millisecond ordering the driver's millisecond Date would have flattened. + return texts.reduce((a, b) => (a >= b ? a : b)); } // Test-only exports of the otherwise-private row mappers and timestamp parser. diff --git a/src/indexing/providers/atlas.ts b/src/indexing/providers/atlas.ts index ff5f838..8ce736a 100644 --- a/src/indexing/providers/atlas.ts +++ b/src/indexing/providers/atlas.ts @@ -24,10 +24,14 @@ export class AtlasDataProvider implements DataProvider { } async fullAcquire(): Promise { + // The state token is microsecond high-water TEXT (or the epoch fallback as + // microsecond text when the source is empty). It flows straight into the + // SQL bound as a `$N::timestamptz` text param — never wrapped in a JS Date, + // which would truncate the microseconds. const stateToken = - (await this.getCurrentStateToken()) ?? new Date(0).toISOString(); + (await this.getCurrentStateToken()) ?? "1970-01-01T00:00:00.000000Z"; const query = { - changedOnOrBefore: new Date(stateToken), + changedOnOrBefore: stateToken, repositories: this.repositoryFilters(), }; const [items, removedIds] = await Promise.all([ @@ -42,10 +46,39 @@ export class AtlasDataProvider implements DataProvider { } async incrementalAcquire(lastStateToken: string): Promise { - const stateToken = (await this.getCurrentStateToken()) ?? lastStateToken; + // Fail loud on a malformed checkpoint BEFORE any other branch. An + // empty/undefined lastStateToken is the legitimate first-run "from the + // beginning" signal (no `changedAfter` lower bound); anything else must be a + // real microsecond timestamp, or the `$N::timestamptz` bind would either + // throw deep in Postgres with no source context or — worse — silently + // coerce garbage. Validating here (not after the null-token early return + // below) ensures a corrupt checkpoint surfaces even when the source is empty + // — otherwise garbage would silently pass through and re-persist on every + // run of an empty source. + const changedAfter = this.parseLowerBound(lastStateToken); + const currentStateToken = await this.getCurrentStateToken(); + // A null current token means the high-water read found no rows (source + // empty or unreadable). Falling back to lastStateToken would build the + // window `changedAfter: T AND changedOnOrBefore: T` (i.e. `> T AND <= T`), + // which matches nothing — a silent no-op that masks the case where the + // state-token query failed to see rows it should have. Skip the pass + // LOUDLY instead of issuing a guaranteed-empty query, and keep the caller's + // (now-validated) checkpoint unchanged so the next run retries from the same + // point. + if (currentStateToken === null) { + console.warn( + `[atlas] Skipping incremental acquire for source "${this.config.name}": ` + + `the current state token was null (source empty or unreadable). ` + + `Carrying lastStateToken forward without running an empty window.`, + ); + return { items: [], removedIds: [], stateToken: lastStateToken }; + } + // currentStateToken is proven non-null by the early return above — bind the + // raw microsecond text directly (no dead `? ... : undefined` ternary, no + // `new Date()` wrap that would truncate the microseconds). const query = { - changedAfter: lastStateToken ? new Date(lastStateToken) : undefined, - changedOnOrBefore: stateToken ? new Date(stateToken) : undefined, + changedAfter, + changedOnOrBefore: currentStateToken, repositories: this.repositoryFilters(), }; const [items, removedIds] = await Promise.all([ @@ -55,10 +88,39 @@ export class AtlasDataProvider implements DataProvider { return { items, removedIds, - stateToken, + stateToken: currentStateToken, }; } + // The exact fixed-width microsecond shape getAtlasStateToken emits via + // `to_char(... 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"')` — 6 fractional digits and a + // trailing Z. A bare `new Date(...)` probe is far looser than Postgres + // `::timestamptz`: "2026" or "Jan 5 2026" parse in JS but bind with + // different / locale-dependent semantics, defeating the fail-loud intent. We + // require this precise token so anything that did NOT come from our own + // state-token writer fails loud here instead of silently binding a different + // instant. + private static readonly STATE_TOKEN_PATTERN = + /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}Z$/; + + // Validate the persisted checkpoint before it reaches the SQL bind. Empty or + // undefined means "first run, no lower bound"; any non-empty value must be the + // exact microsecond state-token shape. We keep the raw microsecond text (the + // regex is only a validity gate — we never reformat it) so the `> $token` + // bound runs at full precision. + private parseLowerBound(lastStateToken: string): string | undefined { + if (!lastStateToken) return undefined; + if (!AtlasDataProvider.STATE_TOKEN_PATTERN.test(lastStateToken)) { + throw new Error( + `[atlas] Refusing incremental acquire for source "${this.config.name}": ` + + `lastStateToken is not a valid microsecond state token ` + + `(expected YYYY-MM-DDTHH:MM:SS.ffffffZ): ` + + `${JSON.stringify(lastStateToken)}`, + ); + } + return lastStateToken; + } + async getCurrentStateToken(): Promise { return getAtlasStateToken(this.config.name, { repositories: this.repositoryFilters(), @@ -66,8 +128,8 @@ export class AtlasDataProvider implements DataProvider { } private async acquireItems(query: { - changedAfter?: Date; - changedOnOrBefore?: Date; + changedAfter?: string; + changedOnOrBefore?: string; repositories?: AtlasRepositoryFilter[]; }): Promise { const entries = await listIndexableAtlasContent(this.config.name, query);