Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions builders/_price_cache_writeboth.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

from __future__ import annotations

from typing import Any

# Wave 3 PR1 — legacy primary, new mirror. The cutover PR (Wave 3 PR4) flips
# ``PRICE_CACHE_NEW_PREFIX`` to first position and removes the legacy entry
# from ``price_cache_write_prefixes`` (one-line edit at that time).
Expand Down Expand Up @@ -93,9 +95,60 @@ def price_cache_read_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list[
return [primary]


def list_price_cache_keys(
s3: Any, bucket: str, primary: str = PRICE_CACHE_LEGACY_PREFIX,
) -> list[str]:
"""List per-ticker parquet keys across the active read prefixes.

Companion to :func:`price_cache_read_prefixes` for aggregate-read
sites (``builders/backfill._load_full_cache``,
``collectors/slim_cache.collect``). Iterates the fallback
chain in read order and returns deduplicated keys by
``{ticker}.parquet`` basename — first prefix wins, so the new
prefix takes precedence during the Wave-3 soak and is the sole
survivor post-PR4 cutover. Legacy fills any reference-side gaps.

Keys are returned with their full prefix attached (callers pass
them straight to ``s3.get_object`` / ``download_file``). Order is
deterministic: all unique keys discovered under the first prefix
in :func:`price_cache_read_prefixes` order, then any new keys
from the second prefix.

The single-key fallback path (one ticker, two candidate prefixes)
is inlined at the few call sites that need it — the predictor's
``regime/features._read_parquet_close`` chokepoint and the data
repo's ``_load_parquet_warmup`` — mirroring the
:func:`price_cache_read_prefixes` semantics without a second
helper signature.
"""
if primary != PRICE_CACHE_LEGACY_PREFIX:
# Custom prefix opts out of the fallback chain, matching the
# write/read-helper conventions for tests + config overrides.
prefixes: list[str] = [primary]
else:
prefixes = price_cache_read_prefixes(primary)

seen_basenames: set[str] = set()
out: list[str] = []
paginator = s3.get_paginator("list_objects_v2")
for prefix in prefixes:
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
if not key.endswith(".parquet"):
continue
basename = key.rsplit("/", 1)[-1]
if basename in seen_basenames:
continue
seen_basenames.add(basename)
out.append(key)
return out


__all__ = [
"PRICE_CACHE_LEGACY_PREFIX",
"PRICE_CACHE_NEW_PREFIX",
"price_cache_write_prefixes",
"price_cache_read_prefixes",
"list_price_cache_keys",
]
31 changes: 21 additions & 10 deletions builders/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
make_source_series,
)
from store.arctic_store import get_universe_lib, get_macro_lib, to_arctic_safe
from builders._price_cache_writeboth import (
PRICE_CACHE_LEGACY_PREFIX,
list_price_cache_keys,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -100,20 +104,27 @@ def _load_current_constituents(s3, bucket: str) -> set[str]:
return set(tickers)


def _load_full_cache(s3, bucket: str, prefix: str = "predictor/price_cache/") -> dict[str, pd.DataFrame]:
"""Load all 10-year price cache parquets from S3 (concurrent)."""
keys = []
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["Key"].endswith(".parquet"):
keys.append(obj["Key"])
def _load_full_cache(s3, bucket: str, prefix: str = PRICE_CACHE_LEGACY_PREFIX) -> dict[str, pd.DataFrame]:
"""Load all 10-year price cache parquets from S3 (concurrent).

Wave-3 reader migration (ROADMAP L1401): when ``prefix`` is the
production default the listing iterates both
``reference/price_cache/`` (new) and ``predictor/price_cache/``
(legacy) via :func:`list_price_cache_keys`, deduping by
``{ticker}.parquet`` basename so each ticker is fetched once.
Custom prefixes (tests, ad-hoc invocations) opt out of the
fallback chain.
"""
keys = list_price_cache_keys(s3, bucket, prefix)

if not keys:
log.error("No parquets found in s3://%s/%s", bucket, prefix)
log.error("No parquets found in s3://%s/%s (read-prefix chain)", bucket, prefix)
return {}

log.info("Downloading %d full cache parquets from s3://%s/%s ...", len(keys), bucket, prefix)
log.info(
"Downloading %d full cache parquets from s3://%s/ (read-prefix chain anchored on %s) ...",
len(keys), bucket, prefix,
)

price_data: dict[str, pd.DataFrame] = {}
errors = 0
Expand Down
51 changes: 38 additions & 13 deletions builders/daily_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@

from store.arctic_store import get_universe_lib, get_macro_lib, to_arctic_safe
from store.parquet_loader import load_parquet_from_s3
from builders._price_cache_writeboth import (
PRICE_CACHE_LEGACY_PREFIX,
price_cache_read_prefixes,
)
from validators.price_validator import (
ALL_ANOMALY_TYPES,
DEFAULT_BLOCK_ANOMALY_TYPES,
Expand All @@ -68,7 +72,11 @@
# not a numeric one — keeping OHLCV_COLS pure simplifies the rolling-stat
# and feature-compute call sites that iterate it expecting numerics.
PROVENANCE_COL = "source"
PRICE_CACHE_PREFIX = "predictor/price_cache/"
# Legacy single-prefix constant retained for backward-compat with any caller
# that imports it (audited 2026-05-19: only the local ``_load_parquet_warmup``,
# which now goes through ``price_cache_read_prefixes``). Wave-3 PR4 cutover
# deletes this constant in the same edit that flips the read helper.
PRICE_CACHE_PREFIX = PRICE_CACHE_LEGACY_PREFIX

# Process the universe in chunks of this size through Phase 1+2 (read,
# compute, write). The full-universe pass holds ~900 ticker histories in
Expand Down Expand Up @@ -445,23 +453,40 @@ def _load_parquet_warmup(s3, bucket: str, ticker: str) -> pd.DataFrame | None:
Returns None when the parquet doesn't exist (new constituent that hasn't
been picked up by the weekly backfill yet). Hard-fails on any other
error shape — NoSilentFails.

Wave-3 reader migration (ROADMAP L1401): iterates the
``price_cache_read_prefixes`` fallback chain — new prefix
(``reference/price_cache/``) consulted first, legacy
(``predictor/price_cache/``) is the soak-window safety net.
"Not found" means absent in BOTH prefixes; non-404 errors hard-fail
on the first prefix that raises (preserving NoSilentFails).
"""
key = f"{PRICE_CACHE_PREFIX}{ticker}.parquet"
try:
df = load_parquet_from_s3(s3, bucket, key)
except ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "")
if code in ("NoSuchKey", "404"):
return None
raise RuntimeError(
f"parquet-warmup read failed for {ticker} (bucket={bucket}, "
f"key={key}): {exc}"
) from exc
df: pd.DataFrame | None = None
last_key: str | None = None
not_found = 0
for prefix in price_cache_read_prefixes(PRICE_CACHE_PREFIX):
last_key = f"{prefix}{ticker}.parquet"
try:
df = load_parquet_from_s3(s3, bucket, last_key)
except ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "")
if code in ("NoSuchKey", "404"):
not_found += 1
continue
raise RuntimeError(
f"parquet-warmup read failed for {ticker} (bucket={bucket}, "
f"key={last_key}): {exc}"
) from exc
break

if df is None:
# Absent in every active prefix → genuine "not in price cache".
return None

if df.empty or "Close" not in df.columns:
raise RuntimeError(
f"parquet-warmup for {ticker}: parquet exists but invalid shape "
f"(empty={df.empty}, cols={list(df.columns)[:6]})"
f"(empty={df.empty}, cols={list(df.columns)[:6]}, key={last_key})"
)
return df

Expand Down
32 changes: 16 additions & 16 deletions collectors/slim_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
import boto3
import pandas as pd

from builders._price_cache_writeboth import (
PRICE_CACHE_LEGACY_PREFIX,
list_price_cache_keys,
)

logger = logging.getLogger(__name__)


def collect(
bucket: str,
full_cache_prefix: str = "predictor/price_cache/",
full_cache_prefix: str = PRICE_CACHE_LEGACY_PREFIX,
slim_prefix: str = "predictor/price_cache_slim/",
lookback_days: int = 730,
dry_run: bool = False,
Expand All @@ -32,8 +37,13 @@ def collect(

Args:
bucket: S3 bucket name
full_cache_prefix: S3 prefix for full 10y parquets
slim_prefix: S3 prefix for 2y slim parquets
full_cache_prefix: S3 prefix anchor for full 10y parquets. The
production default (``predictor/price_cache/``) iterates the
Wave-3 read fallback chain — ``reference/price_cache/`` first,
legacy second — via :func:`list_price_cache_keys`. Custom
prefixes opt out of the chain (single-prefix listing).
slim_prefix: S3 prefix for 2y slim parquets (Wave-3 unaffected;
``predictor/price_cache_slim/`` retirement is the Wave-4 arc).
lookback_days: calendar days of history to keep (default 730 = 2 years)
dry_run: if True, count files but don't write

Expand All @@ -48,8 +58,9 @@ def collect(
with tempfile.TemporaryDirectory() as tmpdir:
local_dir = Path(tmpdir)

# Download full cache parquets
parquet_keys = _list_parquets(s3, bucket, full_cache_prefix)
# Download full cache parquets (Wave-3 read-prefix chain: new first,
# legacy fallback during soak; deduped by ticker basename).
parquet_keys = list_price_cache_keys(s3, bucket, full_cache_prefix)

if dry_run:
logger.info("[dry-run] slim_cache: %d parquets would be sliced", len(parquet_keys))
Expand Down Expand Up @@ -127,14 +138,3 @@ def collect(
if validation_results:
result["validation"] = validation
return result


def _list_parquets(s3, bucket: str, prefix: str) -> list[str]:
"""List all .parquet keys under the given S3 prefix."""
paginator = s3.get_paginator("list_objects_v2")
keys = []
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["Key"].endswith(".parquet"):
keys.append(obj["Key"])
return keys
80 changes: 79 additions & 1 deletion tests/test_daily_append_parquet_warmup.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,82 @@ def test_load_parquet_warmup_returns_valid_frame(monkeypatch):
result = daily_append._load_parquet_warmup(mock_s3, "b", "AAPL")
assert result is not None
assert len(result) == 2
assert "Close" in result.columns


# ── Wave-3 reader migration (ROADMAP L1401) ────────────────────────────────


def test_load_parquet_warmup_prefers_new_prefix(monkeypatch):
"""The default-prefix path consults ``reference/price_cache/`` FIRST.
During the Wave-3 soak both prefixes hold byte-equal copies, but the
new prefix is also the sole survivor post-PR4 cutover — exercising it
end-to-end during the soak is the migration's whole point.
"""
good_df = pd.DataFrame(
{"Open": [1.0], "High": [1.0], "Low": [1.0], "Close": [1.0], "Volume": [1]},
index=pd.DatetimeIndex(["2026-04-20"]),
)
seen_keys: list[str] = []

def _stub(_s3, _bucket, key):
seen_keys.append(key)
return good_df

monkeypatch.setattr(daily_append, "load_parquet_from_s3", _stub)

result = daily_append._load_parquet_warmup(MagicMock(), "b", "AAPL")
assert result is not None
# First (and only — break on success) key fetched is the new prefix.
assert seen_keys == ["reference/price_cache/AAPL.parquet"], (
"Wave-3 read-prefix chain: ``reference/price_cache/`` must be "
"tried before the legacy fallback."
)


def test_load_parquet_warmup_falls_back_to_legacy_on_new_prefix_miss(monkeypatch):
"""When the new prefix is empty (e.g. the soak-window backfill hasn't
seeded a brand-new ticker yet) the legacy fallback is consulted; if
legacy has the parquet the helper returns it.
"""
good_df = pd.DataFrame(
{"Open": [1.0], "High": [1.0], "Low": [1.0], "Close": [1.0], "Volume": [1]},
index=pd.DatetimeIndex(["2026-04-20"]),
)
calls: list[str] = []

def _stub(_s3, _bucket, key):
calls.append(key)
if key.startswith("reference/"):
raise _fake_client_error("NoSuchKey")
return good_df

monkeypatch.setattr(daily_append, "load_parquet_from_s3", _stub)

result = daily_append._load_parquet_warmup(MagicMock(), "b", "AAPL")
assert result is not None
assert calls == [
"reference/price_cache/AAPL.parquet",
"predictor/price_cache/AAPL.parquet",
], "Fallback order must be new → legacy (read = write reversed)."


def test_load_parquet_warmup_returns_none_when_absent_in_both_prefixes(monkeypatch):
"""A ticker absent from BOTH prefixes is a genuine "not in price
cache yet" state (brand-new constituent the weekly backfill hasn't
picked up). Both prefix lookups must be attempted before the helper
degrades to None — a single-prefix NoSuchKey is no longer sufficient.
"""
calls: list[str] = []

def _stub(_s3, _bucket, key):
calls.append(key)
raise _fake_client_error("NoSuchKey")

monkeypatch.setattr(daily_append, "load_parquet_from_s3", _stub)

result = daily_append._load_parquet_warmup(MagicMock(), "b", "NEWIPO")
assert result is None
assert len(calls) == 2, (
"When the new prefix is missing the helper must still try legacy "
"before declaring the ticker absent."
)
Loading
Loading