Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions builders/_price_cache_writeboth.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,40 @@ def price_cache_write_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list
return [primary]


def price_cache_read_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list[str]:
"""Return active READ prefixes in fallback order.

Companion to :func:`price_cache_write_prefixes` — the Wave-3 reader
migration (PR3+) iterates this in fallback order so the new prefix
is consulted first and the legacy prefix is the safety net during
the soak window.

* ``primary == "predictor/price_cache/"`` (the production default) →
``["reference/price_cache/", "predictor/price_cache/"]``: try
the new prefix first (post-PR4 it's the sole survivor), fall
back to legacy on miss for the soak period.
* ``primary`` is any other string → ``[primary]`` (single-read
fallback, mirrors the test-friendly write-side semantics).

Callers wrap their reads in::

for prefix in price_cache_read_prefixes(s3_prefix):
try:
return s3.get_object(Bucket=bucket, Key=f"{prefix}{name}")
except ClientError:
continue

At Wave-3 PR4 cutover the legacy entry is removed here in the same
edit that flips the write helper — one-line change in each.
"""
if primary == PRICE_CACHE_LEGACY_PREFIX:
return [PRICE_CACHE_NEW_PREFIX, PRICE_CACHE_LEGACY_PREFIX]
return [primary]


__all__ = [
"PRICE_CACHE_LEGACY_PREFIX",
"PRICE_CACHE_NEW_PREFIX",
"price_cache_write_prefixes",
"price_cache_read_prefixes",
]
19 changes: 16 additions & 3 deletions collectors/constituents.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,27 @@ def collect(
)
logger.info("Wrote constituents.json to s3://%s/%s (%d tickers)", bucket, key, len(tickers))

# Write sector_map.json to canonical data path + legacy predictor path
# Write sector_map.json to canonical data path + legacy predictor
# path + Wave-3 new reference/ path. The new ``reference/`` write
# was missing from PR1 #270 because that PR scoped only the
# ticker-parquet write paths (yfinance / FRED / chronic-gap);
# sector_map.json travels through this separate constituents
# collector and needs its own write-both update so the Wave-3
# readers don't see a stale ``reference/`` copy after PR4 retires
# legacy.
sector_map_body = json.dumps(sector_etf_map, indent=2, sort_keys=True)
for sector_map_key in ["data/sector_map.json", "predictor/price_cache/sector_map.json"]:
for sector_map_key in (
"data/sector_map.json",
"predictor/price_cache/sector_map.json",
"reference/price_cache/sector_map.json",
):
s3.put_object(
Bucket=bucket, Key=sector_map_key,
Body=sector_map_body, ContentType="application/json",
)
logger.info("Wrote sector_map.json to data/ and predictor/ paths")
logger.info(
"Wrote sector_map.json to data/, predictor/, and reference/ paths",
)

# tickers is included in the return so callers don't need an S3 round-trip
# to re-read what they just wrote. Pre-MorningEnrich preflight (PR #134)
Expand Down
25 changes: 19 additions & 6 deletions sf_preflight.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,17 +584,30 @@ def check_backfill_source_freshness(ctx: PreflightContext) -> CheckResult:

# Backfill source = price_cache + daily_closes delta. Effective last
# is max(price_cache_last, daily_closes_last). Read SPY parquet.
try:
obj = s3.get_object(Bucket=ctx.bucket, Key="predictor/price_cache/SPY.parquet")
df = pd.read_parquet(io.BytesIO(obj["Body"].read()))
cache_last = pd.Timestamp(df.index[-1]).normalize()
except Exception as exc:
#
# Wave-3 reader migration (ROADMAP L1401): try the new
# ``reference/price_cache/`` prefix first, fall back to legacy
# ``predictor/price_cache/`` during the producer write-both soak
# (PR1 #270 shipped 2026-05-19; soak ≥1 week to ~2026-05-26).
from builders._price_cache_writeboth import price_cache_read_prefixes

df = None
last_exc: Exception | None = None
for prefix in price_cache_read_prefixes():
try:
obj = s3.get_object(Bucket=ctx.bucket, Key=f"{prefix}SPY.parquet")
df = pd.read_parquet(io.BytesIO(obj["Body"].read()))
break
except Exception as exc:
last_exc = exc
if df is None:
return CheckResult(
name="backfill_source_freshness",
status="fail",
message=f"price_cache SPY read raised: {exc}",
message=f"price_cache SPY read raised (both prefixes): {last_exc}",
elapsed_seconds=time.time() - t0,
)
cache_last = pd.Timestamp(df.index[-1]).normalize()

# Daily delta — staging/daily_closes/{prior_trading_day}.parquet.
try:
Expand Down
47 changes: 47 additions & 0 deletions tests/test_constituents_sector_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,50 @@ def fake_get(url, **kwargs):
"the count is only useful as observability, not as a tickers source"
)
assert set(result["tickers"]) == {"AAPL", "MSFT", "JHG", "WSO"}


def test_sector_map_writes_to_all_three_paths() -> None:
"""Wave-3 PR3 (ROADMAP L1401): sector_map.json must be written to:

1. ``data/sector_map.json`` — canonical \"new\" data path.
2. ``predictor/price_cache/sector_map.json`` — legacy path (retired
in PR4).
3. ``reference/price_cache/sector_map.json`` — Wave-3 new home for
the predictor/price_cache/ migration. PR1 #270 missed this
write (it scoped only the ticker-parquet writes); without it,
readers that hit ``reference/`` first see a stale snapshot
after PR4 deletes legacy.
"""
from unittest.mock import MagicMock

sp500_html = _fake_html(
["AAPL"], ["Information Technology"],
)
sp400_html = _fake_html(["JHG"], ["Financials"])

def fake_get(url, **kwargs):
return _FakeResp(sp500_html if "500" in url else sp400_html)

put_calls: list[dict] = []
fake_s3 = MagicMock()
fake_s3.put_object.side_effect = lambda **kw: put_calls.append(kw)

fake_boto3 = MagicMock()
fake_boto3.client.return_value = fake_s3

with patch("collectors.constituents.requests.get", side_effect=fake_get), \
patch("collectors.constituents.boto3", fake_boto3):
constituents.collect(bucket="any", dry_run=False)

sector_map_writes = [
c for c in put_calls if c["Key"].endswith("sector_map.json")
]
written_keys = {c["Key"] for c in sector_map_writes}
assert written_keys == {
"data/sector_map.json",
"predictor/price_cache/sector_map.json",
"reference/price_cache/sector_map.json",
}, written_keys
# Bodies must be byte-equal — readers can pick any path safely.
bodies = [c["Body"] for c in sector_map_writes]
assert len(set(bodies)) == 1, "sector_map.json bodies diverge across paths"
28 changes: 28 additions & 0 deletions tests/test_price_cache_writeboth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from builders._price_cache_writeboth import (
PRICE_CACHE_LEGACY_PREFIX,
PRICE_CACHE_NEW_PREFIX,
price_cache_read_prefixes,
price_cache_write_prefixes,
)

Expand Down Expand Up @@ -71,6 +72,33 @@ def test_new_prefix_is_not_legacy():
assert PRICE_CACHE_LEGACY_PREFIX.startswith("predictor/")


# ---------------------------------------------------------------------------
# Read-side helper (Wave-3 PR3 reader migration)
# ---------------------------------------------------------------------------


def test_read_helper_default_returns_new_first_legacy_second():
"""The read order is the WRITE order REVERSED — new prefix consulted
first so consumers see the post-PR4 home as soon as the soak begins;
legacy is the fallback during the soak window only.
"""
out = price_cache_read_prefixes()
assert out == [PRICE_CACHE_NEW_PREFIX, PRICE_CACHE_LEGACY_PREFIX]


def test_read_helper_explicit_legacy_returns_new_first_legacy_second():
out = price_cache_read_prefixes(PRICE_CACHE_LEGACY_PREFIX)
assert out == [PRICE_CACHE_NEW_PREFIX, PRICE_CACHE_LEGACY_PREFIX]


def test_read_helper_custom_prefix_returns_single():
"""Test/config-override prefix opts out of the fallback chain — mirrors
the write-side single-prefix semantics."""
custom = "some/other/prefix/"
out = price_cache_read_prefixes(custom)
assert out == [custom]


# ---------------------------------------------------------------------------
# collectors/prices.py — yfinance refresh upload
# ---------------------------------------------------------------------------
Expand Down
Loading