Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/__tests__/atlas-db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,56 @@ describe("Atlas DB helpers", () => {
expect(items.map((item) => item.key)).toEqual(["included"]);
});

it("does not strand rows whose updated_at has sub-millisecond precision", async () => {
// State tokens round-trip through JS Dates (millisecond precision) while
// updated_at is a Postgres timestamptz (microsecond precision). A row
// updated at ...28.786830 produces the token ...28.786Z; the incremental
// bound `updated_at <= token` must still include that row, and the next
// run's `updated_at > token` must not re-emit it forever (endless
// re-embedding).
await upsertAtlasSeedCandidate({
canonicalKey: "micro",
sourceName: "atlas",
title: "Micro",
content: "Micro content",
provenance: {},
evidence: [],
});
await approveAtlasSeedEntry("micro", "reviewer");
await db.query(
"UPDATE atlas_seed_entries SET updated_at = '2026-06-11T01:46:28.786830+00'::timestamptz WHERE canonical_key = $1",
["micro"],
);

const token = await getAtlasStateToken("atlas");
expect(token).toBe("2026-06-11T01:46:28.786Z");

// Mirror of the incremental acquire bounds: previous token < new token.
const items = await listIndexableAtlasContent("atlas", {
changedAfter: new Date("2026-06-11T01:00:00.000Z"),
changedOnOrBefore: new Date(token!),
});

expect(items.map((item) => item.key)).toEqual(["micro"]);

// Second incremental cycle: bounds collapse to (token, token]. The row is
// already indexed and must NOT re-emit — an un-truncated lower bound
// (updated_at > token) would re-embed it on every incremental run forever.
const second = await listIndexableAtlasContent("atlas", {
changedAfter: new Date(token!),
changedOnOrBefore: new Date(token!),
});
expect(second).toEqual([]);

// Recovery for already-stranded prod rows: clearing last_commit_sha forces
// fullAcquire, which bounds ONLY with changedOnOrBefore — must include the
// row.
const recovered = await listIndexableAtlasContent("atlas", {
changedOnOrBefore: new Date(token!),
});
expect(recovered.map((item) => item.key)).toEqual(["micro"]);
});

it("surfaces stale cache pages as removals and includes them in state tokens", async () => {
await upsertAtlasCachePage({
pageKey: "fresh",
Expand Down Expand Up @@ -516,4 +566,32 @@ describe("Atlas row-mapper robustness", () => {
expect(result).toBeInstanceOf(Date);
expect(result?.toISOString()).toBe(iso);
});

it("resolveAtlasStateToken throws on an unparseable non-null MAX instead of silently shrinking the window", () => {
expect(() => __testing.resolveAtlasStateToken(["garbage"])).toThrowError(
/getAtlasStateToken: unparseable MAX\(updated_at\): "garbage"/,
);
});

it("resolveAtlasStateToken throws on a mixed null-plus-unparseable shape (one empty table, one corrupt MAX)", () => {
expect(() =>
__testing.resolveAtlasStateToken([null, "garbage"]),
).toThrowError(
/getAtlasStateToken: unparseable MAX\(updated_at\): "garbage"/,
);
});

it("resolveAtlasStateToken returns null when every table MAX is null (empty tables)", () => {
expect(__testing.resolveAtlasStateToken([null, null])).toBeNull();
expect(__testing.resolveAtlasStateToken([])).toBeNull();
});

it("resolveAtlasStateToken returns the max of the per-table MAXes as a ms-precision ISO string", () => {
expect(
__testing.resolveAtlasStateToken([
new Date("2026-01-01T00:00:00.123Z"),
"2026-01-02T00:00:00.456Z",
]),
).toBe("2026-01-02T00:00:00.456Z");
});
});
70 changes: 56 additions & 14 deletions src/db/atlas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,31 @@ function addUpdatedAtClauses(
params: unknown[],
): string[] {
const clauses: string[] = [];
// State tokens travel through JS Dates (millisecond precision) while
// updated_at is a Postgres timestamptz (microsecond precision). Compare at
// millisecond precision on BOTH bounds, otherwise the row whose updated_at
// produced the token (e.g. ...28.786830 → token ...28.786Z) falls in the
// sub-millisecond gap: it fails `updated_at <= token` in the run that
// generated the token, and every later run bounds with the same token
// (`> token AND <= token`), stranding the row forever un-indexed (until
// its updated_at next changes).
// Truncating the LOWER bound matters independently: an un-truncated
// `updated_at > token` would match the boundary row (...28.786830 > ...28.786)
// on every incremental run, re-indexing (and re-embedding) it forever.
// Residual: a write landing in the token's millisecond after the items query
// is not picked up by the strict lower bound — accepted sliver, see the
// µs-faithful-token follow-up.
if (query.changedAfter) {
params.push(query.changedAfter);
clauses.push(`${alias}.updated_at > $${params.length}`);
clauses.push(
`date_trunc('milliseconds', ${alias}.updated_at) > $${params.length}`,
);
}
if (query.changedOnOrBefore) {
params.push(query.changedOnOrBefore);
clauses.push(`${alias}.updated_at <= $${params.length}`);
clauses.push(
`date_trunc('milliseconds', ${alias}.updated_at) <= $${params.length}`,
);
}
return clauses;
}
Expand Down Expand Up @@ -752,6 +770,33 @@ export async function listRemovedAtlasContentIds(
];
}

// Resolves the raw MAX(updated_at) values from the per-table state-token
// queries into a single token. Nulls (empty tables) are skipped; an
// unparseable non-null MAX is a hard error — silently dropping it would
// shrink the incremental window and no-op the run while reporting success,
// the same silent-stranding failure class addUpdatedAtClauses guards against.
function resolveAtlasStateToken(raw: unknown[]): string | null {
const values: Date[] = [];
for (const value of raw) {
if (value == null) continue;
const date = value instanceof Date ? value : new Date(value as string);
if (isNaN(date.getTime())) {
throw new Error(
`getAtlasStateToken: unparseable MAX(updated_at): ${JSON.stringify(value)}`,
);
}
values.push(date);
}
if (values.length === 0) return null;
return new Date(
Math.max(...values.map((value) => value.getTime())),
).toISOString();
}

// Note: the returned token is implicitly ms-truncated by the Date round-trip
// in resolveAtlasStateToken (updated_at carries microseconds, JS Dates keep
// milliseconds); the query bounds compare ms-truncated updated_at to match
// (see addUpdatedAtClauses).
export async function getAtlasStateToken(
sourceName: string,
query: Pick<AtlasContentQuery, "repositories"> = {},
Expand Down Expand Up @@ -796,24 +841,21 @@ export async function getAtlasStateToken(
),
]);

const values = [
return resolveAtlasStateToken([
seedResult.rows[0]?.state_token,
cacheResult.rows[0]?.state_token,
]
.map((value) => toDate(value, "atlas state token"))
.filter((value): value is Date => value !== null);
if (values.length === 0) return null;
return new Date(
Math.max(...values.map((value) => value.getTime())),
).toISOString();
]);
}

// Test-only exports of the otherwise-private row mappers and timestamp parser.
// These are pure functions; exporting them lets us unit-test the robustness
// paths (malformed JSON → context-bearing error, invalid timestamp → null)
// directly without contriving a backing store that can hold malformed columns.
// Test-only exports of the otherwise-private row mappers, timestamp parser,
// and state-token resolver. These are pure functions; exporting them lets us
// unit-test the robustness paths (malformed JSON → context-bearing error,
// invalid timestamp → null in toDate, unparseable non-null MAX → throw with
// context and all-null/empty input → null in resolveAtlasStateToken) directly
// without contriving a backing store that can hold malformed columns.
export const __testing = {
mapSeedRow,
mapCacheRow,
toDate,
resolveAtlasStateToken,
};