Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions lambda_concordance/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,27 @@ def handler(event: dict, context) -> dict:

# Imports deferred until after _ensure_init so SSM-loaded secrets
# are available for any module-level init that consults them.
from replay import is_shell_run_dry, shell_run_dry_response
from replay.batch import (
DEFAULT_MAX_ARTIFACTS,
compute_and_emit_concordance,
)

t0 = time.time()

# Shell-run dry path (Saturday-SF keystone). Boot + module imports
# above have already run for real (the keystone's whole point —
# exercise bootstrap/import/lib-pin/transport). Return a benign
# success BEFORE the replay.batch scan (decision_artifacts S3
# discovery), BEFORE any langchain_anthropic / target-model call,
# and BEFORE any CloudWatch metric emit or S3 summary persist.
if is_shell_run_dry(event):
logger.info(
"[lambda_concordance] shell-run dry path: boot+imports OK, "
"skipping replay scan + Anthropic + S3/CW writes"
)
return shell_run_dry_response("lambda_concordance", t0)

bucket = os.environ.get("S3_BUCKET", "alpha-engine-research")

target_models = event.get("target_models") or ["claude-haiku-4-5"]
Expand Down
20 changes: 20 additions & 0 deletions lambda_counterfactual/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,29 @@ def handler(event: dict, context) -> dict:
"""
_ensure_init()

from replay import is_shell_run_dry, shell_run_dry_response
from replay.counterfactual import compute_and_emit

t0 = time.time()

# Shell-run dry path (Saturday-SF keystone). Boot + module imports
# above have already run for real. Return a benign success BEFORE
# the replay.counterfactual scan (decision_artifacts S3 discovery +
# sklearn fit), and BEFORE any CloudWatch metric emit or S3
# per-agent analysis persist. No LLM calls exist on this path.
#
# Side benefit (NOT the contract): because the corpus scan is
# skipped, this also sidesteps the known separate production
# Counterfactual 600s-timeout-on-corpus-growth bug under shell_run
# — that real-Saturday timeout remains a distinct out-of-scope
# issue tracked separately; the scan logic is untouched here.
if is_shell_run_dry(event):
logger.info(
"[lambda_counterfactual] shell-run dry path: boot+imports "
"OK, skipping replay scan + sklearn fit + S3/CW writes"
)
return shell_run_dry_response("lambda_counterfactual", t0)

bucket = os.environ.get("S3_BUCKET", "alpha-engine-research")

end_time_iso = event.get("end_time_iso")
Expand Down
62 changes: 62 additions & 0 deletions replay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,65 @@
model would emit the same*. Together they cover the agent-
justification triple alongside the counterfactual-rule-fit signal.
"""

from __future__ import annotations

import time
from typing import Any

# Canonical Saturday-SF shell-run dry-path event key. Established
# verbatim by the shell-run keystone (alpha-engine-data
# step_function.json) for the Research Lambda
# (``"dry_run_llm.$": "$.research_dry"``); reused here so the
# ReplayConcordance + Counterfactual states can be routed dry (boot +
# imports for real, return a benign success before any scan / external
# call / S3 / CloudWatch write) instead of pure-skipped. Distinct from
# the handlers' pre-existing ``dry_run`` event key, which has a
# different (compute-but-do-not-emit-metrics) semantic and is left
# untouched for backward compatibility.
SHELL_RUN_DRY_EVENT_KEY = "dry_run_llm"


def is_shell_run_dry(event: dict | None) -> bool:
"""True when the SF shell-run keystone routed this Lambda dry.

Reads the canonical ``dry_run_llm`` boolean off the invocation
event. Tolerates a missing/None event and string ``"true"``/``"1"``
forms (Step Functions string-parameter convenience), mirroring the
coercion the handlers already apply to ``agents``/``target_models``.
"""
if not event:
return False
raw = event.get(SHELL_RUN_DRY_EVENT_KEY, False)
if isinstance(raw, str):
return raw.strip().lower() in {"true", "1", "yes"}
return bool(raw)


def shell_run_dry_response(handler_name: str, t0: float) -> dict:
"""Benign success envelope returned BEFORE the replay scan.

Returned by both replay Lambdas when ``is_shell_run_dry`` is true.
Hard invariant at the call site: zero external/LLM calls, zero
S3/CloudWatch writes, no decision_artifacts discovery — boot +
module imports have already run for real by the time this is
called. ``status`` is a recognised value the SF (Catch-wrapped,
non-blocking) treats as success.
"""
return {
"status": "DRY_RUN",
"dry_run": True,
"handler": handler_name,
"note": (
"shell-run dry path: boot + imports executed; replay scan, "
"external/LLM calls, and all S3/CloudWatch writes skipped"
),
"duration_seconds": round(time.time() - t0, 1),
}


__all__ = [
"SHELL_RUN_DRY_EVENT_KEY",
"is_shell_run_dry",
"shell_run_dry_response",
]
61 changes: 61 additions & 0 deletions tests/test_lambda_concordance_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,64 @@ def fake_compute(**kwargs):
context=None,
)
assert captured["agent_filter"] == ["sector_quant", "ic_cio"]


# ── Shell-run dry path (Saturday-SF keystone) ────────────────────────────


class TestShellRunDryPath:
"""`dry_run_llm: true` (the canonical keystone shell-run key) must
short-circuit BEFORE the replay scan: no compute_and_emit_concordance
call (so no decision_artifacts S3 discovery, no langchain_anthropic /
target-model call, no CloudWatch metric emit, no S3 summary
persist), boot + module imports still run, and a benign success
envelope is returned."""

def test_dry_run_llm_short_circuits_before_scan(self, handler_mod):
with patch.object(handler_mod, "_ensure_init") as m_init, \
patch("replay.batch.compute_and_emit_concordance") as m_compute:
result = handler_mod.handler({"dry_run_llm": True}, context=None)

# Boot/init still ran for real (the keystone's whole point).
m_init.assert_called_once()
# The replay scan / Anthropic / S3+CW path was never entered.
m_compute.assert_not_called()
# SF (Catch-wrapped, non-blocking) treats this as success.
assert result["status"] == "DRY_RUN"
assert result["dry_run"] is True
assert result["handler"] == "lambda_concordance"
assert "duration_seconds" in result

def test_dry_run_llm_string_true_coerced(self, handler_mod):
with patch.object(handler_mod, "_ensure_init"), \
patch("replay.batch.compute_and_emit_concordance") as m_compute:
result = handler_mod.handler({"dry_run_llm": "true"}, context=None)
m_compute.assert_not_called()
assert result["status"] == "DRY_RUN"

def test_dry_run_llm_false_takes_real_path(self, handler_mod):
with patch.object(handler_mod, "_ensure_init"), \
patch("replay.batch.compute_and_emit_concordance",
return_value=_ok_summary()) as m_compute:
result = handler_mod.handler({"dry_run_llm": False}, context=None)
m_compute.assert_called_once()
assert result["status"] == "OK"

def test_absent_dry_run_llm_takes_real_path(self, handler_mod):
with patch.object(handler_mod, "_ensure_init"), \
patch("replay.batch.compute_and_emit_concordance",
return_value=_ok_summary()) as m_compute:
result = handler_mod.handler({}, context=None)
m_compute.assert_called_once()
assert result["status"] == "OK"

def test_legacy_dry_run_key_still_takes_real_path(self, handler_mod):
"""The pre-existing `dry_run` (compute-but-don't-emit-metrics)
semantic is preserved — it must NOT short-circuit the scan."""
with patch.object(handler_mod, "_ensure_init"), \
patch("replay.batch.compute_and_emit_concordance",
return_value=_ok_summary()) as m_compute:
result = handler_mod.handler({"dry_run": True}, context=None)
m_compute.assert_called_once()
assert m_compute.call_args.kwargs["emit_metrics"] is False
assert result["status"] == "OK"
58 changes: 58 additions & 0 deletions tests/test_lambda_counterfactual_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,61 @@ def fake_compute(**kwargs):
context=None,
)
assert captured["agent_filter"] == ["ic_cio"]


# ── Shell-run dry path (Saturday-SF keystone) ────────────────────────────


class TestShellRunDryPath:
"""`dry_run_llm: true` (the canonical keystone shell-run key) must
short-circuit BEFORE the replay scan: no compute_and_emit call (so
no decision_artifacts S3 discovery, no sklearn fit, no CloudWatch
metric emit, no S3 per-agent persist), boot + module imports still
run, and a benign success envelope is returned. No LLM calls exist
on this handler's path regardless."""

def test_dry_run_llm_short_circuits_before_scan(self, handler_mod):
with patch.object(handler_mod, "_ensure_init") as m_init, \
patch("replay.counterfactual.compute_and_emit") as m_compute:
result = handler_mod.handler({"dry_run_llm": True}, context=None)

m_init.assert_called_once()
m_compute.assert_not_called()
assert result["status"] == "DRY_RUN"
assert result["dry_run"] is True
assert result["handler"] == "lambda_counterfactual"
assert "duration_seconds" in result

def test_dry_run_llm_string_true_coerced(self, handler_mod):
with patch.object(handler_mod, "_ensure_init"), \
patch("replay.counterfactual.compute_and_emit") as m_compute:
result = handler_mod.handler({"dry_run_llm": "1"}, context=None)
m_compute.assert_not_called()
assert result["status"] == "DRY_RUN"

def test_dry_run_llm_false_takes_real_path(self, handler_mod):
with patch.object(handler_mod, "_ensure_init"), \
patch("replay.counterfactual.compute_and_emit",
return_value=_ok_summary()) as m_compute:
result = handler_mod.handler({"dry_run_llm": False}, context=None)
m_compute.assert_called_once()
assert result["status"] == "OK"

def test_absent_dry_run_llm_takes_real_path(self, handler_mod):
with patch.object(handler_mod, "_ensure_init"), \
patch("replay.counterfactual.compute_and_emit",
return_value=_ok_summary()) as m_compute:
result = handler_mod.handler({}, context=None)
m_compute.assert_called_once()
assert result["status"] == "OK"

def test_legacy_dry_run_key_still_takes_real_path(self, handler_mod):
"""The pre-existing `dry_run` (compute-but-don't-emit-metrics)
semantic is preserved — it must NOT short-circuit the scan."""
with patch.object(handler_mod, "_ensure_init"), \
patch("replay.counterfactual.compute_and_emit",
return_value=_ok_summary()) as m_compute:
result = handler_mod.handler({"dry_run": True}, context=None)
m_compute.assert_called_once()
assert m_compute.call_args.kwargs["emit_metrics"] is False
assert result["status"] == "OK"
59 changes: 59 additions & 0 deletions tests/test_replay_shell_run_dry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Unit tests for the shared shell-run dry helper in replay/__init__.py.

The helper is the single canonical (no-copy-paste) implementation used
by BOTH lambda_concordance/handler.py and lambda_counterfactual/
handler.py to short-circuit the Saturday-SF shell-run dry path before
any replay scan / external call / S3 / CloudWatch write.
"""

from __future__ import annotations

from replay import (
SHELL_RUN_DRY_EVENT_KEY,
is_shell_run_dry,
shell_run_dry_response,
)


class TestEventKey:
def test_canonical_key_is_dry_run_llm(self):
# Verbatim match with the keystone's Research-Lambda key
# (`"dry_run_llm.$": "$.research_dry"` in step_function.json).
assert SHELL_RUN_DRY_EVENT_KEY == "dry_run_llm"


class TestIsShellRunDry:
def test_true_bool(self):
assert is_shell_run_dry({"dry_run_llm": True}) is True

def test_false_bool(self):
assert is_shell_run_dry({"dry_run_llm": False}) is False

def test_absent_key(self):
assert is_shell_run_dry({}) is False

def test_none_event(self):
assert is_shell_run_dry(None) is False

def test_string_true_forms(self):
for v in ("true", "True", "TRUE", "1", "yes", " true "):
assert is_shell_run_dry({"dry_run_llm": v}) is True

def test_string_false_forms(self):
for v in ("false", "0", "no", ""):
assert is_shell_run_dry({"dry_run_llm": v}) is False

def test_legacy_dry_run_key_does_not_trigger(self):
# The pre-existing `dry_run` (compute-but-don't-emit) key must
# NOT be interpreted as the shell-run short-circuit signal.
assert is_shell_run_dry({"dry_run": True}) is False


class TestShellRunDryResponse:
def test_envelope_shape(self):
resp = shell_run_dry_response("lambda_concordance", 0.0)
assert resp["status"] == "DRY_RUN"
assert resp["dry_run"] is True
assert resp["handler"] == "lambda_concordance"
assert "note" in resp
assert isinstance(resp["duration_seconds"], float)
Loading