Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions lambda_counterfactual/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@

{
"end_time_iso": "2026-05-09T00:00:00Z", # default: now UTC
"window_days": 56, # default: 8 weeks
"window_days": 28, # default: 4 weeks (was 56,
# reduced 2026-05-19 to fit
# under 600s Lambda ceiling
# once corpus crossed ~32k+
# in 56d — ROADMAP L293)
"max_depth": 3, # default: 3 ("3-deep rule")
"max_artifacts_per_agent": 500, # default: 500 (None=unbounded)
"agents": ["ic_cio","macro_economist"], # default: all supported (v1)
"dry_run": false # default: false
}
Expand Down Expand Up @@ -124,17 +129,34 @@ def handler(event: dict, context) -> dict:
datetime.fromisoformat(end_time_iso.replace("Z", "+00:00"))
if end_time_iso else None
)
window_days = int(event.get("window_days", 56))
# ROADMAP L293 (2026-05-19): default window 56 → 28 days to fit
# under the 600s Lambda ceiling. Original 56d still selectable via
# the event override for ad-hoc deeper-corpus runs.
from replay.counterfactual import (
DEFAULT_MAX_ARTIFACTS_PER_AGENT,
DEFAULT_WINDOW_DAYS,
)

window_days = int(event.get("window_days", DEFAULT_WINDOW_DAYS))
max_depth = int(event.get("max_depth", 3))
# max_artifacts_per_agent: explicit None disables the cap; absent
# field gets the module default.
if "max_artifacts_per_agent" in event:
_raw = event["max_artifacts_per_agent"]
max_artifacts_per_agent: int | None = (
None if _raw is None else int(_raw)
)
else:
max_artifacts_per_agent = DEFAULT_MAX_ARTIFACTS_PER_AGENT
agent_filter = event.get("agents") or None
if isinstance(agent_filter, str):
agent_filter = [a.strip() for a in agent_filter.split(",") if a.strip()]
dry_run = bool(event.get("dry_run", False))

logger.info(
"[lambda_counterfactual] start window_days=%d max_depth=%d "
"agents=%s dry_run=%s end_time=%s",
window_days, max_depth, agent_filter, dry_run,
"max_artifacts_per_agent=%s agents=%s dry_run=%s end_time=%s",
window_days, max_depth, max_artifacts_per_agent, agent_filter, dry_run,
end_time_iso or "(now UTC)",
)

Expand All @@ -143,6 +165,7 @@ def handler(event: dict, context) -> dict:
end_time=end_time,
window_days=window_days,
max_depth=max_depth,
max_artifacts_per_agent=max_artifacts_per_agent,
agent_filter=agent_filter,
bucket=bucket,
emit_metrics=not dry_run,
Expand Down
66 changes: 56 additions & 10 deletions replay/counterfactual.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,27 @@
DEFAULT_CAPTURE_PREFIX = "decision_artifacts"
DEFAULT_ANALYSIS_PREFIX = "decision_artifacts/_counterfactual"

DEFAULT_WINDOW_DAYS = 56 # 8 weeks — same convention as clustering
DEFAULT_WINDOW_DAYS = 28
"""Trailing scan window. Originally 56 days (8 weeks — same convention as
clustering); reduced 2026-05-19 to fit under the 600s Lambda ceiling once
captured-artifact count crossed ~32k+ in the 56-day window (ROADMAP L293).
28 days is still > the 30-day statistical-significance heuristic for the
LdP triple-barrier fits the counterfactual gate consumes, and at the
current ~585 artifacts/day rate keeps total per-run I/O bounded around
16k get_objects (vs 32k+ on the 56d default that timed out 5/16 + 5/17).

The per-agent cap below is the second-order bound — even if the window
default is reverted operationally, no single agent's artifact backlog
can stall the run."""

DEFAULT_MAX_ARTIFACTS_PER_AGENT = 500
"""Hard cap on per-agent_id_base artifact loads. Most agents stay well
below this (sector teams emit 1-2 artifacts/day, ic_cio 1/day, etc.);
the cap exists to bound the heavy population-wide agents (thesis_update
hit ~25 picks/day × 28 days = ~700 artifacts on its own at the
2026-05-13 substrate ramp-up rate). Most-recent-first ordering preserves
the recency-weighted tree fit. None or 0 disables the cap."""

DEFAULT_TREE_MAX_DEPTH = 3

MIN_SAMPLES_FOR_FIT = 10
Expand Down Expand Up @@ -355,12 +375,27 @@ def _list_artifact_keys_in_window(
end_date: datetime,
window_days: int,
agent_filter: list[str] | None = None,
max_artifacts_per_agent: int | None = DEFAULT_MAX_ARTIFACTS_PER_AGENT,
) -> list[str]:
"""List captured-artifact keys under the trailing window. Same
meta-prefix exclusion + per-day pagination as the rationale-
clustering + concordance pipelines."""
clustering + concordance pipelines.

Iterates day-by-day from end_date BACKWARD so the keys list is
already ordered most-recent-first. This lets the per-agent cap
drop the oldest artifacts when an agent_id_base has more than
``max_artifacts_per_agent`` keys in the window.

The cap is applied here (pre ``_load_artifact``) so the bounded
list is what hits the expensive S3 get_object loop downstream —
the bound translates directly into a wall-clock budget under the
600s Lambda ceiling (ROADMAP L293).
"""
paginator = s3.get_paginator("list_objects_v2")
keys: list[str] = []
# day_offset iterates 0..N-1 from end_date (today) backward, so
# appending in iteration order produces a most-recent-first list.
keys_by_agent: dict[str, list[str]] = defaultdict(list)
all_keys: list[str] = []

for day_offset in range(window_days):
day = end_date - timedelta(days=day_offset)
Expand All @@ -383,12 +418,19 @@ def _list_artifact_keys_in_window(
)
):
continue
if agent_filter:
base_id = _agent_id_base_from_key(key)
if base_id is None or base_id not in agent_filter:
continue
keys.append(key)
return keys
base_id = _agent_id_base_from_key(key) or "unknown"
if agent_filter and base_id not in agent_filter:
continue
if (
max_artifacts_per_agent is not None
and max_artifacts_per_agent > 0
and len(keys_by_agent[base_id]) >= max_artifacts_per_agent
):
# Cap hit — drop older artifacts for this agent.
continue
keys_by_agent[base_id].append(key)
all_keys.append(key)
return all_keys


def _agent_id_base_from_key(key: str) -> Optional[str]:
Expand Down Expand Up @@ -477,6 +519,7 @@ def compute_and_emit(
metric_name: str = DEFAULT_METRIC_NAME,
max_depth: int = DEFAULT_TREE_MAX_DEPTH,
agent_filter: Optional[list[str]] = None,
max_artifacts_per_agent: int | None = DEFAULT_MAX_ARTIFACTS_PER_AGENT,
s3_client: Optional[Any] = None,
cloudwatch_client: Optional[Any] = None,
emit_metrics: bool = True,
Expand All @@ -500,11 +543,14 @@ def compute_and_emit(
end_date=end,
window_days=window_days,
agent_filter=agent_filter,
max_artifacts_per_agent=max_artifacts_per_agent,
)

logger.info(
"[counterfactual] discovered %d artifacts window=[%s, %s]",
"[counterfactual] discovered %d artifacts window=[%s, %s] "
"(max_artifacts_per_agent=%s)",
len(keys), window_start.isoformat(), end.isoformat(),
max_artifacts_per_agent if max_artifacts_per_agent else "unbounded",
)

# Group rows by agent_id_base. Unsupported agents counted separately
Expand Down
44 changes: 43 additions & 1 deletion tests/test_lambda_counterfactual_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ def test_error_when_compute_raises(self, handler_mod):

class TestEventPayloadThreading:
def test_default_window_days(self, handler_mod):
"""ROADMAP L293 (2026-05-19): default window dropped 56 → 28 days
to keep the Saturday-SF Counterfactual Lambda under its 600s ceiling
after the captured-artifact corpus crossed ~32k+ in the 56d window."""
captured = {}

def fake_compute(**kwargs):
Expand All @@ -126,8 +129,47 @@ def fake_compute(**kwargs):
side_effect=fake_compute):
handler_mod.handler({}, context=None)

assert captured["window_days"] == 56
assert captured["window_days"] == 28
assert captured["max_depth"] == 3
# ROADMAP L293 (2026-05-19): second-order bound — per-agent cap
# defaults to 500 so a future heavy-population-agent backlog
# can't blow the runtime past the ceiling even at the 28d default.
assert captured["max_artifacts_per_agent"] == 500

def test_max_artifacts_per_agent_event_override(self, handler_mod):
"""Explicit override threads through to compute_and_emit."""
captured = {}

def fake_compute(**kwargs):
captured.update(kwargs)
return _ok_summary()

with patch.object(handler_mod, "_ensure_init"), \
patch("replay.counterfactual.compute_and_emit",
side_effect=fake_compute):
handler_mod.handler(
{"max_artifacts_per_agent": 200}, context=None,
)

assert captured["max_artifacts_per_agent"] == 200

def test_max_artifacts_per_agent_none_disables_cap(self, handler_mod):
"""Explicit None in the event payload disables the cap for
ad-hoc deeper-corpus runs."""
captured = {}

def fake_compute(**kwargs):
captured.update(kwargs)
return _ok_summary()

with patch.object(handler_mod, "_ensure_init"), \
patch("replay.counterfactual.compute_and_emit",
side_effect=fake_compute):
handler_mod.handler(
{"max_artifacts_per_agent": None}, context=None,
)

assert captured["max_artifacts_per_agent"] is None

def test_window_days_event_override(self, handler_mod):
captured = {}
Expand Down
118 changes: 117 additions & 1 deletion tests/test_replay_counterfactual.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from __future__ import annotations

import json
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -351,3 +351,119 @@ def test_thin_sample_skipped_at_pipeline(self):
assert summary["agents_skipped_thin_sample"][0]["agent_id_base"] == "ic_cio"
# No metric emission for thin-sample skip.
cw.put_metric_data.assert_not_called()


# ── ROADMAP L293 (2026-05-19) — per-agent artifact cap regression suite ──


class TestPerAgentArtifactCap:
"""Bounds the artifact-scan ceiling so the Saturday-SF Counterfactual
Lambda stays under its 600s timeout regardless of how heavy any single
agent's decision-artifact backlog grows. The cap applies at the
list-keys stage (pre ``_load_artifact``) so it directly reduces the
expensive S3 get_object loop."""

def test_cap_drops_oldest_when_one_agent_dominates(self):
"""Single agent with more artifacts than the cap → most-recent-first
truncation keeps only ``max_artifacts_per_agent`` keys for that
agent. The list iterates day-by-day backward from end_date so the
first ``cap`` keys are the freshest."""
from replay.counterfactual import _list_artifact_keys_in_window

end = datetime(2026, 5, 9, tzinfo=timezone.utc)
# Build artifacts spread across 5 days for a single agent, 3/day
artifacts: dict[str, dict] = {}
for day_offset in range(5):
day = end - timedelta(days=day_offset)
day_key = day.strftime("%Y/%m/%d")
for i in range(3):
k = f"decision_artifacts/{day_key}/ic_cio/r{day_offset}_{i}.json"
artifacts[k] = {"agent_id": "ic_cio"}
s3 = _build_s3_stub(artifacts)

# Cap at 7 — should keep day 0 (3 keys) + day 1 (3 keys) + 1 from day 2.
keys = _list_artifact_keys_in_window(
s3,
bucket="b",
capture_prefix="decision_artifacts",
end_date=end,
window_days=5,
max_artifacts_per_agent=7,
)
assert len(keys) == 7
# All retained keys are from the 3 most-recent days (5/9, 5/8, 5/7).
retained_days = {k.split("/")[3] for k in keys}
assert retained_days <= {"09", "08", "07"}

def test_cap_none_returns_all_keys(self):
"""``max_artifacts_per_agent=None`` disables the cap — full corpus."""
from replay.counterfactual import _list_artifact_keys_in_window

end = datetime(2026, 5, 9, tzinfo=timezone.utc)
artifacts = {
f"decision_artifacts/2026/05/09/ic_cio/r{i}.json": {"agent_id": "ic_cio"}
for i in range(15)
}
s3 = _build_s3_stub(artifacts)

keys = _list_artifact_keys_in_window(
s3,
bucket="b",
capture_prefix="decision_artifacts",
end_date=end,
window_days=1,
max_artifacts_per_agent=None,
)
assert len(keys) == 15

def test_cap_zero_returns_all_keys(self):
"""``max_artifacts_per_agent=0`` is treated as unbounded (defensive
— operator may set 0 expecting "no cap" or "no work"; the
less-surprising behavior is to disable the cap rather than emit
zero data)."""
from replay.counterfactual import _list_artifact_keys_in_window

end = datetime(2026, 5, 9, tzinfo=timezone.utc)
artifacts = {
f"decision_artifacts/2026/05/09/ic_cio/r{i}.json": {"agent_id": "ic_cio"}
for i in range(10)
}
s3 = _build_s3_stub(artifacts)

keys = _list_artifact_keys_in_window(
s3,
bucket="b",
capture_prefix="decision_artifacts",
end_date=end,
window_days=1,
max_artifacts_per_agent=0,
)
assert len(keys) == 10

def test_cap_applied_per_agent_id_base(self):
"""Two agents each exceeding the cap → each independently truncated
to ``max_artifacts_per_agent``. Cap is a per-agent ceiling, not a
global one."""
from replay.counterfactual import _list_artifact_keys_in_window

end = datetime(2026, 5, 9, tzinfo=timezone.utc)
artifacts: dict[str, dict] = {}
for i in range(10):
artifacts[f"decision_artifacts/2026/05/09/ic_cio/r{i}.json"] = {}
artifacts[f"decision_artifacts/2026/05/09/macro_economist/r{i}.json"] = {}
s3 = _build_s3_stub(artifacts)

keys = _list_artifact_keys_in_window(
s3,
bucket="b",
capture_prefix="decision_artifacts",
end_date=end,
window_days=1,
max_artifacts_per_agent=3,
)
# 3 per agent × 2 agents = 6 total.
assert len(keys) == 6
ic_cio_keys = [k for k in keys if "/ic_cio/" in k]
macro_keys = [k for k in keys if "/macro_economist/" in k]
assert len(ic_cio_keys) == 3
assert len(macro_keys) == 3
Loading