diff --git a/evaluate.py b/evaluate.py index 874f152..8af6e1a 100644 --- a/evaluate.py +++ b/evaluate.py @@ -711,16 +711,15 @@ def _run_optimizers( ) # Tech weight ablation — per-sector recommendation by sub-score - # rank-correlation grid search. Recommendation-only; pairs with - # PR-A's quant_rank_quality diagnostic + PR-B's research v15 - # sub-score persistence. Status="insufficient_data" until ≥30 rows - # per team have populated sub-scores. + # rank-correlation grid search. Pairs with PR-A's quant_rank_quality + # diagnostic + PR-B's research v15 sub-score persistence. + # Status="insufficient_data" until ≥30 rows per team have populated + # sub-scores. Apply path is gated behind two flags + # (use_tech_ablation_target + enforce_tech_ablation) and a 4-week + # reproduction guard — see optimizer/tech_weight_ablation.apply(). results["tech_weight_ablation"] = tracker.run_module( "tech_weight_ablation", - lambda: tech_weight_ablation.compute_tech_weight_ablation( - db_path=config.get("research_db"), - run_date=config.get("_run_date"), - ), + lambda: _run_tech_weight_ablation(config, freeze), required_inputs={"research_db": avail["research_db"]}, skip_if_missing=["research_db"], ) @@ -845,6 +844,29 @@ def _run_cio_opt(e2e_lift: dict, freeze: bool, bucket: str) -> dict: return result +def _run_tech_weight_ablation(config: dict, freeze: bool) -> dict: + """Run tech_weight_ablation compute + apply path (ROADMAP L2553). + + Apply contract mirrors executor_optimizer: compute the + recommendation, then call ``apply()`` to (optionally) write shadow + + live S3 artifacts. ``freeze=True`` short-circuits the apply() + call entirely so ``--freeze`` evaluator runs produce zero S3 side + effects. + """ + bucket = config.get("signals_bucket", "alpha-engine-research") + result = tech_weight_ablation.compute_tech_weight_ablation( + db_path=config.get("research_db"), + run_date=config.get("_run_date"), + ) + if freeze: + result["apply_result"] = { + "applied": False, "reason": "frozen (--freeze flag)", + } + else: + result["apply_result"] = tech_weight_ablation.apply(result, bucket) + return result + + def _run_executor_opt(config: dict, sweep_df, freeze: bool) -> dict: if sweep_df is None or (hasattr(sweep_df, "empty") and sweep_df.empty): return { @@ -948,6 +970,7 @@ def main() -> None: executor_optimizer.init_config(config) veto_analysis.init_config(config) research_optimizer.init_config(config) + tech_weight_ablation.init_config(config) # Set the assembler-cutover flag from config — when true, individual # optimizers' apply() skip their legacy live-key writes and the diff --git a/optimizer/tech_weight_ablation.py b/optimizer/tech_weight_ablation.py index 8ed22e8..d50f7f6 100644 --- a/optimizer/tech_weight_ablation.py +++ b/optimizer/tech_weight_ablation.py @@ -15,11 +15,22 @@ losers. This module sweeps a grid of alternate weight configs and recommends per-sector overrides. -**Recommendation-only** — does NOT auto-apply. Mirrors the executor -optimizer's PSR-confidence + parallel-observation pattern: ship the -recommendation, observe for N weeks, only flip live config under a -deliberate cutover. Auto-apply is a follow-up after the rank-quality -diagnostic confirms the recommendations are stable. +Two-stage activation (ROADMAP L2553 auto-apply cutover): + + 1. ``use_tech_ablation_target=True`` (default false) — every weekly + run that produces an ``ok`` recommendation writes a shadow payload + to ``config/scoring_weights_per_sector_shadow_history/{run_id}.json`` + (+ ``latest.json`` sidecar). Live config is unchanged. Pure + observability — the operator can compare shadow trajectories + against the live ``scoring.yaml`` baselines week-over-week. + + 2. ``enforce_tech_ablation=True`` (default false) — in addition to + the shadow archive, the apply path writes the live + ``config/scoring_weights_per_sector.json`` key **only if** the + reproduction gate passes: the same per-sector recommendation + dict must reproduce across the last ``_MIN_CONSECUTIVE_WEEKS`` + shadow archives. This prevents a single noisy week from flipping + sector weights live. Reads ``team_candidates`` joined to ``universe_returns`` for rows where the 5 sub-score columns are non-NULL (populated only after PR-B's v15 @@ -33,10 +44,11 @@ from __future__ import annotations +import json import logging import sqlite3 from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from pathlib import Path from typing import Any @@ -45,6 +57,33 @@ logger = logging.getLogger(__name__) +# ── S3 contract (auto-apply, ROADMAP L2553) ───────────────────────────────── + +S3_LIVE_KEY = "config/scoring_weights_per_sector.json" +S3_SHADOW_PREFIX = "config/scoring_weights_per_sector_shadow_history" + +# Reproduction gate: live write only fires when the same per-sector +# recommendation dict reproduces across this many consecutive shadow +# archives. Mirrors the ROADMAP L2553 "4+ consecutive Saturdays" +# acceptance. +_MIN_CONSECUTIVE_WEEKS = 4 + +# Module-level config ref — set by init_config() from evaluate.py +_cfg: dict = {} + + +def init_config(config: dict) -> None: + """Load tech_weight_ablation section from backtester config. + + Recognized keys (all default false): + - ``use_tech_ablation_target``: enable shadow-archive writes + - ``enforce_tech_ablation``: enable live writes (also requires + reproduction gate to pass) + """ + global _cfg + _cfg = config.get("tech_weight_ablation", {}) or {} + + # ── Canonical sectors (mirrors quant_rank_quality + decision_capture) ─────── CANONICAL_SECTORS = ( @@ -445,15 +484,287 @@ def compute_tech_weight_ablation( "recommendations": recommendations, "n_teams_ok": n_ok, "n_teams_with_recommendation": len(recommendations), - # Recommendation-only — no apply path. Mirrors the parallel- - # observation pattern: ship recommendation, observe N weeks, - # cut over deliberately. + # The compute step is recommendation-only; apply() is the + # cutover gate (two-flag activation + reproduction guard). "applied": False, "apply_note": ( - "recommendation-only — auto-apply gated on parallel " - "observation cutover (follow-up PR)" + "see apply() — gated on use_tech_ablation_target + " + "enforce_tech_ablation flags" ), } finally: if own_conn: conn.close() + + +# ── Auto-apply path (ROADMAP L2553) ───────────────────────────────────────── + + +def _config_name_to_weights(config_name: str) -> dict[str, float] | None: + """Map a WeightConfig name (e.g. ``"rsi_only"``) → its weight dict. + + Returns None when the name is not in DEFAULT_GRID — guards against + a future schema drift where the grid is reshuffled. + """ + for cfg in DEFAULT_GRID: + if cfg.name == config_name: + return { + "rsi": cfg.rsi, "macd": cfg.macd, + "ma50": cfg.ma50, "ma200": cfg.ma200, + "momentum": cfg.momentum, + } + return None + + +def _build_per_sector_payload(result: dict) -> dict[str, dict[str, float]]: + """Translate ``recommendations: {team_id -> config_name}`` into a + per-sector weights payload the research-side consumer can read as + an override layer on top of ``scoring.yaml``. + + Returns ``{team_id: {weight_name: weight_value, ...}, ...}``. + Teams with no recommendation (kept_current or non-`switch_to_*`) + are omitted — absent key means \"no override, use scoring.yaml + default\". + """ + out: dict[str, dict[str, float]] = {} + for team_id, config_name in (result.get("recommendations") or {}).items(): + weights = _config_name_to_weights(config_name) + if weights is not None: + out[team_id] = weights + return out + + +def _read_recent_shadow_archives( + s3, bucket: str, n: int, +) -> list[dict]: + """Read up to ``n`` most-recent shadow archives, newest first. + + Returns parsed JSON dicts. Missing/corrupt artifacts are skipped + with a warning — the reproduction gate then treats them as + \"reproduction not yet reached\". + """ + from botocore.exceptions import ClientError + + # List `{prefix}/...json` artifacts. The eval_artifact layout uses + # YYMMDDHHMM-encoded keys + a `latest.json` sidecar; sort + # lexicographically and the YYMMDDHHMM ordering doubles as time + # ordering. Skip the sidecar so we only score real archives. + try: + resp = s3.list_objects_v2( + Bucket=bucket, Prefix=f"{S3_SHADOW_PREFIX}/", + ) + except ClientError as e: + logger.warning( + "[tech_weight_ablation] shadow archive list failed (%s) — " + "treating as no history available", + type(e).__name__, + ) + return [] + keys = sorted( + (obj["Key"] for obj in (resp.get("Contents") or []) + if obj["Key"].endswith(".json") + and not obj["Key"].endswith("/latest.json")), + reverse=True, + )[:n] + + out: list[dict] = [] + for key in keys: + try: + obj = s3.get_object(Bucket=bucket, Key=key) + out.append(json.loads(obj["Body"].read())) + except Exception as e: + logger.warning( + "[tech_weight_ablation] shadow archive %s unreadable (%s) — " + "skipping", + key, type(e).__name__, + ) + return out + + +def _check_reproduction_gate( + s3, bucket: str, current_payload: dict[str, dict[str, float]], + *, min_consecutive: int = _MIN_CONSECUTIVE_WEEKS, +) -> dict: + """Pass = the same per-sector recommendation reproduces across the + last ``min_consecutive`` shadow archives. + + Returns ``{"passed": bool, "reason": str, "n_consecutive": int}``. + + A shadow archive matches when its ``per_sector`` dict equals + ``current_payload`` byte-for-byte (key set + weight values). One + week that disagrees breaks the streak — the gate explicitly does + NOT tolerate intermittent shadow drift, matching the L2553 + \"4+ consecutive Saturdays\" framing. + """ + archives = _read_recent_shadow_archives(s3, bucket, min_consecutive) + if len(archives) < min_consecutive: + return { + "passed": False, + "reason": ( + f"reproduction gate: only {len(archives)} prior shadow " + f"archive(s) available; need {min_consecutive}" + ), + "n_consecutive": len(archives), + } + for i, archive in enumerate(archives): + prior = archive.get("per_sector") or {} + if prior != current_payload: + return { + "passed": False, + "reason": ( + f"reproduction gate broken at archive[-{i + 1}]: " + f"per_sector payload differs from this week" + ), + "n_consecutive": i, + } + return { + "passed": True, + "reason": ( + f"per_sector payload reproduced across last " + f"{min_consecutive} shadow archives" + ), + "n_consecutive": min_consecutive, + } + + +def apply(result: dict, bucket: str) -> dict: + """Write the per-sector tech-weight recommendation to S3 under the + two-stage activation contract documented at module top. + + Two write paths: + + - **Shadow** (``use_tech_ablation_target=True``, + ``enforce_tech_ablation`` ignored): canonical eval-style archive + at ``config/scoring_weights_per_sector_shadow_history/{run_id}.json`` + + ``latest.json`` sidecar. Live config untouched. The shadow + archives are also what the reproduction gate reads to decide + whether to fire the live write. + + - **Live** (``use_tech_ablation_target=True`` AND + ``enforce_tech_ablation=True`` AND reproduction gate passes): + writes ``config/scoring_weights_per_sector.json`` after the + reproduction gate confirms the same recommendation reproduced + across the last ``_MIN_CONSECUTIVE_WEEKS`` shadow archives. + Always also writes the shadow archive — every fire goes into + history. + + Returns the standard apply-result dict shape with ``applied`` + + ``reason`` + per-path bookkeeping. Mirrors + ``executor_optimizer.apply()`` so the evaluator wiring is uniform. + """ + import boto3 + from alpha_engine_lib.eval_artifacts import ( + eval_artifact_key, eval_latest_key, new_eval_run_id, + ) + + use_shadow = bool(_cfg.get("use_tech_ablation_target", False)) + enforce = bool(_cfg.get("enforce_tech_ablation", False)) + + if not use_shadow: + # Flag off — nothing to do. Recommendation stays advisory. + return { + "applied": False, + "reason": "use_tech_ablation_target=False", + } + if result.get("status") != "ok": + return { + "applied": False, + "reason": f"compute status={result.get('status')}", + } + + per_sector = _build_per_sector_payload(result) + if not per_sector: + return { + "applied": False, + "reason": "no per-sector recommendation to apply", + } + + payload = { + "per_sector": per_sector, + "updated_at": str(date.today()), + "n_teams_with_recommendation": len(per_sector), + "source": "tech_weight_ablation", + "run_date": result.get("run_date"), + "min_improvement": result.get("min_improvement"), + } + body = json.dumps(payload, indent=2) + + s3 = boto3.client("s3") + + # Always write shadow when the flag is on — that's the whole + # observability point. Reproduction gate reads these. + run_id = new_eval_run_id() + shadow_key = eval_artifact_key(S3_SHADOW_PREFIX, run_id) + shadow_latest_key = eval_latest_key(S3_SHADOW_PREFIX) + try: + s3.put_object( + Bucket=bucket, Key=shadow_key, Body=body, + ContentType="application/json", + ) + s3.put_object( + Bucket=bucket, Key=shadow_latest_key, Body=body, + ContentType="application/json", + ) + logger.info( + "[tech_weight_ablation] shadow archive written: s3://%s/%s", + bucket, shadow_key, + ) + except Exception as e: + logger.error( + "[tech_weight_ablation] shadow archive write failed: %s", e, + ) + return { + "applied": False, + "reason": f"shadow S3 write failed: {e}", + } + + if not enforce: + return { + "applied": False, + "reason": "shadow mode (enforce_tech_ablation=False)", + "shadow_key": shadow_key, + "per_sector": per_sector, + } + + # Live-write gate: reproduction across last N shadow archives. + # NB the current week's archive has just been written, so the + # gate reads N archives starting with this one. + gate = _check_reproduction_gate(s3, bucket, per_sector) + if not gate["passed"]: + return { + "applied": False, + "reason": gate["reason"], + "shadow_key": shadow_key, + "per_sector": per_sector, + "reproduction_gate": gate, + } + + try: + s3.put_object( + Bucket=bucket, Key=S3_LIVE_KEY, Body=body, + ContentType="application/json", + ) + logger.info( + "[tech_weight_ablation] live config updated: s3://%s/%s — " + "per_sector=%s", + bucket, S3_LIVE_KEY, per_sector, + ) + except Exception as e: + logger.error( + "[tech_weight_ablation] CRITICAL: live S3 write failed: %s", e, + ) + return { + "applied": False, + "reason": f"live S3 write failed: {e}", + "shadow_key": shadow_key, + "per_sector": per_sector, + } + + return { + "applied": True, + "reason": "live config written + shadow archive recorded", + "live_key": S3_LIVE_KEY, + "shadow_key": shadow_key, + "per_sector": per_sector, + "reproduction_gate": gate, + } diff --git a/tests/test_tech_weight_ablation.py b/tests/test_tech_weight_ablation.py index a994bd8..a3c1cce 100644 --- a/tests/test_tech_weight_ablation.py +++ b/tests/test_tech_weight_ablation.py @@ -26,12 +26,19 @@ from optimizer.tech_weight_ablation import ( DEFAULT_GRID, + S3_LIVE_KEY, + S3_SHADOW_PREFIX, WeightConfig, + _MIN_CONSECUTIVE_WEEKS, _MIN_IMPROVEMENT, _MIN_ROWS_PER_TEAM, + _build_per_sector_payload, + _check_reproduction_gate, _evaluate_team_under_config, _load_live_composite_weights_per_sector, + apply, compute_tech_weight_ablation, + init_config, ) @@ -299,10 +306,12 @@ def test_recommendation_only_no_apply(self, conn): db_conn=conn, run_date="2026-04-30", ) assert result["status"] == "ok" - # Must NEVER auto-apply in this PR — that's the parallel- - # observation cutover follow-up. + # compute_tech_weight_ablation() itself NEVER auto-applies; + # apply() is a separate call gated on two flags + the + # reproduction guard (ROADMAP L2553 auto-apply cutover). assert result["applied"] is False - assert "recommendation-only" in result["apply_note"] + assert "apply()" in result["apply_note"] + assert "use_tech_ablation_target" in result["apply_note"] def test_window_filtering(self, conn): # Old rows outside window must be excluded @@ -419,3 +428,290 @@ def test_per_team_output_includes_live_baseline_weights_field( # No override for technology → field present but None assert "live_baseline_weights" in tech assert tech["live_baseline_weights"] is None + + +# ── Auto-apply path (ROADMAP L2553) ───────────────────────────────────────── + + +class _StubS3: + """In-memory S3 stub for apply()/_check_reproduction_gate() tests. + + Mirrors the shape the boto3 client returns: put_object stores a + body; get_object raises ClientError-shaped NoSuchKey on miss; + list_objects_v2 walks keys by prefix. + """ + + class _ClientError(Exception): + def __init__(self, code: str = "NoSuchKey") -> None: + self.response = {"Error": {"Code": code}} + + def __init__(self) -> None: + self.store: dict[tuple[str, str], bytes] = {} + + def put_object(self, *, Bucket, Key, Body, ContentType=None, **kw): + body = Body if isinstance(Body, (bytes, bytearray)) else str(Body).encode() + self.store[(Bucket, Key)] = bytes(body) + return {"ETag": "stub"} + + def get_object(self, *, Bucket, Key): + from io import BytesIO + if (Bucket, Key) not in self.store: + raise self._ClientError("NoSuchKey") + return {"Body": BytesIO(self.store[(Bucket, Key)])} + + def list_objects_v2(self, *, Bucket, Prefix, **kw): + return { + "Contents": [ + {"Key": k} for (b, k) in self.store + if b == Bucket and k.startswith(Prefix) + ], + } + + +@pytest.fixture(autouse=True) +def _reset_cfg(): + """Each test runs against a fresh tech_weight_ablation config block.""" + init_config({}) + yield + init_config({}) + + +def _ok_result(recommendations: dict[str, str]) -> dict: + """Synthesize a compute_tech_weight_ablation 'ok' result with + the given recommendations dict.""" + return { + "status": "ok", + "run_date": "2026-05-19", + "min_improvement": _MIN_IMPROVEMENT, + "recommendations": recommendations, + "per_team": [ + {"team_id": tid, "status": "ok", "best_config": cfg} + for tid, cfg in recommendations.items() + ], + } + + +class TestBuildPerSectorPayload: + def test_maps_recommendation_names_to_weight_dicts(self): + result = _ok_result({"healthcare": "rsi_only", "technology": "trend_only"}) + payload = _build_per_sector_payload(result) + assert set(payload.keys()) == {"healthcare", "technology"} + # rsi_only: 100% RSI + assert payload["healthcare"] == { + "rsi": 1.0, "macd": 0.0, "ma50": 0.0, "ma200": 0.0, "momentum": 0.0, + } + # trend_only: 25/25/25/25 (no RSI) + assert payload["technology"] == { + "rsi": 0.0, "macd": 0.25, "ma50": 0.25, "ma200": 0.25, "momentum": 0.25, + } + + def test_unknown_config_name_is_dropped(self): + result = _ok_result({"healthcare": "definitely_not_in_grid"}) + assert _build_per_sector_payload(result) == {} + + def test_empty_recommendations_yields_empty_payload(self): + assert _build_per_sector_payload(_ok_result({})) == {} + + +class TestApplyShadowGating: + def test_flag_off_skips_all_writes(self): + s3 = _StubS3() + # default _cfg is empty → use_tech_ablation_target=False + out = apply( + _ok_result({"healthcare": "rsi_only"}), "alpha-engine-research", + ) + assert out["applied"] is False + assert out["reason"] == "use_tech_ablation_target=False" + # No S3 side effects at all. + assert len(s3.store) == 0 + + def test_status_not_ok_skips(self): + init_config({"tech_weight_ablation": {"use_tech_ablation_target": True}}) + out = apply({"status": "insufficient_data"}, "alpha-engine-research") + assert out["applied"] is False + assert "status=insufficient_data" in out["reason"] + + def test_empty_recommendations_skips(self): + init_config({"tech_weight_ablation": {"use_tech_ablation_target": True}}) + out = apply(_ok_result({}), "alpha-engine-research") + assert out["applied"] is False + assert "no per-sector recommendation" in out["reason"] + + def test_shadow_only_writes_archive_not_live(self, monkeypatch): + """flag on, enforce off → shadow archive written, live key untouched.""" + s3 = _StubS3() + monkeypatch.setattr( + "boto3.client", lambda svc, *a, **kw: s3, + ) + init_config({"tech_weight_ablation": { + "use_tech_ablation_target": True, + "enforce_tech_ablation": False, + }}) + out = apply( + _ok_result({"healthcare": "rsi_only"}), + "alpha-engine-research", + ) + assert out["applied"] is False + assert "shadow mode" in out["reason"] + assert "shadow_key" in out + # Shadow archive present + assert any( + k.startswith(S3_SHADOW_PREFIX + "/") and k.endswith(".json") + for (_, k) in s3.store + ) + # Live key NOT written + assert ("alpha-engine-research", S3_LIVE_KEY) not in s3.store + + +class TestReproductionGate: + def test_insufficient_history_fails_gate(self): + s3 = _StubS3() + # Empty bucket → no prior archives + current = {"healthcare": {"rsi": 1.0, "macd": 0, "ma50": 0, "ma200": 0, "momentum": 0}} + out = _check_reproduction_gate(s3, "alpha-engine-research", current) + assert out["passed"] is False + assert "only 0 prior shadow archive" in out["reason"] + assert out["n_consecutive"] == 0 + + def test_exact_match_across_min_consecutive_passes(self, monkeypatch): + s3 = _StubS3() + current = {"healthcare": {"rsi": 1.0, "macd": 0, "ma50": 0, "ma200": 0, "momentum": 0}} + # Seed _MIN_CONSECUTIVE_WEEKS prior archives, all matching + import json as _json + for i in range(_MIN_CONSECUTIVE_WEEKS): + # Lexicographically-sorted YYMMDDHHMM-style keys + key = f"{S3_SHADOW_PREFIX}/26051{i}1234_result.json" + s3.store[("alpha-engine-research", key)] = _json.dumps({ + "per_sector": current, + }).encode() + out = _check_reproduction_gate(s3, "alpha-engine-research", current) + assert out["passed"] is True + assert out["n_consecutive"] == _MIN_CONSECUTIVE_WEEKS + + def test_one_drift_breaks_streak(self, monkeypatch): + s3 = _StubS3() + current = {"healthcare": {"rsi": 1.0, "macd": 0, "ma50": 0, "ma200": 0, "momentum": 0}} + drift = {"healthcare": {"rsi": 0.5, "macd": 0.5, "ma50": 0, "ma200": 0, "momentum": 0}} + # 3 matches then 1 drift — sorted-desc means drift is in position [3] + # (oldest of the 4 we read); should fail at the last archive. + import json as _json + # YYMMDDHHMM keys, sorted desc means biggest first (most recent first) + payloads = [current, current, current, drift] + for i, payload in enumerate(payloads): + # newer keys = bigger lex prefix + key = f"{S3_SHADOW_PREFIX}/2605{9 - i}01234_result.json" + s3.store[("alpha-engine-research", key)] = _json.dumps({ + "per_sector": payload, + }).encode() + out = _check_reproduction_gate(s3, "alpha-engine-research", current) + assert out["passed"] is False + assert "reproduction gate broken at archive[-4]" in out["reason"] + + +class TestApplyLiveGating: + def _seed_matching_history(self, s3, current_payload, n: int): + """Plant n shadow archives that all match current_payload.""" + import json as _json + for i in range(n): + key = f"{S3_SHADOW_PREFIX}/26050{i}1234_result.json" + s3.store[("alpha-engine-research", key)] = _json.dumps({ + "per_sector": current_payload, + }).encode() + + def test_enforce_with_insufficient_history_writes_shadow_not_live( + self, monkeypatch, + ): + """enforce=True but reproduction gate fails → shadow yes, live no.""" + s3 = _StubS3() + monkeypatch.setattr("boto3.client", lambda svc, *a, **kw: s3) + init_config({"tech_weight_ablation": { + "use_tech_ablation_target": True, + "enforce_tech_ablation": True, + }}) + out = apply( + _ok_result({"healthcare": "rsi_only"}), + "alpha-engine-research", + ) + assert out["applied"] is False + assert "reproduction gate" in out["reason"] + # Shadow archive written + assert any( + k.startswith(S3_SHADOW_PREFIX + "/") + for (_, k) in s3.store + ) + # Live NOT written + assert ("alpha-engine-research", S3_LIVE_KEY) not in s3.store + + def test_enforce_with_full_reproduction_writes_live(self, monkeypatch): + s3 = _StubS3() + monkeypatch.setattr("boto3.client", lambda svc, *a, **kw: s3) + # Pre-seed: _MIN_CONSECUTIVE_WEEKS - 1 prior shadow archives all + # matching this week's payload. apply() writes this week's shadow + # first (making it the {n}th in the streak), then checks the gate. + rsi_only = { + "rsi": 1.0, "macd": 0.0, "ma50": 0.0, + "ma200": 0.0, "momentum": 0.0, + } + current = {"healthcare": rsi_only} + self._seed_matching_history( + s3, current, n=_MIN_CONSECUTIVE_WEEKS - 1, + ) + init_config({"tech_weight_ablation": { + "use_tech_ablation_target": True, + "enforce_tech_ablation": True, + }}) + out = apply( + _ok_result({"healthcare": "rsi_only"}), + "alpha-engine-research", + ) + assert out["applied"] is True, out + assert out["live_key"] == S3_LIVE_KEY + assert out["per_sector"] == current + # Live key written + assert ("alpha-engine-research", S3_LIVE_KEY) in s3.store + # Shadow archive also present + assert any( + k.startswith(S3_SHADOW_PREFIX + "/") + and k.endswith(".json") + and not k.endswith("/latest.json") + for (_, k) in s3.store + ) + + def test_drift_in_history_blocks_live_write(self, monkeypatch): + s3 = _StubS3() + monkeypatch.setattr("boto3.client", lambda svc, *a, **kw: s3) + rsi_only = { + "rsi": 1.0, "macd": 0.0, "ma50": 0.0, + "ma200": 0.0, "momentum": 0.0, + } + # Pre-seed 2 matching + 1 drift → reproduction breaks at the drift. + # After apply() writes this week's archive, that's 3 in a row of + # matching + 1 drift before; gate requires 4-in-a-row so blocks. + drift_payload = {"healthcare": { + "rsi": 0.5, "macd": 0.5, "ma50": 0.0, "ma200": 0.0, "momentum": 0.0, + }} + import json as _json + # Older drift archive (smaller lex prefix), then 2 matching newer. + s3.store[("alpha-engine-research", + f"{S3_SHADOW_PREFIX}/2605011234_result.json")] = ( + _json.dumps({"per_sector": drift_payload}).encode() + ) + s3.store[("alpha-engine-research", + f"{S3_SHADOW_PREFIX}/2605051234_result.json")] = ( + _json.dumps({"per_sector": {"healthcare": rsi_only}}).encode() + ) + s3.store[("alpha-engine-research", + f"{S3_SHADOW_PREFIX}/2605091234_result.json")] = ( + _json.dumps({"per_sector": {"healthcare": rsi_only}}).encode() + ) + init_config({"tech_weight_ablation": { + "use_tech_ablation_target": True, + "enforce_tech_ablation": True, + }}) + out = apply( + _ok_result({"healthcare": "rsi_only"}), + "alpha-engine-research", + ) + assert out["applied"] is False + assert "reproduction gate broken" in out["reason"] + assert ("alpha-engine-research", S3_LIVE_KEY) not in s3.store