From dad515bc804de5c7c212b961db7340525986501a Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 19 May 2026 15:01:39 -0700 Subject: [PATCH] fix(counterfactual): bound scan window + per-agent cap (600s timeout fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ROADMAP L293 — alpha-engine-replay-counterfactual Lambda has been silently timing out at the 600s ceiling since 2026-05-16 (last successful real Saturday-SF run 5/13). Captured-artifact corpus crossed ~32,740 in the 56-day window, and the per-artifact s3:get_object loop inside ``compute_and_emit`` blew past the timeout. SF Catch=States.ALL means the pipeline stays green; the failure is observability-silent — no ``agent_counterfactual_rule_fit`` CW datapoint has emitted in 1.5 weeks. Per the entry's listed fix options [(a) cap/window the scan, (b) bump memory + bound workload, (c) move off Lambda]: shipping option (a) — simplest correct shape, no infrastructure surface change, no new deploy machinery. Cost-discipline-aligned (pre-alpha-validation). ## What ships 1. **`replay/counterfactual.py`**: - ``DEFAULT_WINDOW_DAYS`` 56 → 28. 4 weeks still > the 30-day statistical-significance heuristic the LdP triple-barrier fits consume; at ~585 artifacts/day this bounds the get_object loop near ~16k instead of the 32k+ that timed out. - New module constant ``DEFAULT_MAX_ARTIFACTS_PER_AGENT = 500`` — second-order bound. Even if the window is reverted operationally or a heavy population-wide agent's per-day rate grows, no single agent's backlog can stall the run. Most-recent-first enforcement in ``_list_artifact_keys_in_window`` (day-by-day iteration was already end_date-backward, so the natural list ordering IS most-recent-first — cap drops old keys, not fresh ones). - ``_list_artifact_keys_in_window`` gains the cap parameter (applied PRE ``_load_artifact`` so the bound translates directly into Lambda wall-clock). - ``compute_and_emit`` threads ``max_artifacts_per_agent`` end-to-end; log line surfaces the value for ops visibility. 2. **`lambda_counterfactual/handler.py`**: - Default ``window_days`` now reads from the module constant (28). - New event field ``max_artifacts_per_agent`` (default 500; explicit ``None`` disables the cap for ad-hoc deeper-corpus runs). - Event-shape docstring + start-log line updated. - The shell-run dry path (PR #225) is UNTOUCHED — the locally- uncommitted 5/15 edits that removed it would have regressed merged work + downgraded the lib pin v0.20.0 → v0.16.0; those edits are abandoned by working on a clean origin/main worktree. 3. **Tests** (+5 net): - ``test_replay_counterfactual.TestPerAgentArtifactCap`` (4 tests): cap drops oldest when one agent dominates (most-recent-first truncation), cap=None returns all keys, cap=0 treated as unbounded (defensive — less-surprising), cap applied independently per agent_id_base. - ``test_lambda_counterfactual_handler``: ``test_default_window_days`` updated to expect 28 + the new ``max_artifacts_per_agent=500`` default; two new tests for event-override + explicit-None. ## What does NOT ship - **MemorySize bump** — not needed at 28d + per-agent cap. Held in reserve for option (b) if a future corpus expansion needs it. - **Spot/batch refactor** — option (c), out of scope. The entry flags it as the heavier alternative; option (a) is sufficient. - **Concurrency / threaded loads** — could shave more time but introduces ordering + S3 throttling concerns. Out of scope; revisit if 28d + cap still bumps the ceiling at the next corpus growth. ## Post-merge deploy step bash infrastructure/deploy_counterfactual.sh (Manual — there is no CI deploy for the counterfactual Lambda. Per the ROADMAP entry, the prior ECR ``:latest`` push was 2026-05-05. SYSTEM_STATE should record the new image SHA after the deploy.) ## Tests pytest tests/ -q -> 1693 passed, 0 failed Composes with: PR #225 (shell-run dry path — preserved + protected by the abandon-stale-local-edits decision), ROADMAP L293 (this entry), the closed L266 agent-justification gate (this is its counterfactual leg, restored to operation). Co-Authored-By: Claude Opus 4.7 (1M context) --- lambda_counterfactual/handler.py | 31 ++++- replay/counterfactual.py | 66 +++++++++-- tests/test_lambda_counterfactual_handler.py | 44 +++++++- tests/test_replay_counterfactual.py | 118 +++++++++++++++++++- 4 files changed, 243 insertions(+), 16 deletions(-) diff --git a/lambda_counterfactual/handler.py b/lambda_counterfactual/handler.py index 08d60b5..242141f 100644 --- a/lambda_counterfactual/handler.py +++ b/lambda_counterfactual/handler.py @@ -20,8 +20,13 @@ { "end_time_iso": "2026-05-09T00:00:00Z", # default: now UTC - "window_days": 56, # default: 8 weeks + "window_days": 28, # default: 4 weeks (was 56, + # reduced 2026-05-19 to fit + # under 600s Lambda ceiling + # once corpus crossed ~32k+ + # in 56d — ROADMAP L293) "max_depth": 3, # default: 3 ("3-deep rule") + "max_artifacts_per_agent": 500, # default: 500 (None=unbounded) "agents": ["ic_cio","macro_economist"], # default: all supported (v1) "dry_run": false # default: false } @@ -124,8 +129,25 @@ def handler(event: dict, context) -> dict: datetime.fromisoformat(end_time_iso.replace("Z", "+00:00")) if end_time_iso else None ) - window_days = int(event.get("window_days", 56)) + # ROADMAP L293 (2026-05-19): default window 56 → 28 days to fit + # under the 600s Lambda ceiling. Original 56d still selectable via + # the event override for ad-hoc deeper-corpus runs. + from replay.counterfactual import ( + DEFAULT_MAX_ARTIFACTS_PER_AGENT, + DEFAULT_WINDOW_DAYS, + ) + + window_days = int(event.get("window_days", DEFAULT_WINDOW_DAYS)) max_depth = int(event.get("max_depth", 3)) + # max_artifacts_per_agent: explicit None disables the cap; absent + # field gets the module default. + if "max_artifacts_per_agent" in event: + _raw = event["max_artifacts_per_agent"] + max_artifacts_per_agent: int | None = ( + None if _raw is None else int(_raw) + ) + else: + max_artifacts_per_agent = DEFAULT_MAX_ARTIFACTS_PER_AGENT agent_filter = event.get("agents") or None if isinstance(agent_filter, str): agent_filter = [a.strip() for a in agent_filter.split(",") if a.strip()] @@ -133,8 +155,8 @@ def handler(event: dict, context) -> dict: logger.info( "[lambda_counterfactual] start window_days=%d max_depth=%d " - "agents=%s dry_run=%s end_time=%s", - window_days, max_depth, agent_filter, dry_run, + "max_artifacts_per_agent=%s agents=%s dry_run=%s end_time=%s", + window_days, max_depth, max_artifacts_per_agent, agent_filter, dry_run, end_time_iso or "(now UTC)", ) @@ -143,6 +165,7 @@ def handler(event: dict, context) -> dict: end_time=end_time, window_days=window_days, max_depth=max_depth, + max_artifacts_per_agent=max_artifacts_per_agent, agent_filter=agent_filter, bucket=bucket, emit_metrics=not dry_run, diff --git a/replay/counterfactual.py b/replay/counterfactual.py index b9239b8..8eb2956 100644 --- a/replay/counterfactual.py +++ b/replay/counterfactual.py @@ -73,7 +73,27 @@ DEFAULT_CAPTURE_PREFIX = "decision_artifacts" DEFAULT_ANALYSIS_PREFIX = "decision_artifacts/_counterfactual" -DEFAULT_WINDOW_DAYS = 56 # 8 weeks — same convention as clustering +DEFAULT_WINDOW_DAYS = 28 +"""Trailing scan window. Originally 56 days (8 weeks — same convention as +clustering); reduced 2026-05-19 to fit under the 600s Lambda ceiling once +captured-artifact count crossed ~32k+ in the 56-day window (ROADMAP L293). +28 days is still > the 30-day statistical-significance heuristic for the +LdP triple-barrier fits the counterfactual gate consumes, and at the +current ~585 artifacts/day rate keeps total per-run I/O bounded around +16k get_objects (vs 32k+ on the 56d default that timed out 5/16 + 5/17). + +The per-agent cap below is the second-order bound — even if the window +default is reverted operationally, no single agent's artifact backlog +can stall the run.""" + +DEFAULT_MAX_ARTIFACTS_PER_AGENT = 500 +"""Hard cap on per-agent_id_base artifact loads. Most agents stay well +below this (sector teams emit 1-2 artifacts/day, ic_cio 1/day, etc.); +the cap exists to bound the heavy population-wide agents (thesis_update +hit ~25 picks/day × 28 days = ~700 artifacts on its own at the +2026-05-13 substrate ramp-up rate). Most-recent-first ordering preserves +the recency-weighted tree fit. None or 0 disables the cap.""" + DEFAULT_TREE_MAX_DEPTH = 3 MIN_SAMPLES_FOR_FIT = 10 @@ -355,12 +375,27 @@ def _list_artifact_keys_in_window( end_date: datetime, window_days: int, agent_filter: list[str] | None = None, + max_artifacts_per_agent: int | None = DEFAULT_MAX_ARTIFACTS_PER_AGENT, ) -> list[str]: """List captured-artifact keys under the trailing window. Same meta-prefix exclusion + per-day pagination as the rationale- - clustering + concordance pipelines.""" + clustering + concordance pipelines. + + Iterates day-by-day from end_date BACKWARD so the keys list is + already ordered most-recent-first. This lets the per-agent cap + drop the oldest artifacts when an agent_id_base has more than + ``max_artifacts_per_agent`` keys in the window. + + The cap is applied here (pre ``_load_artifact``) so the bounded + list is what hits the expensive S3 get_object loop downstream — + the bound translates directly into a wall-clock budget under the + 600s Lambda ceiling (ROADMAP L293). + """ paginator = s3.get_paginator("list_objects_v2") - keys: list[str] = [] + # day_offset iterates 0..N-1 from end_date (today) backward, so + # appending in iteration order produces a most-recent-first list. + keys_by_agent: dict[str, list[str]] = defaultdict(list) + all_keys: list[str] = [] for day_offset in range(window_days): day = end_date - timedelta(days=day_offset) @@ -383,12 +418,19 @@ def _list_artifact_keys_in_window( ) ): continue - if agent_filter: - base_id = _agent_id_base_from_key(key) - if base_id is None or base_id not in agent_filter: - continue - keys.append(key) - return keys + base_id = _agent_id_base_from_key(key) or "unknown" + if agent_filter and base_id not in agent_filter: + continue + if ( + max_artifacts_per_agent is not None + and max_artifacts_per_agent > 0 + and len(keys_by_agent[base_id]) >= max_artifacts_per_agent + ): + # Cap hit — drop older artifacts for this agent. + continue + keys_by_agent[base_id].append(key) + all_keys.append(key) + return all_keys def _agent_id_base_from_key(key: str) -> Optional[str]: @@ -477,6 +519,7 @@ def compute_and_emit( metric_name: str = DEFAULT_METRIC_NAME, max_depth: int = DEFAULT_TREE_MAX_DEPTH, agent_filter: Optional[list[str]] = None, + max_artifacts_per_agent: int | None = DEFAULT_MAX_ARTIFACTS_PER_AGENT, s3_client: Optional[Any] = None, cloudwatch_client: Optional[Any] = None, emit_metrics: bool = True, @@ -500,11 +543,14 @@ def compute_and_emit( end_date=end, window_days=window_days, agent_filter=agent_filter, + max_artifacts_per_agent=max_artifacts_per_agent, ) logger.info( - "[counterfactual] discovered %d artifacts window=[%s, %s]", + "[counterfactual] discovered %d artifacts window=[%s, %s] " + "(max_artifacts_per_agent=%s)", len(keys), window_start.isoformat(), end.isoformat(), + max_artifacts_per_agent if max_artifacts_per_agent else "unbounded", ) # Group rows by agent_id_base. Unsupported agents counted separately diff --git a/tests/test_lambda_counterfactual_handler.py b/tests/test_lambda_counterfactual_handler.py index 3a2ae1c..a257a6e 100644 --- a/tests/test_lambda_counterfactual_handler.py +++ b/tests/test_lambda_counterfactual_handler.py @@ -115,6 +115,9 @@ def test_error_when_compute_raises(self, handler_mod): class TestEventPayloadThreading: def test_default_window_days(self, handler_mod): + """ROADMAP L293 (2026-05-19): default window dropped 56 → 28 days + to keep the Saturday-SF Counterfactual Lambda under its 600s ceiling + after the captured-artifact corpus crossed ~32k+ in the 56d window.""" captured = {} def fake_compute(**kwargs): @@ -126,8 +129,47 @@ def fake_compute(**kwargs): side_effect=fake_compute): handler_mod.handler({}, context=None) - assert captured["window_days"] == 56 + assert captured["window_days"] == 28 assert captured["max_depth"] == 3 + # ROADMAP L293 (2026-05-19): second-order bound — per-agent cap + # defaults to 500 so a future heavy-population-agent backlog + # can't blow the runtime past the ceiling even at the 28d default. + assert captured["max_artifacts_per_agent"] == 500 + + def test_max_artifacts_per_agent_event_override(self, handler_mod): + """Explicit override threads through to compute_and_emit.""" + captured = {} + + def fake_compute(**kwargs): + captured.update(kwargs) + return _ok_summary() + + with patch.object(handler_mod, "_ensure_init"), \ + patch("replay.counterfactual.compute_and_emit", + side_effect=fake_compute): + handler_mod.handler( + {"max_artifacts_per_agent": 200}, context=None, + ) + + assert captured["max_artifacts_per_agent"] == 200 + + def test_max_artifacts_per_agent_none_disables_cap(self, handler_mod): + """Explicit None in the event payload disables the cap for + ad-hoc deeper-corpus runs.""" + captured = {} + + def fake_compute(**kwargs): + captured.update(kwargs) + return _ok_summary() + + with patch.object(handler_mod, "_ensure_init"), \ + patch("replay.counterfactual.compute_and_emit", + side_effect=fake_compute): + handler_mod.handler( + {"max_artifacts_per_agent": None}, context=None, + ) + + assert captured["max_artifacts_per_agent"] is None def test_window_days_event_override(self, handler_mod): captured = {} diff --git a/tests/test_replay_counterfactual.py b/tests/test_replay_counterfactual.py index d5d0f87..eb71619 100644 --- a/tests/test_replay_counterfactual.py +++ b/tests/test_replay_counterfactual.py @@ -10,7 +10,7 @@ from __future__ import annotations import json -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock, patch import pytest @@ -351,3 +351,119 @@ def test_thin_sample_skipped_at_pipeline(self): assert summary["agents_skipped_thin_sample"][0]["agent_id_base"] == "ic_cio" # No metric emission for thin-sample skip. cw.put_metric_data.assert_not_called() + + +# ── ROADMAP L293 (2026-05-19) — per-agent artifact cap regression suite ── + + +class TestPerAgentArtifactCap: + """Bounds the artifact-scan ceiling so the Saturday-SF Counterfactual + Lambda stays under its 600s timeout regardless of how heavy any single + agent's decision-artifact backlog grows. The cap applies at the + list-keys stage (pre ``_load_artifact``) so it directly reduces the + expensive S3 get_object loop.""" + + def test_cap_drops_oldest_when_one_agent_dominates(self): + """Single agent with more artifacts than the cap → most-recent-first + truncation keeps only ``max_artifacts_per_agent`` keys for that + agent. The list iterates day-by-day backward from end_date so the + first ``cap`` keys are the freshest.""" + from replay.counterfactual import _list_artifact_keys_in_window + + end = datetime(2026, 5, 9, tzinfo=timezone.utc) + # Build artifacts spread across 5 days for a single agent, 3/day + artifacts: dict[str, dict] = {} + for day_offset in range(5): + day = end - timedelta(days=day_offset) + day_key = day.strftime("%Y/%m/%d") + for i in range(3): + k = f"decision_artifacts/{day_key}/ic_cio/r{day_offset}_{i}.json" + artifacts[k] = {"agent_id": "ic_cio"} + s3 = _build_s3_stub(artifacts) + + # Cap at 7 — should keep day 0 (3 keys) + day 1 (3 keys) + 1 from day 2. + keys = _list_artifact_keys_in_window( + s3, + bucket="b", + capture_prefix="decision_artifacts", + end_date=end, + window_days=5, + max_artifacts_per_agent=7, + ) + assert len(keys) == 7 + # All retained keys are from the 3 most-recent days (5/9, 5/8, 5/7). + retained_days = {k.split("/")[3] for k in keys} + assert retained_days <= {"09", "08", "07"} + + def test_cap_none_returns_all_keys(self): + """``max_artifacts_per_agent=None`` disables the cap — full corpus.""" + from replay.counterfactual import _list_artifact_keys_in_window + + end = datetime(2026, 5, 9, tzinfo=timezone.utc) + artifacts = { + f"decision_artifacts/2026/05/09/ic_cio/r{i}.json": {"agent_id": "ic_cio"} + for i in range(15) + } + s3 = _build_s3_stub(artifacts) + + keys = _list_artifact_keys_in_window( + s3, + bucket="b", + capture_prefix="decision_artifacts", + end_date=end, + window_days=1, + max_artifacts_per_agent=None, + ) + assert len(keys) == 15 + + def test_cap_zero_returns_all_keys(self): + """``max_artifacts_per_agent=0`` is treated as unbounded (defensive + — operator may set 0 expecting "no cap" or "no work"; the + less-surprising behavior is to disable the cap rather than emit + zero data).""" + from replay.counterfactual import _list_artifact_keys_in_window + + end = datetime(2026, 5, 9, tzinfo=timezone.utc) + artifacts = { + f"decision_artifacts/2026/05/09/ic_cio/r{i}.json": {"agent_id": "ic_cio"} + for i in range(10) + } + s3 = _build_s3_stub(artifacts) + + keys = _list_artifact_keys_in_window( + s3, + bucket="b", + capture_prefix="decision_artifacts", + end_date=end, + window_days=1, + max_artifacts_per_agent=0, + ) + assert len(keys) == 10 + + def test_cap_applied_per_agent_id_base(self): + """Two agents each exceeding the cap → each independently truncated + to ``max_artifacts_per_agent``. Cap is a per-agent ceiling, not a + global one.""" + from replay.counterfactual import _list_artifact_keys_in_window + + end = datetime(2026, 5, 9, tzinfo=timezone.utc) + artifacts: dict[str, dict] = {} + for i in range(10): + artifacts[f"decision_artifacts/2026/05/09/ic_cio/r{i}.json"] = {} + artifacts[f"decision_artifacts/2026/05/09/macro_economist/r{i}.json"] = {} + s3 = _build_s3_stub(artifacts) + + keys = _list_artifact_keys_in_window( + s3, + bucket="b", + capture_prefix="decision_artifacts", + end_date=end, + window_days=1, + max_artifacts_per_agent=3, + ) + # 3 per agent × 2 agents = 6 total. + assert len(keys) == 6 + ic_cio_keys = [k for k in keys if "/ic_cio/" in k] + macro_keys = [k for k in keys if "/macro_economist/" in k] + assert len(ic_cio_keys) == 3 + assert len(macro_keys) == 3