diff --git a/context-guard-kit/benchmark_runner.py b/context-guard-kit/benchmark_runner.py index d8f0e6d..3f1dd69 100755 --- a/context-guard-kit/benchmark_runner.py +++ b/context-guard-kit/benchmark_runner.py @@ -1429,10 +1429,84 @@ def run_fixture(task: TaskFixture, variant: Variant, claude_bin: str, ) -def append_csv(csv_path: Path, claude_ver: str, result: RunResult, *, skip_existing: bool = False) -> bool: +def csv_file_stamp_unlocked(csv_path: Path) -> tuple[int, int, int, int] | None: + try: + fd = _open_regular_no_symlink(csv_path) + except FileNotFoundError: + return None + try: + st = os.fstat(fd) + return (int(st.st_dev), int(st.st_ino), int(st.st_size), int(st.st_mtime_ns)) + finally: + os.close(fd) + + +def refresh_existing_key_cache_unlocked( + csv_path: Path, + existing_key_cache: set[tuple[str, str]], + existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None, +) -> None: + current_stamp = csv_file_stamp_unlocked(csv_path) + if existing_key_cache_stamp is not None and existing_key_cache_stamp.get("stamp") == current_stamp: + return + refreshed = _read_existing_keys_unlocked(csv_path) + existing_key_cache.clear() + existing_key_cache.update(refreshed) + if existing_key_cache_stamp is not None: + existing_key_cache_stamp["stamp"] = current_stamp + + +def resume_key_present( + csv_path: Path, + key: tuple[str, str], + existing_key_cache: set[tuple[str, str]], + existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None, +) -> bool: + if not _csv_exists_no_follow(csv_path): + existing_key_cache.clear() + if existing_key_cache_stamp is not None: + existing_key_cache_stamp["stamp"] = None + return False + with csv_file_lock(csv_path, create_parent=False): + refresh_existing_key_cache_unlocked(csv_path, existing_key_cache, existing_key_cache_stamp) + return key in existing_key_cache + + +def resume_runnable_targets( + csv_path: Path, + targets: list[tuple[TaskFixture, Variant]], + *, + resume: bool, + existing_key_cache: set[tuple[str, str]], + existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None, +) -> list[tuple[TaskFixture, Variant]]: + if not resume: + return list(targets) + return [ + (task, variant) + for task, variant in targets + if not resume_key_present(csv_path, (task.id, variant.name), existing_key_cache, existing_key_cache_stamp) + ] + + +def append_csv( + csv_path: Path, + claude_ver: str, + result: RunResult, + *, + skip_existing: bool = False, + existing_key_cache: set[tuple[str, str]] | None = None, + existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None = None, +) -> bool: with csv_file_lock(csv_path, create_parent=True): - if skip_existing and (result.task_id, result.variant) in _read_existing_keys_unlocked(csv_path): - return False + key = (result.task_id, result.variant) + if skip_existing: + if existing_key_cache is not None: + refresh_existing_key_cache_unlocked(csv_path, existing_key_cache, existing_key_cache_stamp) + if key in existing_key_cache: + return False + elif key in _read_existing_keys_unlocked(csv_path): + return False flags = os.O_CREAT | os.O_APPEND | os.O_WRONLY fd = _open_regular_no_symlink(csv_path, flags, 0o600, create_parent=True) try: @@ -1486,6 +1560,10 @@ def append_csv(csv_path: Path, claude_ver: str, result: RunResult, *, skip_exist finally: if fd != -1: os.close(fd) + if existing_key_cache is not None: + existing_key_cache.add(key) + if existing_key_cache_stamp is not None: + existing_key_cache_stamp["stamp"] = csv_file_stamp_unlocked(csv_path) return True @@ -1644,10 +1722,16 @@ def _csv_exists_no_follow(csv_path: Path) -> bool: def existing_keys(csv_path: Path) -> set[tuple[str, str]]: """이미 적재된 (task_id, variant) 조합. resume 시 skip 판정에 사용.""" + keys, _stamp = existing_keys_snapshot(csv_path) + return keys + + +def existing_keys_snapshot(csv_path: Path) -> tuple[set[tuple[str, str]], tuple[int, int, int, int] | None]: + """Loaded resume keys plus the CSV stamp observed under the same lock.""" if not _csv_exists_no_follow(csv_path): - return set() + return set(), None with csv_file_lock(csv_path, create_parent=False): - return _read_existing_keys_unlocked(csv_path) + return _read_existing_keys_unlocked(csv_path), csv_file_stamp_unlocked(csv_path) def read_csv_rows(csv_path: Path) -> list[dict[str, str]]: @@ -3785,16 +3869,23 @@ def main() -> int: print("no (task, variant) targets matched the filters", file=sys.stderr) return 1 - skip_keys = existing_keys(args.csv) if args.resume else set() - runnable_targets = [ - (task, variant) - for task, variant in targets - if (task.id, variant.name) not in skip_keys - ] + if args.resume: + skip_keys, skip_keys_loaded_stamp = existing_keys_snapshot(args.csv) + skip_keys_stamp = {"stamp": skip_keys_loaded_stamp} + else: + skip_keys = set() + skip_keys_stamp = None + runnable_targets = resume_runnable_targets( + args.csv, + targets, + resume=args.resume, + existing_key_cache=skip_keys, + existing_key_cache_stamp=skip_keys_stamp, + ) if args.evidence_jsonl is not None: if args.dry_run: for task, variant in targets: - if (task.id, variant.name) in skip_keys: + if args.resume and resume_key_present(args.csv, (task.id, variant.name), skip_keys, skip_keys_stamp): print(f"skip {task.id}/{variant.name} (already in {args.csv})") continue print(f"evidence replay dry-run: {task.id}/{variant.name} <- {args.evidence_jsonl}") @@ -3802,18 +3893,33 @@ def main() -> int: return 0 csv_had_preexisting_content = file_has_content_no_follow(args.csv) evidence_rows = read_evidence_jsonl(args.evidence_jsonl) + runnable_targets = resume_runnable_targets( + args.csv, + targets, + resume=args.resume, + existing_key_cache=skip_keys, + existing_key_cache_stamp=skip_keys_stamp, + ) evidence_by_key = validate_evidence_coverage(evidence_rows, runnable_targets) + runnable_keys = {(task.id, variant.name) for task, variant in runnable_targets} claude_ver = "evidence-replay" completed = 0 replay_rows_written: list[EvidenceReplayRow] = [] for task, variant in targets: - if (task.id, variant.name) in skip_keys: + if args.resume and (task.id, variant.name) not in runnable_keys: print(f"skip {task.id}/{variant.name} (already in {args.csv})") continue evidence = evidence_by_key[(task.id, variant.name)] print(f"replay {task.id}/{variant.name} ...", flush=True) result = run_evidence_fixture(task, variant, evidence) - wrote = append_csv(args.csv, claude_ver, result, skip_existing=args.resume) + wrote = append_csv( + args.csv, + claude_ver, + result, + skip_existing=args.resume, + existing_key_cache=skip_keys if args.resume else None, + existing_key_cache_stamp=skip_keys_stamp, + ) if wrote: replay_rows_written.append(evidence) if args.ledger_jsonl is not None: @@ -3846,6 +3952,13 @@ def main() -> int: print(f"completed {completed} run(s); results in {args.csv}") return 0 + runnable_targets = resume_runnable_targets( + args.csv, + targets, + resume=args.resume, + existing_key_cache=skip_keys, + existing_key_cache_stamp=skip_keys_stamp, + ) placeholder_targets = [ f"{task.id}/{variant.name}" for task, variant in runnable_targets @@ -3873,7 +3986,7 @@ def main() -> int: completed = 0 for task, variant in targets: - if (task.id, variant.name) in skip_keys: + if args.resume and resume_key_present(args.csv, (task.id, variant.name), skip_keys, skip_keys_stamp): print(f"skip {task.id}/{variant.name} (already in {args.csv})") continue print(f"run {task.id}/{variant.name} ...", flush=True) @@ -3882,7 +3995,14 @@ def main() -> int: # 깎고, (b) --resume 이 그 (task, variant) 를 skip 해 실제 측정값이 영구 누락된다. wrote = True if not args.dry_run: - wrote = append_csv(args.csv, claude_ver, result, skip_existing=args.resume) + wrote = append_csv( + args.csv, + claude_ver, + result, + skip_existing=args.resume, + existing_key_cache=skip_keys if args.resume else None, + existing_key_cache_stamp=skip_keys_stamp, + ) if wrote and args.ledger_jsonl is not None: append_cost_shift_ledger(args.ledger_jsonl, claude_ver, result) completed += 1 diff --git a/context-guard-kit/cache_score.py b/context-guard-kit/cache_score.py index 2040571..8003fd0 100755 --- a/context-guard-kit/cache_score.py +++ b/context-guard-kit/cache_score.py @@ -63,6 +63,7 @@ MAX_JSON_WALK_NODES = 10_000 MAX_JSON_WALK_DEPTH = 64 MAX_JSON_SHAPE_WARNINGS = 200 +MAX_JSON_CANONICAL_COMPARE_BYTES = 200_000 SAFE_JSON_PATH_SEGMENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_-]{0,63}$") DYNAMIC_JSON_KEY_RE = re.compile(r"(?i)(request|trace|nonce|random|timestamp|created[_-]?at|updated[_-]?at|date)") SENSITIVE_JSON_KEY_RE = re.compile( @@ -93,6 +94,22 @@ def json_bytes(data: Any, *, indent: int | None = None) -> str: return json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":") if indent is None else None, indent=indent) +def bounded_canonical_json(data: Any, *, max_bytes: int) -> str | None: + encoder = json.JSONEncoder(ensure_ascii=False, sort_keys=True, indent=2) + chunks: list[str] = [] + size = 0 + for chunk in encoder.iterencode(data): + size += byte_len_text(chunk) + if size > max_bytes: + return None + chunks.append(chunk) + size += 1 + if size > max_bytes: + return None + chunks.append("\n") + return "".join(chunks) + + def json_path_child(path: str, key: object) -> str: """Return a JSON warning path segment without echoing sensitive/dynamic keys.""" text = str(key) @@ -335,8 +352,18 @@ def json_shape_warnings(text: str) -> tuple[str, list[dict[str, Any]]]: if not isinstance(data, (dict, list)): return "json-scalar", [] warnings = _walk_json(data) - canonical = json_bytes(data, indent=2) + "\n" - if canonical != text: + input_bytes = byte_len_text(text) + canonical = bounded_canonical_json(data, max_bytes=MAX_JSON_CANONICAL_COMPARE_BYTES) + if canonical is None: + warnings.append({ + "code": "json_canonical_check_skipped", + "path": "$", + "severity": "info", + "message": "JSON input is parseable but canonical formatting would exceed the comparison byte cap.", + "input_bytes": input_bytes, + "max_bytes": MAX_JSON_CANONICAL_COMPARE_BYTES, + }) + elif canonical != text: warnings.append({ "code": "json_not_canonical", "path": "$", diff --git a/context-guard-kit/claude_transcript_cost_audit.py b/context-guard-kit/claude_transcript_cost_audit.py index aeecaa5..4689056 100755 --- a/context-guard-kit/claude_transcript_cost_audit.py +++ b/context-guard-kit/claude_transcript_cost_audit.py @@ -145,14 +145,14 @@ class PromptCacheAudit: def observe(self, root: Any) -> None: self.sampled_records += 1 + if len(self.samples) >= PROMPT_AUDIT_MAX_RECORDS: + self.capped_records += 1 + return segments, bytes_sampled, redactions, collection_capped = prompt_segments_for_record(root) if collection_capped: self.prompt_collection_capped_records += 1 if not segments: return - if len(self.samples) >= PROMPT_AUDIT_MAX_RECORDS: - self.capped_records += 1 - return self.analyzed_prompt_records += 1 self.total_segments += len(segments) self.total_bytes_sampled += bytes_sampled diff --git a/context-guard-kit/context_pack.py b/context-guard-kit/context_pack.py index 77e9bf1..e7a4068 100755 --- a/context-guard-kit/context_pack.py +++ b/context-guard-kit/context_pack.py @@ -957,6 +957,29 @@ def metadata_size(data: dict[str, Any]) -> int: return len(json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True).encode("utf-8", errors="replace")) + 1 +def receipt_working_copy(data: dict[str, Any]) -> tuple[dict[str, Any], bool]: + """Copy receipt metadata without deep-copying or serializing an oversized pack body. + + The pack body is already an immutable string in normal builds and stdout remains + authoritative for it. When it cannot possibly fit under the receipt cap by + itself, omit it before the first receipt-size probe so capping work only touches + metadata previews. + """ + receipt: dict[str, Any] = {} + pack_omitted = False + for key, value in data.items(): + if key == "pack" and isinstance(value, str): + if len(value.encode("utf-8", errors="replace")) > MAX_RECEIPT_BYTES: + pack_omitted = True + continue + receipt[key] = value + continue + receipt[key] = copy.deepcopy(value) + if pack_omitted: + receipt["pack_omitted_from_receipt"] = True + return receipt, pack_omitted + + def artifact_failure(error: str, *, bytes_count: int = 0, capped: bool = False) -> dict[str, Any]: return { "stored": False, @@ -1113,8 +1136,11 @@ def finalize_receipt_size(receipt: dict[str, Any]) -> int: def shrink_receipt_for_write(data: dict[str, Any]) -> tuple[dict[str, Any], bool]: - receipt = copy.deepcopy(data) - capped = False + receipt, pack_omitted = receipt_working_copy(data) + capped = pack_omitted + if pack_omitted: + receipt.setdefault("artifact", {})["capped"] = True + receipt.setdefault("artifact", {})["cap_bytes"] = MAX_RECEIPT_BYTES if metadata_size(receipt) <= MAX_RECEIPT_BYTES: return receipt, capped capped = True diff --git a/context-guard-kit/tool_schema_pruner.py b/context-guard-kit/tool_schema_pruner.py index 6da5cea..16355a3 100755 --- a/context-guard-kit/tool_schema_pruner.py +++ b/context-guard-kit/tool_schema_pruner.py @@ -87,6 +87,8 @@ class Candidate: index: int score: float = 0.0 rank: int = 0 + schema_bytes: int = 0 + parameter_terms: frozenset[str] | None = None def fail(message: str) -> NoReturn: @@ -276,7 +278,15 @@ def tool_schema_from_dict(raw: dict[str, Any], *, fallback_name: str | None = No schema["description"] = description if server and "server" not in schema: schema["server"] = server - return Candidate(name=name, server=cap_text(server, MAX_LABEL_CHARS) if server else None, description=description, schema=schema, index=index) + return Candidate( + name=name, + server=cap_text(server, MAX_LABEL_CHARS) if server else None, + description=description, + schema=schema, + index=index, + schema_bytes=byte_len_json(schema), + parameter_terms=frozenset(terms(" ".join(collect_parameter_text(schema)))), + ) def normalize_catalog(raw: Any) -> list[Candidate]: @@ -362,7 +372,11 @@ def score_candidate(candidate: Candidate, query_terms: set[str]) -> float: return 0.0 name_terms = terms(candidate.name) desc_terms = terms(candidate.description) - parameter_terms = terms(" ".join(collect_parameter_text(candidate.schema))) + parameter_terms = ( + set(candidate.parameter_terms) + if candidate.parameter_terms is not None + else terms(" ".join(collect_parameter_text(candidate.schema))) + ) score = 0.0 score += 4.0 * len(query_terms & name_terms) score += 1.5 * len(query_terms & desc_terms) @@ -379,14 +393,38 @@ def rank_candidates(candidates: list[Candidate], query: str) -> list[Candidate]: query_terms = terms(query) scored: list[Candidate] = [] for cand in candidates: - scored.append(Candidate(cand.name, cand.server, cand.description, cand.schema, cand.index, score_candidate(cand, query_terms), 0)) + scored.append(Candidate( + cand.name, + cand.server, + cand.description, + cand.schema, + cand.index, + score_candidate(cand, query_terms), + 0, + schema_bytes=cand.schema_bytes, + parameter_terms=cand.parameter_terms, + )) scored.sort(key=lambda item: (-item.score, item.index)) ranked: list[Candidate] = [] for rank, cand in enumerate(scored, start=1): - ranked.append(Candidate(cand.name, cand.server, cand.description, cand.schema, cand.index, cand.score, rank)) + ranked.append(Candidate( + cand.name, + cand.server, + cand.description, + cand.schema, + cand.index, + cand.score, + rank, + schema_bytes=cand.schema_bytes, + parameter_terms=cand.parameter_terms, + )) return ranked +def candidate_schema_bytes(cand: Candidate) -> int: + return cand.schema_bytes if cand.schema_bytes > 0 else byte_len_json(cand.schema) + + def normalized_link_target(parent: Path, raw_target: str) -> Path: target = Path(raw_target) if not target.is_absolute(): @@ -707,7 +745,7 @@ def build_payload(receipt_id: str, ranked: list[Candidate], query: str, redactio "description": cand.description, "score": cand.score, "rank": cand.rank, - "schema_bytes": byte_len_json(cand.schema), + "schema_bytes": candidate_schema_bytes(cand), "schema": cand.schema, } for cand in ranked @@ -739,7 +777,7 @@ def retrieval_command(receipt_id: str, *, store_dir: str, tool_name: str | None def selected_tool_record(cand: Candidate, receipt_id: str, budget_left: int, *, store_dir: str) -> tuple[dict[str, Any], int]: - schema_size = byte_len_json(cand.schema) + schema_size = candidate_schema_bytes(cand) record: dict[str, Any] = { "name": cand.name, "server": cand.server, @@ -765,7 +803,7 @@ def deferred_tool_record(cand: Candidate, receipt_id: str, *, store_dir: str) -> "score": cand.score, "rank": cand.rank, "description": cand.description, - "schema_bytes": byte_len_json(cand.schema), + "schema_bytes": candidate_schema_bytes(cand), "reason": "deferred_after_core_top", "retrieval": retrieval_command(receipt_id, store_dir=store_dir, tool_name=cand.name), } @@ -1008,9 +1046,9 @@ def defer_report(args: argparse.Namespace) -> str: store_dir=args.store_dir, namespace_top=namespace_top, ) - all_schema_bytes = sum(byte_len_json(cand.schema) for cand in ranked) - listed_deferred_schema_bytes = sum(byte_len_json(cand.schema) for cand in deferred_candidates) - total_deferred_schema_bytes = sum(byte_len_json(cand.schema) for cand in ranked[core_top:]) + all_schema_bytes = sum(candidate_schema_bytes(cand) for cand in ranked) + listed_deferred_schema_bytes = sum(candidate_schema_bytes(cand) for cand in deferred_candidates) + total_deferred_schema_bytes = sum(candidate_schema_bytes(cand) for cand in ranked[core_top:]) tool_stub_report_bytes = byte_len_json(core_tools) + byte_len_json(deferred_tools) all_schema_tokens = proxy_tokens(all_schema_bytes) inline_core_schema_tokens = proxy_tokens(core_schema_bytes) diff --git a/plugins/context-guard/bin/context-guard-audit b/plugins/context-guard/bin/context-guard-audit index aeecaa5..4689056 100755 --- a/plugins/context-guard/bin/context-guard-audit +++ b/plugins/context-guard/bin/context-guard-audit @@ -145,14 +145,14 @@ class PromptCacheAudit: def observe(self, root: Any) -> None: self.sampled_records += 1 + if len(self.samples) >= PROMPT_AUDIT_MAX_RECORDS: + self.capped_records += 1 + return segments, bytes_sampled, redactions, collection_capped = prompt_segments_for_record(root) if collection_capped: self.prompt_collection_capped_records += 1 if not segments: return - if len(self.samples) >= PROMPT_AUDIT_MAX_RECORDS: - self.capped_records += 1 - return self.analyzed_prompt_records += 1 self.total_segments += len(segments) self.total_bytes_sampled += bytes_sampled diff --git a/plugins/context-guard/bin/context-guard-bench b/plugins/context-guard/bin/context-guard-bench index d8f0e6d..3f1dd69 100755 --- a/plugins/context-guard/bin/context-guard-bench +++ b/plugins/context-guard/bin/context-guard-bench @@ -1429,10 +1429,84 @@ def run_fixture(task: TaskFixture, variant: Variant, claude_bin: str, ) -def append_csv(csv_path: Path, claude_ver: str, result: RunResult, *, skip_existing: bool = False) -> bool: +def csv_file_stamp_unlocked(csv_path: Path) -> tuple[int, int, int, int] | None: + try: + fd = _open_regular_no_symlink(csv_path) + except FileNotFoundError: + return None + try: + st = os.fstat(fd) + return (int(st.st_dev), int(st.st_ino), int(st.st_size), int(st.st_mtime_ns)) + finally: + os.close(fd) + + +def refresh_existing_key_cache_unlocked( + csv_path: Path, + existing_key_cache: set[tuple[str, str]], + existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None, +) -> None: + current_stamp = csv_file_stamp_unlocked(csv_path) + if existing_key_cache_stamp is not None and existing_key_cache_stamp.get("stamp") == current_stamp: + return + refreshed = _read_existing_keys_unlocked(csv_path) + existing_key_cache.clear() + existing_key_cache.update(refreshed) + if existing_key_cache_stamp is not None: + existing_key_cache_stamp["stamp"] = current_stamp + + +def resume_key_present( + csv_path: Path, + key: tuple[str, str], + existing_key_cache: set[tuple[str, str]], + existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None, +) -> bool: + if not _csv_exists_no_follow(csv_path): + existing_key_cache.clear() + if existing_key_cache_stamp is not None: + existing_key_cache_stamp["stamp"] = None + return False + with csv_file_lock(csv_path, create_parent=False): + refresh_existing_key_cache_unlocked(csv_path, existing_key_cache, existing_key_cache_stamp) + return key in existing_key_cache + + +def resume_runnable_targets( + csv_path: Path, + targets: list[tuple[TaskFixture, Variant]], + *, + resume: bool, + existing_key_cache: set[tuple[str, str]], + existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None, +) -> list[tuple[TaskFixture, Variant]]: + if not resume: + return list(targets) + return [ + (task, variant) + for task, variant in targets + if not resume_key_present(csv_path, (task.id, variant.name), existing_key_cache, existing_key_cache_stamp) + ] + + +def append_csv( + csv_path: Path, + claude_ver: str, + result: RunResult, + *, + skip_existing: bool = False, + existing_key_cache: set[tuple[str, str]] | None = None, + existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None = None, +) -> bool: with csv_file_lock(csv_path, create_parent=True): - if skip_existing and (result.task_id, result.variant) in _read_existing_keys_unlocked(csv_path): - return False + key = (result.task_id, result.variant) + if skip_existing: + if existing_key_cache is not None: + refresh_existing_key_cache_unlocked(csv_path, existing_key_cache, existing_key_cache_stamp) + if key in existing_key_cache: + return False + elif key in _read_existing_keys_unlocked(csv_path): + return False flags = os.O_CREAT | os.O_APPEND | os.O_WRONLY fd = _open_regular_no_symlink(csv_path, flags, 0o600, create_parent=True) try: @@ -1486,6 +1560,10 @@ def append_csv(csv_path: Path, claude_ver: str, result: RunResult, *, skip_exist finally: if fd != -1: os.close(fd) + if existing_key_cache is not None: + existing_key_cache.add(key) + if existing_key_cache_stamp is not None: + existing_key_cache_stamp["stamp"] = csv_file_stamp_unlocked(csv_path) return True @@ -1644,10 +1722,16 @@ def _csv_exists_no_follow(csv_path: Path) -> bool: def existing_keys(csv_path: Path) -> set[tuple[str, str]]: """이미 적재된 (task_id, variant) 조합. resume 시 skip 판정에 사용.""" + keys, _stamp = existing_keys_snapshot(csv_path) + return keys + + +def existing_keys_snapshot(csv_path: Path) -> tuple[set[tuple[str, str]], tuple[int, int, int, int] | None]: + """Loaded resume keys plus the CSV stamp observed under the same lock.""" if not _csv_exists_no_follow(csv_path): - return set() + return set(), None with csv_file_lock(csv_path, create_parent=False): - return _read_existing_keys_unlocked(csv_path) + return _read_existing_keys_unlocked(csv_path), csv_file_stamp_unlocked(csv_path) def read_csv_rows(csv_path: Path) -> list[dict[str, str]]: @@ -3785,16 +3869,23 @@ def main() -> int: print("no (task, variant) targets matched the filters", file=sys.stderr) return 1 - skip_keys = existing_keys(args.csv) if args.resume else set() - runnable_targets = [ - (task, variant) - for task, variant in targets - if (task.id, variant.name) not in skip_keys - ] + if args.resume: + skip_keys, skip_keys_loaded_stamp = existing_keys_snapshot(args.csv) + skip_keys_stamp = {"stamp": skip_keys_loaded_stamp} + else: + skip_keys = set() + skip_keys_stamp = None + runnable_targets = resume_runnable_targets( + args.csv, + targets, + resume=args.resume, + existing_key_cache=skip_keys, + existing_key_cache_stamp=skip_keys_stamp, + ) if args.evidence_jsonl is not None: if args.dry_run: for task, variant in targets: - if (task.id, variant.name) in skip_keys: + if args.resume and resume_key_present(args.csv, (task.id, variant.name), skip_keys, skip_keys_stamp): print(f"skip {task.id}/{variant.name} (already in {args.csv})") continue print(f"evidence replay dry-run: {task.id}/{variant.name} <- {args.evidence_jsonl}") @@ -3802,18 +3893,33 @@ def main() -> int: return 0 csv_had_preexisting_content = file_has_content_no_follow(args.csv) evidence_rows = read_evidence_jsonl(args.evidence_jsonl) + runnable_targets = resume_runnable_targets( + args.csv, + targets, + resume=args.resume, + existing_key_cache=skip_keys, + existing_key_cache_stamp=skip_keys_stamp, + ) evidence_by_key = validate_evidence_coverage(evidence_rows, runnable_targets) + runnable_keys = {(task.id, variant.name) for task, variant in runnable_targets} claude_ver = "evidence-replay" completed = 0 replay_rows_written: list[EvidenceReplayRow] = [] for task, variant in targets: - if (task.id, variant.name) in skip_keys: + if args.resume and (task.id, variant.name) not in runnable_keys: print(f"skip {task.id}/{variant.name} (already in {args.csv})") continue evidence = evidence_by_key[(task.id, variant.name)] print(f"replay {task.id}/{variant.name} ...", flush=True) result = run_evidence_fixture(task, variant, evidence) - wrote = append_csv(args.csv, claude_ver, result, skip_existing=args.resume) + wrote = append_csv( + args.csv, + claude_ver, + result, + skip_existing=args.resume, + existing_key_cache=skip_keys if args.resume else None, + existing_key_cache_stamp=skip_keys_stamp, + ) if wrote: replay_rows_written.append(evidence) if args.ledger_jsonl is not None: @@ -3846,6 +3952,13 @@ def main() -> int: print(f"completed {completed} run(s); results in {args.csv}") return 0 + runnable_targets = resume_runnable_targets( + args.csv, + targets, + resume=args.resume, + existing_key_cache=skip_keys, + existing_key_cache_stamp=skip_keys_stamp, + ) placeholder_targets = [ f"{task.id}/{variant.name}" for task, variant in runnable_targets @@ -3873,7 +3986,7 @@ def main() -> int: completed = 0 for task, variant in targets: - if (task.id, variant.name) in skip_keys: + if args.resume and resume_key_present(args.csv, (task.id, variant.name), skip_keys, skip_keys_stamp): print(f"skip {task.id}/{variant.name} (already in {args.csv})") continue print(f"run {task.id}/{variant.name} ...", flush=True) @@ -3882,7 +3995,14 @@ def main() -> int: # 깎고, (b) --resume 이 그 (task, variant) 를 skip 해 실제 측정값이 영구 누락된다. wrote = True if not args.dry_run: - wrote = append_csv(args.csv, claude_ver, result, skip_existing=args.resume) + wrote = append_csv( + args.csv, + claude_ver, + result, + skip_existing=args.resume, + existing_key_cache=skip_keys if args.resume else None, + existing_key_cache_stamp=skip_keys_stamp, + ) if wrote and args.ledger_jsonl is not None: append_cost_shift_ledger(args.ledger_jsonl, claude_ver, result) completed += 1 diff --git a/plugins/context-guard/bin/context-guard-cache-score b/plugins/context-guard/bin/context-guard-cache-score index 2040571..8003fd0 100755 --- a/plugins/context-guard/bin/context-guard-cache-score +++ b/plugins/context-guard/bin/context-guard-cache-score @@ -63,6 +63,7 @@ MAX_JSON_PATH_SEGMENT_CHARS = 64 MAX_JSON_WALK_NODES = 10_000 MAX_JSON_WALK_DEPTH = 64 MAX_JSON_SHAPE_WARNINGS = 200 +MAX_JSON_CANONICAL_COMPARE_BYTES = 200_000 SAFE_JSON_PATH_SEGMENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_-]{0,63}$") DYNAMIC_JSON_KEY_RE = re.compile(r"(?i)(request|trace|nonce|random|timestamp|created[_-]?at|updated[_-]?at|date)") SENSITIVE_JSON_KEY_RE = re.compile( @@ -93,6 +94,22 @@ def json_bytes(data: Any, *, indent: int | None = None) -> str: return json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":") if indent is None else None, indent=indent) +def bounded_canonical_json(data: Any, *, max_bytes: int) -> str | None: + encoder = json.JSONEncoder(ensure_ascii=False, sort_keys=True, indent=2) + chunks: list[str] = [] + size = 0 + for chunk in encoder.iterencode(data): + size += byte_len_text(chunk) + if size > max_bytes: + return None + chunks.append(chunk) + size += 1 + if size > max_bytes: + return None + chunks.append("\n") + return "".join(chunks) + + def json_path_child(path: str, key: object) -> str: """Return a JSON warning path segment without echoing sensitive/dynamic keys.""" text = str(key) @@ -335,8 +352,18 @@ def json_shape_warnings(text: str) -> tuple[str, list[dict[str, Any]]]: if not isinstance(data, (dict, list)): return "json-scalar", [] warnings = _walk_json(data) - canonical = json_bytes(data, indent=2) + "\n" - if canonical != text: + input_bytes = byte_len_text(text) + canonical = bounded_canonical_json(data, max_bytes=MAX_JSON_CANONICAL_COMPARE_BYTES) + if canonical is None: + warnings.append({ + "code": "json_canonical_check_skipped", + "path": "$", + "severity": "info", + "message": "JSON input is parseable but canonical formatting would exceed the comparison byte cap.", + "input_bytes": input_bytes, + "max_bytes": MAX_JSON_CANONICAL_COMPARE_BYTES, + }) + elif canonical != text: warnings.append({ "code": "json_not_canonical", "path": "$", diff --git a/plugins/context-guard/bin/context-guard-pack b/plugins/context-guard/bin/context-guard-pack index 77e9bf1..e7a4068 100755 --- a/plugins/context-guard/bin/context-guard-pack +++ b/plugins/context-guard/bin/context-guard-pack @@ -957,6 +957,29 @@ def metadata_size(data: dict[str, Any]) -> int: return len(json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True).encode("utf-8", errors="replace")) + 1 +def receipt_working_copy(data: dict[str, Any]) -> tuple[dict[str, Any], bool]: + """Copy receipt metadata without deep-copying or serializing an oversized pack body. + + The pack body is already an immutable string in normal builds and stdout remains + authoritative for it. When it cannot possibly fit under the receipt cap by + itself, omit it before the first receipt-size probe so capping work only touches + metadata previews. + """ + receipt: dict[str, Any] = {} + pack_omitted = False + for key, value in data.items(): + if key == "pack" and isinstance(value, str): + if len(value.encode("utf-8", errors="replace")) > MAX_RECEIPT_BYTES: + pack_omitted = True + continue + receipt[key] = value + continue + receipt[key] = copy.deepcopy(value) + if pack_omitted: + receipt["pack_omitted_from_receipt"] = True + return receipt, pack_omitted + + def artifact_failure(error: str, *, bytes_count: int = 0, capped: bool = False) -> dict[str, Any]: return { "stored": False, @@ -1113,8 +1136,11 @@ def finalize_receipt_size(receipt: dict[str, Any]) -> int: def shrink_receipt_for_write(data: dict[str, Any]) -> tuple[dict[str, Any], bool]: - receipt = copy.deepcopy(data) - capped = False + receipt, pack_omitted = receipt_working_copy(data) + capped = pack_omitted + if pack_omitted: + receipt.setdefault("artifact", {})["capped"] = True + receipt.setdefault("artifact", {})["cap_bytes"] = MAX_RECEIPT_BYTES if metadata_size(receipt) <= MAX_RECEIPT_BYTES: return receipt, capped capped = True diff --git a/plugins/context-guard/bin/context-guard-tool-prune b/plugins/context-guard/bin/context-guard-tool-prune index 6da5cea..16355a3 100755 --- a/plugins/context-guard/bin/context-guard-tool-prune +++ b/plugins/context-guard/bin/context-guard-tool-prune @@ -87,6 +87,8 @@ class Candidate: index: int score: float = 0.0 rank: int = 0 + schema_bytes: int = 0 + parameter_terms: frozenset[str] | None = None def fail(message: str) -> NoReturn: @@ -276,7 +278,15 @@ def tool_schema_from_dict(raw: dict[str, Any], *, fallback_name: str | None = No schema["description"] = description if server and "server" not in schema: schema["server"] = server - return Candidate(name=name, server=cap_text(server, MAX_LABEL_CHARS) if server else None, description=description, schema=schema, index=index) + return Candidate( + name=name, + server=cap_text(server, MAX_LABEL_CHARS) if server else None, + description=description, + schema=schema, + index=index, + schema_bytes=byte_len_json(schema), + parameter_terms=frozenset(terms(" ".join(collect_parameter_text(schema)))), + ) def normalize_catalog(raw: Any) -> list[Candidate]: @@ -362,7 +372,11 @@ def score_candidate(candidate: Candidate, query_terms: set[str]) -> float: return 0.0 name_terms = terms(candidate.name) desc_terms = terms(candidate.description) - parameter_terms = terms(" ".join(collect_parameter_text(candidate.schema))) + parameter_terms = ( + set(candidate.parameter_terms) + if candidate.parameter_terms is not None + else terms(" ".join(collect_parameter_text(candidate.schema))) + ) score = 0.0 score += 4.0 * len(query_terms & name_terms) score += 1.5 * len(query_terms & desc_terms) @@ -379,14 +393,38 @@ def rank_candidates(candidates: list[Candidate], query: str) -> list[Candidate]: query_terms = terms(query) scored: list[Candidate] = [] for cand in candidates: - scored.append(Candidate(cand.name, cand.server, cand.description, cand.schema, cand.index, score_candidate(cand, query_terms), 0)) + scored.append(Candidate( + cand.name, + cand.server, + cand.description, + cand.schema, + cand.index, + score_candidate(cand, query_terms), + 0, + schema_bytes=cand.schema_bytes, + parameter_terms=cand.parameter_terms, + )) scored.sort(key=lambda item: (-item.score, item.index)) ranked: list[Candidate] = [] for rank, cand in enumerate(scored, start=1): - ranked.append(Candidate(cand.name, cand.server, cand.description, cand.schema, cand.index, cand.score, rank)) + ranked.append(Candidate( + cand.name, + cand.server, + cand.description, + cand.schema, + cand.index, + cand.score, + rank, + schema_bytes=cand.schema_bytes, + parameter_terms=cand.parameter_terms, + )) return ranked +def candidate_schema_bytes(cand: Candidate) -> int: + return cand.schema_bytes if cand.schema_bytes > 0 else byte_len_json(cand.schema) + + def normalized_link_target(parent: Path, raw_target: str) -> Path: target = Path(raw_target) if not target.is_absolute(): @@ -707,7 +745,7 @@ def build_payload(receipt_id: str, ranked: list[Candidate], query: str, redactio "description": cand.description, "score": cand.score, "rank": cand.rank, - "schema_bytes": byte_len_json(cand.schema), + "schema_bytes": candidate_schema_bytes(cand), "schema": cand.schema, } for cand in ranked @@ -739,7 +777,7 @@ def retrieval_command(receipt_id: str, *, store_dir: str, tool_name: str | None def selected_tool_record(cand: Candidate, receipt_id: str, budget_left: int, *, store_dir: str) -> tuple[dict[str, Any], int]: - schema_size = byte_len_json(cand.schema) + schema_size = candidate_schema_bytes(cand) record: dict[str, Any] = { "name": cand.name, "server": cand.server, @@ -765,7 +803,7 @@ def deferred_tool_record(cand: Candidate, receipt_id: str, *, store_dir: str) -> "score": cand.score, "rank": cand.rank, "description": cand.description, - "schema_bytes": byte_len_json(cand.schema), + "schema_bytes": candidate_schema_bytes(cand), "reason": "deferred_after_core_top", "retrieval": retrieval_command(receipt_id, store_dir=store_dir, tool_name=cand.name), } @@ -1008,9 +1046,9 @@ def defer_report(args: argparse.Namespace) -> str: store_dir=args.store_dir, namespace_top=namespace_top, ) - all_schema_bytes = sum(byte_len_json(cand.schema) for cand in ranked) - listed_deferred_schema_bytes = sum(byte_len_json(cand.schema) for cand in deferred_candidates) - total_deferred_schema_bytes = sum(byte_len_json(cand.schema) for cand in ranked[core_top:]) + all_schema_bytes = sum(candidate_schema_bytes(cand) for cand in ranked) + listed_deferred_schema_bytes = sum(candidate_schema_bytes(cand) for cand in deferred_candidates) + total_deferred_schema_bytes = sum(candidate_schema_bytes(cand) for cand in ranked[core_top:]) tool_stub_report_bytes = byte_len_json(core_tools) + byte_len_json(deferred_tools) all_schema_tokens = proxy_tokens(all_schema_bytes) inline_core_schema_tokens = proxy_tokens(core_schema_bytes) diff --git a/tests/test_context_guard_kit.py b/tests/test_context_guard_kit.py index 83b3b80..e844e0d 100644 --- a/tests/test_context_guard_kit.py +++ b/tests/test_context_guard_kit.py @@ -11279,6 +11279,60 @@ def test_cache_score_json_walk_caps_nodes_depth_and_warnings(self): self.assertIn("json_walk_truncated", {item["code"] for item in capped_warnings}) self.assertLessEqual(len(capped_warnings), 4) + def test_cache_score_large_json_skips_canonical_compare(self): + for index, script in enumerate(CACHE_SCORE_SCRIPTS): + with self.subTest(script=script): + module = load_python_script_module(script, f"_cache_score_large_canonical_{index}") + payload = json.dumps({"items": [{"value": "x" * 20} for _ in range(4)]}) + self.assertGreater(len(payload.encode("utf-8")), 16) + original_limit = module.MAX_JSON_CANONICAL_COMPARE_BYTES + original_json_bytes = module.json_bytes + + def fail_json_bytes(*_args, **_kwargs): + raise AssertionError("large JSON canonicalization should be skipped") + + module.MAX_JSON_CANONICAL_COMPARE_BYTES = 16 + module.json_bytes = fail_json_bytes + try: + kind, warnings = module.json_shape_warnings(payload) + finally: + module.MAX_JSON_CANONICAL_COMPARE_BYTES = original_limit + module.json_bytes = original_json_bytes + self.assertEqual(kind, "json") + self.assertIn("json_canonical_check_skipped", {item["code"] for item in warnings}) + + def test_cache_score_skips_canonical_compare_when_pretty_output_exceeds_cap(self): + for index, script in enumerate(CACHE_SCORE_SCRIPTS): + with self.subTest(script=script): + module = load_python_script_module(script, f"_cache_score_expanded_canonical_{index}") + payload = json.dumps({"items": list(range(40))}, separators=(",", ":")) + original_limit = module.MAX_JSON_CANONICAL_COMPARE_BYTES + module.MAX_JSON_CANONICAL_COMPARE_BYTES = len(payload.encode("utf-8")) + 1 + try: + kind, warnings = module.json_shape_warnings(payload) + finally: + module.MAX_JSON_CANONICAL_COMPARE_BYTES = original_limit + codes = {item["code"] for item in warnings} + self.assertEqual(kind, "json") + self.assertIn("json_canonical_check_skipped", codes) + self.assertNotIn("json_not_canonical", codes) + + def test_cache_score_whitespace_heavy_json_uses_bounded_output_cap(self): + for index, script in enumerate(CACHE_SCORE_SCRIPTS): + with self.subTest(script=script): + module = load_python_script_module(script, f"_cache_score_whitespace_canonical_{index}") + payload = '{ "a" : 1 }' + original_limit = module.MAX_JSON_CANONICAL_COMPARE_BYTES + module.MAX_JSON_CANONICAL_COMPARE_BYTES = 15 + try: + kind, warnings = module.json_shape_warnings(payload) + finally: + module.MAX_JSON_CANONICAL_COMPARE_BYTES = original_limit + codes = {item["code"] for item in warnings} + self.assertEqual(kind, "json") + self.assertIn("json_not_canonical", codes) + self.assertNotIn("json_canonical_check_skipped", codes) + def test_cache_score_json_order_provider_thresholds_and_help(self): request = { "tools": [ @@ -11392,6 +11446,61 @@ def _tool_catalog(self, secret: str | None = None) -> dict: ] } + def test_tool_prune_caches_schema_bytes_and_parameter_terms(self): + catalog = { + "tools": [ + { + "name": "needle_reader", + "description": "Read files", + "inputSchema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Needle parameter for fixture lookup", + } + }, + }, + } + ] + } + for index, script in enumerate(TOOL_PRUNE_SCRIPTS): + with self.subTest(script=script): + module = load_python_script_module(script, f"_tool_prune_cached_schema_{index}") + candidates = module.normalize_catalog(catalog) + self.assertEqual(len(candidates), 1) + candidate = candidates[0] + self.assertEqual(candidate.schema_bytes, module.byte_len_json(candidate.schema)) + self.assertIn("needle", candidate.parameter_terms) + + original_collect = module.collect_parameter_text + original_byte_len_json = module.byte_len_json + + def fail_collect(*_args, **_kwargs): + raise AssertionError("ranking should use cached parameter terms") + + def fail_byte_len_json(*_args, **_kwargs): + raise AssertionError("records should use cached schema byte lengths") + + module.collect_parameter_text = fail_collect + module.byte_len_json = fail_byte_len_json + try: + ranked = module.rank_candidates(candidates, "needle parameter") + self.assertGreater(ranked[0].score, 0) + record, used = module.selected_tool_record( + ranked[0], + "a" * 20, + 100_000, + store_dir=module.DEFAULT_STORE_DIR, + ) + payload = module.build_payload("a" * 20, ranked, "needle parameter", 0) + finally: + module.collect_parameter_text = original_collect + module.byte_len_json = original_byte_len_json + self.assertEqual(record["schema_bytes"], candidate.schema_bytes) + self.assertEqual(used, candidate.schema_bytes) + self.assertEqual(payload["tools"][0]["schema_bytes"], candidate.schema_bytes) + def test_tool_prune_select_ranks_relevant_tools_and_writes_receipts(self): for script in TOOL_PRUNE_SCRIPTS: with self.subTest(script=script): @@ -12573,6 +12682,79 @@ def test_context_pack_receipt_cap_does_not_write_oversized_metadata(self): self.assertEqual(artifact["error"], "receipt_metadata_too_large") self.assertFalse((root / ".context-guard" / "packs" / f"{data['pack_id']}.json").exists()) + def test_context_pack_omits_oversized_pack_before_receipt_size_probe(self): + for index, script in enumerate(PACK_SCRIPTS): + with self.subTest(script=script): + module = load_python_script_module(script, f"_context_pack_receipt_pack_omit_{index}") + result = { + "pack_id": "pack-1", + "pack": "x" * (module.MAX_RECEIPT_BYTES + 1), + "artifact": { + "stored": False, + "path": None, + "bytes": 0, + "capped": False, + "cap_bytes": module.MAX_RECEIPT_BYTES, + }, + "included_sources": [], + "omitted_sources": [], + } + original_metadata_size = module.metadata_size + + def guarded_metadata_size(data): + if "pack" in data: + raise AssertionError("oversized pack should be omitted before receipt sizing") + return original_metadata_size(data) + + module.metadata_size = guarded_metadata_size + try: + receipt, capped = module.shrink_receipt_for_write(result) + finally: + module.metadata_size = original_metadata_size + self.assertTrue(capped) + self.assertTrue(receipt["pack_omitted_from_receipt"]) + self.assertNotIn("pack", receipt) + self.assertTrue(receipt["artifact"]["capped"]) + + def test_context_pack_stores_oversized_pack_receipt_with_omission_contract(self): + for script in PACK_SCRIPTS: + with self.subTest(script=script): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + (root / "large.txt").write_text(("large pack body\n" * 6000), encoding="utf-8") + manifest = root / "pack.json" + manifest.write_text( + json.dumps({"version": 1, "sources": [{"path": "large.txt", "priority": 1}]}), + encoding="utf-8", + ) + data = json.loads( + self._run_pack( + script, + root, + "build", + "--root", + ".", + "--manifest", + str(manifest), + "--budget-bytes", + "120000", + "--json", + ).stdout + ) + self.assertIn("large pack body", data["pack"]) + artifact = data["artifact"] + self.assertTrue(artifact["stored"], artifact) + self.assertTrue(artifact["capped"], artifact) + self.assertLessEqual(artifact["bytes"], 64_000) + receipt_path = root / ".context-guard" / "packs" / f"{data['pack_id']}.json" + receipt = json.loads(receipt_path.read_text(encoding="utf-8")) + self.assertEqual(receipt["pack_id"], data["pack_id"]) + self.assertNotIn("pack", receipt) + self.assertTrue(receipt["pack_omitted_from_receipt"]) + self.assertTrue(receipt["artifact"]["stored"]) + self.assertTrue(receipt["artifact"]["capped"]) + self.assertEqual(receipt["artifact"]["path"], f".context-guard/packs/{data['pack_id']}.json") + def _init_pack_git_repo(self, root: Path) -> None: if shutil.which("git") is None: self.skipTest("git is required for context-pack suggest diff tests") @@ -22339,6 +22521,36 @@ def test_transcript_audit_cache_friendliness_bounds_broad_prompt_content(self): self.assertEqual(data["cache_friendliness"]["analyzed_prompt_records"], 0) self.assertGreaterEqual(data["cache_friendliness"]["prompt_collection_capped_records"], 1) + def test_transcript_audit_prompt_cap_skips_extraction_after_sample_limit(self): + for index, script in enumerate([KIT_DIR / "claude_transcript_cost_audit.py", PLUGIN_BIN / "context-guard-audit"]): + with self.subTest(script=script): + module = load_python_script_module(script, f"_transcript_audit_prompt_cap_{index}") + audit = module.PromptCacheAudit() + audit.samples = [ + module.PromptSegmentSample( + prefix_hashes=(), + tail_hashes=(), + segment_count=0, + bytes_sampled=0, + redactions=0, + ) + for _ in range(module.PROMPT_AUDIT_MAX_RECORDS) + ] + original = module.prompt_segments_for_record + + def fail_prompt_segments(_root): + raise AssertionError("prompt extraction should not run after the sample cap") + + module.prompt_segments_for_record = fail_prompt_segments + try: + audit.observe({"message": {"content": [{"type": "text", "text": "expensive prompt"}]}}) + finally: + module.prompt_segments_for_record = original + self.assertEqual(audit.sampled_records, 1) + self.assertEqual(audit.capped_records, 1) + self.assertEqual(audit.analyzed_prompt_records, 0) + self.assertEqual(audit.prompt_collection_capped_records, 0) + def test_transcript_audit_cache_friendliness_marks_skipped_prompt_evidence_partial(self): with tempfile.TemporaryDirectory() as tmp: sample = Path(tmp) / "session.jsonl" @@ -25401,6 +25613,241 @@ def test_append_csv_skip_existing_suppresses_duplicate_rows(self): self.assertEqual(len(rows), 1) self.assertEqual(rows[0]["notes"], "first") + def test_append_csv_resume_key_cache_avoids_per_append_reread(self): + for index, script in enumerate(BENCH_SCRIPTS): + with self.subTest(script=script): + module = load_python_script_module(script, f"_bench_runner_csv_dedupe_cache_{index}") + with tempfile.TemporaryDirectory() as tmp: + csv_path = Path(tmp) / "results.csv" + existing = {("t01", "baseline")} + duplicate = module.RunResult( + task_id="t01", + variant="baseline", + model="sonnet", + effort=None, + tokens={"input_tokens": 1, "output_tokens": 0, "cache_read": 0, "cache_creation": 0}, + cost_usd=0.0, + success=True, + notes="duplicate", + ) + fresh = module.RunResult( + task_id="t02", + variant="baseline", + model="sonnet", + effort=None, + tokens={"input_tokens": 2, "output_tokens": 0, "cache_read": 0, "cache_creation": 0}, + cost_usd=0.0, + success=True, + notes="fresh", + ) + original = module._read_existing_keys_unlocked + stamp = {"stamp": module.csv_file_stamp_unlocked(csv_path)} + + def fail_read(_path): + raise AssertionError("resume writes should use the already-loaded key cache") + + module._read_existing_keys_unlocked = fail_read + try: + self.assertFalse( + module.append_csv( + csv_path, + "test", + duplicate, + skip_existing=True, + existing_key_cache=existing, + existing_key_cache_stamp=stamp, + ) + ) + self.assertTrue( + module.append_csv( + csv_path, + "test", + fresh, + skip_existing=True, + existing_key_cache=existing, + existing_key_cache_stamp=stamp, + ) + ) + finally: + module._read_existing_keys_unlocked = original + self.assertIn(("t02", "baseline"), existing) + with csv_path.open(encoding="utf-8") as f: + rows = list(csv.DictReader(f)) + self.assertEqual([row["task_id"] for row in rows], ["t02"]) + + def test_append_csv_resume_key_cache_refreshes_when_csv_changes(self): + for index, script in enumerate(BENCH_SCRIPTS): + with self.subTest(script=script): + module = load_python_script_module(script, f"_bench_runner_csv_dedupe_cache_refresh_{index}") + with tempfile.TemporaryDirectory() as tmp: + csv_path = Path(tmp) / "results.csv" + stale_cache: set[tuple[str, str]] = set() + stamp = {"stamp": module.csv_file_stamp_unlocked(csv_path)} + first = module.RunResult( + task_id="t01", + variant="baseline", + model="sonnet", + effort=None, + tokens={"input_tokens": 1, "output_tokens": 0, "cache_read": 0, "cache_creation": 0}, + cost_usd=0.0, + success=True, + notes="first", + ) + duplicate = module.RunResult( + task_id="t01", + variant="baseline", + model="sonnet", + effort=None, + tokens={"input_tokens": 999, "output_tokens": 999, "cache_read": 0, "cache_creation": 0}, + cost_usd=999.0, + success=False, + notes="duplicate", + ) + self.assertTrue(module.append_csv(csv_path, "test", first)) + real_read = module._read_existing_keys_unlocked + read_count = 0 + + def counting_read(path): + nonlocal read_count + read_count += 1 + return real_read(path) + + module._read_existing_keys_unlocked = counting_read + try: + self.assertFalse( + module.append_csv( + csv_path, + "test", + duplicate, + skip_existing=True, + existing_key_cache=stale_cache, + existing_key_cache_stamp=stamp, + ) + ) + finally: + module._read_existing_keys_unlocked = real_read + self.assertEqual(read_count, 1) + self.assertIn(("t01", "baseline"), stale_cache) + with csv_path.open(encoding="utf-8") as f: + rows = list(csv.DictReader(f)) + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]["notes"], "first") + + def test_append_csv_resume_key_cache_drops_keys_removed_by_rewrite(self): + for index, script in enumerate(BENCH_SCRIPTS): + with self.subTest(script=script): + module = load_python_script_module(script, f"_bench_runner_csv_dedupe_cache_rewrite_{index}") + with tempfile.TemporaryDirectory() as tmp: + csv_path = Path(tmp) / "results.csv" + first = module.RunResult( + task_id="t01", + variant="baseline", + model="sonnet", + effort=None, + tokens={"input_tokens": 1, "output_tokens": 0, "cache_read": 0, "cache_creation": 0}, + cost_usd=0.0, + success=True, + notes="first", + ) + rerun = module.RunResult( + task_id="t01", + variant="baseline", + model="sonnet", + effort=None, + tokens={"input_tokens": 2, "output_tokens": 0, "cache_read": 0, "cache_creation": 0}, + cost_usd=0.0, + success=True, + notes="rerun", + ) + self.assertTrue(module.append_csv(csv_path, "test", first)) + stale_cache, stale_stamp = module.existing_keys_snapshot(csv_path) + self.assertIn(("t01", "baseline"), stale_cache) + stamp = {"stamp": stale_stamp} + csv_path.write_text(",".join(module.CSV_COLUMNS) + "\n", encoding="utf-8") + self.assertFalse(module.resume_key_present(csv_path, ("t01", "baseline"), stale_cache, stamp)) + self.assertNotIn(("t01", "baseline"), stale_cache) + self.assertTrue( + module.append_csv( + csv_path, + "test", + rerun, + skip_existing=True, + existing_key_cache=stale_cache, + existing_key_cache_stamp=stamp, + ) + ) + with csv_path.open(encoding="utf-8") as f: + rows = list(csv.DictReader(f)) + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]["notes"], "rerun") + + def test_resume_runnable_targets_refreshes_before_preflight(self): + for index, script in enumerate(BENCH_SCRIPTS): + with self.subTest(script=script): + module = load_python_script_module(script, f"_bench_runner_resume_preflight_refresh_{index}") + with tempfile.TemporaryDirectory() as tmp: + csv_path = Path(tmp) / "results.csv" + first = module.RunResult( + task_id="t01", + variant="baseline", + model="sonnet", + effort=None, + tokens={"input_tokens": 1, "output_tokens": 0, "cache_read": 0, "cache_creation": 0}, + cost_usd=0.0, + success=True, + notes="first", + ) + self.assertTrue(module.append_csv(csv_path, "test", first)) + cache, loaded_stamp = module.existing_keys_snapshot(csv_path) + stamp = {"stamp": loaded_stamp} + self.assertIn(("t01", "baseline"), cache) + + task_id_index = module.CSV_COLUMNS.index("task_id") + variant_index = module.CSV_COLUMNS.index("variant") + csv_row = [""] * len(module.CSV_COLUMNS) + csv_row[task_id_index] = "t02" + csv_row[variant_index] = "baseline" + csv_path.write_text( + ",".join(module.CSV_COLUMNS) + "\n" + + ",".join(csv_row) + "\n", + encoding="utf-8", + ) + targets = [ + (module.TaskFixture(id="t01", prompt="p1"), module.Variant(name="baseline")), + (module.TaskFixture(id="t02", prompt="p2"), module.Variant(name="baseline")), + ] + runnable = module.resume_runnable_targets( + csv_path, + targets, + resume=True, + existing_key_cache=cache, + existing_key_cache_stamp=stamp, + ) + self.assertEqual([task.id for task, _variant in runnable], ["t01"]) + self.assertEqual(cache, {("t02", "baseline")}) + evidence = module.EvidenceReplayRow( + result=module.RunResult( + task_id="t01", + variant="baseline", + model="evidence", + effort="", + tokens={"input_tokens": 1, "output_tokens": 0, "cache_read": 0, "cache_creation": 0}, + cost_usd=0.0, + success=True, + notes="ok", + ), + source_type="synthetic_fixture", + provider_name=None, + capture_command_or_export_id=None, + claim_scope="fixture_only", + provider_export_provenance_complete=False, + public_claim_eligible=False, + explicit_notes=True, + line_number=1, + ) + coverage = module.validate_evidence_coverage([evidence], runnable) + self.assertEqual(set(coverage), {("t01", "baseline")}) + def test_benchmark_runner_rejects_incompatible_existing_csv_schema(self): shift_columns = { "turns",