diff --git a/lambda_counterfactual/handler.py b/lambda_counterfactual/handler.py index 08d60b5..242141f 100644 --- a/lambda_counterfactual/handler.py +++ b/lambda_counterfactual/handler.py @@ -20,8 +20,13 @@ { "end_time_iso": "2026-05-09T00:00:00Z", # default: now UTC - "window_days": 56, # default: 8 weeks + "window_days": 28, # default: 4 weeks (was 56, + # reduced 2026-05-19 to fit + # under 600s Lambda ceiling + # once corpus crossed ~32k+ + # in 56d — ROADMAP L293) "max_depth": 3, # default: 3 ("3-deep rule") + "max_artifacts_per_agent": 500, # default: 500 (None=unbounded) "agents": ["ic_cio","macro_economist"], # default: all supported (v1) "dry_run": false # default: false } @@ -124,8 +129,25 @@ def handler(event: dict, context) -> dict: datetime.fromisoformat(end_time_iso.replace("Z", "+00:00")) if end_time_iso else None ) - window_days = int(event.get("window_days", 56)) + # ROADMAP L293 (2026-05-19): default window 56 → 28 days to fit + # under the 600s Lambda ceiling. Original 56d still selectable via + # the event override for ad-hoc deeper-corpus runs. + from replay.counterfactual import ( + DEFAULT_MAX_ARTIFACTS_PER_AGENT, + DEFAULT_WINDOW_DAYS, + ) + + window_days = int(event.get("window_days", DEFAULT_WINDOW_DAYS)) max_depth = int(event.get("max_depth", 3)) + # max_artifacts_per_agent: explicit None disables the cap; absent + # field gets the module default. + if "max_artifacts_per_agent" in event: + _raw = event["max_artifacts_per_agent"] + max_artifacts_per_agent: int | None = ( + None if _raw is None else int(_raw) + ) + else: + max_artifacts_per_agent = DEFAULT_MAX_ARTIFACTS_PER_AGENT agent_filter = event.get("agents") or None if isinstance(agent_filter, str): agent_filter = [a.strip() for a in agent_filter.split(",") if a.strip()] @@ -133,8 +155,8 @@ def handler(event: dict, context) -> dict: logger.info( "[lambda_counterfactual] start window_days=%d max_depth=%d " - "agents=%s dry_run=%s end_time=%s", - window_days, max_depth, agent_filter, dry_run, + "max_artifacts_per_agent=%s agents=%s dry_run=%s end_time=%s", + window_days, max_depth, max_artifacts_per_agent, agent_filter, dry_run, end_time_iso or "(now UTC)", ) @@ -143,6 +165,7 @@ def handler(event: dict, context) -> dict: end_time=end_time, window_days=window_days, max_depth=max_depth, + max_artifacts_per_agent=max_artifacts_per_agent, agent_filter=agent_filter, bucket=bucket, emit_metrics=not dry_run, diff --git a/replay/counterfactual.py b/replay/counterfactual.py index b9239b8..8eb2956 100644 --- a/replay/counterfactual.py +++ b/replay/counterfactual.py @@ -73,7 +73,27 @@ DEFAULT_CAPTURE_PREFIX = "decision_artifacts" DEFAULT_ANALYSIS_PREFIX = "decision_artifacts/_counterfactual" -DEFAULT_WINDOW_DAYS = 56 # 8 weeks — same convention as clustering +DEFAULT_WINDOW_DAYS = 28 +"""Trailing scan window. Originally 56 days (8 weeks — same convention as +clustering); reduced 2026-05-19 to fit under the 600s Lambda ceiling once +captured-artifact count crossed ~32k+ in the 56-day window (ROADMAP L293). +28 days is still > the 30-day statistical-significance heuristic for the +LdP triple-barrier fits the counterfactual gate consumes, and at the +current ~585 artifacts/day rate keeps total per-run I/O bounded around +16k get_objects (vs 32k+ on the 56d default that timed out 5/16 + 5/17). + +The per-agent cap below is the second-order bound — even if the window +default is reverted operationally, no single agent's artifact backlog +can stall the run.""" + +DEFAULT_MAX_ARTIFACTS_PER_AGENT = 500 +"""Hard cap on per-agent_id_base artifact loads. Most agents stay well +below this (sector teams emit 1-2 artifacts/day, ic_cio 1/day, etc.); +the cap exists to bound the heavy population-wide agents (thesis_update +hit ~25 picks/day × 28 days = ~700 artifacts on its own at the +2026-05-13 substrate ramp-up rate). Most-recent-first ordering preserves +the recency-weighted tree fit. None or 0 disables the cap.""" + DEFAULT_TREE_MAX_DEPTH = 3 MIN_SAMPLES_FOR_FIT = 10 @@ -355,12 +375,27 @@ def _list_artifact_keys_in_window( end_date: datetime, window_days: int, agent_filter: list[str] | None = None, + max_artifacts_per_agent: int | None = DEFAULT_MAX_ARTIFACTS_PER_AGENT, ) -> list[str]: """List captured-artifact keys under the trailing window. Same meta-prefix exclusion + per-day pagination as the rationale- - clustering + concordance pipelines.""" + clustering + concordance pipelines. + + Iterates day-by-day from end_date BACKWARD so the keys list is + already ordered most-recent-first. This lets the per-agent cap + drop the oldest artifacts when an agent_id_base has more than + ``max_artifacts_per_agent`` keys in the window. + + The cap is applied here (pre ``_load_artifact``) so the bounded + list is what hits the expensive S3 get_object loop downstream — + the bound translates directly into a wall-clock budget under the + 600s Lambda ceiling (ROADMAP L293). + """ paginator = s3.get_paginator("list_objects_v2") - keys: list[str] = [] + # day_offset iterates 0..N-1 from end_date (today) backward, so + # appending in iteration order produces a most-recent-first list. + keys_by_agent: dict[str, list[str]] = defaultdict(list) + all_keys: list[str] = [] for day_offset in range(window_days): day = end_date - timedelta(days=day_offset) @@ -383,12 +418,19 @@ def _list_artifact_keys_in_window( ) ): continue - if agent_filter: - base_id = _agent_id_base_from_key(key) - if base_id is None or base_id not in agent_filter: - continue - keys.append(key) - return keys + base_id = _agent_id_base_from_key(key) or "unknown" + if agent_filter and base_id not in agent_filter: + continue + if ( + max_artifacts_per_agent is not None + and max_artifacts_per_agent > 0 + and len(keys_by_agent[base_id]) >= max_artifacts_per_agent + ): + # Cap hit — drop older artifacts for this agent. + continue + keys_by_agent[base_id].append(key) + all_keys.append(key) + return all_keys def _agent_id_base_from_key(key: str) -> Optional[str]: @@ -477,6 +519,7 @@ def compute_and_emit( metric_name: str = DEFAULT_METRIC_NAME, max_depth: int = DEFAULT_TREE_MAX_DEPTH, agent_filter: Optional[list[str]] = None, + max_artifacts_per_agent: int | None = DEFAULT_MAX_ARTIFACTS_PER_AGENT, s3_client: Optional[Any] = None, cloudwatch_client: Optional[Any] = None, emit_metrics: bool = True, @@ -500,11 +543,14 @@ def compute_and_emit( end_date=end, window_days=window_days, agent_filter=agent_filter, + max_artifacts_per_agent=max_artifacts_per_agent, ) logger.info( - "[counterfactual] discovered %d artifacts window=[%s, %s]", + "[counterfactual] discovered %d artifacts window=[%s, %s] " + "(max_artifacts_per_agent=%s)", len(keys), window_start.isoformat(), end.isoformat(), + max_artifacts_per_agent if max_artifacts_per_agent else "unbounded", ) # Group rows by agent_id_base. Unsupported agents counted separately diff --git a/tests/test_lambda_counterfactual_handler.py b/tests/test_lambda_counterfactual_handler.py index 3a2ae1c..a257a6e 100644 --- a/tests/test_lambda_counterfactual_handler.py +++ b/tests/test_lambda_counterfactual_handler.py @@ -115,6 +115,9 @@ def test_error_when_compute_raises(self, handler_mod): class TestEventPayloadThreading: def test_default_window_days(self, handler_mod): + """ROADMAP L293 (2026-05-19): default window dropped 56 → 28 days + to keep the Saturday-SF Counterfactual Lambda under its 600s ceiling + after the captured-artifact corpus crossed ~32k+ in the 56d window.""" captured = {} def fake_compute(**kwargs): @@ -126,8 +129,47 @@ def fake_compute(**kwargs): side_effect=fake_compute): handler_mod.handler({}, context=None) - assert captured["window_days"] == 56 + assert captured["window_days"] == 28 assert captured["max_depth"] == 3 + # ROADMAP L293 (2026-05-19): second-order bound — per-agent cap + # defaults to 500 so a future heavy-population-agent backlog + # can't blow the runtime past the ceiling even at the 28d default. + assert captured["max_artifacts_per_agent"] == 500 + + def test_max_artifacts_per_agent_event_override(self, handler_mod): + """Explicit override threads through to compute_and_emit.""" + captured = {} + + def fake_compute(**kwargs): + captured.update(kwargs) + return _ok_summary() + + with patch.object(handler_mod, "_ensure_init"), \ + patch("replay.counterfactual.compute_and_emit", + side_effect=fake_compute): + handler_mod.handler( + {"max_artifacts_per_agent": 200}, context=None, + ) + + assert captured["max_artifacts_per_agent"] == 200 + + def test_max_artifacts_per_agent_none_disables_cap(self, handler_mod): + """Explicit None in the event payload disables the cap for + ad-hoc deeper-corpus runs.""" + captured = {} + + def fake_compute(**kwargs): + captured.update(kwargs) + return _ok_summary() + + with patch.object(handler_mod, "_ensure_init"), \ + patch("replay.counterfactual.compute_and_emit", + side_effect=fake_compute): + handler_mod.handler( + {"max_artifacts_per_agent": None}, context=None, + ) + + assert captured["max_artifacts_per_agent"] is None def test_window_days_event_override(self, handler_mod): captured = {} diff --git a/tests/test_replay_counterfactual.py b/tests/test_replay_counterfactual.py index d5d0f87..eb71619 100644 --- a/tests/test_replay_counterfactual.py +++ b/tests/test_replay_counterfactual.py @@ -10,7 +10,7 @@ from __future__ import annotations import json -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock, patch import pytest @@ -351,3 +351,119 @@ def test_thin_sample_skipped_at_pipeline(self): assert summary["agents_skipped_thin_sample"][0]["agent_id_base"] == "ic_cio" # No metric emission for thin-sample skip. cw.put_metric_data.assert_not_called() + + +# ── ROADMAP L293 (2026-05-19) — per-agent artifact cap regression suite ── + + +class TestPerAgentArtifactCap: + """Bounds the artifact-scan ceiling so the Saturday-SF Counterfactual + Lambda stays under its 600s timeout regardless of how heavy any single + agent's decision-artifact backlog grows. The cap applies at the + list-keys stage (pre ``_load_artifact``) so it directly reduces the + expensive S3 get_object loop.""" + + def test_cap_drops_oldest_when_one_agent_dominates(self): + """Single agent with more artifacts than the cap → most-recent-first + truncation keeps only ``max_artifacts_per_agent`` keys for that + agent. The list iterates day-by-day backward from end_date so the + first ``cap`` keys are the freshest.""" + from replay.counterfactual import _list_artifact_keys_in_window + + end = datetime(2026, 5, 9, tzinfo=timezone.utc) + # Build artifacts spread across 5 days for a single agent, 3/day + artifacts: dict[str, dict] = {} + for day_offset in range(5): + day = end - timedelta(days=day_offset) + day_key = day.strftime("%Y/%m/%d") + for i in range(3): + k = f"decision_artifacts/{day_key}/ic_cio/r{day_offset}_{i}.json" + artifacts[k] = {"agent_id": "ic_cio"} + s3 = _build_s3_stub(artifacts) + + # Cap at 7 — should keep day 0 (3 keys) + day 1 (3 keys) + 1 from day 2. + keys = _list_artifact_keys_in_window( + s3, + bucket="b", + capture_prefix="decision_artifacts", + end_date=end, + window_days=5, + max_artifacts_per_agent=7, + ) + assert len(keys) == 7 + # All retained keys are from the 3 most-recent days (5/9, 5/8, 5/7). + retained_days = {k.split("/")[3] for k in keys} + assert retained_days <= {"09", "08", "07"} + + def test_cap_none_returns_all_keys(self): + """``max_artifacts_per_agent=None`` disables the cap — full corpus.""" + from replay.counterfactual import _list_artifact_keys_in_window + + end = datetime(2026, 5, 9, tzinfo=timezone.utc) + artifacts = { + f"decision_artifacts/2026/05/09/ic_cio/r{i}.json": {"agent_id": "ic_cio"} + for i in range(15) + } + s3 = _build_s3_stub(artifacts) + + keys = _list_artifact_keys_in_window( + s3, + bucket="b", + capture_prefix="decision_artifacts", + end_date=end, + window_days=1, + max_artifacts_per_agent=None, + ) + assert len(keys) == 15 + + def test_cap_zero_returns_all_keys(self): + """``max_artifacts_per_agent=0`` is treated as unbounded (defensive + — operator may set 0 expecting "no cap" or "no work"; the + less-surprising behavior is to disable the cap rather than emit + zero data).""" + from replay.counterfactual import _list_artifact_keys_in_window + + end = datetime(2026, 5, 9, tzinfo=timezone.utc) + artifacts = { + f"decision_artifacts/2026/05/09/ic_cio/r{i}.json": {"agent_id": "ic_cio"} + for i in range(10) + } + s3 = _build_s3_stub(artifacts) + + keys = _list_artifact_keys_in_window( + s3, + bucket="b", + capture_prefix="decision_artifacts", + end_date=end, + window_days=1, + max_artifacts_per_agent=0, + ) + assert len(keys) == 10 + + def test_cap_applied_per_agent_id_base(self): + """Two agents each exceeding the cap → each independently truncated + to ``max_artifacts_per_agent``. Cap is a per-agent ceiling, not a + global one.""" + from replay.counterfactual import _list_artifact_keys_in_window + + end = datetime(2026, 5, 9, tzinfo=timezone.utc) + artifacts: dict[str, dict] = {} + for i in range(10): + artifacts[f"decision_artifacts/2026/05/09/ic_cio/r{i}.json"] = {} + artifacts[f"decision_artifacts/2026/05/09/macro_economist/r{i}.json"] = {} + s3 = _build_s3_stub(artifacts) + + keys = _list_artifact_keys_in_window( + s3, + bucket="b", + capture_prefix="decision_artifacts", + end_date=end, + window_days=1, + max_artifacts_per_agent=3, + ) + # 3 per agent × 2 agents = 6 total. + assert len(keys) == 6 + ic_cio_keys = [k for k in keys if "/ic_cio/" in k] + macro_keys = [k for k in keys if "/macro_economist/" in k] + assert len(ic_cio_keys) == 3 + assert len(macro_keys) == 3