Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,13 @@ def _load_ticker_parquet(path: Path) -> pd.DataFrame:
df.index = idx
if not df.index.is_monotonic_increasing:
df = df.sort_index()
# Deduplicate on date index. Mirrors inference's defensive handling
# at inference/stages/load_prices.py:403. 2026-04-15 smoke tests
# showed 904/909 tickers failing feature computation with
# "cannot reindex on an axis with duplicate labels" — ArcticDB reads
# are emitting same-date rows for essentially every ticker. Upstream
# fix belongs in alpha-engine-data's ArcticDB write path; this is
# the consistency fix that brings training into alignment with the
# inference path that already expects and handles the duplicates.
if df.index.has_duplicates:
n_before = len(df)
df = df[~df.index.duplicated(keep="last")]
log.warning(
"Deduplicated %d duplicate date rows in %s (kept last: %d → %d). "
"Upstream ArcticDB write is emitting duplicates — file against alpha-engine-data.",
n_before - len(df), path.name, n_before, len(df),
)
# ArcticDB writes are duplicate-free post the 2026-04-15 write-path
# fix (alpha-engine-data builders/daily_append.py → update() instead
# of append()). If duplicates ever re-appear here, the downstream
# ``compute_features`` reindex raises and surfaces the upstream
# regression — preferred over silently picking ``keep="last"``,
# which would mask a values-differ write-path bug as a clean read
# per feedback_no_silent_fails.
return df
except Exception as exc:
log.warning("Failed to load %s: %s", path.name, exc)
Expand Down
19 changes: 11 additions & 8 deletions inference/stages/load_prices.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,15 @@ def load_price_data_from_arctic(
* Per-ticker read error rate > 5% → ``PipelineAbort``.
* Individual ticker missing/empty → logged WARNING and dropped from output.

Defensive dedup
---------------
The 2026-04-15 write-path fix (builders/daily_append.py → ``update()`` instead
of ``append()``) prevents new duplicate-date rows, but historical rows may
still carry duplicates. Each frame is deduped on read with ``keep="last"``.
Remove after 1-2 clean Saturday cycles confirm upstream is clean.
Upstream invariant
------------------
ArcticDB writes are duplicate-free post the 2026-04-15 write-path fix
(builders/daily_append.py → ``update()`` instead of ``append()``).
Reads here do NOT defensively dedup — if duplicates ever re-appear,
pandas raises downstream at the first ``reindex`` callsite (in
``compute_features``), surfacing the upstream regression instead of
silently picking ``keep="last"`` (which would mask a values-differ
write-path regression as a clean read).
"""
end_ts = pd.Timestamp(date_str).normalize()
start_ts = end_ts - pd.Timedelta(days=lookback_days)
Expand All @@ -186,14 +189,14 @@ def _read_ohlcv(lib, sym: str) -> pd.DataFrame:
df = res.data
if df.empty:
return df
return df[~df.index.duplicated(keep="last")].sort_index()
return df.sort_index()

def _read_close(lib, sym: str) -> pd.DataFrame:
res = lib.read(sym, date_range=(start_ts, end_ts), columns=["Close"])
df = res.data
if df.empty:
return df
return df[~df.index.duplicated(keep="last")].sort_index()
return df.sort_index()

# ── Stocks: universe library only ────────────────────────────────────────
price_data: dict[str, pd.DataFrame] = {}
Expand Down
40 changes: 40 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for data/dataset.py — array building and normalization."""

import inspect
import json
import tempfile
from pathlib import Path
Expand Down Expand Up @@ -192,3 +193,42 @@ def test_returns_arrays(self):
mean, std = load_norm_stats(f.name)
assert isinstance(mean, np.ndarray)
assert isinstance(std, np.ndarray)


class TestDefensiveDedupRetiredL2188:
"""Pin L2188 — `keep="last"` defensive dedup retired post-soak.

The 2026-04-15 ArcticDB write-path fix (alpha-engine-data
builders/daily_append.py → `update()` over `append()`) made
duplicates structurally impossible. The defensive `keep="last"`
dedup on the read path was a transition device; per
feedback_no_silent_fails it must NOT be re-introduced — silent
dedup of values-differ duplicates would mask an upstream regression
as a clean read.

Sites guarded:
- data/dataset.py::_load_ticker_parquet (training read path)
- inference/stages/load_prices.py::_read_ohlcv + _read_close
(inference read path)
"""

def test_dataset_loader_does_not_dedup(self):
from data import dataset
src = inspect.getsource(dataset._load_ticker_parquet)
assert ".duplicated(" not in src, (
"data/dataset.py::_load_ticker_parquet must not silently "
"dedup on read — duplicates are structurally impossible "
"post the 2026-04-15 ArcticDB write-path fix. If duplicates "
"appear, surface them via pandas reindex raising downstream "
"rather than masking values-differ regressions per "
"feedback_no_silent_fails."
)

def test_inference_readers_do_not_dedup(self):
from inference.stages import load_prices
src = inspect.getsource(load_prices.load_price_data_from_arctic)
assert ".duplicated(" not in src, (
"inference/stages/load_prices.py::_read_ohlcv + _read_close "
"must not silently dedup on read — same rationale as the "
"dataset loader pin above."
)
Loading