Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 136 additions & 16 deletions context-guard-kit/benchmark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1429,10 +1429,84 @@ def run_fixture(task: TaskFixture, variant: Variant, claude_bin: str,
)


def append_csv(csv_path: Path, claude_ver: str, result: RunResult, *, skip_existing: bool = False) -> bool:
def csv_file_stamp_unlocked(csv_path: Path) -> tuple[int, int, int, int] | None:
try:
fd = _open_regular_no_symlink(csv_path)
except FileNotFoundError:
return None
try:
st = os.fstat(fd)
return (int(st.st_dev), int(st.st_ino), int(st.st_size), int(st.st_mtime_ns))
finally:
os.close(fd)


def refresh_existing_key_cache_unlocked(
csv_path: Path,
existing_key_cache: set[tuple[str, str]],
existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None,
) -> None:
current_stamp = csv_file_stamp_unlocked(csv_path)
if existing_key_cache_stamp is not None and existing_key_cache_stamp.get("stamp") == current_stamp:
return
refreshed = _read_existing_keys_unlocked(csv_path)
existing_key_cache.clear()
existing_key_cache.update(refreshed)
if existing_key_cache_stamp is not None:
existing_key_cache_stamp["stamp"] = current_stamp


def resume_key_present(
csv_path: Path,
key: tuple[str, str],
existing_key_cache: set[tuple[str, str]],
existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None,
) -> bool:
if not _csv_exists_no_follow(csv_path):
existing_key_cache.clear()
if existing_key_cache_stamp is not None:
existing_key_cache_stamp["stamp"] = None
return False
with csv_file_lock(csv_path, create_parent=False):
refresh_existing_key_cache_unlocked(csv_path, existing_key_cache, existing_key_cache_stamp)
return key in existing_key_cache


def resume_runnable_targets(
csv_path: Path,
targets: list[tuple[TaskFixture, Variant]],
*,
resume: bool,
existing_key_cache: set[tuple[str, str]],
existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None,
) -> list[tuple[TaskFixture, Variant]]:
if not resume:
return list(targets)
return [
(task, variant)
for task, variant in targets
if not resume_key_present(csv_path, (task.id, variant.name), existing_key_cache, existing_key_cache_stamp)
]


def append_csv(
csv_path: Path,
claude_ver: str,
result: RunResult,
*,
skip_existing: bool = False,
existing_key_cache: set[tuple[str, str]] | None = None,
existing_key_cache_stamp: dict[str, tuple[int, int, int, int] | None] | None = None,
) -> bool:
with csv_file_lock(csv_path, create_parent=True):
if skip_existing and (result.task_id, result.variant) in _read_existing_keys_unlocked(csv_path):
return False
key = (result.task_id, result.variant)
if skip_existing:
if existing_key_cache is not None:
refresh_existing_key_cache_unlocked(csv_path, existing_key_cache, existing_key_cache_stamp)
if key in existing_key_cache:
return False
elif key in _read_existing_keys_unlocked(csv_path):
return False
flags = os.O_CREAT | os.O_APPEND | os.O_WRONLY
fd = _open_regular_no_symlink(csv_path, flags, 0o600, create_parent=True)
try:
Expand Down Expand Up @@ -1486,6 +1560,10 @@ def append_csv(csv_path: Path, claude_ver: str, result: RunResult, *, skip_exist
finally:
if fd != -1:
os.close(fd)
if existing_key_cache is not None:
existing_key_cache.add(key)
if existing_key_cache_stamp is not None:
existing_key_cache_stamp["stamp"] = csv_file_stamp_unlocked(csv_path)
return True


Expand Down Expand Up @@ -1644,10 +1722,16 @@ def _csv_exists_no_follow(csv_path: Path) -> bool:

def existing_keys(csv_path: Path) -> set[tuple[str, str]]:
"""이미 적재된 (task_id, variant) 조합. resume 시 skip 판정에 사용."""
keys, _stamp = existing_keys_snapshot(csv_path)
return keys


def existing_keys_snapshot(csv_path: Path) -> tuple[set[tuple[str, str]], tuple[int, int, int, int] | None]:
"""Loaded resume keys plus the CSV stamp observed under the same lock."""
if not _csv_exists_no_follow(csv_path):
return set()
return set(), None
with csv_file_lock(csv_path, create_parent=False):
return _read_existing_keys_unlocked(csv_path)
return _read_existing_keys_unlocked(csv_path), csv_file_stamp_unlocked(csv_path)


def read_csv_rows(csv_path: Path) -> list[dict[str, str]]:
Expand Down Expand Up @@ -3785,35 +3869,57 @@ def main() -> int:
print("no (task, variant) targets matched the filters", file=sys.stderr)
return 1

skip_keys = existing_keys(args.csv) if args.resume else set()
runnable_targets = [
(task, variant)
for task, variant in targets
if (task.id, variant.name) not in skip_keys
]
if args.resume:
skip_keys, skip_keys_loaded_stamp = existing_keys_snapshot(args.csv)
skip_keys_stamp = {"stamp": skip_keys_loaded_stamp}
else:
skip_keys = set()
skip_keys_stamp = None
runnable_targets = resume_runnable_targets(
args.csv,
targets,
resume=args.resume,
existing_key_cache=skip_keys,
existing_key_cache_stamp=skip_keys_stamp,
)
if args.evidence_jsonl is not None:
if args.dry_run:
for task, variant in targets:
if (task.id, variant.name) in skip_keys:
if args.resume and resume_key_present(args.csv, (task.id, variant.name), skip_keys, skip_keys_stamp):
print(f"skip {task.id}/{variant.name} (already in {args.csv})")
continue
print(f"evidence replay dry-run: {task.id}/{variant.name} <- {args.evidence_jsonl}")
print("completed 0 run(s); results in (dry-run; no CSV writes)")
return 0
csv_had_preexisting_content = file_has_content_no_follow(args.csv)
evidence_rows = read_evidence_jsonl(args.evidence_jsonl)
runnable_targets = resume_runnable_targets(
args.csv,
targets,
resume=args.resume,
existing_key_cache=skip_keys,
existing_key_cache_stamp=skip_keys_stamp,
)
evidence_by_key = validate_evidence_coverage(evidence_rows, runnable_targets)
runnable_keys = {(task.id, variant.name) for task, variant in runnable_targets}
claude_ver = "evidence-replay"
completed = 0
replay_rows_written: list[EvidenceReplayRow] = []
for task, variant in targets:
if (task.id, variant.name) in skip_keys:
if args.resume and (task.id, variant.name) not in runnable_keys:
print(f"skip {task.id}/{variant.name} (already in {args.csv})")
continue
evidence = evidence_by_key[(task.id, variant.name)]
print(f"replay {task.id}/{variant.name} ...", flush=True)
result = run_evidence_fixture(task, variant, evidence)
wrote = append_csv(args.csv, claude_ver, result, skip_existing=args.resume)
wrote = append_csv(
args.csv,
claude_ver,
result,
skip_existing=args.resume,
existing_key_cache=skip_keys if args.resume else None,
existing_key_cache_stamp=skip_keys_stamp,
)
if wrote:
replay_rows_written.append(evidence)
if args.ledger_jsonl is not None:
Expand Down Expand Up @@ -3846,6 +3952,13 @@ def main() -> int:
print(f"completed {completed} run(s); results in {args.csv}")
return 0

runnable_targets = resume_runnable_targets(
args.csv,
targets,
resume=args.resume,
existing_key_cache=skip_keys,
existing_key_cache_stamp=skip_keys_stamp,
)
placeholder_targets = [
f"{task.id}/{variant.name}"
for task, variant in runnable_targets
Expand Down Expand Up @@ -3873,7 +3986,7 @@ def main() -> int:

completed = 0
for task, variant in targets:
if (task.id, variant.name) in skip_keys:
if args.resume and resume_key_present(args.csv, (task.id, variant.name), skip_keys, skip_keys_stamp):
print(f"skip {task.id}/{variant.name} (already in {args.csv})")
continue
print(f"run {task.id}/{variant.name} ...", flush=True)
Expand All @@ -3882,7 +3995,14 @@ def main() -> int:
# 깎고, (b) --resume 이 그 (task, variant) 를 skip 해 실제 측정값이 영구 누락된다.
wrote = True
if not args.dry_run:
wrote = append_csv(args.csv, claude_ver, result, skip_existing=args.resume)
wrote = append_csv(
args.csv,
claude_ver,
result,
skip_existing=args.resume,
existing_key_cache=skip_keys if args.resume else None,
existing_key_cache_stamp=skip_keys_stamp,
)
if wrote and args.ledger_jsonl is not None:
append_cost_shift_ledger(args.ledger_jsonl, claude_ver, result)
completed += 1
Expand Down
31 changes: 29 additions & 2 deletions context-guard-kit/cache_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
MAX_JSON_WALK_NODES = 10_000
MAX_JSON_WALK_DEPTH = 64
MAX_JSON_SHAPE_WARNINGS = 200
MAX_JSON_CANONICAL_COMPARE_BYTES = 200_000
SAFE_JSON_PATH_SEGMENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_-]{0,63}$")
DYNAMIC_JSON_KEY_RE = re.compile(r"(?i)(request|trace|nonce|random|timestamp|created[_-]?at|updated[_-]?at|date)")
SENSITIVE_JSON_KEY_RE = re.compile(
Expand Down Expand Up @@ -93,6 +94,22 @@ def json_bytes(data: Any, *, indent: int | None = None) -> str:
return json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":") if indent is None else None, indent=indent)


def bounded_canonical_json(data: Any, *, max_bytes: int) -> str | None:
encoder = json.JSONEncoder(ensure_ascii=False, sort_keys=True, indent=2)
chunks: list[str] = []
size = 0
for chunk in encoder.iterencode(data):
size += byte_len_text(chunk)
if size > max_bytes:
return None
chunks.append(chunk)
size += 1
if size > max_bytes:
return None
chunks.append("\n")
return "".join(chunks)


def json_path_child(path: str, key: object) -> str:
"""Return a JSON warning path segment without echoing sensitive/dynamic keys."""
text = str(key)
Expand Down Expand Up @@ -335,8 +352,18 @@ def json_shape_warnings(text: str) -> tuple[str, list[dict[str, Any]]]:
if not isinstance(data, (dict, list)):
return "json-scalar", []
warnings = _walk_json(data)
canonical = json_bytes(data, indent=2) + "\n"
if canonical != text:
input_bytes = byte_len_text(text)
canonical = bounded_canonical_json(data, max_bytes=MAX_JSON_CANONICAL_COMPARE_BYTES)
if canonical is None:
warnings.append({
"code": "json_canonical_check_skipped",
"path": "$",
"severity": "info",
"message": "JSON input is parseable but canonical formatting would exceed the comparison byte cap.",
"input_bytes": input_bytes,
"max_bytes": MAX_JSON_CANONICAL_COMPARE_BYTES,
})
elif canonical != text:
warnings.append({
"code": "json_not_canonical",
"path": "$",
Expand Down
6 changes: 3 additions & 3 deletions context-guard-kit/claude_transcript_cost_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ class PromptCacheAudit:

def observe(self, root: Any) -> None:
self.sampled_records += 1
if len(self.samples) >= PROMPT_AUDIT_MAX_RECORDS:
self.capped_records += 1
return
segments, bytes_sampled, redactions, collection_capped = prompt_segments_for_record(root)
if collection_capped:
self.prompt_collection_capped_records += 1
if not segments:
return
if len(self.samples) >= PROMPT_AUDIT_MAX_RECORDS:
self.capped_records += 1
return
self.analyzed_prompt_records += 1
self.total_segments += len(segments)
self.total_bytes_sampled += bytes_sampled
Expand Down
30 changes: 28 additions & 2 deletions context-guard-kit/context_pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,29 @@ def metadata_size(data: dict[str, Any]) -> int:
return len(json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True).encode("utf-8", errors="replace")) + 1


def receipt_working_copy(data: dict[str, Any]) -> tuple[dict[str, Any], bool]:
"""Copy receipt metadata without deep-copying or serializing an oversized pack body.

The pack body is already an immutable string in normal builds and stdout remains
authoritative for it. When it cannot possibly fit under the receipt cap by
itself, omit it before the first receipt-size probe so capping work only touches
metadata previews.
"""
receipt: dict[str, Any] = {}
pack_omitted = False
for key, value in data.items():
if key == "pack" and isinstance(value, str):
if len(value.encode("utf-8", errors="replace")) > MAX_RECEIPT_BYTES:
pack_omitted = True
continue
receipt[key] = value
continue
receipt[key] = copy.deepcopy(value)
if pack_omitted:
receipt["pack_omitted_from_receipt"] = True
return receipt, pack_omitted


def artifact_failure(error: str, *, bytes_count: int = 0, capped: bool = False) -> dict[str, Any]:
return {
"stored": False,
Expand Down Expand Up @@ -1113,8 +1136,11 @@ def finalize_receipt_size(receipt: dict[str, Any]) -> int:


def shrink_receipt_for_write(data: dict[str, Any]) -> tuple[dict[str, Any], bool]:
receipt = copy.deepcopy(data)
capped = False
receipt, pack_omitted = receipt_working_copy(data)
capped = pack_omitted
if pack_omitted:
receipt.setdefault("artifact", {})["capped"] = True
receipt.setdefault("artifact", {})["cap_bytes"] = MAX_RECEIPT_BYTES
if metadata_size(receipt) <= MAX_RECEIPT_BYTES:
return receipt, capped
capped = True
Expand Down
Loading