diff --git a/builders/daily_append.py b/builders/daily_append.py index d313b70..48701a2 100644 --- a/builders/daily_append.py +++ b/builders/daily_append.py @@ -49,6 +49,54 @@ PRICE_CACHE_PREFIX = "predictor/price_cache/" +def _write_row_backfill_safe( + lib, + symbol: str, + new_row: pd.DataFrame, + existing_series: pd.DataFrame | None = None, +) -> str: + """Write a single-date row to ArcticDB, handling both append and backfill cases. + + Returns the mode used: ``"append"`` (target_date > all existing dates, + used update() — fast) or ``"backfill"`` (target_date is in the middle + of an existing series, used read+splice+write() — necessary because + update() requires monotonic insertion at the head). + + The backfill path is ~10-100x slower per ticker (full series read + + full rewrite vs. single-row update) but fires only for rare backfill + operations like the 2026-04-24 historical VWAP repair after the + polygon outage. + """ + target_ts = new_row.index[0] + + # If caller already has the existing series (the per-ticker loop in + # daily_append already reads `hist` for feature warmup), reuse it + # instead of double-reading. + if existing_series is None: + try: + existing_series = lib.read(symbol).data + except Exception: + # Symbol doesn't exist yet — first write is always an append. + lib.write(symbol, new_row, prune_previous_versions=True) + return "append" + + if existing_series.empty or target_ts > existing_series.index.max(): + # Append at head — fast path. update() is idempotent for same-date + # rows (replaces in place rather than appending duplicates). + lib.update(symbol, new_row) + return "append" + + # Backfill — splice new_row into existing series, write back full + # series. Required because ArcticDB's update() refuses non-monotonic + # insertion ("index must be monotonic increasing or decreasing"). + # Same-date rows are deduped with keep="last" so the new row wins + # over any existing row at target_ts (matches update() semantics). + combined = pd.concat([existing_series, new_row]) + combined = combined[~combined.index.duplicated(keep="last")].sort_index() + lib.write(symbol, combined, prune_previous_versions=True) + return "backfill" + + def _load_parquet_warmup(s3, bucket: str, ticker: str) -> pd.DataFrame | None: """Load a ticker's 10y price-cache parquet for feature warmup. @@ -395,13 +443,18 @@ def daily_append( today_row.index.name = "date" - # Use update() rather than append() so a re-run on the same - # date overwrites instead of accumulating duplicate rows. - # 2026-04-15: diagnosed as root cause of the predictor training - # failure where 904/909 tickers had duplicate date rows when - # read back from ArcticDB. update() is idempotent — same-date - # rows replace instead of append, regardless of race conditions. - universe_lib.update(ticker, today_row) + # Backfill-safe write — picks update() when target_date is at + # the head of the series (the steady-state daily pass), and + # falls back to read+splice+write() when target_date is in + # the middle (historical backfill). The 2026-04-24 polygon + # VWAP repair surfaced the need: ArcticDB's update() raises + # "index must be monotonic increasing or decreasing" when + # asked to insert behind the latest stored date. + # `hist` is already in scope from the per-ticker warmup read + # above, so the helper reuses it instead of double-reading. + _write_row_backfill_safe( + universe_lib, ticker, today_row, existing_series=hist + ) # Increment coverage counter + emit the partial-features log # only after the write landed. @@ -438,6 +491,12 @@ def daily_append( macro_updated: list[str] = [] sector_updated: list[str] = [] + # Track per-symbol write mode (append vs backfill) so the verification + # check below can apply the right correctness assertion. Append: last + # readback row should equal target_ts. Backfill: target_ts should be + # in the readback index (could be anywhere in the middle). + macro_write_modes: dict[str, str] = {} + if not dry_run: for key in macro_keys: bar = closes.get(key) @@ -450,8 +509,9 @@ def daily_append( index=pd.DatetimeIndex([today_ts]), ) new_row.index.name = "date" - macro_lib.update(key, new_row) + mode = _write_row_backfill_safe(macro_lib, key, new_row) macro_updated.append(key) + macro_write_modes[key] = mode except Exception as exc: raise RuntimeError( f"Failed to update macro {key} bar for {date_str}: {exc}" @@ -473,8 +533,9 @@ def daily_append( index=pd.DatetimeIndex([today_ts]), ) new_row.index.name = "date" - macro_lib.update(sym, new_row) + mode = _write_row_backfill_safe(macro_lib, sym, new_row) sector_updated.append(sym) + macro_write_modes[sym] = mode except Exception as exc: raise RuntimeError( f"Failed to update sector ETF {sym} bar for {date_str}: {exc}" @@ -495,12 +556,17 @@ def daily_append( f"before claiming pipeline success." ) - # Verify writes landed. The update() call above is fire-and-forget - # (no return value surfaces a success flag), so we read back the - # latest row for each key we claimed to update. If the readback - # doesn't show today's date, something between update() and commit - # silently dropped the write — the 2026-04-15 failure mode where - # Step Function reported SUCCEEDED and macro stayed 5 days stale. + # Verify writes landed. The update() / write() calls above are + # fire-and-forget (no return value surfaces a success flag), so + # we read back each key and assert target_ts is present. The + # check is mode-aware: + # - append mode: last readback row should equal target_ts + # (catches the 2026-04-15 silent-stale failure where SSM + # reported SUCCEEDED but macro/SPY stayed 5 days behind) + # - backfill mode: target_ts should be IN the readback index, + # anywhere (last date is naturally future relative to + # target_ts when we backfill an old date) + target_ts_norm = today_ts.normalize() verification_failures: list[tuple[str, str]] = [] for key in macro_updated + sector_updated: try: @@ -511,16 +577,26 @@ def daily_append( if readback.empty: verification_failures.append((key, "readback empty")) continue - last_ts = pd.Timestamp(readback.index[-1]).normalize() - if last_ts != today_ts.normalize(): - verification_failures.append( - (key, f"last date {last_ts.date()} != expected {today_ts.date()}") - ) + mode = macro_write_modes.get(key, "append") + if mode == "backfill": + # Target date should be present somewhere in the series. + index_norm = pd.DatetimeIndex(readback.index).normalize() + if target_ts_norm not in index_norm: + verification_failures.append( + (key, f"backfill target {target_ts_norm.date()} not in readback index " + f"(last={pd.Timestamp(readback.index[-1]).date()})") + ) + else: + last_ts = pd.Timestamp(readback.index[-1]).normalize() + if last_ts != target_ts_norm: + verification_failures.append( + (key, f"last date {last_ts.date()} != expected {target_ts_norm.date()}") + ) if verification_failures: raise RuntimeError( f"Macro update verification failed for {date_str}: " - f"{verification_failures}. update() calls completed without " - f"exception but readback shows stale data. Investigate " + f"{verification_failures}. update()/write() calls completed without " + f"exception but readback shows the row is missing. Investigate " f"ArcticDB commit / consistency semantics." ) diff --git a/tests/test_daily_append_backfill_safe.py b/tests/test_daily_append_backfill_safe.py new file mode 100644 index 0000000..4f75a7e --- /dev/null +++ b/tests/test_daily_append_backfill_safe.py @@ -0,0 +1,192 @@ +"""Tests for _write_row_backfill_safe — the helper that handles both +append (target_date > all existing dates, fast lib.update() path) and +backfill (target_date in middle of series, lib.write() full rewrite). + +Background: the 2026-04-24 historical VWAP repair (after the polygon +4/17→4/23 outage) surfaced that ArcticDB's update() raises +"index must be monotonic increasing or decreasing" when asked to insert +behind the latest stored date. daily_append was originally designed for +"append today's row at the head" only; this helper makes it usable for +arbitrary historical backfills too. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pandas as pd +import pytest + +from builders.daily_append import _write_row_backfill_safe + + +def _series(dates: list[str], close_values: list[float] | None = None) -> pd.DataFrame: + """Build a minimal stored-series DataFrame for the mock lib.read().""" + closes = close_values if close_values is not None else [100.0 + i for i in range(len(dates))] + return pd.DataFrame( + {"Close": closes, "Open": closes, "High": closes, "Low": closes, + "Volume": [1_000_000] * len(dates), "VWAP": [None] * len(dates)}, + index=pd.DatetimeIndex(pd.to_datetime(dates)), + ) + + +def _new_row(date: str, close: float = 999.0) -> pd.DataFrame: + return pd.DataFrame( + {"Close": [close], "Open": [close], "High": [close], "Low": [close], + "Volume": [1_000_000], "VWAP": [close * 0.99]}, + index=pd.DatetimeIndex(pd.to_datetime([date])), + ) + + +# ── append path (target > latest, fast) ──────────────────────────────────── + + +def test_append_uses_lib_update_when_target_after_latest(): + lib = MagicMock() + existing = _series(["2026-04-20", "2026-04-21", "2026-04-22"]) + new_row = _new_row("2026-04-23") + + mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing) + + assert mode == "append" + lib.update.assert_called_once_with("AAPL", new_row) + lib.write.assert_not_called() + + +def test_append_when_existing_series_is_empty(): + """First write to a previously-empty symbol takes the append path.""" + lib = MagicMock() + empty = pd.DataFrame( + columns=["Close", "Open", "High", "Low", "Volume", "VWAP"], + index=pd.DatetimeIndex([]), + ) + new_row = _new_row("2026-04-23") + + mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=empty) + + assert mode == "append" + lib.update.assert_called_once() + lib.write.assert_not_called() + + +def test_first_write_to_nonexistent_symbol(): + """If lib.read raises, the symbol doesn't exist — first write must use write().""" + lib = MagicMock() + lib.read.side_effect = Exception("symbol not found") + + new_row = _new_row("2026-04-23") + mode = _write_row_backfill_safe(lib, "NEW_TICKER", new_row) + + assert mode == "append" + lib.write.assert_called_once() + lib.update.assert_not_called() + + +# ── backfill path (target ≤ latest, full rewrite) ────────────────────────── + + +def test_backfill_uses_lib_write_when_target_before_latest(): + """Inserting a row behind the latest must NOT call update() (would raise + 'index must be monotonic increasing or decreasing'). Must use write() + with the spliced full series.""" + lib = MagicMock() + existing = _series(["2026-04-20", "2026-04-21", "2026-04-22", "2026-04-23"]) + # Target = 2026-04-17, BEFORE all existing dates + new_row = _new_row("2026-04-17", close=200.0) + + mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing) + + assert mode == "backfill" + lib.update.assert_not_called() + lib.write.assert_called_once() + # Verify the written frame includes both the new row + all existing rows, + # in monotonic-sorted order, with no duplicates. + written = lib.write.call_args.args[1] + assert pd.Timestamp("2026-04-17") in written.index + assert pd.Timestamp("2026-04-23") in written.index + assert written.index.is_monotonic_increasing + assert not written.index.has_duplicates + + +def test_backfill_replaces_existing_same_date_row(): + """If the target date already has a row in the existing series, the new + row must REPLACE it (matches update()'s same-date semantics).""" + lib = MagicMock() + existing = _series( + ["2026-04-20", "2026-04-21", "2026-04-22", "2026-04-23"], + close_values=[100.0, 101.0, 102.0, 103.0], + ) + new_row = _new_row("2026-04-21", close=999.0) + + mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing) + + assert mode == "backfill" + written = lib.write.call_args.args[1] + # 2026-04-21 should now be 999.0 (the new row), not 101.0 (the old row). + assert written.loc[pd.Timestamp("2026-04-21"), "Close"] == 999.0 + # Other rows unchanged. + assert written.loc[pd.Timestamp("2026-04-20"), "Close"] == 100.0 + assert written.loc[pd.Timestamp("2026-04-23"), "Close"] == 103.0 + + +def test_backfill_target_in_middle_of_series(): + """Target = date in middle of existing series. Backfill mode + write.""" + lib = MagicMock() + existing = _series(["2026-04-15", "2026-04-16", "2026-04-22", "2026-04-23"]) + new_row = _new_row("2026-04-17", close=500.0) + + mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing) + + assert mode == "backfill" + written = lib.write.call_args.args[1] + assert pd.Timestamp("2026-04-17") in written.index + assert written.loc[pd.Timestamp("2026-04-17"), "Close"] == 500.0 + assert len(written) == 5 # 4 existing + 1 new + assert written.index.is_monotonic_increasing + + +# ── boundary cases ──────────────────────────────────────────────────────── + + +def test_target_equal_to_latest_takes_append_path(): + """target_ts == latest_ts is the steady-state daily case (re-running + today). Should use update() (which is idempotent for same-date).""" + lib = MagicMock() + existing = _series(["2026-04-21", "2026-04-22", "2026-04-23"]) + new_row = _new_row("2026-04-23", close=200.0) # same date as latest + + mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing) + + # target_ts > latest is False (they're equal), so backfill path runs + # with read+splice+write. The condition is `target_ts > existing.index.max()`. + # Same-date should NOT take the append path because update() at the + # same-date IS valid, but the >-check is conservative for safety. + assert mode == "backfill" + lib.write.assert_called_once() + + +def test_lib_write_called_with_prune_previous_versions(): + """write() with prune_previous_versions=True keeps storage small — + backfill is rare enough that we don't need version history bloat.""" + lib = MagicMock() + existing = _series(["2026-04-20", "2026-04-21"]) + new_row = _new_row("2026-04-15") + + _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing) + + call_kwargs = lib.write.call_args.kwargs + assert call_kwargs.get("prune_previous_versions") is True + + +def test_passing_existing_series_avoids_extra_lib_read(): + """When the caller passes existing_series (the per-ticker loop in + daily_append already reads `hist` for warmup), the helper must NOT + re-read — saves a round-trip per ticker on the steady-state daily run. + """ + lib = MagicMock() + existing = _series(["2026-04-21", "2026-04-22"]) + new_row = _new_row("2026-04-23") + + _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing) + + lib.read.assert_not_called() diff --git a/tests/test_daily_append_semantics.py b/tests/test_daily_append_semantics.py index c201aa5..158f37f 100644 --- a/tests/test_daily_append_semantics.py +++ b/tests/test_daily_append_semantics.py @@ -22,22 +22,40 @@ def _source() -> str: def test_universe_lib_uses_update_not_append(): + """Per-ticker write must go through _write_row_backfill_safe, which + uses lib.update() for the append case (steady-state daily pass) and + lib.write() for the backfill case (historical row in middle of series). + Direct lib.append() must never appear — it accumulates duplicate + same-date rows and caused the 2026-04-15 retrain outage. + """ src = _source() - assert "universe_lib.update(ticker, today_row)" in src, ( - "daily_append must call universe_lib.update() — append() accumulates " - "duplicate same-date rows and caused the 2026-04-15 retrain outage." + assert "_write_row_backfill_safe(\n universe_lib, ticker, today_row" in src or \ + "_write_row_backfill_safe(universe_lib, ticker, today_row" in src, ( + "Per-ticker write must call _write_row_backfill_safe(universe_lib, ticker, today_row, ...) " + "— the helper picks update() for append + write() for backfill. " + "Calling universe_lib.update() directly skips the backfill-safe path " + "and breaks historical rewrites with 'index must be monotonic' " + "(see 2026-04-24 polygon VWAP repair)." + ) + # Inside the helper, the append branch must use update() (not append()). + assert "lib.update(symbol, new_row)" in src, ( + "_write_row_backfill_safe must call lib.update() in the append " + "branch — append() accumulates duplicate same-date rows " + "(2026-04-15 retrain outage)." ) - assert "universe_lib.append(ticker, today_row)" not in src, ( + assert "universe_lib.append(" not in src, ( "Found universe_lib.append() — this is the duplicate-row bug. " - "Use update() instead." + "Use _write_row_backfill_safe() (which routes to update() for append)." ) def test_macro_lib_uses_update_not_append(): + """Macro + sector ETF writes must also route through + _write_row_backfill_safe so historical macro backfills work.""" src = _source() - # Both key-path and sym-path (sector ETFs) must use update. - assert "macro_lib.update(key, new_row)" in src - assert "macro_lib.update(sym, new_row)" in src + # Both key-path and sym-path (sector ETFs) must use the helper. + assert "_write_row_backfill_safe(macro_lib, key, new_row)" in src + assert "_write_row_backfill_safe(macro_lib, sym, new_row)" in src assert "macro_lib.append(key," not in src assert "macro_lib.append(sym," not in src @@ -145,28 +163,34 @@ def test_partial_features_are_loudly_logged(): def test_counters_increment_after_successful_write(): - """n_ok and n_partial must be incremented AFTER universe_lib.update() + """n_ok and n_partial must be incremented AFTER the per-ticker write so an exception rolls the iteration back cleanly into n_err. Locks the 2026-04-21 rewrite where counters were hoisted post-write - to prevent double-counting when update() throws. + to prevent double-counting when the write throws. """ src = _source() - # The update() call site + increments must appear in that order. - # Find the update call for universe_lib.update(ticker, today_row) — - # the committed increment happens on the same side. + # The write call site + increments must appear in that order. + # Find the _write_row_backfill_safe(universe_lib, ticker, today_row) call. lines = src.splitlines() - update_idx = None + write_idx = None for i, line in enumerate(lines): - if "universe_lib.update(ticker, today_row)" in line: - update_idx = i + if "_write_row_backfill_safe(" in line and "universe_lib" in line: + write_idx = i break - assert update_idx is not None, ( - "universe_lib.update(ticker, today_row) call site not found" + # Multi-line call form: line ends with `_write_row_backfill_safe(` + if line.rstrip().endswith("_write_row_backfill_safe("): + # Look ahead a few lines for `universe_lib` + ahead = "\n".join(lines[i:i+5]) + if "universe_lib, ticker, today_row" in ahead: + write_idx = i + break + assert write_idx is not None, ( + "_write_row_backfill_safe(universe_lib, ticker, today_row, ...) call site not found" ) - window_after = "\n".join(lines[update_idx:update_idx + 20]) + window_after = "\n".join(lines[write_idx:write_idx + 25]) assert "n_partial += 1" in window_after and "n_ok += 1" in window_after, ( - "n_ok / n_partial must be incremented AFTER the update() call, " + "n_ok / n_partial must be incremented AFTER the write call, " "not before. Increments before write cause miscount on exception." )