Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions data/derived/analyst_revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ def load_snapshot_time_series(
Canonical shape: list ``{prefix}/{ticker}/`` once + parse each
artifact's body to extract ``snapshot_date``; index by that date.
Cheaper than per-date GETs since one LIST + small body reads
cover the entire window. Tolerates the legacy
``{ticker}/{date}.json`` shape during transition.
cover the entire window.
"""
import json as _json
out: dict[Date, dict] = {}
Expand Down
32 changes: 6 additions & 26 deletions data/derived/news_aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,29 +390,23 @@ def write_news_aggregates_parquet(


def read_news_aggregates_parquet(
aggregate_date: Date | None = None,
*,
s3_client: Any,
bucket: str = DEFAULT_S3_BUCKET,
prefix: str = DEFAULT_S3_PREFIX,
) -> pd.DataFrame:
"""Consumer-side read. Resolves the canonical artifact via
``latest.json`` sidecar; falls back to the legacy
``{aggregate_date}.parquet`` key shape during the transition
window.
"""Consumer-side read. Resolves the canonical artifact via the
``latest.json`` sidecar.

``aggregate_date`` is only used for the legacy-shape fallback.
Under the canonical shape, ``latest.json`` always points at the
most recent run regardless of date; the parquet itself carries
``aggregate_date`` per row so filtering happens at the DataFrame
layer.
``latest.json`` always points at the most recent run regardless of
date; the parquet itself carries ``aggregate_date`` per row so any
date filtering happens at the DataFrame layer.

Returns an empty DataFrame with the canonical schema when no
artifact exists.
"""
from alpha_engine_lib.eval_artifacts import eval_latest_key

# Canonical path: read latest.json → resolve key
latest_key = eval_latest_key(prefix)
try:
obj = s3_client.get_object(Bucket=bucket, Key=latest_key)
Expand All @@ -423,24 +417,10 @@ def read_news_aggregates_parquet(
return pd.read_parquet(BytesIO(body["Body"].read()), engine="pyarrow")
except Exception as e:
logger.info(
"[news_aggregates] canonical sidecar read failed for %s (%s) — "
"trying legacy date-key fallback",
"[news_aggregates] canonical sidecar read failed for %s (%s)",
latest_key, type(e).__name__,
)

# Legacy fallback during transition
if aggregate_date is not None:
legacy_key = f"{prefix}/{aggregate_date.isoformat()}.parquet"
try:
obj = s3_client.get_object(Bucket=bucket, Key=legacy_key)
logger.info(
"[news_aggregates] read via legacy key %s — "
"will be removed after canonical-shape soak", legacy_key,
)
return pd.read_parquet(BytesIO(obj["Body"].read()), engine="pyarrow")
except Exception:
pass

return _empty_df()


Expand Down
22 changes: 2 additions & 20 deletions data/snapshotter/analyst_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,6 @@ def snapshot_universe(
# ── S3 key + serialization helpers ────────────────────────────────────


def s3_key_for(
ticker: str, snapshot_date: Date, *, prefix: str = DEFAULT_S3_PREFIX,
) -> str:
"""Canonical key for one (ticker, date) snapshot document."""
return f"{prefix}/{ticker.upper()}/{snapshot_date.isoformat()}.json"


def _serialize_snapshot(snap: AnalystSnapshot) -> dict:
"""Pydantic AnalystSnapshot → JSON-safe dict.

Expand All @@ -236,9 +229,6 @@ def read_snapshot_document(
whose YYMMDDHHMM run_id starts with the date's YYMMDD prefix.
Picks the most recent intra-day run when multiple exist.

Legacy fallback: tries the old ``{prefix}/{ticker}/{date}.json``
key shape during the transition.

Returns the raw JSON dict or None if no document exists.
"""
# Canonical: list prefix + find by run_id date prefix.
Expand All @@ -261,15 +251,7 @@ def read_snapshot_document(
return json.loads(obj["Body"].read())
except Exception as e:
logger.debug(
"[analyst_snapshotter] canonical list failed for %s/%s (%s) — "
"trying legacy date-key fallback",
"[analyst_snapshotter] canonical list failed for %s/%s (%s)",
ticker, snapshot_date, type(e).__name__,
)

# Legacy fallback: {prefix}/{ticker}/{date}.json
legacy_key = f"{prefix}/{ticker.upper()}/{snapshot_date.isoformat()}.json"
try:
obj = s3_client.get_object(Bucket=bucket, Key=legacy_key)
return json.loads(obj["Body"].read())
except Exception:
return None
return None
26 changes: 26 additions & 0 deletions tests/test_analyst_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,32 @@ def test_read_missing_document_returns_none(self):
s3 = _InMemoryS3()
assert read_snapshot_document("X", date(2026, 1, 1), s3_client=s3) is None

def test_legacy_date_keyed_json_is_ignored(self):
"""Regression guard: post-#234 canonical-key migration, a
bare ``{prefix}/{ticker}/{date}.json`` file (the pre-migration
shape) must NOT be read by the canonical lister. The list
prefix is ``{ticker}/{YYMMDD}`` so an ``YYYY-MM-DD.json``
file does not match it.
"""
s3 = _InMemoryS3()
legacy_body = json.dumps({
"ticker": "AAPL",
"snapshot_date": "2026-05-13",
"schema_version": SNAPSHOT_SCHEMA_VERSION,
"snapshots_by_source": {},
}).encode("utf-8")
s3.put_object(
Bucket="alpha-engine-research",
Key="data/analyst_snapshots/AAPL/2026-05-13.json",
Body=legacy_body,
)
# Canonical reader lists by YYMMDD prefix; legacy YYYY-MM-DD key
# does not match → None.
doc = read_snapshot_document(
"AAPL", date(2026, 5, 13), s3_client=s3,
)
assert doc is None

def test_universe_orchestrator(self):
s3 = _InMemoryS3()
sources = [_StaticSource("yfinance", _make_snapshot())]
Expand Down
37 changes: 28 additions & 9 deletions tests/test_news_aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,22 +362,43 @@ def test_write_then_read_preserves_rows(self):
# latest.json sidecar points at it
assert ("alpha-engine-research", "data/news_aggregates/latest.json") in s3._store

df_out = read_news_aggregates_parquet(
aggregate_date=date(2026, 5, 13), s3_client=s3,
)
df_out = read_news_aggregates_parquet(s3_client=s3)
assert len(df_out) == 1
assert df_out.iloc[0]["ticker"] == "AAPL"
assert df_out.iloc[0]["lm_sentiment_mean"] == pytest.approx(0.42)

def test_missing_parquet_returns_empty_schema_df(self):
s3 = _InMemoryS3()
df = read_news_aggregates_parquet(
aggregate_date=date(2026, 1, 1), s3_client=s3,
)
df = read_news_aggregates_parquet(s3_client=s3)
assert len(df) == 0
for col in NewsTickerDailyAggregate.__dataclass_fields__:
assert col in df.columns

def test_legacy_date_keyed_parquet_is_ignored(self):
"""Regression guard: post-#234 canonical-key migration, a
bare ``{prefix}/{date}.parquet`` file with NO ``latest.json``
sidecar must NOT be read. Canonical-only contract."""
s3 = _InMemoryS3()
articles = [_make_aggregated(fingerprint="a", tickers=("AAPL",))]
df_in = build_news_aggregates_df(
articles=articles,
nlp_output=NewsNLPOutput(sentiment_scores=[
_lm_score("a", composite=0.42, pos=2, neg=0, total=10),
]),
aggregate_date=date(2026, 5, 13),
)
# Plant a legacy-shape parquet that the OLD fallback would have read.
buf = BytesIO()
df_in.to_parquet(buf, engine="pyarrow", index=False)
s3.put_object(
Bucket="alpha-engine-research",
Key="data/news_aggregates/2026-05-13.parquet",
Body=buf.getvalue(),
)
# No latest.json sidecar present → canonical read finds nothing.
df = read_news_aggregates_parquet(s3_client=s3)
assert len(df) == 0

def test_overwrite_existing_parquet(self):
s3 = _InMemoryS3()
# v1: 1 row
Expand All @@ -401,9 +422,7 @@ def test_overwrite_existing_parquet(self):
write_news_aggregates_parquet(
df_v2, aggregate_date=date(2026, 5, 13), s3_client=s3,
)
df_read = read_news_aggregates_parquet(
aggregate_date=date(2026, 5, 13), s3_client=s3,
)
df_read = read_news_aggregates_parquet(s3_client=s3)
assert len(df_read) == 2

def test_canonical_artifact_and_latest_keys(self):
Expand Down
Loading