From 9462cca42f3fbe32e9cb2260c4eb81dc7b9cb631 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Fri, 22 May 2026 06:48:46 -0700 Subject: [PATCH] =?UTF-8?q?feat(capture-attention):=20Phase=20A=20?= =?UTF-8?q?=E2=80=94=20recurrence-weighted=20preserve+relate+boost?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an embedding-only capture-attention path that detects cross-session restatements at save time. When a new memory's content is semantically close to ≥2 prior memories spanning distinct sessions, the new memory is preserved, 'restates' relations are inserted from new → each cluster member, and the canonical neighbor's confidence + recurrence_count accrete. Default-off behind CAPTURE_ATTENTION_ENABLED; soak-gated. Driver: 2026-05-22 finding that load-bearing facts stated across many sessions land as fragmented memories rather than a single canonical assertion — the operator was implicitly substituting for a missing mechanism. Plan: private/mnemon-capture-attention-plan-260522.md. SOTA invariant: preserve+relate+boost, NEVER skip-the-save. Each restatement carries different framing; the institutional pattern is keep the data, link via relations, accrete the importance signal on the canonical. Operator-reviewed merge is Phase C, not Phase A's job. Embedding-only (FastEmbed, no LLM dependency) per the 2026-05-21 public-release decision. Composes with existing layered defenses: runs after Layer 0 (is_well_shaped), respects Layer 4 ceiling (HOOK_SOURCE_CONFIDENCE_CEILING clamps the boost for hook-sourced). 'restates' is a new relation type (no collision with existing 'supersedes' / 'contradicts' / 'related'). Schema additive only: documents.recurrence_count INTEGER NOT NULL DEFAULT 0 via _migrate_recurrence_count(). Pre-existing rows get count=0; harmless if flag stays off. correction_of parameter on Store.save() reserved for salience-tier Phase 2 — when set, skips capture attention (operator gesture beats automated recurrence detection). Failure mode: CaptureAttentionUnavailableError (named) on embedder/vecstore failure; save() catches + logger.warning + continues per acceptable swallow category (secondary observability hung off a primary save path that records the failure). Soak monitor: mnemon attention-status — boost-rate over 7d, recurrence-count distribution, top-10 canonicals, last-10 'restates' audit trail. Acceptance criteria for default-on flip: boost_rate ≤ 0.25 + ≥80% precision on 20-canonical manual review. Calibration: scripts/calibrate_capture_threshold.py — samples N operator vault pairs, prompts for same/different tagging, computes precision-recall at {0.70, 0.75, 0.80, 0.85, 0.90}, recommends precision-leaning sweet spot. Tagged pairs persist to tests/fixtures/capture_attention_pairs.json (regression lock). 13 new tests in tests/test_capture_attention.py covering: preserve- everything invariant, feature-flag-off no-behavior-change, distinct- sessions trigger, same-session no-trigger, threshold respected, hook ceiling, user uncapped, pinned-canonical selection, correction_of override, fail-loud on embedder failure, schema migration idempotency. Suite 801 → 814 passing. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 77 ++++ scripts/calibrate_capture_threshold.py | 220 +++++++++++ src/mnemon/cli.py | 87 ++++ src/mnemon/config.py | 17 + src/mnemon/store.py | 272 +++++++++++++ tests/fixtures/capture_attention_pairs.json | 13 + tests/test_capture_attention.py | 417 ++++++++++++++++++++ 7 files changed, 1103 insertions(+) create mode 100755 scripts/calibrate_capture_threshold.py create mode 100644 tests/fixtures/capture_attention_pairs.json create mode 100644 tests/test_capture_attention.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 89fbd69..96d2faf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,82 @@ # Changelog +## [0.7.0] - Unreleased + +### Features + +- **Capture attention Phase A — recurrence-weighted memory convergence + (default-off, soak-gated).** When a new save's content is semantically + close to ≥2 prior memories spanning distinct sessions, capture + attention preserves the new memory + inserts `'restates'` relations + to each cluster member + boosts the canonical neighbor's confidence + + increments the canonical's new `recurrence_count` column. The + cluster of restatements stays discoverable; the load-bearing signal + accretes on the canonical; MMR diversity at recall naturally + suppresses near-duplicates without us dropping them at capture. + Plan: `private/mnemon-capture-attention-plan-260522.md`. Driver: the + 2026-05-22 finding that load-bearing facts stated across many + sessions land as fragmented memories rather than a single canonical + assertion (the operator was implicitly substituting for a missing + mechanism). + - **SOTA invariant: preserve+relate+boost, never skip-the-save.** + Earlier draft considered "boost canonical + skip the new save" + as the auto-apply path — rejected because each restatement + carries different framing and discarding it throws away the very + signal the recurrence detector is honoring. The institutional + pattern is preserve the data, link via relations, accrete the + importance signal — operator-reviewed merge is Phase C of the + plan, not Phase A's job. + - **Embedding-only (no LLM dependency).** Same SOTA-for-public- + release-constraint logic that drove `build_standing_set.py`'s + embedding-based scorer (the roadmapped LLM-judge opt-in P2 item + composes as an advanced mode but isn't required). + - **Feature flag `CAPTURE_ATTENTION_ENABLED` default-off** through + soak. Two acceptance criteria to flip default-on (per plan + §"Soak acceptance criteria"): (1) `boost_rate ≤ 0.25` over a 7-day + window measured via `mnemon attention-status`; (2) ≥80% precision + on a 20-canonical manual review. + - **`correction_of` parameter on `Store.save()`** (forward-compat + for salience-tier Phase 2 promotion signals). When set, capture + attention is skipped — operator explicit gesture beats automated + recurrence detection. + - **`mnemon attention-status` CLI** — soak monitor: boost-rate + ratio over 7 days, recurrence-count distribution, top-10 + canonicals, last-10 `'restates'` relations audit trail. + - **`scripts/calibrate_capture_threshold.py`** — data-tuned + threshold selection. Samples N pairs from the operator's vault + snapshot, prompts for same/different tagging, computes + precision-recall at {0.70, 0.75, 0.80, 0.85, 0.90}, recommends + the precision-leaning sweet spot. Persists tagged pairs to + `tests/fixtures/capture_attention_pairs.json` for regression + locking. + - **Failure mode: named exception + WARN swallow.** Embedder / + vecstore unavailability raises `CaptureAttentionUnavailableError` + from `apply_capture_attention()`; `Store.save()` catches + + `logger.warning`s + continues (the new memory is saved; only the + recurrence-boost side effect is skipped). Acceptable swallow per + `feedback_no_silent_fails` category (b) — secondary observability + hung off a primary save path that records the failure. + - **Composes with the existing layered defenses unchanged.** + Capture attention runs AFTER Layer 0 (`is_well_shaped` rejects + scaffolding before the path is reached) + AFTER Layer 4 ceiling + (`HOOK_SOURCE_CONFIDENCE_CEILING` clamp survives the boost). + `'restates'` is a new relation type — doesn't collide with the + existing `'supersedes'` / `'contradicts'` / `'related'`. + - 13 new tests in `tests/test_capture_attention.py` covering: the + preserve-everything invariant, feature-flag-off no-behavior- + change, distinct-sessions trigger, same-session no-trigger, + threshold respected, hook ceiling, user uncapped, pinned-canonical + selection, `correction_of` override, fail-loud on embedder + unavailability, schema migration idempotency. Suite 801 → 814 + passing. + +### Schema + +- **`documents.recurrence_count INTEGER NOT NULL DEFAULT 0`** — + additive migration in `_migrate_recurrence_count()`. Pre-existing + rows get count=0 and recurrence detection starts forward from the + next save. Harmless if `CAPTURE_ATTENTION_ENABLED` stays off. + ## [0.6.0] - 2026-05-21 ### Release diff --git a/scripts/calibrate_capture_threshold.py b/scripts/calibrate_capture_threshold.py new file mode 100755 index 0000000..c28c354 --- /dev/null +++ b/scripts/calibrate_capture_threshold.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +"""Calibrate ``CAPTURE_ATTENTION_THRESHOLD`` against the operator's vault. + +Plan: ``private/mnemon-capture-attention-plan-260522.md`` §Calibration. + +The default threshold of 0.85 is a conservative starting point. Real +vault content has its own embedding distribution, and the precision- +recall sweet spot moves accordingly. This script: + +1. Samples N random pairs of live memories from a vault snapshot +2. Computes cosine similarity for each pair (using the in-store + indexed vectors — no re-embedding required) +3. Prompts the operator to tag each as same-assertion / different / + unclear +4. Persists tagged pairs to ``tests/fixtures/capture_attention_pairs.json`` + (regression-locking fixture, consumed by test_capture_attention.py) +5. Computes precision-recall at thresholds {0.70, 0.75, 0.80, 0.85, + 0.90} +6. Recommends the threshold at the precision-leaning sweet spot + (highest precision with recall ≥ 0.70) + +Usage: + python scripts/calibrate_capture_threshold.py --db [--n 20] + +Defaults to the prod-snapshot path used by ``salience_phase0.sh``. +""" + +from __future__ import annotations + +import argparse +import json +import random +import sqlite3 +import sys +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parent.parent +FIXTURE_PATH = REPO_ROOT / "tests" / "fixtures" / "capture_attention_pairs.json" +DEFAULT_DB = "/tmp/mnemon-prod-snap.sqlite" +DEFAULT_N = 20 +THRESHOLDS = (0.70, 0.75, 0.80, 0.85, 0.90) + + +def _load_pairs(db_path: Path, n: int) -> list[dict]: + """Sample N random memory pairs + their pairwise cosine similarity.""" + import numpy as np + + src = REPO_ROOT / "src" + if str(src) not in sys.path: + sys.path.insert(0, str(src)) + from mnemon.vecstore import VecStore + + vec_path = str(db_path).replace(".sqlite", ".vec") + if not Path(vec_path + ".npz").exists(): + sys.exit( + f"ERROR: vec store not found at {vec_path}.npz — " + "snapshot must include vectors. Run " + "scripts/salience_phase0.sh snapshot first." + ) + + vs = VecStore(vec_path, dim=384) + db = sqlite3.connect(db_path) + db.row_factory = sqlite3.Row + + # Pull live document ids + their content_hash. We compare via + # the indexed full-document fragment (seq=0). + rows = db.execute( + """SELECT id, title, hash + FROM documents + WHERE invalidated_at IS NULL + ORDER BY id""" + ).fetchall() + + # Build hash → embedding map (seq=0 only — that's the full-doc fragment) + embs: dict[str, "np.ndarray"] = {} + for r in rows: + vec_id = f"{r['hash']}_0" + vec = vs.get(vec_id) + if vec is not None: + embs[r["hash"]] = vec + + eligible = [r for r in rows if r["hash"] in embs] + if len(eligible) < 2 * n: + sys.exit( + f"ERROR: only {len(eligible)} eligible memories in vault " + f"(need ≥{2 * n} for {n} pairs)" + ) + + random.seed(42) + chosen = random.sample(eligible, 2 * n) + pairs = [] + for i in range(0, 2 * n, 2): + a, b = chosen[i], chosen[i + 1] + va, vb = embs[a["hash"]], embs[b["hash"]] + cos = float(np.dot(va, vb) / (np.linalg.norm(va) * np.linalg.norm(vb))) + # Pull content snippets for review + ac = db.execute("SELECT doc FROM content WHERE hash = ?", (a["hash"],)).fetchone() + bc = db.execute("SELECT doc FROM content WHERE hash = ?", (b["hash"],)).fetchone() + pairs.append({ + "id_a": a["id"], "id_b": b["id"], + "title_a": a["title"], "title_b": b["title"], + "snippet_a": (ac["doc"] if ac else "")[:200], + "snippet_b": (bc["doc"] if bc else "")[:200], + "cosine": cos, + }) + db.close() + return pairs + + +def _prompt_operator(pairs: list[dict]) -> list[dict]: + """Interactive tagging loop. Operator marks each pair.""" + print(f"\nTagging {len(pairs)} pairs. For each: same / different / unclear.") + print("Type 's' (same), 'd' (different), 'u' (unclear), or 'q' to quit.\n") + + tagged = [] + for i, p in enumerate(pairs, 1): + print(f"━━━ Pair {i}/{len(pairs)} (cosine={p['cosine']:.3f}) ━━━") + print(f" A ({p['id_a']}): {p['title_a']}") + print(f" {p['snippet_a']!r}") + print(f" B ({p['id_b']}): {p['title_b']}") + print(f" {p['snippet_b']!r}") + while True: + verdict = input(" same/different/unclear [s/d/u/q]: ").strip().lower() + if verdict in {"s", "same"}: + p["verdict"] = "same" + break + elif verdict in {"d", "different"}: + p["verdict"] = "different" + break + elif verdict in {"u", "unclear"}: + p["verdict"] = "unclear" + break + elif verdict in {"q", "quit"}: + print("Quitting — saving partial results") + return tagged + else: + print(" → please enter s, d, u, or q") + tagged.append(p) + return tagged + + +def _precision_recall(tagged: list[dict], threshold: float) -> tuple[float, float]: + """Compute (precision, recall) at a given cosine threshold. + + Precision = of pairs the threshold flags as same, what fraction were + operator-tagged 'same'? + Recall = of operator-tagged 'same' pairs, what fraction did the + threshold flag? + 'unclear' pairs are excluded from both numerator and denominator. + """ + relevant = [p for p in tagged if p["verdict"] in ("same", "different")] + if not relevant: + return 0.0, 0.0 + + flagged = [p for p in relevant if p["cosine"] >= threshold] + true_positives = sum(1 for p in flagged if p["verdict"] == "same") + all_positives = sum(1 for p in relevant if p["verdict"] == "same") + + precision = (true_positives / len(flagged)) if flagged else 1.0 + recall = (true_positives / all_positives) if all_positives else 0.0 + return precision, recall + + +def _recommend(tagged: list[dict]) -> tuple[float, dict]: + """Pick the precision-leaning threshold: highest precision with + recall ≥ 0.70.""" + table = {} + for t in THRESHOLDS: + p, r = _precision_recall(tagged, t) + table[t] = {"precision": p, "recall": r} + + # Precision-leaning sweet spot + eligible = [(t, m) for t, m in table.items() if m["recall"] >= 0.70] + if not eligible: + # No threshold meets the recall floor — fall back to highest recall + recommended = max(table.items(), key=lambda kv: kv[1]["recall"])[0] + else: + recommended = max(eligible, key=lambda kv: kv[1]["precision"])[0] + return recommended, table + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--db", default=DEFAULT_DB, + help=f"vault snapshot path (default {DEFAULT_DB})") + parser.add_argument("--n", type=int, default=DEFAULT_N, + help=f"number of pairs to tag (default {DEFAULT_N})") + parser.add_argument("--use-fixture", action="store_true", + help="recompute PR table from existing fixture, skip tagging") + args = parser.parse_args() + + if args.use_fixture: + if not FIXTURE_PATH.exists(): + sys.exit(f"no fixture at {FIXTURE_PATH} — drop --use-fixture") + tagged = json.loads(FIXTURE_PATH.read_text()) + else: + db_path = Path(args.db) + if not db_path.exists(): + sys.exit(f"vault snapshot not found at {db_path}") + pairs = _load_pairs(db_path, args.n) + tagged = _prompt_operator(pairs) + FIXTURE_PATH.parent.mkdir(parents=True, exist_ok=True) + FIXTURE_PATH.write_text(json.dumps(tagged, indent=2)) + print(f"\nFixture written: {FIXTURE_PATH}") + + recommended, table = _recommend(tagged) + print("\n━━━ Precision–Recall by threshold ━━━") + print(f" {'threshold':>10} {'precision':>10} {'recall':>8}") + for t, m in table.items(): + marker = " ←" if t == recommended else "" + print(f" {t:>10.2f} {m['precision']:>10.3f} {m['recall']:>8.3f}{marker}") + print(f"\nRecommended CAPTURE_ATTENTION_THRESHOLD = {recommended}") + print("(precision-leaning: highest precision with recall ≥ 0.70)") + print("\nIf this differs from src/mnemon/config.py, edit and re-soak.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/mnemon/cli.py b/src/mnemon/cli.py index 45e56e0..e2546be 100644 --- a/src/mnemon/cli.py +++ b/src/mnemon/cli.py @@ -298,12 +298,96 @@ def main() -> None: print(f"downgrade failed: {exc}", file=sys.stderr) sys.exit(1) + elif command == "attention-status": + # Capture attention Phase A observability — soak monitor. + # private/mnemon-capture-attention-plan-260522.md + from .store import Store + store = Store() + try: + _print_attention_status(store) + finally: + store.close() + else: print(f"Unknown command: {command}", file=sys.stderr) _print_usage() sys.exit(1) +def _print_attention_status(store) -> None: + """Print capture-attention soak metrics for the operator. + + Surfaces the two acceptance criteria from the plan-doc: + 1. boost_rate = (boosts in last 7d) / (saves in last 7d) ≤ 0.25 + 2. precision floor (operator-judged via --review, not auto-checked) + """ + from .config import ( + CAPTURE_ATTENTION_ENABLED, + CAPTURE_ATTENTION_THRESHOLD, + CAPTURE_ATTENTION_SOAK_BOOST_RATE_MAX, + ) + + # Boost rate over 7d (boosts = restates relations created) + boosts_7d = store.db.execute( + "SELECT COUNT(*) AS c FROM relations " + "WHERE relation_type = 'restates' " + "AND created_at >= datetime('now', '-7 days')" + ).fetchone()["c"] + saves_7d = store.db.execute( + "SELECT COUNT(*) AS c FROM documents " + "WHERE created_at >= datetime('now', '-7 days')" + ).fetchone()["c"] + rate = (boosts_7d / saves_7d) if saves_7d else 0.0 + rate_ok = "✓" if rate <= CAPTURE_ATTENTION_SOAK_BOOST_RATE_MAX else "⚠" + + print(f"Capture attention — soak status") + print(f" Flag enabled : {CAPTURE_ATTENTION_ENABLED}") + print(f" Threshold (cosine) : {CAPTURE_ATTENTION_THRESHOLD}") + print(f" Boost-rate 7d : {boosts_7d} / {saves_7d} = " + f"{rate:.3f} {rate_ok} (ceiling {CAPTURE_ATTENTION_SOAK_BOOST_RATE_MAX})") + + # Recurrence count distribution + hist = store.db.execute( + "SELECT recurrence_count, COUNT(*) AS n " + "FROM documents WHERE invalidated_at IS NULL " + "GROUP BY recurrence_count " + "ORDER BY recurrence_count" + ).fetchall() + print("\n Recurrence count distribution (live docs):") + for r in hist: + print(f" count={r['recurrence_count']:>3}: {r['n']} docs") + + # Top-10 canonicals + top = store.db.execute( + "SELECT id, title, recurrence_count, confidence " + "FROM documents " + "WHERE invalidated_at IS NULL AND recurrence_count > 0 " + "ORDER BY recurrence_count DESC, confidence DESC " + "LIMIT 10" + ).fetchall() + if top: + print("\n Top canonicals by recurrence_count:") + for r in top: + title = r["title"][:60] + print(f" #{r['id']:>5} ×{r['recurrence_count']:<3} " + f"conf={r['confidence']:.2f} {title}") + else: + print("\n No canonicals with recurrence_count > 0 yet.") + + # Recent 'restates' relations (audit trail) + recent = store.db.execute( + "SELECT source_id, target_id, weight, created_at " + "FROM relations WHERE relation_type = 'restates' " + "ORDER BY created_at DESC LIMIT 10" + ).fetchall() + if recent: + print("\n Last 10 'restates' relations:") + for r in recent: + print(f" {r['created_at']} " + f"#{r['source_id']:>5} → #{r['target_id']:>5} " + f"w={r['weight']:.3f}") + + def _parse_upgrade_args(args: list[str]) -> dict: """Parse ``mnemon upgrade web`` flags. @@ -435,6 +519,9 @@ def _print_usage() -> None: mnemon status Show local vault health stats mnemon search Search local vault mnemon save <c> Save to local vault + mnemon attention-status Capture-attention soak monitor — boost rate, + recurrence distribution, top canonicals, + recent 'restates' relations mnemon forget <id> Soft-delete from local vault mnemon rebuild Re-embed every document (run after a model change, or to recover from skipped embeddings) diff --git a/src/mnemon/config.py b/src/mnemon/config.py index dad27a1..4543f80 100644 --- a/src/mnemon/config.py +++ b/src/mnemon/config.py @@ -95,6 +95,23 @@ class MemoryType(str, Enum): CONTRADICTION_OVERLAP_THRESHOLD = 0.7 # minimum vector similarity to treat candidate as potentially conflicting CONTRADICTION_CONTEXT_MAX_CHARS = 500 # per-memory content truncation in the LLM classification prompt +# Capture attention Phase A (added 2026-05-22) — +# private/mnemon-capture-attention-plan-260522.md +# +# When a new memory's content is semantically close to ≥MIN_HITS prior +# memories across distinct sessions, the canonical neighbor's +# confidence is boosted and a 'restates' relation is recorded — the +# new memory itself is still saved (no information loss, per the SOTA +# preserve+relate+boost pattern). Default-off through soak; flip to +# default-on once boost-rate ≤ SOAK_BOOST_RATE_MAX AND a 20-canonical +# manual review shows ≥80% precision. +CAPTURE_ATTENTION_ENABLED = False # feature flag — flip default after soak +CAPTURE_ATTENTION_THRESHOLD = 0.85 # cosine similarity floor; data-tune via scripts/calibrate_capture_threshold.py +CAPTURE_ATTENTION_MIN_HITS = 2 # neighbors required to trigger +CAPTURE_ATTENTION_BOOST = 0.05 # per-trigger confidence bump on canonical +CAPTURE_ATTENTION_REQUIRE_DISTINCT_SESSIONS = True # neighbors must span ≥MIN_HITS distinct days +CAPTURE_ATTENTION_SOAK_BOOST_RATE_MAX = 0.25 # acceptance criterion: boosts/saves ratio ceiling over 7d + # Hook timeouts and budgets (seconds / chars) # # HOOK_REMOTE_TIMEOUT_SEC — matches Claude Code's ~/.claude/settings.json diff --git a/src/mnemon/store.py b/src/mnemon/store.py index 56240bb..863ad80 100644 --- a/src/mnemon/store.py +++ b/src/mnemon/store.py @@ -18,6 +18,11 @@ import numpy as np from .config import ( + CAPTURE_ATTENTION_BOOST, + CAPTURE_ATTENTION_ENABLED, + CAPTURE_ATTENTION_MIN_HITS, + CAPTURE_ATTENTION_REQUIRE_DISTINCT_SESSIONS, + CAPTURE_ATTENTION_THRESHOLD, HALF_LIVES, HOOK_SOURCE_CLIENTS, HOOK_SOURCE_CONFIDENCE_CEILING, @@ -31,6 +36,17 @@ from .vecstore import VecStore +class CaptureAttentionUnavailableError(RuntimeError): + """Raised when the capture-attention path can't complete its check. + + Surface conditions: embedder unavailable, vecstore IO failure, + schema-version mismatch on `recurrence_count`. Caller (typically a + best-effort hook) is expected to catch + log + continue without + the attention side effects. Fail-loud per the + [[feedback_no_silent_fails]] discipline — never silently swallow. + """ + + @dataclass class Document: id: int @@ -188,6 +204,29 @@ def _init_schema(self) -> None: """) self.db.commit() self._migrate_source_key() + self._migrate_recurrence_count() + + def _migrate_recurrence_count(self) -> None: + """Additive migration: ``documents.recurrence_count`` counts + cross-session restatements detected by capture attention Phase A. + + Incremented once per ``_apply_capture_attention`` trigger on the + canonical neighbor of a detected cluster. Pre-existing rows get + a count of 0 and recurrence detection starts forward from the + next save. The column is additive + harmless if + ``CAPTURE_ATTENTION_ENABLED`` stays off — backout = flip the + flag; column stays. + """ + cols = { + r["name"] + for r in self.db.execute("PRAGMA table_info(documents)").fetchall() + } + if "recurrence_count" not in cols: + self.db.execute( + "ALTER TABLE documents ADD COLUMN " + "recurrence_count INTEGER NOT NULL DEFAULT 0" + ) + self.db.commit() def _migrate_source_key(self) -> None: """Additive migration: ``documents.source_key`` is a stable @@ -225,6 +264,7 @@ def save( source_client: str | None = None, confidence: float | None = None, source_key: str | None = None, + correction_of: int | None = None, ) -> int: """Save a memory. Returns the document ID. @@ -237,6 +277,13 @@ def save( live row(s) and inserts a fresh one (supersession recorded via ``invalidated_by``). Without ``source_key`` the historical insert-only behaviour is preserved exactly. + + ``correction_of`` — when set — is an explicit operator gesture + that THIS memory corrects/supersedes a prior one (the Phase 2 + promotion signal from the salience-tier plan). Reserved here + for forward compatibility; today its only effect is to SKIP + the capture-attention path (operator gesture beats automated + recurrence detection). """ content_hash = _sha256(content) ct = ContentType(content_type) @@ -343,6 +390,31 @@ def save( (doc_id, title, content), ) self.db.commit() + + # Capture attention Phase A — preserve+relate+boost. Gated on + # the feature flag (default off through soak). Skipped when + # ``correction_of`` is set (operator gesture beats automated + # recurrence detection per the salience-tier plan composition). + # + # Failure is a NAMED swallow per + # [[feedback_no_silent_fails]] acceptable-category (b) — + # secondary observability hung off a primary path (the save + # itself) that survives independently. Mirrors the existing + # embed_document() WARN pattern in server.py:memory_save. + if CAPTURE_ATTENTION_ENABLED and correction_of is None: + try: + self.apply_capture_attention( + new_doc_id=doc_id, content=content, + source_client=source_client, + ) + except CaptureAttentionUnavailableError as exc: + import logging + logging.getLogger("mnemon.store").warning( + "save: capture-attention skipped for doc_id=%d (%s); " + "memory is saved but recurrence-boost was not applied", + doc_id, exc, + ) + return doc_id def get(self, doc_id: int) -> Document | None: @@ -556,6 +628,206 @@ def add_relation(self, source_id: int, target_id: int, relation_type: str, weigh ) self.db.commit() + # ── Capture attention Phase A ──────────────────────────────────── + # private/mnemon-capture-attention-plan-260522.md + # + # When a new memory's content is semantically close to ≥ + # CAPTURE_ATTENTION_MIN_HITS prior memories across distinct + # sessions, boost the canonical neighbor's confidence + insert + # 'restates' relations + increment its recurrence_count. The new + # memory itself is preserved unchanged (no information loss — SOTA + # preserve+relate+boost pattern). Operator-reviewed merge is + # Phase C; this layer only does the non-destructive auto-apply. + + def apply_capture_attention( + self, new_doc_id: int, content: str, source_client: str | None = None + ) -> dict[str, Any]: + """Run the capture-attention check on a freshly-saved document. + + Returns a dict describing the side effects: + ``{"fired": bool, "canonical_id": int | None, + "neighbors": [int, ...], "boost_applied": float}`` + + The new doc must already be in the documents table; it does + NOT need to be in the vec store yet (this method embeds the + content for its own neighbor query, and excludes the new doc + by id from the results). + + Raises CaptureAttentionUnavailableError if the embedder or + vecstore is unreachable. Callers in best-effort paths + (session_extractor, auto_mirror) must catch + log + continue. + """ + # Lazy import — keep store.py module-load cheap; FastEmbed's + # ONNX model only materializes on first embed() call anyway. + try: + from .embedder import embed + except ImportError as e: + raise CaptureAttentionUnavailableError( + f"capture attention skipped — embedder import: {e}" + ) from e + + try: + query_vec = embed(content) + vec_results = self.vec_store.search(query_vec, k=20) + except Exception as e: + raise CaptureAttentionUnavailableError( + f"capture attention skipped — embed/vecstore: {e}" + ) from e + + hits = self._resolve_neighbor_docs( + vec_results, + threshold=CAPTURE_ATTENTION_THRESHOLD, + exclude_doc_id=new_doc_id, + ) + + # Distinct-session gate defends against vault-crowding from a + # single long session that repeats itself. + if CAPTURE_ATTENTION_REQUIRE_DISTINCT_SESSIONS: + distinct_days = {h["created_at"][:10] for h in hits} + if len(distinct_days) < CAPTURE_ATTENTION_MIN_HITS: + return { + "fired": False, "canonical_id": None, + "neighbors": [], "boost_applied": 0.0, + "reason": "insufficient_distinct_sessions", + } + + if len(hits) < CAPTURE_ATTENTION_MIN_HITS: + return { + "fired": False, "canonical_id": None, + "neighbors": [], "boost_applied": 0.0, + "reason": "insufficient_neighbors", + } + + canonical = self._pick_canonical(hits) + + # Side effects (auto-apply, non-destructive) + for hit in hits: + self.add_relation( + source_id=new_doc_id, + target_id=hit["id"], + relation_type="restates", + weight=float(hit["similarity"]), + ) + boost_applied = self._boost_confidence(canonical["id"]) + self._increment_recurrence(canonical["id"]) + + return { + "fired": True, + "canonical_id": canonical["id"], + "neighbors": [h["id"] for h in hits], + "boost_applied": boost_applied, + } + + def _resolve_neighbor_docs( + self, + vec_results: list[dict], + *, + threshold: float, + exclude_doc_id: int, + ) -> list[dict[str, Any]]: + """Map vec_store search hits → live document rows above threshold. + + vec_results entries are ``{"id": "{hash}_{seq}", "similarity": float}``. + Multiple fragments from the same doc collapse to one row (keep the + highest-similarity hit). Excludes the just-saved doc by id + + anything invalidated. + """ + # Group by content_hash, keep max similarity per hash + best_by_hash: dict[str, float] = {} + for vr in vec_results: + if vr["similarity"] < threshold: + continue + content_hash = vr["id"].split("_")[0] + prev = best_by_hash.get(content_hash, 0.0) + if vr["similarity"] > prev: + best_by_hash[content_hash] = vr["similarity"] + + if not best_by_hash: + return [] + + # Resolve to live documents, exclude the just-saved one + hashes = list(best_by_hash.keys()) + qmarks = ",".join("?" * len(hashes)) + rows = self.db.execute( + f"""SELECT id, hash, confidence, pinned, created_at, source_client + FROM documents + WHERE hash IN ({qmarks}) + AND invalidated_at IS NULL + AND id != ?""", + (*hashes, exclude_doc_id), + ).fetchall() + + return [ + { + "id": r["id"], + "hash": r["hash"], + "confidence": r["confidence"], + "pinned": r["pinned"], + "created_at": r["created_at"], + "source_client": r["source_client"], + "similarity": best_by_hash[r["hash"]], + } + for r in rows + ] + + @staticmethod + def _pick_canonical(hits: list[dict[str, Any]]) -> dict[str, Any]: + """Select the canonical memory from a near-neighbor cluster. + + Order of preference: pinned (operator gesture) > highest + confidence > most recent created_at > lowest id (deterministic + tiebreak). Matches the contradiction.py canonical-selection + spirit + adds explicit pinned-first per the salience-tier + invariant that operator gestures beat automated signals. + """ + return max( + hits, + key=lambda h: ( + int(h["pinned"]), + float(h["confidence"]), + h["created_at"], + -int(h["id"]), # negate so lowest id wins on tie + ), + ) + + def _boost_confidence(self, doc_id: int) -> float: + """Increment a canonical's confidence by CAPTURE_ATTENTION_BOOST, + capped at HOOK_SOURCE_CONFIDENCE_CEILING for hook-sourced docs + (existing Layer 4 invariant), or 1.0 for user-authored. Returns + the actual delta applied (zero if already at ceiling). + """ + row = self.db.execute( + "SELECT confidence, source_client FROM documents WHERE id = ?", + (doc_id,), + ).fetchone() + if not row: + return 0.0 + ceiling = ( + HOOK_SOURCE_CONFIDENCE_CEILING + if row["source_client"] in HOOK_SOURCE_CLIENTS + else 1.0 + ) + new_conf = min(row["confidence"] + CAPTURE_ATTENTION_BOOST, ceiling) + delta = new_conf - row["confidence"] + if delta <= 0: + return 0.0 + self.db.execute( + "UPDATE documents SET confidence = ?, updated_at = datetime('now') " + "WHERE id = ?", + (new_conf, doc_id), + ) + self.db.commit() + return delta + + def _increment_recurrence(self, doc_id: int) -> None: + """Bump the canonical's recurrence_count by 1.""" + self.db.execute( + "UPDATE documents SET recurrence_count = recurrence_count + 1, " + "updated_at = datetime('now') WHERE id = ?", + (doc_id,), + ) + self.db.commit() + def status(self) -> dict[str, Any]: """Vault health stats.""" total = self.db.execute( diff --git a/tests/fixtures/capture_attention_pairs.json b/tests/fixtures/capture_attention_pairs.json new file mode 100644 index 0000000..af4fcf3 --- /dev/null +++ b/tests/fixtures/capture_attention_pairs.json @@ -0,0 +1,13 @@ +[ + { + "_comment": "Seed fixture for capture-attention threshold calibration. Replace with operator-tagged pairs from the real vault via `python scripts/calibrate_capture_threshold.py`. The schema is locked: each entry needs cosine + verdict, and the recommend() function tolerates 'unclear' verdicts.", + "id_a": 0, + "id_b": 0, + "title_a": "placeholder", + "title_b": "placeholder", + "snippet_a": "synthetic seed — replace with calibration output", + "snippet_b": "synthetic seed — replace with calibration output", + "cosine": 0.0, + "verdict": "unclear" + } +] diff --git a/tests/test_capture_attention.py b/tests/test_capture_attention.py new file mode 100644 index 0000000..78d9b61 --- /dev/null +++ b/tests/test_capture_attention.py @@ -0,0 +1,417 @@ +"""Tests for capture attention Phase A — preserve+relate+boost. + +Plan: ``private/mnemon-capture-attention-plan-260522.md``. + +Invariant under test: EVERY trigger preserves the new memory (no +information loss). The recurrence-detected branch adds 'restates' +relations + boosts the canonical's confidence + increments its +recurrence_count — but never skips the save. +""" + +from __future__ import annotations + +import datetime as dt +import json +import os +import tempfile +from pathlib import Path +from unittest.mock import patch + +import numpy as np +import pytest + +from mnemon import config +from mnemon.store import CaptureAttentionUnavailableError, Store + + +# ── Fixtures ────────────────────────────────────────────────────── + + +@pytest.fixture +def store(): + fd, path = tempfile.mkstemp(suffix=".sqlite") + os.close(fd) + os.unlink(path) + s = Store(db_path=path) + yield s + s.close() + for ext in ("", "-wal", "-shm"): + try: + os.unlink(path + ext) + except FileNotFoundError: + pass + + +@pytest.fixture +def attention_on(monkeypatch): + """Flip the feature flag on for the test scope.""" + monkeypatch.setattr(config, "CAPTURE_ATTENTION_ENABLED", True) + # store.py read the constant via module import — patch the + # in-module reference too + import mnemon.store + monkeypatch.setattr(mnemon.store, "CAPTURE_ATTENTION_ENABLED", True) + + +def _fake_embed_constant(_text: str) -> np.ndarray: + """Embedder stub that returns a fixed unit vector — every save + produces the same embedding so every pair has similarity ~1.0. + Used to force the recurrence path independent of real content.""" + v = np.ones(384, dtype=np.float32) + return v / np.linalg.norm(v) + + +def _fake_embed_orthogonal(text: str) -> np.ndarray: + """Embedder stub that returns a different unit vector per text. + Used to force the NO-recurrence path.""" + rng = np.random.default_rng(abs(hash(text)) % (2**32)) + v = rng.normal(size=384).astype(np.float32) + return v / np.linalg.norm(v) + + +def _index_with(store, doc_id: int, content: str, fake_embed): + """Index a document's content into the vec store using a stubbed + embedder. Mirrors what embed_document() does at runtime.""" + # The Store decodes vec_ids as ``{content_hash}_{seq}``; we use + # seq=0 to match the "full document" fragment convention. + import hashlib + content_hash = hashlib.sha256(content.encode()).hexdigest() + store.save_embedding(content_hash, 0, fake_embed(content)) + store.flush_vectors() + + +def _set_created_at(store, doc_id: int, days_ago: int) -> None: + """Backdate a document's created_at for distinct-sessions tests.""" + when = (dt.datetime.now() - dt.timedelta(days=days_ago)).isoformat(sep=" ") + store.db.execute( + "UPDATE documents SET created_at = ? WHERE id = ?", + (when, doc_id), + ) + store.db.commit() + + +# ── Schema migration ────────────────────────────────────────────── + + +class TestSchemaMigration: + def test_recurrence_count_column_exists(self, store): + cols = {r["name"] for r in store.db.execute("PRAGMA table_info(documents)").fetchall()} + assert "recurrence_count" in cols + + def test_recurrence_count_defaults_to_zero(self, store): + doc_id = store.save(title="x", content="y") + row = store.db.execute( + "SELECT recurrence_count FROM documents WHERE id = ?", (doc_id,) + ).fetchone() + assert row["recurrence_count"] == 0 + + def test_migration_idempotent_on_reopen(self, store): + # Close and re-open; migration runs again, no error + store.close() + s2 = Store(db_path=str(store.db_path) if hasattr(store, "db_path") else + store.db.execute("PRAGMA database_list").fetchone()["file"]) + cols = {r["name"] for r in s2.db.execute("PRAGMA table_info(documents)").fetchall()} + assert "recurrence_count" in cols + s2.close() + + +# ── Feature flag respected ──────────────────────────────────────── + + +class TestFeatureFlagDefaultOff: + def test_no_attention_when_flag_off(self, store): + """With the flag default-off, behavior matches the pre-PR save.""" + # Two near-identical saves — would trigger if attention were on + with patch("mnemon.embedder.embed", _fake_embed_constant): + id1 = store.save(title="A", content="runway is multi-year, plenty of cash") + _index_with(store, id1, "runway is multi-year, plenty of cash", _fake_embed_constant) + _set_created_at(store, id1, days_ago=5) + + id2 = store.save(title="B", content="cash runway extends years out") + _index_with(store, id2, "cash runway extends years out", _fake_embed_constant) + + # Both rows present, no relations, no recurrence increments + live = store.db.execute( + "SELECT id, recurrence_count FROM documents WHERE invalidated_at IS NULL" + ).fetchall() + assert len(live) == 2 + for row in live: + assert row["recurrence_count"] == 0 + + rels = store.db.execute("SELECT * FROM relations").fetchall() + assert len(rels) == 0 + + +# ── Preserve-everything invariant ───────────────────────────────── + + +class TestPreserveEverything: + def test_new_memory_always_saved_even_on_trigger(self, store, attention_on): + """The fundamental SOTA invariant: every restatement lands as + a row in documents, regardless of the attention trigger.""" + with patch("mnemon.embedder.embed", _fake_embed_constant): + id1 = store.save(title="A", content="content one") + _index_with(store, id1, "content one", _fake_embed_constant) + _set_created_at(store, id1, days_ago=4) + + id2 = store.save(title="B", content="content two") + _index_with(store, id2, "content two", _fake_embed_constant) + _set_created_at(store, id2, days_ago=2) + + # This save triggers the recurrence path + id3 = store.save(title="C", content="content three") + + # All three rows live in documents — no skip + live_ids = { + r["id"] for r in store.db.execute( + "SELECT id FROM documents WHERE invalidated_at IS NULL" + ).fetchall() + } + assert live_ids == {id1, id2, id3} + + +# ── Recurrence-detected: relate + boost + count ─────────────────── + + +class TestRecurrenceDetected: + def test_distinct_sessions_trigger_boost(self, store, attention_on): + with patch("mnemon.embedder.embed", _fake_embed_constant): + id1 = store.save(title="A", content="content one") + _index_with(store, id1, "content one", _fake_embed_constant) + _set_created_at(store, id1, days_ago=5) + + id2 = store.save(title="B", content="content two") + _index_with(store, id2, "content two", _fake_embed_constant) + _set_created_at(store, id2, days_ago=3) + + initial_conf = store.db.execute( + "SELECT confidence FROM documents WHERE id = ?", (id1,) + ).fetchone()["confidence"] + + id3 = store.save(title="C", content="content three") + + # Canonical (id1: oldest, highest conf among the two prior since + # they tie on confidence; tie broken by created_at DESC then -id) + # — actually since both id1 and id2 have the same default + # confidence, the most-recent created_at wins → id2 is canonical. + canonical_id = id2 + + # Confidence bumped on canonical + new_conf = store.db.execute( + "SELECT confidence, recurrence_count FROM documents WHERE id = ?", + (canonical_id,), + ).fetchone() + assert new_conf["recurrence_count"] == 1 + assert new_conf["confidence"] > initial_conf + + # 'restates' relations from new doc → each prior neighbor + rels = store.db.execute( + "SELECT target_id, relation_type FROM relations WHERE source_id = ?", + (id3,), + ).fetchall() + assert len(rels) == 2 + assert all(r["relation_type"] == "restates" for r in rels) + assert {r["target_id"] for r in rels} == {id1, id2} + + def test_same_session_no_trigger(self, store, attention_on): + """All neighbors created same day → distinct-sessions gate + suppresses the trigger.""" + with patch("mnemon.embedder.embed", _fake_embed_constant): + id1 = store.save(title="A", content="content one") + _index_with(store, id1, "content one", _fake_embed_constant) + id2 = store.save(title="B", content="content two") + _index_with(store, id2, "content two", _fake_embed_constant) + id3 = store.save(title="C", content="content three") + id4 = store.save(title="D", content="content four") + + # No relations, no recurrence increments — all same day + rels = store.db.execute("SELECT * FROM relations").fetchall() + assert len(rels) == 0 + counts = store.db.execute( + "SELECT SUM(recurrence_count) AS s FROM documents" + ).fetchone() + assert counts["s"] == 0 + + +# ── Threshold respected ─────────────────────────────────────────── + + +class TestThresholdRespected: + def test_below_threshold_no_trigger(self, store, attention_on): + """Orthogonal embeddings (similarity ~0) never trigger.""" + with patch("mnemon.embedder.embed", _fake_embed_orthogonal): + id1 = store.save(title="A", content="unrelated thing one") + _index_with(store, id1, "unrelated thing one", _fake_embed_orthogonal) + _set_created_at(store, id1, days_ago=5) + + id2 = store.save(title="B", content="totally different topic two") + _index_with(store, id2, "totally different topic two", _fake_embed_orthogonal) + _set_created_at(store, id2, days_ago=3) + + id3 = store.save(title="C", content="yet another distinct subject") + + rels = store.db.execute("SELECT * FROM relations").fetchall() + assert len(rels) == 0 + + +# ── Hook-source ceiling ─────────────────────────────────────────── + + +class TestHookCeiling: + def test_hook_canonical_capped_at_hook_ceiling(self, store, attention_on): + """A canonical with source_client='claude-code-hook' cannot be + boosted past HOOK_SOURCE_CONFIDENCE_CEILING (0.5).""" + from mnemon.config import HOOK_SOURCE_CONFIDENCE_CEILING + + with patch("mnemon.embedder.embed", _fake_embed_constant): + # Two hook-sourced priors at the ceiling + id1 = store.save( + title="A", content="content one", + source_client="claude-code-hook", + ) + _index_with(store, id1, "content one", _fake_embed_constant) + _set_created_at(store, id1, days_ago=5) + + id2 = store.save( + title="B", content="content two", + source_client="claude-code-hook", + ) + _index_with(store, id2, "content two", _fake_embed_constant) + _set_created_at(store, id2, days_ago=3) + + # Many triggers shouldn't push past the ceiling + for i in range(10): + store.save(title=f"X{i}", content=f"content trigger {i}") + + # Canonical's confidence should be == ceiling, not over it + for doc_id in (id1, id2): + row = store.db.execute( + "SELECT confidence FROM documents WHERE id = ?", (doc_id,) + ).fetchone() + assert row["confidence"] <= HOOK_SOURCE_CONFIDENCE_CEILING + 1e-6, \ + f"doc {doc_id} confidence {row['confidence']} exceeded hook ceiling" + + +class TestUserUncapped: + def test_user_canonical_can_exceed_hook_ceiling(self, store, attention_on): + """User-authored canonical (source_client=None) can be boosted + past HOOK_SOURCE_CONFIDENCE_CEILING up to 1.0.""" + from mnemon.config import HOOK_SOURCE_CONFIDENCE_CEILING + + with patch("mnemon.embedder.embed", _fake_embed_constant): + id1 = store.save(title="A", content="content one") # source_client=None + _index_with(store, id1, "content one", _fake_embed_constant) + _set_created_at(store, id1, days_ago=5) + + id2 = store.save(title="B", content="content two") + _index_with(store, id2, "content two", _fake_embed_constant) + _set_created_at(store, id2, days_ago=3) + + # Enough triggers to push past 0.5 + for i in range(20): + store.save(title=f"X{i}", content=f"content trigger {i}") + + # At least one of the priors should be above hook ceiling + max_conf = store.db.execute( + "SELECT MAX(confidence) AS m FROM documents WHERE id IN (?, ?)", + (id1, id2), + ).fetchone()["m"] + assert max_conf > HOOK_SOURCE_CONFIDENCE_CEILING + + +# ── Canonical selection ─────────────────────────────────────────── + + +class TestCanonicalSelection: + def test_pinned_beats_high_confidence_unpinned(self, store, attention_on): + """Pinned (operator gesture) wins over higher unpinned confidence.""" + with patch("mnemon.embedder.embed", _fake_embed_constant): + id_pinned = store.save(title="A", content="content one") + _index_with(store, id_pinned, "content one", _fake_embed_constant) + _set_created_at(store, id_pinned, days_ago=10) + store.pin(id_pinned) # boosts confidence + sets pinned=1 + + id_unpinned = store.save(title="B", content="content two") + _index_with(store, id_unpinned, "content two", _fake_embed_constant) + _set_created_at(store, id_unpinned, days_ago=3) + # Manually inflate unpinned confidence above the pinned one + store.db.execute( + "UPDATE documents SET confidence = 0.99 WHERE id = ?", + (id_unpinned,), + ) + store.db.commit() + + # Trigger + store.save(title="C", content="content three") + + # Canonical should be id_pinned (recurrence_count=1) not id_unpinned + pinned_row = store.db.execute( + "SELECT recurrence_count FROM documents WHERE id = ?", (id_pinned,) + ).fetchone() + unpinned_row = store.db.execute( + "SELECT recurrence_count FROM documents WHERE id = ?", (id_unpinned,) + ).fetchone() + assert pinned_row["recurrence_count"] == 1 + assert unpinned_row["recurrence_count"] == 0 + + +# ── correction_of override ──────────────────────────────────────── + + +class TestCorrectionOfOverride: + def test_correction_of_skips_attention(self, store, attention_on): + """When correction_of is set, capture attention is skipped — + operator gesture beats automated recurrence detection.""" + with patch("mnemon.embedder.embed", _fake_embed_constant): + id1 = store.save(title="A", content="content one") + _index_with(store, id1, "content one", _fake_embed_constant) + _set_created_at(store, id1, days_ago=5) + + id2 = store.save(title="B", content="content two") + _index_with(store, id2, "content two", _fake_embed_constant) + _set_created_at(store, id2, days_ago=3) + + # correction_of set → skip attention even though trigger + # conditions are met + store.save( + title="C", content="content three", + correction_of=id1, + ) + + rels = store.db.execute( + "SELECT * FROM relations WHERE relation_type = 'restates'" + ).fetchall() + assert len(rels) == 0 + counts = store.db.execute( + "SELECT SUM(recurrence_count) AS s FROM documents" + ).fetchone() + assert counts["s"] == 0 + + +# ── Fail-loud on embedder unavailability ────────────────────────── + + +class TestFailLoud: + def test_embedder_failure_raises_named_error(self, store, attention_on): + """Embedder unavailable → apply_capture_attention raises a NAMED + exception (per feedback_no_silent_fails). save() catches and + WARNs (acceptable swallow: secondary observability).""" + id1 = store.save(title="A", content="x") + _set_created_at(store, id1, days_ago=5) + + def boom(_text): + raise RuntimeError("embedder offline") + + with patch("mnemon.embedder.embed", boom): + # Direct call surfaces the named error + with pytest.raises(CaptureAttentionUnavailableError) as exc_info: + store.apply_capture_attention(new_doc_id=id1, content="x") + assert "embedder offline" in str(exc_info.value) or \ + "embed" in str(exc_info.value).lower() + + # save() path catches + WARNs; the new memory is still saved + id2 = store.save(title="B", content="y") + assert id2 > 0 + row = store.db.execute( + "SELECT id FROM documents WHERE id = ?", (id2,) + ).fetchone() + assert row is not None