From 1eeb13f963a0c40222e50683691473740307695f Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Sat, 4 Apr 2026 17:05:49 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20add=20data=20quality=20gates=20?= =?UTF-8?q?=E2=80=94=20parquet=20validation,=20summary=20JSON,=20email=20f?= =?UTF-8?q?ixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add validation of slim cache parquet slices after slicing, aggregate validation results into a per-run validation.json on S3, fix emailer detail extraction to match actual collector field names, and add set -o pipefail to the DriftDetection step. Includes 14 new tests covering validation checks and email detail extraction. Co-Authored-By: Claude Opus 4.6 (1M context) --- collectors/slim_cache.py | 29 +++++++++- emailer.py | 12 +++++ infrastructure/step_function.json | 1 + tests/test_emailer.py | 66 +++++++++++++++++++++++ tests/test_price_validator.py | 90 +++++++++++++++++++++++++++++++ weekly_collector.py | 42 +++++++++++++++ 6 files changed, 238 insertions(+), 2 deletions(-) create mode 100644 tests/test_emailer.py create mode 100644 tests/test_price_validator.py diff --git a/collectors/slim_cache.py b/collectors/slim_cache.py index 66fbb7d..b0fa0e4 100644 --- a/collectors/slim_cache.py +++ b/collectors/slim_cache.py @@ -38,8 +38,10 @@ def collect( dry_run: if True, count files but don't write Returns: - dict with status, written count, failed count + dict with status, written count, failed count, validation summary """ + from validators.price_validator import validate_parquet + s3 = boto3.client("s3") cutoff = pd.Timestamp.now().normalize() - pd.Timedelta(days=lookback_days) @@ -60,12 +62,14 @@ def collect( written = 0 failed = 0 + validation_results: list[dict] = [] for s3_key in parquet_keys: filename = s3_key.split("/")[-1] if filename == "sector_map.json": continue + ticker = filename.replace(".parquet", "") local_path = local_dir / filename try: s3.download_file(bucket, s3_key, str(local_path)) @@ -80,6 +84,9 @@ def collect( local_path.unlink(missing_ok=True) continue + # Validate the 2-year slice (what inference actually reads) + validation_results.append(validate_parquet(slim_df, ticker)) + slim_path = local_dir / f"_slim_{filename}" slim_df.to_parquet(slim_path, engine="pyarrow", compression="snappy") @@ -100,8 +107,26 @@ def collect( fail_pct = failed / max(len(parquet_keys), 1) * 100 logger.warning("Slim cache: %d/%d failed (%.1f%%)", failed, len(parquet_keys), fail_pct) + # Build validation summary + anomaly_tickers = [r for r in validation_results if r["status"] != "clean"] + validation = { + "total_validated": len(validation_results), + "clean": len(validation_results) - len(anomaly_tickers), + "anomalies": len(anomaly_tickers), + "anomaly_details": anomaly_tickers[:20], + } + if anomaly_tickers: + logger.warning("Slim cache validation: %d/%d tickers have anomalies", len(anomaly_tickers), len(validation_results)) + for r in anomaly_tickers[:10]: + logger.warning(" %s: %s", r["ticker"], "; ".join(r["anomalies"])) + else: + logger.info("Slim cache validation: all %d tickers clean", len(validation_results)) + logger.info("Slim cache: %d / %d uploaded to s3://%s/%s", written, len(parquet_keys), bucket, slim_prefix) - return {"status": "ok" if failed == 0 else "partial", "written": written, "failed": failed} + result = {"status": "ok" if failed == 0 else "partial", "written": written, "failed": failed} + if validation_results: + result["validation"] = validation + return result def _list_parquets(s3, bucket: str, prefix: str) -> list[str]: diff --git a/emailer.py b/emailer.py index 3d1be42..11e45ec 100644 --- a/emailer.py +++ b/emailer.py @@ -161,16 +161,28 @@ def _extract_details(name: str, info: dict) -> str: if "tickers_refreshed" in info: parts.append(f"{info['tickers_refreshed']} refreshed") + elif "refreshed" in info: + parts.append(f"{info['refreshed']} refreshed") + if "stale" in info: + parts.append(f"{info['stale']} stale") if "tickers_skipped" in info: parts.append(f"{info['tickers_skipped']} skipped") if "tickers_failed" in info: n = info["tickers_failed"] if n > 0: parts.append(f"{n} failed") + elif "failed" in info: + n = info["failed"] + if n > 0: + parts.append(f"{n} failed") if "total_tickers" in info: parts.append(f"{info['total_tickers']} total") + elif "total" in info: + parts.append(f"{info['total']} total") if "n_tickers" in info: parts.append(f"{info['n_tickers']} tickers") + if "written" in info: + parts.append(f"{info['written']} written") if "sp500_count" in info: parts.append(f"S&P500: {info['sp500_count']}") if "sp400_count" in info: diff --git a/infrastructure/step_function.json b/infrastructure/step_function.json index 24c5345..dc98edd 100644 --- a/infrastructure/step_function.json +++ b/infrastructure/step_function.json @@ -376,6 +376,7 @@ "commands": [ "export HOME=/home/ec2-user", "set -a && source /home/ec2-user/.alpha-engine.env && set +a", + "set -o pipefail", "export PYTHONPATH=/home/ec2-user/alpha-engine-predictor", "/home/ec2-user/alpha-engine-data/.venv/bin/python -m monitoring.drift_detector --alert 2>&1 | tee /var/log/drift-detection.log" ], diff --git a/tests/test_emailer.py b/tests/test_emailer.py new file mode 100644 index 0000000..43f2d99 --- /dev/null +++ b/tests/test_emailer.py @@ -0,0 +1,66 @@ +"""Tests for emailer._extract_details.""" + +from emailer import _extract_details + + +class TestExtractDetails: + def test_prices_collector(self): + info = { + "status": "ok", + "refreshed": 150, + "stale": 160, + "failed": 10, + "total": 900, + "validation": { + "total_validated": 150, + "anomalies": 3, + "clean": 147, + }, + } + details = _extract_details("prices", info) + assert "150 refreshed" in details + assert "160 stale" in details + assert "10 failed" in details + assert "900 total" in details + assert "3/150 anomalies" in details + + def test_slim_cache_with_validation(self): + info = { + "status": "ok", + "written": 450, + "failed": 0, + "validation": { + "total_validated": 450, + "anomalies": 0, + "clean": 450, + }, + } + details = _extract_details("slim_cache", info) + assert "450 written" in details + assert "450 validated" in details + assert "failed" not in details # 0 failures omitted + + def test_slim_cache_with_anomalies(self): + info = { + "status": "partial", + "written": 448, + "failed": 2, + "validation": { + "total_validated": 448, + "anomalies": 5, + "clean": 443, + }, + } + details = _extract_details("slim_cache", info) + assert "448 written" in details + assert "2 failed" in details + assert "5/448 anomalies" in details + + def test_empty_info(self): + assert _extract_details("unknown", {}) == "" + + def test_no_validation(self): + info = {"status": "ok", "written": 10, "failed": 0} + details = _extract_details("slim_cache", info) + assert "10 written" in details + assert "validated" not in details diff --git a/tests/test_price_validator.py b/tests/test_price_validator.py new file mode 100644 index 0000000..96b92ca --- /dev/null +++ b/tests/test_price_validator.py @@ -0,0 +1,90 @@ +"""Tests for validators/price_validator.py.""" + +import pandas as pd +import pytest + +from validators.price_validator import validate_parquet + + +def _make_ohlcv(n=30, base_close=100.0): + """Build a clean OHLCV DataFrame with n trading days.""" + dates = pd.bdate_range("2025-01-02", periods=n) + close = [base_close + i * 0.5 for i in range(n)] + return pd.DataFrame( + { + "Open": [c - 0.1 for c in close], + "High": [c + 1.0 for c in close], + "Low": [c - 1.0 for c in close], + "Close": close, + "Volume": [1_000_000 + i * 1000 for i in range(n)], + }, + index=dates, + ) + + +class TestValidateParquet: + def test_clean_data(self): + df = _make_ohlcv() + result = validate_parquet(df, "AAPL") + assert result["status"] == "clean" + assert result["anomalies"] == [] + + def test_empty_dataframe(self): + df = pd.DataFrame() + result = validate_parquet(df, "EMPTY") + assert result["status"] == "empty" + + def test_high_less_than_low(self): + df = _make_ohlcv() + df.iloc[5, df.columns.get_loc("High")] = df.iloc[5]["Low"] - 1 + result = validate_parquet(df, "BAD_HL") + assert result["status"] == "anomaly" + assert any("High5 day gap) + gap_start = df.index[15] + gap_end = gap_start + pd.Timedelta(days=8) + df = df[(df.index < gap_start) | (df.index >= gap_end)] + result = validate_parquet(df, "GAP") + assert result["status"] == "anomaly" + assert any("gap" in a.lower() for a in result["anomalies"]) + + def test_multiple_anomalies(self): + df = _make_ohlcv() + df.iloc[5, df.columns.get_loc("Close")] = 0 + df.iloc[10, df.columns.get_loc("Volume")] = 0 + result = validate_parquet(df, "MULTI") + assert result["status"] == "anomaly" + assert len(result["anomalies"]) >= 2 diff --git a/weekly_collector.py b/weekly_collector.py index 6a123d9..306e51b 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -408,6 +408,7 @@ def _finalize( if not dry_run and only is None: _write_manifest(bucket, market_prefix, run_date, results) + _write_validation_json(bucket, market_prefix, run_date, results) # Write health marker for Step Functions phase = results.get("phase") @@ -465,6 +466,47 @@ def _write_manifest(bucket: str, s3_prefix: str, run_date: str, results: dict) - logger.info("Wrote manifest + latest pointer for %s", run_date) +def _write_validation_json( + bucket: str, s3_prefix: str, run_date: str, results: dict, +) -> None: + """Aggregate validation results from all collectors and write to S3.""" + collectors = results.get("collectors", {}) + validations: dict[str, dict] = {} + + for name, info in collectors.items(): + val = info.get("validation") + if val: + validations[name] = val + + if not validations: + return + + total_validated = sum(v.get("total_validated", 0) for v in validations.values()) + total_anomalies = sum(v.get("anomalies", 0) for v in validations.values()) + total_clean = sum(v.get("clean", 0) for v in validations.values()) + + payload = { + "date": run_date, + "total_validated": total_validated, + "total_clean": total_clean, + "total_anomalies": total_anomalies, + "collectors": validations, + } + + s3 = boto3.client("s3") + key = f"{s3_prefix}weekly/{run_date}/validation.json" + s3.put_object( + Bucket=bucket, + Key=key, + Body=json.dumps(payload, indent=2, default=str), + ContentType="application/json", + ) + logger.info( + "Wrote validation.json: %d validated, %d anomalies → s3://%s/%s", + total_validated, total_anomalies, bucket, key, + ) + + def _write_health_marker(bucket: str, phase: int, run_date: str, status: str) -> None: """Write health marker for Step Functions dependency checking.""" s3 = boto3.client("s3")