From ce2dd8dfc755d5f807a30d7fb51360ee41b4f3e1 Mon Sep 17 00:00:00 2001 From: mike-inkeep Date: Wed, 3 Jun 2026 22:54:03 -0400 Subject: [PATCH] =?UTF-8?q?fix(open-knowledge):=20MCP=20writes=20reflect?= =?UTF-8?q?=20on-disk=20truth=20=E2=80=94=20reconcile=20out-of-band=20edit?= =?UTF-8?q?s=20(PRD-6832)=20(#1591)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(open-knowledge): MCP writes reflect on-disk truth — reconcile out-of-band edits (PRD-6832) PRD-6832 β: a stale loaded CRDT could silently clobber a newer out-of-band disk edit on the next MCP write, making disk itself stale (the "silent stale read"). This makes disk authoritative on the write/reconcile path. - L1 reconcile-before-apply (write/edit/frontmatter + the rename spine): ingest a divergent disk edit via applyExternalChange before the agent edit lands; both survive. A doc that's mid-conflict is never reconciled (the refusal owns it). - L3 store-time backstop: on the residual TOCTOU, abort the overwrite (disk wins) and return urn:ok:error:disk-divergence (409); retry re-applies once. Gated to agent-write stores via a pre-executeNow marker — Hocuspocus passes a null transaction origin for agent DirectConnection writes, so origin can't gate. - FR3: success-path disk-edit-reconciled warning (structuredContent.contentDivergence + text nudge) so the agent re-reads the combined result. Rename serializes the loaded CRDT and bypasses storeDocumentNow, so it needed its own spine reconcile (L3 can't guard it). FR5 (replace recovery checkpoint) deferred; ContentDivergenceWarningSchema is a stop-gap for the unmerged #1270. Spec: specs/2026-05-30-ok-mcp-read-reflects-disk/. * test(open-knowledge): give rollback-fault test CI-git headroom (pollUntil 25s + 60s timeout) The version-rollback fault test waits on two shadow-repo checkpoint commits. Shadow-repo git commits are slow on loaded CI runners — the pollUntil(12000) budget and the inherited bun 5s per-test default are both too tight there (the checkpoints land fine in isolation). Size the pollUntil budgets to 25s each and the per-test timeout to 60s; pollUntil returns as soon as a checkpoint lands, so the fast path is unchanged — this only buys headroom for slow-git CI runs. * fix(open-knowledge): wire L1 reconcile into legacy agent-write; note no-op/L3 residual (PR review) Addresses pullfrog review on #1591: - agent-write (legacy /api/agent-write content handler) had the L3 backstop + the AgentWriteSuccessSchema warning field but no L1 reconcile, so the common case (disk diverged before the call) clobbered. Wire reconcileDiskBeforeAgentWrite in for symmetry with the other content handlers; the warning is now producible. - Document the no-op-skip-precedes-L3 residual for the L1-less handlers (undo/rollback) in the spec as-built notes: a no-op store skip never overwrites, so there's no data loss — only an absent signal for a write that applied no bytes. * Revert rollback-fault test budget bump — β should not touch this test The version-rollback fault test fails under merge-queue I/O contention on a hardcoded internal pollUntil(…, 12000) waiting for a shadow-repo checkpoint commit. This is a known, general merge-queue flake (it has ejected #1270, #1411, #1590 — PRs unrelated to this diff) and is owned by the deflake work in #1588, which also added a suite-wide 30s --timeout that makes a per-test bump redundant. A bigger budget doesn't fix it (the internal poll is the killer), and a proper fix overlaps #1588 + may surface a real shadow-repo-commit-under-load perf signal that shouldn't be masked inside a disk-authority PR. Restore the file to main. * fix(open-knowledge): guard L3 disk-wins ingest against false-success 200 (PRD-6832) Address the Major finding from code review: the L3 store-time backstop's disk-wins applyDiskContent transact sat outside any failure-recording try/catch (the atomic-write catch is below it; the withSpan callback opens no outer try), so an ingest-throw would propagate to a false-success 200 — agent bytes on neither disk nor re-ingested CRDT. Mirror the two already- guarded identical applyDiskContent calls (tripwire-reset + atomic-write): on throw, record a store failure + rethrow so the awaiting handler surfaces a storage error via the already-tested takeStoreFailure path. The base stays at currentBase so a retry re-reconciles. Also fold in two review cosmetics in agent-write.ts (both insertion artifacts of this PR): add the missing `satisfies StandardSchemaV1` to ContentDivergenceWarningSchema, and restore the AgentWriteSuccessSchema JSDoc displaced by that insertion. Document the intentional resolveEmbed omission at the rename reconcile site (content-bytes only; ref round-trips). * test+harden(open-knowledge): close /review-local gaps on the L3 disk-authority surface (PRD-6832 β) Address the 6 findings from the local adversarial review (no impl bug found; all test-coverage + consistency/observability): - Major: add an undo-handler L3 test — undo's ONLY disk-authority guard (no L1). Asserts 409 + urn:ok:error:disk-divergence + native survives + undo not applied; recovery via a forward agent-write-md (undo's post-revert UM-stack semantics are intentionally out of scope). - Major: add a gate-exclusion test — an unmarked human/client store is never reverted by L3 (presence assertion sidesteps the seam write/rename race). Fix the phantom citation to a nonexistent store-divergence-gate.test.ts. - Minor: DiskEditReconciledWarning byte fields use Buffer.byteLength (UTF-8) to match the sibling ContentDivergenceWarning on the shared WriteWarningSchema union (was string.length / UTF-16). - Minor: surface the L3 disk-read fail-open with a structured log.warn (terminal guard; log-only rather than shoehorn the external-change-namespaced counter). - Consider: version.ts MCP tool parses WriteWarningSchema for symmetry with the 3 write/edit tools (rollback emits only content-divergence today; future-proof). - Consider: document the setReconciledBase synchronous base-advance as the file-watcher double-ingest prevention invariant. Verified: typecheck clean; new tests 3x deterministic. --------- GitOrigin-RevId: 4b588cb1d0135626c7397c0018e9034f263e08bf --- .changeset/prd-6832-beta-disk-authority.md | 16 ++ .../disk-authority-reconcile.test.ts | 125 +++++++++++++++ .../disk-divergence-backstop.test.ts | 150 ++++++++++++++++++ packages/core/src/handoff/urn-ipc-registry.ts | 1 + packages/core/src/index.ts | 4 + packages/core/src/schemas/api/_envelope.ts | 1 + packages/core/src/schemas/api/agent-write.ts | 23 ++- packages/server/src/api-extension.ts | 147 +++++++++++++++-- packages/server/src/external-change.ts | 70 +++++++- .../server/src/mcp/tools/edit-document.ts | 22 +-- .../server/src/mcp/tools/edit-frontmatter.ts | 20 ++- packages/server/src/mcp/tools/version.ts | 16 +- .../server/src/mcp/tools/write-document.ts | 29 ++-- packages/server/src/persistence.ts | 66 ++++++++ packages/server/src/server-factory.ts | 2 + 15 files changed, 638 insertions(+), 54 deletions(-) create mode 100644 .changeset/prd-6832-beta-disk-authority.md create mode 100644 packages/app/tests/integration/disk-authority-reconcile.test.ts create mode 100644 packages/app/tests/integration/disk-divergence-backstop.test.ts diff --git a/.changeset/prd-6832-beta-disk-authority.md b/.changeset/prd-6832-beta-disk-authority.md new file mode 100644 index 00000000..26ee266d --- /dev/null +++ b/.changeset/prd-6832-beta-disk-authority.md @@ -0,0 +1,16 @@ +--- +"@inkeep/open-knowledge-server": patch +"@inkeep/open-knowledge": patch +--- + +fix(open-knowledge): MCP writes reflect on-disk truth — reconcile out-of-band edits instead of silently clobbering them (PRD-6832) + +An OK MCP read could authoritatively return content older than the bytes on disk, with no warning. The root cause was upstream of the read: a doc loaded in the server holds stale in-memory CRDT state, an out-of-band edit (a script, `git pull`, manual edit) makes disk newer, and the next MCP write serializes the stale CRDT over the newer file — making disk itself stale, so the next read faithfully returns bad bytes. In one incident this gave an agent a stale spec scope and cost a multi-file revert. + +This change makes disk authoritative on the write/reconcile path: + +- **Reconcile-before-apply (L1).** Before a content write (`write_document` / `edit_document` / `edit_frontmatter`) or a `rename` applies, the server compares the on-disk bytes to the last-synced base and, on divergence, ingests the disk edit first (through the existing sanctioned file-watcher path) so the agent's edit lands on top of current reality. Both edits survive. +- **Store-time backstop (L3).** For the residual few-millisecond window where disk changes between the reconcile check and the store, the store re-checks disk before overwriting. On divergence it aborts the overwrite (disk wins) and the handler returns a hard `urn:ok:error:disk-divergence` (409) — the agent's edit was NOT applied; re-read and retry (a retry re-applies exactly once). This is the only guard for `undo` / `rollback`, which have no L1. +- **Heads-up on reconcile.** When a write reconciles an out-of-band edit, the success response carries a `disk-edit-reconciled` warning (`structuredContent.contentDivergence`) so the agent re-reads to see the combined result. Observational — the write still landed and both edits are on disk. + +Human-editor (browser) writes are unaffected: the backstop only fires on agent-triggered stores, so in-progress typing is never reverted. diff --git a/packages/app/tests/integration/disk-authority-reconcile.test.ts b/packages/app/tests/integration/disk-authority-reconcile.test.ts new file mode 100644 index 00000000..1aa90256 --- /dev/null +++ b/packages/app/tests/integration/disk-authority-reconcile.test.ts @@ -0,0 +1,125 @@ +import { afterEach, describe, expect, test } from 'bun:test'; +import { randomUUID } from 'node:crypto'; +import { writeFileSync } from 'node:fs'; +import { join } from 'node:path'; +import { + agentPatch, + agentWriteMd, + createTestServer, + pollUntil, + readTestDoc, + type TestServer, +} from './test-harness.ts'; + +let server: TestServer | undefined; + +afterEach(async () => { + if (server) { + await server.cleanup(); + server = undefined; + } +}); + +async function frontmatterPatch(port: number, docName: string, patch: Record) { + return fetch(`http://localhost:${port}/api/frontmatter-patch`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ docName, patch }), + }); +} + +async function renamePath(port: number, fromPath: string, toPath: string) { + return fetch(`http://localhost:${port}/api/rename-path`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ kind: 'file', fromPath, toPath }), + }); +} + +describe('PRD-6832 β L1: agent write reconciles a newer out-of-band disk edit', () => { + test('write_document append: the native edit is NOT clobbered + FR3 warning fires', async () => { + server = await createTestServer({ debounce: 50, maxDebounce: 200 }); + const { port, contentDir } = server; + const docName = `reconcile-append-${randomUUID()}`; + const filePath = join(contentDir, `${docName}.md`); + + await agentWriteMd(port, '# V1 from agent\n\nbody-v1\n', { docName, position: 'replace' }); + await pollUntil(() => readTestDoc(contentDir, docName).includes('body-v1')); + + writeFileSync(filePath, '# V2 NATIVE OUT-OF-BAND EDIT\n\nbody-v2-native\n', 'utf-8'); + + const res = await fetch(`http://localhost:${port}/api/agent-write-md`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + docName, + markdown: 'appended-by-agent-still-on-v1\n', + position: 'append', + }), + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { warning?: { kind?: string } }; + expect(body.warning?.kind).toBe('disk-edit-reconciled'); + + const after = readTestDoc(contentDir, docName); + expect(after).toContain('body-v2-native'); // out-of-band edit preserved (no clobber) + expect(after).toContain('appended-by-agent-still-on-v1'); // agent edit applied on top + }); + + test('edit_document find/replace: runs against the live (disk-reflecting) content', async () => { + server = await createTestServer({ debounce: 50, maxDebounce: 200 }); + const { port, contentDir } = server; + const docName = `reconcile-patch-${randomUUID()}`; + const filePath = join(contentDir, `${docName}.md`); + + await agentWriteMd(port, '# Doc\n\nBANANA here\n', { docName, position: 'replace' }); + await pollUntil(() => readTestDoc(contentDir, docName).includes('BANANA')); + + writeFileSync(filePath, '# Doc\n\nBANANA here\n\nnative-extra-line\n', 'utf-8'); + + await agentPatch(port, 'BANANA', 'CHERRY', docName); + + const after = readTestDoc(contentDir, docName); + expect(after).toContain('CHERRY'); // patch applied against the reconciled content + expect(after).not.toContain('BANANA'); // the find target was replaced + expect(after).toContain('native-extra-line'); // out-of-band edit preserved + }); + + test('edit_frontmatter: the native body edit is preserved while the FM patch applies', async () => { + server = await createTestServer({ debounce: 50, maxDebounce: 200 }); + const { port, contentDir } = server; + const docName = `reconcile-fm-${randomUUID()}`; + const filePath = join(contentDir, `${docName}.md`); + + await agentWriteMd(port, '# Doc\n\nbody-original\n', { docName, position: 'replace' }); + await pollUntil(() => readTestDoc(contentDir, docName).includes('body-original')); + + writeFileSync(filePath, '# Doc\n\nbody-original\n\nnative-body-line\n', 'utf-8'); + + const res = await frontmatterPatch(port, docName, { title: 'New Title' }); + expect(res.status).toBe(200); + + const after = readTestDoc(contentDir, docName); + expect(after).toContain('New Title'); // FM patch applied + expect(after).toContain('native-body-line'); // out-of-band body edit preserved + }); + + test('rename: the renamed doc carries the newer out-of-band disk content', async () => { + server = await createTestServer({ debounce: 50, maxDebounce: 200 }); + const { port, contentDir } = server; + const fromDoc = `reconcile-rename-from-${randomUUID()}`; + const toDoc = `reconcile-rename-to-${randomUUID()}`; + const fromPath = join(contentDir, `${fromDoc}.md`); + + await agentWriteMd(port, '# V1\n\nbody-v1\n', { docName: fromDoc, position: 'replace' }); + await pollUntil(() => readTestDoc(contentDir, fromDoc).includes('body-v1')); + + writeFileSync(fromPath, '# V2 NATIVE OUT-OF-BAND EDIT\n\nbody-v2-native\n', 'utf-8'); + + const res = await renamePath(port, `${fromDoc}.md`, `${toDoc}.md`); + expect(res.status).toBe(200); + + const after = readTestDoc(contentDir, toDoc); + expect(after).toContain('body-v2-native'); // the newer disk edit moved with the rename + }); +}); diff --git a/packages/app/tests/integration/disk-divergence-backstop.test.ts b/packages/app/tests/integration/disk-divergence-backstop.test.ts new file mode 100644 index 00000000..aec274f5 --- /dev/null +++ b/packages/app/tests/integration/disk-divergence-backstop.test.ts @@ -0,0 +1,150 @@ +import { afterEach, describe, expect, test } from 'bun:test'; +import { randomUUID } from 'node:crypto'; +import { + agentWriteMd, + awaitDocQuiescence, + createTestClient, + createTestServer, + pollUntil, + readTestDoc, + type TestServer, +} from './test-harness.ts'; + +const INJECTED_MARKER = 'native-divergence-injected'; + +let server: TestServer | undefined; + +afterEach(async () => { + delete process.env.OK_TEST_STORE_DIVERGENCE; + if (server) { + await server.cleanup(); + server = undefined; + } +}); + +async function writeMd( + port: number, + markdown: string, + opts: { docName: string; position: 'append' | 'prepend' | 'replace' }, +) { + return fetch(`http://localhost:${port}/api/agent-write-md`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ markdown, ...opts }), + }); +} + +async function agentUndoRaw( + port: number, + opts: { docName: string; connectionId: string; scope?: 'last' | 'session' }, +) { + return fetch(`http://localhost:${port}/api/agent-undo`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + docName: opts.docName, + connectionId: opts.connectionId, + scope: opts.scope ?? 'last', + }), + }); +} + +describe('PRD-6832 β L3: store-time divergence backstop', () => { + test('reverts on TOCTOU divergence (409); disk wins; retry re-applies exactly once', async () => { + server = await createTestServer({ debounce: 50, maxDebounce: 200 }); + const { port, contentDir } = server; + const docName = `l3-content-${randomUUID()}`; + + const seed = await writeMd(port, '# V1\n\nbody-v1\n', { docName, position: 'replace' }); + expect(seed.status).toBe(200); + await pollUntil(() => readTestDoc(contentDir, docName).includes('body-v1')); + + process.env.OK_TEST_STORE_DIVERGENCE = docName; + + const attempt1 = await writeMd(port, 'AGENT-APPEND-XYZ\n', { + docName, + position: 'append', + }); + expect(attempt1.status).toBe(409); + const body1 = (await attempt1.json()) as { type?: string }; + expect(body1.type).toBe('urn:ok:error:disk-divergence'); + + const afterRevert = readTestDoc(contentDir, docName); + expect(afterRevert).toContain(INJECTED_MARKER); + expect(afterRevert).not.toContain('AGENT-APPEND-XYZ'); + + delete process.env.OK_TEST_STORE_DIVERGENCE; + const attempt2 = await writeMd(port, 'AGENT-APPEND-XYZ\n', { + docName, + position: 'append', + }); + expect(attempt2.status).toBe(200); + + const afterRetry = readTestDoc(contentDir, docName); + expect(afterRetry).toContain(INJECTED_MARKER); + expect(afterRetry).toContain('AGENT-APPEND-XYZ'); + expect(afterRetry.split('AGENT-APPEND-XYZ').length - 1).toBe(1); + }); + + test('undo: L3 reverts on TOCTOU divergence (409); native survives; undo NOT applied', async () => { + server = await createTestServer({ debounce: 50, maxDebounce: 200 }); + const { port, contentDir } = server; + const docName = `l3-undo-${randomUUID()}`; + + await agentWriteMd(port, '# Base\n\nbase-body\n', { + docName, + position: 'replace', + agentId: 'u1', + }); + await pollUntil(() => readTestDoc(contentDir, docName).includes('base-body')); + await new Promise((r) => setTimeout(r, 700)); + await agentWriteMd(port, 'UNDO-ME-LINE\n', { docName, position: 'append', agentId: 'u1' }); + await pollUntil(() => readTestDoc(contentDir, docName).includes('UNDO-ME-LINE')); + + process.env.OK_TEST_STORE_DIVERGENCE = docName; + + const undoRes = await agentUndoRaw(port, { docName, connectionId: 'agent-u1', scope: 'last' }); + expect(undoRes.status).toBe(409); + const body = (await undoRes.json()) as { type?: string }; + expect(body.type).toBe('urn:ok:error:disk-divergence'); + + const afterRevert = readTestDoc(contentDir, docName); + expect(afterRevert).toContain(INJECTED_MARKER); + expect(afterRevert).not.toContain('base-body'); + + delete process.env.OK_TEST_STORE_DIVERGENCE; + await agentWriteMd(port, 'RECOVERY-LINE\n', { docName, position: 'append', agentId: 'u1' }); + await pollUntil(() => readTestDoc(contentDir, docName).includes('RECOVERY-LINE')); + const afterRecovery = readTestDoc(contentDir, docName); + expect(afterRecovery).toContain(INJECTED_MARKER); + expect(afterRecovery).toContain('RECOVERY-LINE'); + }); + + test('gate: an unmarked human/client store is NEVER reverted by L3', async () => { + server = await createTestServer({ debounce: 50, maxDebounce: 200 }); + const { port, contentDir } = server; + const docName = `l3-gate-human-${randomUUID()}`; + + const seed = await writeMd(port, '# V1\n\nseed-body\n', { docName, position: 'replace' }); + expect(seed.status).toBe(200); + await pollUntil(() => readTestDoc(contentDir, docName).includes('seed-body')); + + const client = await createTestClient(port, docName); + try { + await pollUntil(() => client.ytext.toString().includes('seed-body'), 5000); + + process.env.OK_TEST_STORE_DIVERGENCE = docName; + + const HUMAN_MARK = 'HUMAN-EDIT-NOT-REVERTED'; + client.doc.transact(() => { + client.ytext.insert(client.ytext.length, `\n${HUMAN_MARK}\n`); + }); + await awaitDocQuiescence(client.doc, { timeoutMs: 3000 }); + + await pollUntil(() => readTestDoc(contentDir, docName).includes(HUMAN_MARK), 8000); + } finally { + delete process.env.OK_TEST_STORE_DIVERGENCE; + await client.cleanup(); + } + }); +}); diff --git a/packages/core/src/handoff/urn-ipc-registry.ts b/packages/core/src/handoff/urn-ipc-registry.ts index 949521b5..4a9ea59f 100644 --- a/packages/core/src/handoff/urn-ipc-registry.ts +++ b/packages/core/src/handoff/urn-ipc-registry.ts @@ -52,6 +52,7 @@ export const URN_HTTP_ONLY: ReadonlySet = new Set([ 'urn:ok:error:frontmatter-malformed', 'urn:ok:error:no-active-session', 'urn:ok:error:too-many-agent-sessions', + 'urn:ok:error:disk-divergence', 'urn:ok:error:doc-in-conflict', 'urn:ok:error:no-conflict-tracked', 'urn:ok:error:doc-not-found', diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 33a35620..3593145e 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -494,6 +494,8 @@ export { DiffLineSchema, type DiffSuccess, DiffSuccessSchema, + type DiskEditReconciledWarning, + DiskEditReconciledWarningSchema, type DocumentListEntry, DocumentListEntrySchema, type DocumentListSuccess, @@ -772,6 +774,8 @@ export { UploadRequestSchema, type WorkspaceSuccess, WorkspaceSuccessSchema, + type WriteWarning, + WriteWarningSchema, } from './schemas/api/index.ts'; export { CC1_CHANNEL_BRANCH_SWITCHED, diff --git a/packages/core/src/schemas/api/_envelope.ts b/packages/core/src/schemas/api/_envelope.ts index 38ebfa40..7d467142 100644 --- a/packages/core/src/schemas/api/_envelope.ts +++ b/packages/core/src/schemas/api/_envelope.ts @@ -63,6 +63,7 @@ export const ProblemTypeSchema = z.enum([ 'urn:ok:error:frontmatter-malformed', 'urn:ok:error:no-active-session', 'urn:ok:error:too-many-agent-sessions', + 'urn:ok:error:disk-divergence', 'urn:ok:error:doc-not-found', 'urn:ok:error:doc-already-exists', 'urn:ok:error:doc-not-open', diff --git a/packages/core/src/schemas/api/agent-write.ts b/packages/core/src/schemas/api/agent-write.ts index c09aba6a..9353c218 100644 --- a/packages/core/src/schemas/api/agent-write.ts +++ b/packages/core/src/schemas/api/agent-write.ts @@ -89,10 +89,28 @@ export const OrphanHintSchema = z .loose() satisfies StandardSchemaV1; export type OrphanHint = z.infer; +export const DiskEditReconciledWarningSchema = z + .object({ + kind: z.literal('disk-edit-reconciled'), + intendedBytes: z.number().int().nonnegative(), + actualBytes: z.number().int().nonnegative(), + byteDelta: z.number().int(), + hint: z.string().optional(), + }) + .loose() satisfies StandardSchemaV1; +export type DiskEditReconciledWarning = z.infer; + +export const WriteWarningSchema = z.discriminatedUnion('kind', [ + ContentDivergenceWarningSchema, + DiskEditReconciledWarningSchema, +]); +export type WriteWarning = z.infer; + export const AgentWriteSuccessSchema = z .object({ timestamp: z.string().min(1), summary: SummaryResponseFieldSchema.optional(), + warning: WriteWarningSchema.optional(), }) .loose() satisfies StandardSchemaV1; export type AgentWriteSuccess = z.infer; @@ -104,7 +122,7 @@ export const AgentWriteMdSuccessSchema = z systemSubscriberCount: z.number().int().nonnegative(), hints: z.array(OrphanHintSchema).optional(), summary: SummaryResponseFieldSchema.optional(), - warning: ContentDivergenceWarningSchema.optional(), + warning: WriteWarningSchema.optional(), }) .loose() satisfies StandardSchemaV1; export type AgentWriteMdSuccess = z.infer; @@ -115,7 +133,7 @@ export const AgentPatchSuccessSchema = z subscriberCount: z.number().int().nonnegative(), systemSubscriberCount: z.number().int().nonnegative(), summary: SummaryResponseFieldSchema.optional(), - warning: ContentDivergenceWarningSchema.optional(), + warning: WriteWarningSchema.optional(), }) .loose() satisfies StandardSchemaV1; export type AgentPatchSuccess = z.infer; @@ -147,6 +165,7 @@ export const FrontmatterPatchSuccessSchema = z systemSubscriberCount: z.number().int().nonnegative(), appliedKeys: z.array(z.string()), summary: SummaryResponseFieldSchema.optional(), + warning: WriteWarningSchema.optional(), }) .loose() satisfies StandardSchemaV1; export type FrontmatterPatchSuccess = z.infer; diff --git a/packages/server/src/api-extension.ts b/packages/server/src/api-extension.ts index 7864b512..c2e33b14 100644 --- a/packages/server/src/api-extension.ts +++ b/packages/server/src/api-extension.ts @@ -61,6 +61,7 @@ import { DeletePathRequestSchema, DeletePathSuccessSchema, DiffSuccessSchema, + type DiskEditReconciledWarning, type DocumentListEntry, DocumentListSuccessSchema, DocumentReadSuccessSchema, @@ -321,6 +322,10 @@ import { SUPPORTED_DOC_EXTENSIONS, stripDocExtension, } from './doc-extensions.ts'; +import { + type ReconcileBeforeWriteResult, + reconcileDiskBeforeAgentWrite, +} from './external-change.ts'; import { extractActorIdentity } from './extract-actor-identity.ts'; import { contentHash, @@ -1522,6 +1527,8 @@ export interface ApiExtensionOptions { flushGitCommit?: () => Promise; flushContributors?: () => Promise; takeStoreFailure?: (docName: string) => StoreFailure | null; + takeStoreDivergence?: (docName: string) => boolean; + markAgentWriteStore?: (docName: string) => void; getCurrentBranch?: () => string | null; getDiskAckSVs?: () => Record; contentRoot?: string; @@ -1601,6 +1608,8 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { flushGitCommit, flushContributors, takeStoreFailure, + takeStoreDivergence, + markAgentWriteStore, getCurrentBranch, getDiskAckSVs, contentRoot, @@ -1834,12 +1843,18 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { }); } - async function flushDiskAndDetectFailure(docName: string): Promise { + type FlushOutcome = { kind: 'failure'; failure: StoreFailure } | { kind: 'divergence' } | null; + + async function flushDiskAndDetectOutcome(docName: string): Promise { const debounceId = `onStoreDocument-${docName}`; if (hocuspocus.debouncer.isDebounced(debounceId)) { + markAgentWriteStore?.(docName); await hocuspocus.debouncer.executeNow(debounceId); } - return takeStoreFailure?.(docName) ?? null; + const failure = takeStoreFailure?.(docName) ?? null; + if (failure) return { kind: 'failure', failure }; + if (takeStoreDivergence?.(docName)) return { kind: 'divergence' }; + return null; } function respondPersistenceFailure( @@ -1857,6 +1872,29 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { ); } + function respondDiskDivergence(res: ServerResponse, handler: string): void { + errorResponse( + res, + 409, + 'urn:ok:error:disk-divergence', + 'The document changed on disk after your edit was prepared; your edit was NOT applied, to avoid overwriting the newer on-disk content. Re-read the document and retry.', + { handler }, + ); + } + + function buildReconcileWarning( + reconcile: ReconcileBeforeWriteResult, + ): DiskEditReconciledWarning | undefined { + if (!reconcile.reconciled) return undefined; + return { + kind: 'disk-edit-reconciled', + intendedBytes: reconcile.baseBytes, + actualBytes: reconcile.diskBytes, + byteDelta: reconcile.diskBytes - reconcile.baseBytes, + hint: 'An out-of-band edit was reconciled into this document before your edit was applied on top; the document now reflects that edit plus yours. Re-read it (e.g. `exec("cat ")`) to see the combined result before continuing.', + }; + } + function collectAdmittedDocNames(): Set { const admitted = new Set(); for (const [docName, entry] of getFileIndex()) { @@ -2542,6 +2580,7 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { } } + reconcileDiskBeforeAgentWrite(hocuspocus, docName, contentDir); const content = readCurrentDocumentContent(docName); if (typeof content === 'string') { snapshotContents.set(docName, content); @@ -2972,6 +3011,14 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { colorSeed, clientName, }); + + const agentWriteReconcile = reconcileDiskBeforeAgentWrite( + hocuspocus, + docName, + contentDir, + options.resolveEmbed, + ); + const timestamp = new Date().toISOString(); const content = typeof body.content === 'string' ? body.content : `Hello from the agent! ${timestamp}`; @@ -3023,9 +3070,19 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { agentPresenceBroadcaster?.touchMode(agentId, 'idle'); } + const flushOutcome = await flushDiskAndDetectOutcome(docName); + if (flushOutcome?.kind === 'failure') { + respondPersistenceFailure(res, flushOutcome.failure, 'agent-write'); + return; + } + if (flushOutcome?.kind === 'divergence') { + respondDiskDivergence(res, 'agent-write'); + return; + } flushDocToGit(docName, 'agent-write'); onAgentWrite?.(); + const agentWriteWarning = buildReconcileWarning(agentWriteReconcile); successResponse( res, 200, @@ -3033,6 +3090,7 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { { timestamp, ...(summaryResponse ? { summary: summaryResponse } : {}), + ...(agentWriteWarning ? { warning: agentWriteWarning } : {}), }, { handler: 'agent-write' }, ); @@ -3103,6 +3161,14 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { colorSeed, clientName, }); + + const writeMdReconcile = reconcileDiskBeforeAgentWrite( + hocuspocus, + resolvedDocName, + contentDir, + options.resolveEmbed, + ); + const timestamp = new Date().toISOString(); let writeDivergence: AgentWriteContentDivergence | undefined; @@ -3167,9 +3233,13 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { agentPresenceBroadcaster?.touchMode(agentId, 'idle'); } - const storeFailure = await flushDiskAndDetectFailure(resolvedDocName); - if (storeFailure) { - respondPersistenceFailure(res, storeFailure, 'agent-write-md'); + const flushOutcome = await flushDiskAndDetectOutcome(resolvedDocName); + if (flushOutcome?.kind === 'failure') { + respondPersistenceFailure(res, flushOutcome.failure, 'agent-write-md'); + return; + } + if (flushOutcome?.kind === 'divergence') { + respondDiskDivergence(res, 'agent-write-md'); return; } @@ -3195,6 +3265,7 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { }); } + const writeMdWarning = buildReconcileWarning(writeMdReconcile); successResponse( res, 200, @@ -3207,7 +3278,9 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { ...(summaryResponse ? { summary: summaryResponse } : {}), ...(writeDivergence !== undefined ? { warning: toContentDivergenceWarning(writeDivergence) } - : {}), + : writeMdWarning + ? { warning: writeMdWarning } + : {}), }, { handler: 'agent-write-md' }, ); @@ -3273,6 +3346,14 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { colorSeed, clientName, }); + + const fmReconcile = reconcileDiskBeforeAgentWrite( + hocuspocus, + resolvedDocName, + contentDir, + options.resolveEmbed, + ); + const timestamp = new Date().toISOString(); let editError: import('@inkeep/open-knowledge-core').FmEditError | undefined; @@ -3391,9 +3472,13 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { incrementAgentWriteCalls(); countNormalizedSummary(normalizedSummary); if (bodyMutated) { - const storeFailure = await flushDiskAndDetectFailure(resolvedDocName); - if (storeFailure) { - respondPersistenceFailure(res, storeFailure, 'frontmatter-patch'); + const flushOutcome = await flushDiskAndDetectOutcome(resolvedDocName); + if (flushOutcome?.kind === 'failure') { + respondPersistenceFailure(res, flushOutcome.failure, 'frontmatter-patch'); + return; + } + if (flushOutcome?.kind === 'divergence') { + respondDiskDivergence(res, 'frontmatter-patch'); return; } } @@ -3418,6 +3503,7 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { }); } + const fmWarning = buildReconcileWarning(fmReconcile); successResponse( res, 200, @@ -3428,6 +3514,7 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { systemSubscriberCount, appliedKeys, ...(summaryResponse ? { summary: summaryResponse } : {}), + ...(fmWarning ? { warning: fmWarning } : {}), }, { handler: 'frontmatter-patch' }, ); @@ -4293,6 +4380,14 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { colorSeed, clientName, }); + + const patchReconcile = reconcileDiskBeforeAgentWrite( + hocuspocus, + docName, + contentDir, + options.resolveEmbed, + ); + const timestamp = new Date().toISOString(); let notFound = false; @@ -4427,9 +4522,13 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { return; } - const storeFailure = await flushDiskAndDetectFailure(docName); - if (storeFailure) { - respondPersistenceFailure(res, storeFailure, 'agent-patch'); + const flushOutcome = await flushDiskAndDetectOutcome(docName); + if (flushOutcome?.kind === 'failure') { + respondPersistenceFailure(res, flushOutcome.failure, 'agent-patch'); + return; + } + if (flushOutcome?.kind === 'divergence') { + respondDiskDivergence(res, 'agent-patch'); return; } @@ -4455,6 +4554,7 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { const { response: summaryResponse } = summaryResponseFields(normalizedSummary); + const patchWarning = buildReconcileWarning(patchReconcile); successResponse( res, 200, @@ -4466,7 +4566,9 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { ...(summaryResponse ? { summary: summaryResponse } : {}), ...(patchDivergence !== undefined ? { warning: toContentDivergenceWarning(patchDivergence) } - : {}), + : patchWarning + ? { warning: patchWarning } + : {}), }, { handler: 'agent-patch' }, ); @@ -4573,6 +4675,15 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { } if (undone) { + const flushOutcome = await flushDiskAndDetectOutcome(docName); + if (flushOutcome?.kind === 'failure') { + respondPersistenceFailure(res, flushOutcome.failure, 'agent-undo'); + return; + } + if (flushOutcome?.kind === 'divergence') { + respondDiskDivergence(res, 'agent-undo'); + return; + } flushDocToGit(docName, 'agent-undo'); } @@ -5480,9 +5591,13 @@ export function createApiExtension(options: ApiExtensionOptions): Extension { } renameAttributionCounter().add(1, { kind: 'rollback', attribution_kind: actor.kind }); - const storeFailure = await flushDiskAndDetectFailure(docName); - if (storeFailure) { - respondPersistenceFailure(res, storeFailure, 'rollback'); + const flushOutcome = await flushDiskAndDetectOutcome(docName); + if (flushOutcome?.kind === 'failure') { + respondPersistenceFailure(res, flushOutcome.failure, 'rollback'); + return; + } + if (flushOutcome?.kind === 'divergence') { + respondDiskDivergence(res, 'rollback'); return; } diff --git a/packages/server/src/external-change.ts b/packages/server/src/external-change.ts index 11db5654..6fedbc0a 100644 --- a/packages/server/src/external-change.ts +++ b/packages/server/src/external-change.ts @@ -1,18 +1,26 @@ +import { existsSync, readFileSync, realpathSync } from 'node:fs'; import type { Hocuspocus } from '@hocuspocus/server'; import { BridgeInvariantViolationError, BridgeMergeContentLossError, + normalizeBridge, stripFrontmatter, } from '@inkeep/open-knowledge-core'; import { formatReconcileSubject } from '@inkeep/open-knowledge-core/shadow-repo-layout'; import type * as Y from 'yjs'; import { composeAndWriteRawBody } from './bridge-intake.ts'; import { isConfigDoc, isSystemDoc } from './cc1-broadcast.ts'; +import { isDocInConflict } from './conflict-errors.ts'; import { recordContributor } from './contributor-tracker.ts'; import { recordFrontmatterEditSurface } from './frontmatter-telemetry.ts'; import { getLogger } from './logger.ts'; import { incrementExternalChangeHandlerErrors } from './metrics.ts'; -import { setReconciledBase } from './persistence.ts'; +import { + getReconciledBase, + isWithinContentDir, + safeContentPath, + setReconciledBase, +} from './persistence.ts'; import type { PairedWriteOrigin } from './server-observers.ts'; import { FILE_SYSTEM_WRITER } from './shadow-repo.ts'; @@ -93,3 +101,63 @@ export function createExternalChangeHandler( } }; } + +export interface ReconcileBeforeWriteResult { + reconciled: boolean; + baseBytes: number; + diskBytes: number; +} + +const NOT_RECONCILED: ReconcileBeforeWriteResult = { + reconciled: false, + baseBytes: 0, + diskBytes: 0, +}; + +export function reconcileDiskBeforeAgentWrite( + hocuspocus: Hocuspocus, + docName: string, + contentDir: string, + resolveEmbed?: (basename: string, sourcePath: string) => string | null, +): ReconcileBeforeWriteResult { + if (isSystemDoc(docName) || isConfigDoc(docName)) return NOT_RECONCILED; + + const document = hocuspocus.documents.get(docName); + if (document && isDocInConflict(document)) return NOT_RECONCILED; + + const base = getReconciledBase(docName); + if (base === undefined) return NOT_RECONCILED; + + let canonical: string; + try { + const requestedPath = safeContentPath(docName, contentDir); + if (!existsSync(requestedPath)) return NOT_RECONCILED; + canonical = realpathSync(requestedPath); + } catch { + return NOT_RECONCILED; + } + + if (!isWithinContentDir(canonical, contentDir)) { + getLogger('reconcile').warn( + { docName, canonical, contentDir }, + `[reconcile] symlink-escape on disk read for ${docName}; skipping reconcile`, + ); + return NOT_RECONCILED; + } + + let diskContent: string; + try { + diskContent = readFileSync(canonical, 'utf-8'); + } catch { + return NOT_RECONCILED; + } + + if (normalizeBridge(diskContent) === normalizeBridge(base)) return NOT_RECONCILED; + + applyExternalChange(hocuspocus, docName, diskContent, resolveEmbed); + return { + reconciled: true, + baseBytes: Buffer.byteLength(base, 'utf8'), + diskBytes: Buffer.byteLength(diskContent, 'utf8'), + }; +} diff --git a/packages/server/src/mcp/tools/edit-document.ts b/packages/server/src/mcp/tools/edit-document.ts index 9ba481fd..4c199dc4 100644 --- a/packages/server/src/mcp/tools/edit-document.ts +++ b/packages/server/src/mcp/tools/edit-document.ts @@ -1,4 +1,4 @@ -import { ContentDivergenceWarningSchema, renderInventoryFooter } from '@inkeep/open-knowledge-core'; +import { renderInventoryFooter, WriteWarningSchema } from '@inkeep/open-knowledge-core'; import { z } from 'zod'; import { resolveLockDir } from '../../config/paths.ts'; import type { AgentIdentity } from '../agent-identity.ts'; @@ -120,17 +120,17 @@ export function register(server: ServerInstance, deps: EditDocumentDeps): void { : undefined; const summaryHint = typeof summaryResult?.hint === 'string' ? summaryResult.hint : undefined; - const contentDivergenceParse = ContentDivergenceWarningSchema.safeParse(result.warning); - const contentDivergence = contentDivergenceParse.success - ? contentDivergenceParse.data - : undefined; + const writeWarningParse = WriteWarningSchema.safeParse(result.warning); + const writeWarning = writeWarningParse.success ? writeWarningParse.data : undefined; const lines: string[] = ['Edit applied successfully.']; if (noPreviewAnywhere && !preview) lines.push(START_UI_TEXT_HINT); if (summaryHint) lines.push(summaryHint); - if (contentDivergence) { + if (writeWarning) { lines.push( - `⚠ Content divergence: ${contentDivergence.actualBytes} actual bytes vs ${contentDivergence.intendedBytes} intended (byteDelta=${contentDivergence.byteDelta}). ${contentDivergence.hint ?? 'currentState carries the converged content (re-read only if it is truncated).'}`, + writeWarning.kind === 'content-divergence' + ? `⚠ Content divergence: ${writeWarning.actualBytes} actual bytes vs ${writeWarning.intendedBytes} intended (byteDelta=${writeWarning.byteDelta}). ${writeWarning.hint ?? 'currentState carries the converged content (re-read only if it is truncated).'}` + : `⚠ ${writeWarning.hint ?? 'An out-of-band edit was reconciled into this document before your edit landed on top — re-read for the combined result.'}`, ); } const text = lines.join('\n'); @@ -140,12 +140,15 @@ export function register(server: ServerInstance, deps: EditDocumentDeps): void { !noPreviewAnywhere && !noPreviewOnThisDoc && !summaryResult && - !contentDivergence + !writeWarning ) { return textResult(text); } const structured: Record = {}; + if (writeWarning) { + structured.contentDivergence = writeWarning; + } if (preview) { structured.previewUrl = preview.url; structured.previewUrlSource = preview.source; @@ -156,9 +159,6 @@ export function register(server: ServerInstance, deps: EditDocumentDeps): void { if (summaryResult) { structured.summary = summaryResult; } - if (contentDivergence) { - structured.contentDivergence = contentDivergence; - } return textPlusStructured(text, structured); }, ); diff --git a/packages/server/src/mcp/tools/edit-frontmatter.ts b/packages/server/src/mcp/tools/edit-frontmatter.ts index 69b0336f..b3918ec0 100644 --- a/packages/server/src/mcp/tools/edit-frontmatter.ts +++ b/packages/server/src/mcp/tools/edit-frontmatter.ts @@ -1,4 +1,8 @@ -import { FRONTMATTER_TYPES, FrontmatterValueSchema } from '@inkeep/open-knowledge-core'; +import { + FRONTMATTER_TYPES, + FrontmatterValueSchema, + WriteWarningSchema, +} from '@inkeep/open-knowledge-core'; import { z } from 'zod'; import { resolveLockDir } from '../../config/paths.ts'; import type { AgentIdentity } from '../agent-identity.ts'; @@ -143,13 +147,25 @@ export function register(server: ServerInstance, deps: EditFrontmatterDeps): voi ]; if (noPreviewAnywhere && !preview) lines.push(START_UI_TEXT_HINT); if (summaryHint) lines.push(summaryHint); + const writeWarningParse = WriteWarningSchema.safeParse(result.warning); + const writeWarning = writeWarningParse.success ? writeWarningParse.data : undefined; + if (writeWarning?.hint) lines.push(`⚠ ${writeWarning.hint}`); const text = lines.join('\n'); - if (!preview && !noPreviewAnywhere && !noPreviewOnThisDoc && !summaryResult) { + if ( + !preview && + !noPreviewAnywhere && + !noPreviewOnThisDoc && + !summaryResult && + !writeWarning + ) { return textResult(text); } const structured: Record = {}; + if (writeWarning) { + structured.contentDivergence = writeWarning; + } if (preview) { structured.previewUrl = preview.url; structured.previewUrlSource = preview.source; diff --git a/packages/server/src/mcp/tools/version.ts b/packages/server/src/mcp/tools/version.ts index d08618e4..45e6099b 100644 --- a/packages/server/src/mcp/tools/version.ts +++ b/packages/server/src/mcp/tools/version.ts @@ -1,4 +1,4 @@ -import { ContentDivergenceWarningSchema } from '@inkeep/open-knowledge-core'; +import { WriteWarningSchema } from '@inkeep/open-knowledge-core'; import { z } from 'zod'; import type { AgentIdentity } from '../agent-identity.ts'; import { resolvePreviewUrlForTool } from './preview-url.ts'; @@ -161,10 +161,8 @@ async function handleRollback(args: VersionArgs, url: string, cwd: string, deps: : undefined; const summaryHint = typeof summaryResult?.hint === 'string' ? summaryResult.hint : undefined; - const contentDivergenceParse = ContentDivergenceWarningSchema.safeParse(result.warning); - const contentDivergence = contentDivergenceParse.success - ? contentDivergenceParse.data - : undefined; + const writeWarningParse = WriteWarningSchema.safeParse(result.warning); + const writeWarning = writeWarningParse.success ? writeWarningParse.data : undefined; const author = typeof versionResult.author === 'string' ? versionResult.author : undefined; const timestamp = @@ -174,9 +172,11 @@ async function handleRollback(args: VersionArgs, url: string, cwd: string, deps: `Restored "${docName}" to version ${args.commitSha.slice(0, 8)}${provenance}. The change has been applied to all connected editors.`, ]; if (summaryHint) textLines.push(summaryHint); - if (contentDivergence) { + if (writeWarning) { textLines.push( - `⚠ Content divergence: ${contentDivergence.actualBytes} actual bytes vs ${contentDivergence.intendedBytes} intended (byteDelta=${contentDivergence.byteDelta}). ${contentDivergence.hint ?? 'currentState carries the converged content (re-read only if it is truncated).'}`, + writeWarning.kind === 'content-divergence' + ? `⚠ Content divergence: ${writeWarning.actualBytes} actual bytes vs ${writeWarning.intendedBytes} intended (byteDelta=${writeWarning.byteDelta}). ${writeWarning.hint ?? 'currentState carries the converged content (re-read only if it is truncated).'}` + : `⚠ ${writeWarning.hint ?? 'An out-of-band edit was reconciled into this document before your change landed on top — re-read for the combined result.'}`, ); } @@ -192,6 +192,6 @@ async function handleRollback(args: VersionArgs, url: string, cwd: string, deps: previewUrl: preview?.url ?? null, ...(preview ? { previewUrlSource: preview.source } : {}), ...(summaryResult ? { summary: summaryResult } : {}), - ...(contentDivergence ? { contentDivergence } : {}), + ...(writeWarning ? { contentDivergence: writeWarning } : {}), }); } diff --git a/packages/server/src/mcp/tools/write-document.ts b/packages/server/src/mcp/tools/write-document.ts index 5ad44208..5b281423 100644 --- a/packages/server/src/mcp/tools/write-document.ts +++ b/packages/server/src/mcp/tools/write-document.ts @@ -1,10 +1,10 @@ import { existsSync, readFileSync } from 'node:fs'; import { resolve as resolvePath } from 'node:path'; import { - ContentDivergenceWarningSchema, normalizeBridge, renderInventoryFooter, stripFrontmatter, + WriteWarningSchema, } from '@inkeep/open-knowledge-core'; import { z } from 'zod'; import { resolveContentDir, resolveLockDir } from '../../config/paths.ts'; @@ -339,7 +339,7 @@ export function register(server: ServerInstance, deps: WriteDocumentDeps): void const documents = results.map((r) => { if (!r.ok) return { docName: r.docName, ok: false as const, error: r.error }; const preview = resolvePreviewUrl(r.docName, { lockDir }); - const divergenceParse = ContentDivergenceWarningSchema.safeParse(r.raw.warning); + const divergenceParse = WriteWarningSchema.safeParse(r.raw.warning); const divergence = divergenceParse.success ? divergenceParse.data : undefined; return { docName: r.docName, @@ -359,9 +359,10 @@ export function register(server: ServerInstance, deps: WriteDocumentDeps): void } const d = documents[i]; const base = `Wrote ${spec.docName} (${r.position}).`; - return d?.ok && d.contentDivergence + if (!(d?.ok && d.contentDivergence)) return base; + return d.contentDivergence.kind === 'content-divergence' ? `${base} ⚠ Content divergence: ${d.contentDivergence.actualBytes} actual vs ${d.contentDivergence.intendedBytes} intended (byteDelta=${d.contentDivergence.byteDelta}).` - : base; + : `${base} ⚠ An out-of-band disk edit was reconciled in before your write — re-read for the combined result.`; }); const perDocNotes = args.docs.flatMap((spec, i) => { const r = results[i]; @@ -420,10 +421,8 @@ export function register(server: ServerInstance, deps: WriteDocumentDeps): void : undefined; const summaryHint = typeof summaryResult?.hint === 'string' ? summaryResult.hint : undefined; - const contentDivergenceParse = ContentDivergenceWarningSchema.safeParse(result.warning); - const contentDivergence = contentDivergenceParse.success - ? contentDivergenceParse.data - : undefined; + const writeWarningParse = WriteWarningSchema.safeParse(result.warning); + const writeWarning = writeWarningParse.success ? writeWarningParse.data : undefined; const noOpNote = emptyAppendNoOpNote(w.position, args.markdown); const lines: string[] = [ @@ -442,9 +441,11 @@ export function register(server: ServerInstance, deps: WriteDocumentDeps): void if (hint.message) lines.push(hint.message); } } - if (contentDivergence) { + if (writeWarning) { lines.push( - `⚠ Content divergence: ${contentDivergence.actualBytes} actual bytes vs ${contentDivergence.intendedBytes} intended (byteDelta=${contentDivergence.byteDelta}). ${contentDivergence.hint ?? 'currentState carries the converged content (re-read only if it is truncated).'}`, + writeWarning.kind === 'content-divergence' + ? `⚠ Content divergence: ${writeWarning.actualBytes} actual bytes vs ${writeWarning.intendedBytes} intended (byteDelta=${writeWarning.byteDelta}). ${writeWarning.hint ?? 'currentState carries the converged content (re-read only if it is truncated).'}` + : `⚠ ${writeWarning.hint ?? 'An out-of-band edit was reconciled into this document before your write landed on top — re-read for the combined result.'}`, ); } const text = lines.join('\n'); @@ -455,12 +456,15 @@ export function register(server: ServerInstance, deps: WriteDocumentDeps): void !noPreviewOnThisDoc && !hints && !summaryResult && - !contentDivergence + !writeWarning ) { return textResult(text); } const structured: Record = {}; + if (writeWarning) { + structured.contentDivergence = writeWarning; + } if (preview) { structured.previewUrl = preview.url; structured.previewUrlSource = preview.source; @@ -474,9 +478,6 @@ export function register(server: ServerInstance, deps: WriteDocumentDeps): void if (summaryResult) { structured.summary = summaryResult; } - if (contentDivergence) { - structured.contentDivergence = contentDivergence; - } return textPlusStructured(text, structured); }, ); diff --git a/packages/server/src/persistence.ts b/packages/server/src/persistence.ts index ca90a712..fd101196 100644 --- a/packages/server/src/persistence.ts +++ b/packages/server/src/persistence.ts @@ -294,6 +294,8 @@ export interface PersistenceHandle { flushDeferredStores: (mode?: 'within-branch' | 'discard-stale') => Promise; flushPendingGitCommit: () => Promise; takeStoreFailure: (documentName: string) => StoreFailure | null; + takeStoreDivergence: (documentName: string) => boolean; + markAgentWriteStore: (documentName: string) => void; flushContributors: () => Promise; waitForPendingCommits: () => Promise; readonly configPersistenceCtx: ConfigPersistenceCtx; @@ -334,6 +336,10 @@ export function createPersistenceExtension(options?: PersistenceOptions): Persis const storeFailures = new Map(); + const storeDivergences = new Set(); + + const agentWriteStores = new Set(); + const gitEnabled = options?.gitEnabled ?? true; const commitDebounceMs = options?.commitDebounceMs ?? 15_000; const wipRef = options?.wipRef ?? 'refs/wip/main'; @@ -681,6 +687,8 @@ export function createPersistenceExtension(options?: PersistenceOptions): Persis 'persistence.onStoreDocument', { attributes: { 'doc.name': documentName } }, async () => { + const agentTriggeredStore = agentWriteStores.delete(documentName); + const lifecycleStatus = document.getMap('lifecycle').get('status'); if ( lifecycleStatus === 'deleted-upstream' || @@ -908,6 +916,52 @@ export function createPersistenceExtension(options?: PersistenceOptions): Persis throw new Error(msg); } + if ( + process.env.NODE_ENV === 'test' && + process.env.OK_TEST_STORE_DIVERGENCE === documentName + ) { + await tracedWriteFile(canonicalPath, '# NATIVE\n\nnative-divergence-injected\n', 'utf-8'); + } + + if (agentTriggeredStore && currentBase !== undefined) { + let diskNow: string | null = null; + try { + if (existsSync(canonicalPath)) diskNow = readFileSync(canonicalPath, 'utf-8'); + } catch (err) { + diskNow = null; + log.warn( + { err, documentName }, + '[persistence] L3 disk-read failed; divergence check skipped for this store', + ); + } + if (diskNow !== null && normalizeBridge(diskNow) !== normalizeBridge(currentBase)) { + const diskContent = diskNow; // const so the closure keeps the non-null narrowing + console.warn( + JSON.stringify({ + event: 'agent-write-content-divergence', + 'doc.name': documentName, + outcome: 'reverted', + diskBytes: diskContent.length, + baseBytes: currentBase.length, + candidateBytes: markdown.length, + }), + ); + try { + document.transact(() => { + applyDiskContent(document, diskContent); + }, FILE_WATCHER_ORIGIN); + } catch (err) { + storeFailures.set(documentName, toStoreFailure(err)); + persistenceDeferCounts.delete(documentName); + throw err; + } + setReconciledBase(documentName, diskContent); + storeDivergences.add(documentName); + persistenceDeferCounts.delete(documentName); + return; + } + } + const tmpPath = `${canonicalPath}.tmp.${crypto.randomUUID()}`; try { if (process.env.NODE_ENV === 'test' && process.env.OK_TEST_STORE_FAULT === documentName) { @@ -1203,6 +1257,16 @@ export function createPersistenceExtension(options?: PersistenceOptions): Persis return failure ?? null; } + function takeStoreDivergence(documentName: string): boolean { + if (!storeDivergences.has(documentName)) return false; + storeDivergences.delete(documentName); + return true; + } + + function markAgentWriteStore(documentName: string): void { + agentWriteStores.add(documentName); + } + return { extension, flushDeferredStores, @@ -1210,6 +1274,8 @@ export function createPersistenceExtension(options?: PersistenceOptions): Persis flushContributors, waitForPendingCommits, takeStoreFailure, + takeStoreDivergence, + markAgentWriteStore, configPersistenceCtx, }; } diff --git a/packages/server/src/server-factory.ts b/packages/server/src/server-factory.ts index 18b569dc..f84645eb 100644 --- a/packages/server/src/server-factory.ts +++ b/packages/server/src/server-factory.ts @@ -599,6 +599,8 @@ export function createServer(options: ServerOptions): ServerInstance { flushGitCommit: () => persistence.flushPendingGitCommit(), flushContributors: () => persistence.flushContributors(), takeStoreFailure: (docName: string) => persistence.takeStoreFailure(docName), + takeStoreDivergence: (docName: string) => persistence.takeStoreDivergence(docName), + markAgentWriteStore: (docName: string) => persistence.markAgentWriteStore(docName), getCurrentBranch: () => headWatcher?.getLastKnownBranch() ?? null, getDiskAckSVs: () => cc1Broadcaster?.getLatestDiskAckSVsAsBase64() ?? {}, contentRoot,