Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions collectors/slim_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ def collect(
dry_run: if True, count files but don't write

Returns:
dict with status, written count, failed count
dict with status, written count, failed count, validation summary
"""
from validators.price_validator import validate_parquet

s3 = boto3.client("s3")
cutoff = pd.Timestamp.now().normalize() - pd.Timedelta(days=lookback_days)

Expand All @@ -60,12 +62,14 @@ def collect(

written = 0
failed = 0
validation_results: list[dict] = []

for s3_key in parquet_keys:
filename = s3_key.split("/")[-1]
if filename == "sector_map.json":
continue

ticker = filename.replace(".parquet", "")
local_path = local_dir / filename
try:
s3.download_file(bucket, s3_key, str(local_path))
Expand All @@ -80,6 +84,9 @@ def collect(
local_path.unlink(missing_ok=True)
continue

# Validate the 2-year slice (what inference actually reads)
validation_results.append(validate_parquet(slim_df, ticker))

slim_path = local_dir / f"_slim_{filename}"
slim_df.to_parquet(slim_path, engine="pyarrow", compression="snappy")

Expand All @@ -100,8 +107,26 @@ def collect(
fail_pct = failed / max(len(parquet_keys), 1) * 100
logger.warning("Slim cache: %d/%d failed (%.1f%%)", failed, len(parquet_keys), fail_pct)

# Build validation summary
anomaly_tickers = [r for r in validation_results if r["status"] != "clean"]
validation = {
"total_validated": len(validation_results),
"clean": len(validation_results) - len(anomaly_tickers),
"anomalies": len(anomaly_tickers),
"anomaly_details": anomaly_tickers[:20],
}
if anomaly_tickers:
logger.warning("Slim cache validation: %d/%d tickers have anomalies", len(anomaly_tickers), len(validation_results))
for r in anomaly_tickers[:10]:
logger.warning(" %s: %s", r["ticker"], "; ".join(r["anomalies"]))
else:
logger.info("Slim cache validation: all %d tickers clean", len(validation_results))

logger.info("Slim cache: %d / %d uploaded to s3://%s/%s", written, len(parquet_keys), bucket, slim_prefix)
return {"status": "ok" if failed == 0 else "partial", "written": written, "failed": failed}
result = {"status": "ok" if failed == 0 else "partial", "written": written, "failed": failed}
if validation_results:
result["validation"] = validation
return result


def _list_parquets(s3, bucket: str, prefix: str) -> list[str]:
Expand Down
12 changes: 12 additions & 0 deletions emailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,28 @@ def _extract_details(name: str, info: dict) -> str:

if "tickers_refreshed" in info:
parts.append(f"{info['tickers_refreshed']} refreshed")
elif "refreshed" in info:
parts.append(f"{info['refreshed']} refreshed")
if "stale" in info:
parts.append(f"{info['stale']} stale")
if "tickers_skipped" in info:
parts.append(f"{info['tickers_skipped']} skipped")
if "tickers_failed" in info:
n = info["tickers_failed"]
if n > 0:
parts.append(f"{n} failed")
elif "failed" in info:
n = info["failed"]
if n > 0:
parts.append(f"{n} failed")
if "total_tickers" in info:
parts.append(f"{info['total_tickers']} total")
elif "total" in info:
parts.append(f"{info['total']} total")
if "n_tickers" in info:
parts.append(f"{info['n_tickers']} tickers")
if "written" in info:
parts.append(f"{info['written']} written")
if "sp500_count" in info:
parts.append(f"S&P500: {info['sp500_count']}")
if "sp400_count" in info:
Expand Down
1 change: 1 addition & 0 deletions infrastructure/step_function.json
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@
"commands": [
"export HOME=/home/ec2-user",
"set -a && source /home/ec2-user/.alpha-engine.env && set +a",
"set -o pipefail",
"export PYTHONPATH=/home/ec2-user/alpha-engine-predictor",
"/home/ec2-user/alpha-engine-data/.venv/bin/python -m monitoring.drift_detector --alert 2>&1 | tee /var/log/drift-detection.log"
],
Expand Down
66 changes: 66 additions & 0 deletions tests/test_emailer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Tests for emailer._extract_details."""

from emailer import _extract_details


class TestExtractDetails:
def test_prices_collector(self):
info = {
"status": "ok",
"refreshed": 150,
"stale": 160,
"failed": 10,
"total": 900,
"validation": {
"total_validated": 150,
"anomalies": 3,
"clean": 147,
},
}
details = _extract_details("prices", info)
assert "150 refreshed" in details
assert "160 stale" in details
assert "10 failed" in details
assert "900 total" in details
assert "3/150 anomalies" in details

def test_slim_cache_with_validation(self):
info = {
"status": "ok",
"written": 450,
"failed": 0,
"validation": {
"total_validated": 450,
"anomalies": 0,
"clean": 450,
},
}
details = _extract_details("slim_cache", info)
assert "450 written" in details
assert "450 validated" in details
assert "failed" not in details # 0 failures omitted

def test_slim_cache_with_anomalies(self):
info = {
"status": "partial",
"written": 448,
"failed": 2,
"validation": {
"total_validated": 448,
"anomalies": 5,
"clean": 443,
},
}
details = _extract_details("slim_cache", info)
assert "448 written" in details
assert "2 failed" in details
assert "5/448 anomalies" in details

def test_empty_info(self):
assert _extract_details("unknown", {}) == ""

def test_no_validation(self):
info = {"status": "ok", "written": 10, "failed": 0}
details = _extract_details("slim_cache", info)
assert "10 written" in details
assert "validated" not in details
90 changes: 90 additions & 0 deletions tests/test_price_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Tests for validators/price_validator.py."""

import pandas as pd
import pytest

from validators.price_validator import validate_parquet


def _make_ohlcv(n=30, base_close=100.0):
"""Build a clean OHLCV DataFrame with n trading days."""
dates = pd.bdate_range("2025-01-02", periods=n)
close = [base_close + i * 0.5 for i in range(n)]
return pd.DataFrame(
{
"Open": [c - 0.1 for c in close],
"High": [c + 1.0 for c in close],
"Low": [c - 1.0 for c in close],
"Close": close,
"Volume": [1_000_000 + i * 1000 for i in range(n)],
},
index=dates,
)


class TestValidateParquet:
def test_clean_data(self):
df = _make_ohlcv()
result = validate_parquet(df, "AAPL")
assert result["status"] == "clean"
assert result["anomalies"] == []

def test_empty_dataframe(self):
df = pd.DataFrame()
result = validate_parquet(df, "EMPTY")
assert result["status"] == "empty"

def test_high_less_than_low(self):
df = _make_ohlcv()
df.iloc[5, df.columns.get_loc("High")] = df.iloc[5]["Low"] - 1
result = validate_parquet(df, "BAD_HL")
assert result["status"] == "anomaly"
assert any("High<Low" in a for a in result["anomalies"])

def test_zero_close(self):
df = _make_ohlcv()
df.iloc[10, df.columns.get_loc("Close")] = 0
result = validate_parquet(df, "ZERO")
assert result["status"] == "anomaly"
assert any("Close<=0" in a for a in result["anomalies"])

def test_extreme_daily_return(self):
df = _make_ohlcv()
# 60% jump
df.iloc[15, df.columns.get_loc("Close")] = df.iloc[14]["Close"] * 1.61
result = validate_parquet(df, "SPIKE")
assert result["status"] == "anomaly"
assert any("50%" in a for a in result["anomalies"])

def test_zero_volume(self):
df = _make_ohlcv()
df.iloc[5, df.columns.get_loc("Volume")] = 0
result = validate_parquet(df, "NOVOL")
assert result["status"] == "anomaly"
assert any("zero volume" in a for a in result["anomalies"])

def test_volume_spike(self):
df = _make_ohlcv(n=40)
# 15x median volume
df.iloc[35, df.columns.get_loc("Volume")] = df.iloc[34]["Volume"] * 15
result = validate_parquet(df, "VOLSPIKE")
assert result["status"] == "anomaly"
assert any("volume" in a and "median" in a for a in result["anomalies"])

def test_trading_day_gap(self):
df = _make_ohlcv(n=40)
# Remove 8 calendar days worth of rows (creates a >5 day gap)
gap_start = df.index[15]
gap_end = gap_start + pd.Timedelta(days=8)
df = df[(df.index < gap_start) | (df.index >= gap_end)]
result = validate_parquet(df, "GAP")
assert result["status"] == "anomaly"
assert any("gap" in a.lower() for a in result["anomalies"])

def test_multiple_anomalies(self):
df = _make_ohlcv()
df.iloc[5, df.columns.get_loc("Close")] = 0
df.iloc[10, df.columns.get_loc("Volume")] = 0
result = validate_parquet(df, "MULTI")
assert result["status"] == "anomaly"
assert len(result["anomalies"]) >= 2
42 changes: 42 additions & 0 deletions weekly_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ def _finalize(

if not dry_run and only is None:
_write_manifest(bucket, market_prefix, run_date, results)
_write_validation_json(bucket, market_prefix, run_date, results)

# Write health marker for Step Functions
phase = results.get("phase")
Expand Down Expand Up @@ -465,6 +466,47 @@ def _write_manifest(bucket: str, s3_prefix: str, run_date: str, results: dict) -
logger.info("Wrote manifest + latest pointer for %s", run_date)


def _write_validation_json(
bucket: str, s3_prefix: str, run_date: str, results: dict,
) -> None:
"""Aggregate validation results from all collectors and write to S3."""
collectors = results.get("collectors", {})
validations: dict[str, dict] = {}

for name, info in collectors.items():
val = info.get("validation")
if val:
validations[name] = val

if not validations:
return

total_validated = sum(v.get("total_validated", 0) for v in validations.values())
total_anomalies = sum(v.get("anomalies", 0) for v in validations.values())
total_clean = sum(v.get("clean", 0) for v in validations.values())

payload = {
"date": run_date,
"total_validated": total_validated,
"total_clean": total_clean,
"total_anomalies": total_anomalies,
"collectors": validations,
}

s3 = boto3.client("s3")
key = f"{s3_prefix}weekly/{run_date}/validation.json"
s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(payload, indent=2, default=str),
ContentType="application/json",
)
logger.info(
"Wrote validation.json: %d validated, %d anomalies → s3://%s/%s",
total_validated, total_anomalies, bucket, key,
)


def _write_health_marker(bucket: str, phase: int, run_date: str, status: str) -> None:
"""Write health marker for Step Functions dependency checking."""
s3 = boto3.client("s3")
Expand Down
Loading