Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 98 additions & 22 deletions builders/daily_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,54 @@
PRICE_CACHE_PREFIX = "predictor/price_cache/"


def _write_row_backfill_safe(
lib,
symbol: str,
new_row: pd.DataFrame,
existing_series: pd.DataFrame | None = None,
) -> str:
"""Write a single-date row to ArcticDB, handling both append and backfill cases.

Returns the mode used: ``"append"`` (target_date > all existing dates,
used update() — fast) or ``"backfill"`` (target_date is in the middle
of an existing series, used read+splice+write() — necessary because
update() requires monotonic insertion at the head).

The backfill path is ~10-100x slower per ticker (full series read +
full rewrite vs. single-row update) but fires only for rare backfill
operations like the 2026-04-24 historical VWAP repair after the
polygon outage.
"""
target_ts = new_row.index[0]

# If caller already has the existing series (the per-ticker loop in
# daily_append already reads `hist` for feature warmup), reuse it
# instead of double-reading.
if existing_series is None:
try:
existing_series = lib.read(symbol).data
except Exception:
# Symbol doesn't exist yet — first write is always an append.
lib.write(symbol, new_row, prune_previous_versions=True)
return "append"

if existing_series.empty or target_ts > existing_series.index.max():
# Append at head — fast path. update() is idempotent for same-date
# rows (replaces in place rather than appending duplicates).
lib.update(symbol, new_row)
return "append"

# Backfill — splice new_row into existing series, write back full
# series. Required because ArcticDB's update() refuses non-monotonic
# insertion ("index must be monotonic increasing or decreasing").
# Same-date rows are deduped with keep="last" so the new row wins
# over any existing row at target_ts (matches update() semantics).
combined = pd.concat([existing_series, new_row])
combined = combined[~combined.index.duplicated(keep="last")].sort_index()
lib.write(symbol, combined, prune_previous_versions=True)
return "backfill"


def _load_parquet_warmup(s3, bucket: str, ticker: str) -> pd.DataFrame | None:
"""Load a ticker's 10y price-cache parquet for feature warmup.

Expand Down Expand Up @@ -395,13 +443,18 @@ def daily_append(

today_row.index.name = "date"

# Use update() rather than append() so a re-run on the same
# date overwrites instead of accumulating duplicate rows.
# 2026-04-15: diagnosed as root cause of the predictor training
# failure where 904/909 tickers had duplicate date rows when
# read back from ArcticDB. update() is idempotent — same-date
# rows replace instead of append, regardless of race conditions.
universe_lib.update(ticker, today_row)
# Backfill-safe write — picks update() when target_date is at
# the head of the series (the steady-state daily pass), and
# falls back to read+splice+write() when target_date is in
# the middle (historical backfill). The 2026-04-24 polygon
# VWAP repair surfaced the need: ArcticDB's update() raises
# "index must be monotonic increasing or decreasing" when
# asked to insert behind the latest stored date.
# `hist` is already in scope from the per-ticker warmup read
# above, so the helper reuses it instead of double-reading.
_write_row_backfill_safe(
universe_lib, ticker, today_row, existing_series=hist
)

# Increment coverage counter + emit the partial-features log
# only after the write landed.
Expand Down Expand Up @@ -438,6 +491,12 @@ def daily_append(
macro_updated: list[str] = []
sector_updated: list[str] = []

# Track per-symbol write mode (append vs backfill) so the verification
# check below can apply the right correctness assertion. Append: last
# readback row should equal target_ts. Backfill: target_ts should be
# in the readback index (could be anywhere in the middle).
macro_write_modes: dict[str, str] = {}

if not dry_run:
for key in macro_keys:
bar = closes.get(key)
Expand All @@ -450,8 +509,9 @@ def daily_append(
index=pd.DatetimeIndex([today_ts]),
)
new_row.index.name = "date"
macro_lib.update(key, new_row)
mode = _write_row_backfill_safe(macro_lib, key, new_row)
macro_updated.append(key)
macro_write_modes[key] = mode
except Exception as exc:
raise RuntimeError(
f"Failed to update macro {key} bar for {date_str}: {exc}"
Expand All @@ -473,8 +533,9 @@ def daily_append(
index=pd.DatetimeIndex([today_ts]),
)
new_row.index.name = "date"
macro_lib.update(sym, new_row)
mode = _write_row_backfill_safe(macro_lib, sym, new_row)
sector_updated.append(sym)
macro_write_modes[sym] = mode
except Exception as exc:
raise RuntimeError(
f"Failed to update sector ETF {sym} bar for {date_str}: {exc}"
Expand All @@ -495,12 +556,17 @@ def daily_append(
f"before claiming pipeline success."
)

# Verify writes landed. The update() call above is fire-and-forget
# (no return value surfaces a success flag), so we read back the
# latest row for each key we claimed to update. If the readback
# doesn't show today's date, something between update() and commit
# silently dropped the write — the 2026-04-15 failure mode where
# Step Function reported SUCCEEDED and macro stayed 5 days stale.
# Verify writes landed. The update() / write() calls above are
# fire-and-forget (no return value surfaces a success flag), so
# we read back each key and assert target_ts is present. The
# check is mode-aware:
# - append mode: last readback row should equal target_ts
# (catches the 2026-04-15 silent-stale failure where SSM
# reported SUCCEEDED but macro/SPY stayed 5 days behind)
# - backfill mode: target_ts should be IN the readback index,
# anywhere (last date is naturally future relative to
# target_ts when we backfill an old date)
target_ts_norm = today_ts.normalize()
verification_failures: list[tuple[str, str]] = []
for key in macro_updated + sector_updated:
try:
Expand All @@ -511,16 +577,26 @@ def daily_append(
if readback.empty:
verification_failures.append((key, "readback empty"))
continue
last_ts = pd.Timestamp(readback.index[-1]).normalize()
if last_ts != today_ts.normalize():
verification_failures.append(
(key, f"last date {last_ts.date()} != expected {today_ts.date()}")
)
mode = macro_write_modes.get(key, "append")
if mode == "backfill":
# Target date should be present somewhere in the series.
index_norm = pd.DatetimeIndex(readback.index).normalize()
if target_ts_norm not in index_norm:
verification_failures.append(
(key, f"backfill target {target_ts_norm.date()} not in readback index "
f"(last={pd.Timestamp(readback.index[-1]).date()})")
)
else:
last_ts = pd.Timestamp(readback.index[-1]).normalize()
if last_ts != target_ts_norm:
verification_failures.append(
(key, f"last date {last_ts.date()} != expected {target_ts_norm.date()}")
)
if verification_failures:
raise RuntimeError(
f"Macro update verification failed for {date_str}: "
f"{verification_failures}. update() calls completed without "
f"exception but readback shows stale data. Investigate "
f"{verification_failures}. update()/write() calls completed without "
f"exception but readback shows the row is missing. Investigate "
f"ArcticDB commit / consistency semantics."
)

Expand Down
192 changes: 192 additions & 0 deletions tests/test_daily_append_backfill_safe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
"""Tests for _write_row_backfill_safe — the helper that handles both
append (target_date > all existing dates, fast lib.update() path) and
backfill (target_date in middle of series, lib.write() full rewrite).

Background: the 2026-04-24 historical VWAP repair (after the polygon
4/17→4/23 outage) surfaced that ArcticDB's update() raises
"index must be monotonic increasing or decreasing" when asked to insert
behind the latest stored date. daily_append was originally designed for
"append today's row at the head" only; this helper makes it usable for
arbitrary historical backfills too.
"""

from __future__ import annotations

from unittest.mock import MagicMock

import pandas as pd
import pytest

from builders.daily_append import _write_row_backfill_safe


def _series(dates: list[str], close_values: list[float] | None = None) -> pd.DataFrame:
"""Build a minimal stored-series DataFrame for the mock lib.read()."""
closes = close_values if close_values is not None else [100.0 + i for i in range(len(dates))]
return pd.DataFrame(
{"Close": closes, "Open": closes, "High": closes, "Low": closes,
"Volume": [1_000_000] * len(dates), "VWAP": [None] * len(dates)},
index=pd.DatetimeIndex(pd.to_datetime(dates)),
)


def _new_row(date: str, close: float = 999.0) -> pd.DataFrame:
return pd.DataFrame(
{"Close": [close], "Open": [close], "High": [close], "Low": [close],
"Volume": [1_000_000], "VWAP": [close * 0.99]},
index=pd.DatetimeIndex(pd.to_datetime([date])),
)


# ── append path (target > latest, fast) ────────────────────────────────────


def test_append_uses_lib_update_when_target_after_latest():
lib = MagicMock()
existing = _series(["2026-04-20", "2026-04-21", "2026-04-22"])
new_row = _new_row("2026-04-23")

mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing)

assert mode == "append"
lib.update.assert_called_once_with("AAPL", new_row)
lib.write.assert_not_called()


def test_append_when_existing_series_is_empty():
"""First write to a previously-empty symbol takes the append path."""
lib = MagicMock()
empty = pd.DataFrame(
columns=["Close", "Open", "High", "Low", "Volume", "VWAP"],
index=pd.DatetimeIndex([]),
)
new_row = _new_row("2026-04-23")

mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=empty)

assert mode == "append"
lib.update.assert_called_once()
lib.write.assert_not_called()


def test_first_write_to_nonexistent_symbol():
"""If lib.read raises, the symbol doesn't exist — first write must use write()."""
lib = MagicMock()
lib.read.side_effect = Exception("symbol not found")

new_row = _new_row("2026-04-23")
mode = _write_row_backfill_safe(lib, "NEW_TICKER", new_row)

assert mode == "append"
lib.write.assert_called_once()
lib.update.assert_not_called()


# ── backfill path (target ≤ latest, full rewrite) ──────────────────────────


def test_backfill_uses_lib_write_when_target_before_latest():
"""Inserting a row behind the latest must NOT call update() (would raise
'index must be monotonic increasing or decreasing'). Must use write()
with the spliced full series."""
lib = MagicMock()
existing = _series(["2026-04-20", "2026-04-21", "2026-04-22", "2026-04-23"])
# Target = 2026-04-17, BEFORE all existing dates
new_row = _new_row("2026-04-17", close=200.0)

mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing)

assert mode == "backfill"
lib.update.assert_not_called()
lib.write.assert_called_once()
# Verify the written frame includes both the new row + all existing rows,
# in monotonic-sorted order, with no duplicates.
written = lib.write.call_args.args[1]
assert pd.Timestamp("2026-04-17") in written.index
assert pd.Timestamp("2026-04-23") in written.index
assert written.index.is_monotonic_increasing
assert not written.index.has_duplicates


def test_backfill_replaces_existing_same_date_row():
"""If the target date already has a row in the existing series, the new
row must REPLACE it (matches update()'s same-date semantics)."""
lib = MagicMock()
existing = _series(
["2026-04-20", "2026-04-21", "2026-04-22", "2026-04-23"],
close_values=[100.0, 101.0, 102.0, 103.0],
)
new_row = _new_row("2026-04-21", close=999.0)

mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing)

assert mode == "backfill"
written = lib.write.call_args.args[1]
# 2026-04-21 should now be 999.0 (the new row), not 101.0 (the old row).
assert written.loc[pd.Timestamp("2026-04-21"), "Close"] == 999.0
# Other rows unchanged.
assert written.loc[pd.Timestamp("2026-04-20"), "Close"] == 100.0
assert written.loc[pd.Timestamp("2026-04-23"), "Close"] == 103.0


def test_backfill_target_in_middle_of_series():
"""Target = date in middle of existing series. Backfill mode + write."""
lib = MagicMock()
existing = _series(["2026-04-15", "2026-04-16", "2026-04-22", "2026-04-23"])
new_row = _new_row("2026-04-17", close=500.0)

mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing)

assert mode == "backfill"
written = lib.write.call_args.args[1]
assert pd.Timestamp("2026-04-17") in written.index
assert written.loc[pd.Timestamp("2026-04-17"), "Close"] == 500.0
assert len(written) == 5 # 4 existing + 1 new
assert written.index.is_monotonic_increasing


# ── boundary cases ────────────────────────────────────────────────────────


def test_target_equal_to_latest_takes_append_path():
"""target_ts == latest_ts is the steady-state daily case (re-running
today). Should use update() (which is idempotent for same-date)."""
lib = MagicMock()
existing = _series(["2026-04-21", "2026-04-22", "2026-04-23"])
new_row = _new_row("2026-04-23", close=200.0) # same date as latest

mode = _write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing)

# target_ts > latest is False (they're equal), so backfill path runs
# with read+splice+write. The condition is `target_ts > existing.index.max()`.
# Same-date should NOT take the append path because update() at the
# same-date IS valid, but the >-check is conservative for safety.
assert mode == "backfill"
lib.write.assert_called_once()


def test_lib_write_called_with_prune_previous_versions():
"""write() with prune_previous_versions=True keeps storage small —
backfill is rare enough that we don't need version history bloat."""
lib = MagicMock()
existing = _series(["2026-04-20", "2026-04-21"])
new_row = _new_row("2026-04-15")

_write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing)

call_kwargs = lib.write.call_args.kwargs
assert call_kwargs.get("prune_previous_versions") is True


def test_passing_existing_series_avoids_extra_lib_read():
"""When the caller passes existing_series (the per-ticker loop in
daily_append already reads `hist` for warmup), the helper must NOT
re-read — saves a round-trip per ticker on the steady-state daily run.
"""
lib = MagicMock()
existing = _series(["2026-04-21", "2026-04-22"])
new_row = _new_row("2026-04-23")

_write_row_backfill_safe(lib, "AAPL", new_row, existing_series=existing)

lib.read.assert_not_called()
Loading
Loading