Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions builders/_price_cache_writeboth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Wave 3 PR1 — additive write-both helper for the price_cache prefix migration.

ROADMAP P1 "`predictor/` S3 namespace rationalization Wave 3": migrate the 10y
``predictor/price_cache/*.parquet`` tree to ``reference/price_cache/`` so the
``predictor/`` namespace ends up owning ONLY predictor-module outputs
(``weights/``, ``predictions/``, ``metrics/``) and ``reference/`` collects
long-lived data-module references. Mirrors the Wave 1 ``predictor/daily_closes/``
→ ``staging/daily_closes/`` arc but uses write-both + soak (instead of Wave 1's
hard cutover) because this writer rewrites only STALE tickers — a hard cut
would leave fresh tickers in legacy and the new prefix incomplete for an
entire yfinance refresh cycle. CLAUDE.md S3 Contract Safety mandates the
write-both for any path change.

Soak contract:
Wave 3 PR1 (this PR) → both prefixes written, every reader stays on legacy.
≥1 week soak after PR1's first Saturday SF lands writes to both prefixes.
Wave 3 PR3+ → reader migrations with legacy fallback in 4 repos.
Wave 3 PR4 (cutover) → swap primary to ``reference/``, drop legacy from this
helper's return list, retire fallbacks, ``aws s3 rm`` legacy prefix.

The helper is the single chokepoint for write-both — three writers call into
it (``collectors/prices.py`` yfinance refresh, ``collectors/fred_history.py``
FRED backfill, ``weekly_collector.py`` chronic-gap self-heal patch). Adding
a future writer requires no per-call-site discipline beyond wrapping the
upload in ``for prefix in price_cache_write_prefixes(s3_prefix): ...``.
"""

from __future__ import annotations

# Wave 3 PR1 — legacy primary, new mirror. The cutover PR (Wave 3 PR4) flips
# ``PRICE_CACHE_NEW_PREFIX`` to first position and removes the legacy entry
# from ``price_cache_write_prefixes`` (one-line edit at that time).
PRICE_CACHE_LEGACY_PREFIX = "predictor/price_cache/"
PRICE_CACHE_NEW_PREFIX = "reference/price_cache/"


def price_cache_write_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list[str]:
"""Return the active write prefixes for ticker parquet uploads.

During the Wave 3 write-both soak (this PR through Wave 3 PR4 cutover):

* ``primary == "predictor/price_cache/"`` (the production default) →
``["predictor/price_cache/", "reference/price_cache/"]`` — every
ticker-parquet write hits both prefixes byte-for-byte.
* ``primary`` is any other string (custom prefix from a test or a
config override) → ``[primary]`` — single-write fallback. Tests
that need to assert a single-prefix write pass an explicit custom
prefix; production paths use the default and get write-both.

Callers wrap their upload in::

for prefix in price_cache_write_prefixes(s3_prefix):
s3.upload_file(local_path, bucket, f"{prefix}{ticker}.parquet")

The order is deterministic (legacy first, new second) so a fail-fast
upload error on the legacy prefix preserves the existing pre-Wave-3
failure semantics — the new prefix never silently masks a legacy
write failure.
"""
if primary == PRICE_CACHE_LEGACY_PREFIX:
return [PRICE_CACHE_LEGACY_PREFIX, PRICE_CACHE_NEW_PREFIX]
return [primary]


__all__ = [
"PRICE_CACHE_LEGACY_PREFIX",
"PRICE_CACHE_NEW_PREFIX",
"price_cache_write_prefixes",
]
26 changes: 16 additions & 10 deletions collectors/fred_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import pandas as pd
import requests

from builders._price_cache_writeboth import price_cache_write_prefixes

logger = logging.getLogger(__name__)

_FRED_BASE = "https://api.stlouisfed.org/fred/series/observations"
Expand Down Expand Up @@ -247,16 +249,20 @@ def backfill_to_s3(
"last_date": ohlcv.index.max().date().isoformat(),
}
if not dry_run:
s3.upload_file(
str(parquet_path),
bucket,
f"{s3_prefix}{ticker}.parquet",
)
logger.info(
"Uploaded s3://%s/%s%s.parquet (%d rows, %s → %s)",
bucket, s3_prefix, ticker, len(ohlcv),
results[ticker]["first_date"], results[ticker]["last_date"],
)
# Wave 3 PR1: write-both to legacy ``predictor/price_cache/``
# + new ``reference/price_cache/`` (see
# builders/_price_cache_writeboth.py for soak contract)
for prefix in price_cache_write_prefixes(s3_prefix):
s3.upload_file(
str(parquet_path),
bucket,
f"{prefix}{ticker}.parquet",
)
logger.info(
"Uploaded s3://%s/%s%s.parquet (%d rows, %s → %s)",
bucket, prefix, ticker, len(ohlcv),
results[ticker]["first_date"], results[ticker]["last_date"],
)
except Exception as e:
logger.error("Backfill failed for %s (%s): %s", ticker, series_id, e)
results[ticker] = {"status": "error", "error": str(e)}
Expand Down
9 changes: 7 additions & 2 deletions collectors/prices.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import pandas as pd
import yfinance as yf

from builders._price_cache_writeboth import price_cache_write_prefixes

logger = logging.getLogger(__name__)

# Tickers that require a leading caret in yfinance (not available on polygon)
Expand Down Expand Up @@ -218,10 +220,13 @@ def _refresh_stale(
new_df.index = idx
new_df = new_df.sort_index()

# Write locally and upload
# Write locally and upload (Wave 3 PR1: write-both to legacy
# ``predictor/price_cache/`` + new ``reference/price_cache/``;
# see builders/_price_cache_writeboth.py for soak contract)
parquet_path = local_dir / f"{ticker}.parquet"
new_df.to_parquet(parquet_path, engine="pyarrow", compression="snappy")
s3.upload_file(str(parquet_path), bucket, f"{s3_prefix}{ticker}.parquet")
for prefix in price_cache_write_prefixes(s3_prefix):
s3.upload_file(str(parquet_path), bucket, f"{prefix}{ticker}.parquet")
refreshed += 1

except Exception as e:
Expand Down
68 changes: 68 additions & 0 deletions infrastructure/backfill_reference_price_cache.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env bash
# infrastructure/backfill_reference_price_cache.sh — Wave 3 PR1 one-shot seed.
#
# Copies every object under ``s3://alpha-engine-research/predictor/price_cache/``
# to ``s3://alpha-engine-research/reference/price_cache/`` byte-for-byte. Run
# ONCE as part of the PR1 deploy — the producer write-both in
# ``collectors/prices.py`` + ``collectors/fred_history.py`` +
# ``weekly_collector.py`` only mirrors writes for STALE tickers, so the new
# prefix needs this initial seed before the write-both soak clock starts.
#
# Idempotent. ``aws s3 sync`` skips objects that already match by size +
# last-modified, so re-running is safe and incremental.
#
# Usage:
# bash infrastructure/backfill_reference_price_cache.sh # real copy
# bash infrastructure/backfill_reference_price_cache.sh --dry-run # plan only

set -euo pipefail

BUCKET="${ALPHA_ENGINE_BUCKET:-alpha-engine-research}"
LEGACY_PREFIX="predictor/price_cache/"
NEW_PREFIX="reference/price_cache/"

DRY_RUN=""
if [[ "${1:-}" == "--dry-run" ]]; then
DRY_RUN="--dryrun"
echo "[backfill] DRY-RUN: planning copy without writing"
fi

echo "[backfill] Wave 3 PR1 seed: ${LEGACY_PREFIX} -> ${NEW_PREFIX} on s3://${BUCKET}"

# Pre-flight: legacy must exist + have objects
LEGACY_COUNT=$(aws s3 ls "s3://${BUCKET}/${LEGACY_PREFIX}" --recursive --summarize \
| awk '/Total Objects:/ {print $3}')
if [[ -z "${LEGACY_COUNT}" || "${LEGACY_COUNT}" -lt 100 ]]; then
echo "[backfill] ERROR: legacy prefix has ${LEGACY_COUNT:-0} objects (expected >100)." >&2
echo "[backfill] refusing to seed an empty/sparse mirror." >&2
exit 1
fi
echo "[backfill] legacy: ${LEGACY_COUNT} objects"

# aws s3 sync: copy with delta-only semantics (skips objects matching by
# size + mtime). The first run copies everything; subsequent runs are
# fast no-ops unless the legacy prefix has been refreshed.
aws s3 sync \
"s3://${BUCKET}/${LEGACY_PREFIX}" \
"s3://${BUCKET}/${NEW_PREFIX}" \
--only-show-errors \
${DRY_RUN}

if [[ -n "${DRY_RUN}" ]]; then
echo "[backfill] DRY-RUN complete — no objects written."
exit 0
fi

# Post-flight: parity check on object counts
NEW_COUNT=$(aws s3 ls "s3://${BUCKET}/${NEW_PREFIX}" --recursive --summarize \
| awk '/Total Objects:/ {print $3}')
echo "[backfill] new prefix: ${NEW_COUNT} objects (legacy: ${LEGACY_COUNT})"

if [[ "${NEW_COUNT}" -lt "${LEGACY_COUNT}" ]]; then
echo "[backfill] WARN: new prefix has fewer objects than legacy." >&2
echo "[backfill] delta=${LEGACY_COUNT} - ${NEW_COUNT}. Re-run to converge." >&2
exit 2
fi

echo "[backfill] OK — reference/price_cache/ seeded. Soak clock starts on the"
echo "[backfill] next Saturday SF firing's first write to both prefixes."
22 changes: 17 additions & 5 deletions tests/test_fred_history_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ def maybe_fail(series_id, period_years=10, api_key=None):
assert result["per_ticker"]["HYOAS"]["status"] == "ok"

def test_uploads_to_s3_when_not_dry_run(self):
"""Wave 3 PR1: each FRED-sourced ticker writes to BOTH legacy
(``predictor/price_cache/``) and new (``reference/price_cache/``)
prefixes. PR4 cutover will flip this back to a single key."""
with self._patch_fetch_returning(), \
patch("collectors.fred_history.boto3.client") as mock_boto:
mock_s3 = MagicMock()
Expand All @@ -283,8 +286,17 @@ def test_uploads_to_s3_when_not_dry_run(self):
dry_run=False,
)
assert result["dry_run"] is False
# upload_file called with (local_path, bucket, key)
assert mock_s3.upload_file.called
call = mock_s3.upload_file.call_args
assert call[0][1] == "test-bucket"
assert call[0][2] == "predictor/price_cache/TWO.parquet"
# upload_file called twice — once per write-both prefix
assert mock_s3.upload_file.call_count == 2
keys = sorted(
call.args[2] for call in mock_s3.upload_file.call_args_list
)
assert keys == [
"predictor/price_cache/TWO.parquet",
"reference/price_cache/TWO.parquet",
]
# Both writes hit the same bucket + same local file
buckets = {call.args[1] for call in mock_s3.upload_file.call_args_list}
assert buckets == {"test-bucket"}
local_paths = {call.args[0] for call in mock_s3.upload_file.call_args_list}
assert len(local_paths) == 1 # same local parquet body for both uploads
Loading
Loading