diff --git a/benchmarks/engine_throughput.py b/benchmarks/engine_throughput.py index f499bbf..7860e4b 100644 --- a/benchmarks/engine_throughput.py +++ b/benchmarks/engine_throughput.py @@ -48,6 +48,15 @@ def run_batch(engine: CppMatchingEngine, columns: CompiledEventColumns) -> None: engine.apply_events_batch(columns.slice(0, len(columns.ts_ns))) +def run_batch_with_marks( + engine: CppMatchingEngine, + columns: CompiledEventColumns, +) -> None: + """Apply one compiled event slice and return compact valuation marks.""" + + engine.apply_events_batch_with_marks(columns.slice(0, len(columns.ts_ns))) + + def measure( path_name: str, runner: Callable[[], None], @@ -150,7 +159,17 @@ def main() -> None: repeats=args.repeats, warmups=args.warmups, ) - results.extend((cpp_scalar, cpp_batch)) + cpp_batch_marks = measure( + "CppMatchingEngine batch+marks", + lambda: run_batch_with_marks( + CppMatchingEngine(tick_size=TICK_SIZE), + columns, + ), + event_count=len(events), + repeats=args.repeats, + warmups=args.warmups, + ) + results.extend((cpp_scalar, cpp_batch, cpp_batch_marks)) for result in results[1:]: print(format_result(result)) @@ -158,8 +177,10 @@ def main() -> None: python_eps = results[0].events_per_second scalar_speedup = cpp_scalar.events_per_second / python_eps batch_speedup = cpp_batch.events_per_second / python_eps + batch_marks_speedup = cpp_batch_marks.events_per_second / python_eps print(f"per-event C++ speedup vs Python {scalar_speedup:>7.2f}x") print(f"batch C++ speedup vs Python {batch_speedup:>7.2f}x") + print(f"batch+marks speedup vs Python {batch_marks_speedup:>7.2f}x") if __name__ == "__main__": diff --git a/cpp/matching_engine_cpp.cpp b/cpp/matching_engine_cpp.cpp index d5b3a79..703d72e 100644 --- a/cpp/matching_engine_cpp.cpp +++ b/cpp/matching_engine_cpp.cpp @@ -17,12 +17,6 @@ struct BatchBuffers { const int64_t* order_id; }; -struct MarkRow { - int64_t ts_ns; - int64_t bid_ticks; - int64_t ask_ticks; -}; - template const T* checked_buffer( const py::buffer& buffer, @@ -104,6 +98,16 @@ BatchBuffers read_batch_buffers( }; } +py::bytes int64_bytes(const std::vector& values) { + if (values.empty()) { + return py::bytes(); + } + return py::bytes( + reinterpret_cast(values.data()), + values.size() * sizeof(int64_t) + ); +} + std::vector apply_events_batch( MatchingEngineCpp& engine, const py::buffer& ts_ns, @@ -158,7 +162,8 @@ py::tuple apply_events_batch_with_marks( ); std::vector fills; - std::vector marks; + std::vector mark_ts_ns; + std::vector mark_mid_ticks_x2; for (py::ssize_t i = 0; i < batch.n; ++i) { const auto event_fills = engine.apply_event_code( @@ -173,11 +178,16 @@ py::tuple apply_events_batch_with_marks( const auto [bid_ticks, ask_ticks] = engine.book_top(); if (bid_ticks >= 0 && ask_ticks >= 0) { - marks.push_back(MarkRow{batch.ts_ns[i], bid_ticks, ask_ticks}); + mark_ts_ns.push_back(batch.ts_ns[i]); + mark_mid_ticks_x2.push_back(bid_ticks + ask_ticks); } } - return py::make_tuple(fills, marks); + return py::make_tuple( + fills, + int64_bytes(mark_ts_ns), + int64_bytes(mark_mid_ticks_x2) + ); } py::tuple apply_events_until_fill( @@ -228,11 +238,6 @@ PYBIND11_MODULE(_matching_engine_cpp, module) { .def_readonly("size", &FillRow::size) .def_readonly("ts_ns", &FillRow::ts_ns); - py::class_(module, "MarkRow") - .def_readonly("ts_ns", &MarkRow::ts_ns) - .def_readonly("bid_ticks", &MarkRow::bid_ticks) - .def_readonly("ask_ticks", &MarkRow::ask_ticks); - py::class_(module, "MatchingEngineCpp") .def(py::init<>()) .def("apply_event", &MatchingEngineCpp::apply_event) diff --git a/docs/benchmarks.md b/docs/benchmarks.md index a891139..0560e4d 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -34,10 +34,13 @@ This benchmark measures event ingestion through the execution engine itself: | `MatchingEngine` scalar | `apply_event(MBOEvent)` on the Python reference engine | | `CppMatchingEngine` per-event | `apply_event(MBOEvent)` one event at a time for compatibility checks | | `CppMatchingEngine` batch | `apply_events_batch(...)` over precompiled primitive columns | +| `CppMatchingEngine` batch+marks | `apply_events_batch_with_marks(...)`, returning fills plus compact valuation-mark columns | The per-event C++ number is a diagnostic baseline, not the intended fast path. The useful compiled path is batched: Python hands C++ a contiguous slice and C++ -advances internally. +advances internally. The `batch+marks` path is the closest engine-level +measurement to ordinary audited replay because replay needs valuation marks for +the equity curve. The batch result excludes `CompiledEventColumns.from_events(...)` construction. That conversion is meant to happen once before repeated compiled-engine runs; @@ -67,8 +70,10 @@ also performs the work that makes `ordersim` inspectable: When the default C++ engine is available, ordinary replay advances each requested time slice through compiled columns and returns both fills and valuation marks. -The Python reference engine remains event-by-event because it is the readable -behavioral model. +Those valuation marks are carried back as compact integer columns +(`ts_ns[]`, `mid_ticks_x2[]`) and converted to public `Decimal` prices only when +the equity curve is built. The Python reference engine remains event-by-event +because it is the readable behavioral model. ## Interpreting Results diff --git a/docs/economics.md b/docs/economics.md index 9547219..5a635d5 100644 --- a/docs/economics.md +++ b/docs/economics.md @@ -75,6 +75,12 @@ Replay builds `equity_curve` from observed fills and midpoint valuation marks. A midpoint mark is recorded when both bid and ask are available after replay applies a book event or order action. +The C++ replay path transports those marks as compact integer columns and keeps +midpoints as `bid_ticks + ask_ticks` until equity construction. Public +`EquityPoint` values still expose `Decimal` prices; the compact representation +only avoids creating an intermediate Python `ValuationMark` object for every +market-data event. + Replay only marks times it actually advances through. Full-session intraday drawdown therefore requires the strategy or harness to advance through the session window being studied, or to call `build_equity_curve(...)` directly with diff --git a/docs/execution-engines.md b/docs/execution-engines.md index 3727b0e..0f6c1e0 100644 --- a/docs/execution-engines.md +++ b/docs/execution-engines.md @@ -90,6 +90,8 @@ exposes a compiled batch-ingest path. It accepts primitive columns derived from the same `MBOEvent` schema and returns passive fills without changing the public matching semantics. Ordinary `Replay(...)` uses the compiled batch path when it can also receive the valuation marks needed to build the default equity curve. +Those marks are transported as compact timestamp and midpoint-tick columns, not +one Python object per market-data event. `Replay(...)` precompiles its immutable event stream once and shares that column view with each strategy run, so future compiled replay paths do not need to rebuild primitive columns inside `run_many(...)`. diff --git a/docs/schema.md b/docs/schema.md index d11d119..960c91a 100644 --- a/docs/schema.md +++ b/docs/schema.md @@ -109,6 +109,16 @@ See `docs/economics.md` for the assumptions and explicit non-goals. | `ts_ns` | `int` | Mark timestamp as UTC Unix-epoch nanoseconds. | | `price` | `Decimal` | Price used for open-lot valuation. | +`CompiledValuationMarks` is the compact internal form used by the C++ replay +path. It stores mark timestamps and midpoint prices as primitive integer +columns: + +| Field | Type | Meaning | +|---|---|---| +| `ts_ns` | `memoryview[int64]` | Mark timestamps as UTC Unix-epoch nanoseconds. | +| `mid_ticks_x2` | `memoryview[int64]` | `bid_ticks + ask_ticks`, preserving half-tick midpoints exactly. | +| `tick_size` | `Decimal` | Price multiplier used when public `Decimal` prices are built. | + `EquityPoint` is one output row from a mark-to-market equity curve. | Field | Type | Meaning | diff --git a/src/ordersim/__init__.py b/src/ordersim/__init__.py index d709a50..a397de6 100644 --- a/src/ordersim/__init__.py +++ b/src/ordersim/__init__.py @@ -11,6 +11,7 @@ write_parquet, ) from ordersim.economics import ( + CompiledValuationMarks, EquityPoint, ExecutionSummary, PositionLot, @@ -68,6 +69,7 @@ "BookSide", "BoundaryAdvance", "CompiledEventColumns", + "CompiledValuationMarks", "ConstantLatency", "CppMatchingEngine", "CsvSource", diff --git a/src/ordersim/economics.py b/src/ordersim/economics.py index e1a5675..7b34569 100644 --- a/src/ordersim/economics.py +++ b/src/ordersim/economics.py @@ -1,7 +1,9 @@ """Execution economics computed directly from fills.""" +from collections.abc import Iterable from dataclasses import dataclass from decimal import Decimal +from typing import TypeAlias from ordersim.specs import InstrumentSpec from ordersim.types import Fill, Price, Side @@ -43,6 +45,38 @@ class ValuationMark: price: Price +@dataclass(frozen=True, slots=True) +class CompiledValuationMarks: + """Compact valuation marks stored as timestamp and midpoint tick columns. + + `mid_ticks_x2` stores `bid_ticks + ask_ticks`, so half-tick midpoints stay + exact until the public `Decimal` equity curve is built. + """ + + ts_ns: memoryview + mid_ticks_x2: memoryview + tick_size: Decimal + + @classmethod + def from_bytes( + cls, + *, + ts_ns: bytes, + mid_ticks_x2: bytes, + tick_size: Decimal, + ) -> "CompiledValuationMarks": + """Build compact marks from native int64 byte columns.""" + + timestamps = memoryview(ts_ns).cast("q") + mids = memoryview(mid_ticks_x2).cast("q") + if len(timestamps) != len(mids): + raise ValueError("valuation mark columns must have equal length") + return cls(ts_ns=timestamps, mid_ticks_x2=mids, tick_size=tick_size) + + def __len__(self) -> int: + return len(self.ts_ns) + + @dataclass(frozen=True, slots=True) class EquityPoint: """One point on a mark-to-market equity curve.""" @@ -56,6 +90,13 @@ class EquityPoint: drawdown: Decimal +ValuationMarkInput: TypeAlias = ( + tuple[ValuationMark | CompiledValuationMarks, ...] + | list[ValuationMark | CompiledValuationMarks] + | CompiledValuationMarks +) + + def summarize_fills( fills: tuple[Fill, ...] | list[Fill], instrument: InstrumentSpec, @@ -102,7 +143,7 @@ def summarize_fills( def build_equity_curve( fills: tuple[Fill, ...] | list[Fill], - marks: tuple[ValuationMark, ...] | list[ValuationMark], + marks: ValuationMarkInput, instrument: InstrumentSpec, ) -> tuple[EquityPoint, ...]: """Build a mark-to-market equity curve from fills and valuation marks. @@ -112,7 +153,71 @@ def build_equity_curve( """ sorted_fills = tuple(sorted(fills, key=lambda fill: fill.ts_ns)) - sorted_marks = tuple(sorted(marks, key=lambda mark: mark.ts_ns)) + if isinstance(marks, CompiledValuationMarks): + return _build_equity_curve_from_compiled_marks( + sorted_fills, + marks, + instrument, + ) + + sorted_marks = tuple(sorted(_iter_mark_pairs(marks), key=lambda mark: mark[0])) + open_lots: list[PositionLot] = [] + realized_pnl = Decimal("0") + commission = Decimal("0") + high_water_mark = Decimal("0") + points: list[EquityPoint] = [] + fill_index = 0 + + for mark_ts_ns, mark_price in sorted_marks: + while ( + fill_index < len(sorted_fills) + and sorted_fills[fill_index].ts_ns <= mark_ts_ns + ): + fill = sorted_fills[fill_index] + realized_pnl += _apply_fill_to_lots(open_lots, fill, instrument) + commission += instrument.commission_per_contract * fill.size + fill_index += 1 + + unrealized_pnl = _unrealized_pnl(open_lots, mark_price, instrument) + equity = realized_pnl + unrealized_pnl - commission + high_water_mark = max(high_water_mark, equity) + points.append( + EquityPoint( + ts_ns=mark_ts_ns, + mark_price=mark_price, + realized_pnl=realized_pnl, + unrealized_pnl=unrealized_pnl, + commission=commission, + equity=equity, + drawdown=high_water_mark - equity, + ) + ) + + return tuple(points) + + +def _iter_mark_pairs( + marks: ValuationMarkInput, +) -> Iterable[tuple[int, Price]]: + for mark in marks: + if isinstance(mark, CompiledValuationMarks): + yield from _iter_compiled_mark_pairs(mark) + else: + yield mark.ts_ns, mark.price + + +def _iter_compiled_mark_pairs( + marks: CompiledValuationMarks, +) -> Iterable[tuple[int, Price]]: + for ts_ns, mid_ticks_x2 in zip(marks.ts_ns, marks.mid_ticks_x2, strict=True): + yield ts_ns, marks.tick_size * Decimal(mid_ticks_x2) / 2 + + +def _build_equity_curve_from_compiled_marks( + sorted_fills: tuple[Fill, ...], + marks: CompiledValuationMarks, + instrument: InstrumentSpec, +) -> tuple[EquityPoint, ...]: open_lots: list[PositionLot] = [] realized_pnl = Decimal("0") commission = Decimal("0") @@ -120,23 +225,24 @@ def build_equity_curve( points: list[EquityPoint] = [] fill_index = 0 - for mark in sorted_marks: + for ts_ns, mid_ticks_x2 in zip(marks.ts_ns, marks.mid_ticks_x2, strict=True): while ( fill_index < len(sorted_fills) - and sorted_fills[fill_index].ts_ns <= mark.ts_ns + and sorted_fills[fill_index].ts_ns <= ts_ns ): fill = sorted_fills[fill_index] realized_pnl += _apply_fill_to_lots(open_lots, fill, instrument) commission += instrument.commission_per_contract * fill.size fill_index += 1 - unrealized_pnl = _unrealized_pnl(open_lots, mark.price, instrument) + mark_price = marks.tick_size * Decimal(mid_ticks_x2) / 2 + unrealized_pnl = _unrealized_pnl(open_lots, mark_price, instrument) equity = realized_pnl + unrealized_pnl - commission high_water_mark = max(high_water_mark, equity) points.append( EquityPoint( - ts_ns=mark.ts_ns, - mark_price=mark.price, + ts_ns=ts_ns, + mark_price=mark_price, realized_pnl=realized_pnl, unrealized_pnl=unrealized_pnl, commission=commission, diff --git a/src/ordersim/replay/simulator.py b/src/ordersim/replay/simulator.py index 08e2184..0351d3f 100644 --- a/src/ordersim/replay/simulator.py +++ b/src/ordersim/replay/simulator.py @@ -7,9 +7,11 @@ from ordersim.connectors import EventInput, normalize_events from ordersim.economics import ( + CompiledValuationMarks, EquityPoint, ExecutionSummary, ValuationMark, + ValuationMarkInput, build_equity_curve, summarize_fills, ) @@ -107,7 +109,7 @@ def _init( self._cursor = 0 self._now_ns = 0 self._fills: list[Fill] = [] - self._valuation_marks: list[ValuationMark] = [] + self._valuation_marks: list[ValuationMark | CompiledValuationMarks] = [] @property def fills(self) -> tuple[Fill, ...]: @@ -116,7 +118,7 @@ def fills(self) -> tuple[Fill, ...]: return tuple(self._fills) @property - def valuation_marks(self) -> tuple[ValuationMark, ...]: + def valuation_marks(self) -> ValuationMarkInput: """Midpoint valuation marks observed during replay.""" return tuple(self._valuation_marks) @@ -206,7 +208,8 @@ def _advance_events(self, start: int, stop: int) -> list[Fill]: apply_compiled = getattr(self._engine, "apply_events_batch_with_marks", None) if self._compiled_events is not None and apply_compiled is not None: fills, marks = apply_compiled(self._compiled_events.slice(start, stop)) - self._valuation_marks.extend(marks) + if len(marks) > 0: + self._valuation_marks.append(marks) return list(fills) fills: list[Fill] = [] diff --git a/src/ordersim/sim/cpp_matching_engine.py b/src/ordersim/sim/cpp_matching_engine.py index 6e1f547..c69b098 100644 --- a/src/ordersim/sim/cpp_matching_engine.py +++ b/src/ordersim/sim/cpp_matching_engine.py @@ -3,7 +3,7 @@ from decimal import Decimal from typing import Any -from ordersim.economics import ValuationMark +from ordersim.economics import CompiledValuationMarks from ordersim.replay.compiled_events import CompiledEventSlice from ordersim.sim.matching_engine import PriceLevel from ordersim.types import ( @@ -57,20 +57,26 @@ def apply_events_batch( def apply_events_batch_with_marks( self, events: CompiledEventSlice, - ) -> tuple[list[Fill], list[ValuationMark]]: + ) -> tuple[list[Fill], CompiledValuationMarks]: """Apply one compiled event slice and return fills plus midpoint marks.""" - fill_rows, mark_rows = self._core.apply_events_batch_with_marks( - events.ts_ns, - events.action, - events.side, - events.price_ticks, - events.size, - events.order_id, + fill_rows, mark_ts_ns, mark_mid_ticks_x2 = ( + self._core.apply_events_batch_with_marks( + events.ts_ns, + events.action, + events.side, + events.price_ticks, + events.size, + events.order_id, + ) + ) + return [self._fill_from_row(row) for row in fill_rows], ( + CompiledValuationMarks.from_bytes( + ts_ns=mark_ts_ns, + mid_ticks_x2=mark_mid_ticks_x2, + tick_size=self._tick_size, + ) ) - fills = [self._fill_from_row(row) for row in fill_rows] - marks = [self._valuation_mark_from_row(row) for row in mark_rows] - return fills, marks def apply_events_until_fill( self, @@ -177,16 +183,6 @@ def _fill_from_row(self, row: Any) -> Fill: ts_ns=row.ts_ns, ) - def _valuation_mark_from_row(self, row: Any) -> ValuationMark: - return ValuationMark( - ts_ns=row.ts_ns, - price=( - self._ticks_to_price(row.bid_ticks) - + self._ticks_to_price(row.ask_ticks) - ) - / 2, - ) - def cpp_execution_engine_available() -> bool: """Return whether the optional compiled extension can be imported.""" diff --git a/tests/test_benchmarks.py b/tests/test_benchmarks.py index 055ad5f..ba9b5e5 100644 --- a/tests/test_benchmarks.py +++ b/tests/test_benchmarks.py @@ -2,9 +2,15 @@ import pytest +from benchmarks.engine_throughput import run_batch_with_marks from benchmarks.replay_throughput import advance_to_end, gc_spec, run_replay from benchmarks.workloads import build_mixed_mbo_workload -from ordersim import MatchingEngine +from ordersim import ( + CompiledEventColumns, + CppMatchingEngine, + MatchingEngine, + cpp_execution_engine_available, +) def test_mixed_benchmark_workload_is_balanced() -> None: @@ -32,3 +38,17 @@ def test_replay_benchmark_uses_a_complete_replay_run() -> None: run_replay(events, execution_engine_factory=MatchingEngine) assert strategy_name == "strategy" + + +@pytest.mark.skipif( + not cpp_execution_engine_available(), + reason="optional C++ execution engine is not built", +) +def test_engine_benchmark_batch_with_marks_runs() -> None: + events = build_mixed_mbo_workload(cycles=1) + columns = CompiledEventColumns.from_events( + events, + tick_size=Decimal("0.10"), + ) + + run_batch_with_marks(CppMatchingEngine(tick_size=Decimal("0.10")), columns) diff --git a/tests/test_cpp_execution_engine.py b/tests/test_cpp_execution_engine.py index f5532c8..218cc3f 100644 --- a/tests/test_cpp_execution_engine.py +++ b/tests/test_cpp_execution_engine.py @@ -170,9 +170,9 @@ def test_cpp_execution_engine_applies_compiled_batches_with_marks() -> None: assert [(fill.order_id, fill.price, fill.ts_ns) for fill in fills] == [ (resting.order_id, Decimal("100.00"), 3), ] - assert [(mark.ts_ns, mark.price) for mark in marks] == [ - (2, Decimal("100.50")), - ] + assert list(marks.ts_ns) == [2] + assert list(marks.mid_ticks_x2) == [2010] + assert marks.tick_size == Decimal("0.10") def test_cpp_execution_engine_stops_compiled_batch_at_passive_fill() -> None: diff --git a/tests/test_economics.py b/tests/test_economics.py index a50d735..13590b4 100644 --- a/tests/test_economics.py +++ b/tests/test_economics.py @@ -1,6 +1,9 @@ from decimal import Decimal +import pytest + from ordersim import ( + CompiledValuationMarks, EquityPoint, Fill, InstrumentSpec, @@ -11,6 +14,12 @@ ) +def int64_bytes(values: tuple[int, ...]) -> bytes: + from array import array + + return array("q", values).tobytes() + + def gc_spec() -> InstrumentSpec: return InstrumentSpec( symbol="GC", @@ -120,3 +129,51 @@ def test_build_equity_curve_returns_no_points_without_marks() -> None: ) assert build_equity_curve(fills, (), gc_spec()) == () + + +def test_build_equity_curve_accepts_compact_valuation_marks() -> None: + fills = ( + Fill(order_id=1, side="buy", price=Decimal("100.0"), size=1, ts_ns=1), + Fill(order_id=2, side="sell", price=Decimal("100.0"), size=1, ts_ns=4), + ) + marks = CompiledValuationMarks.from_bytes( + ts_ns=int64_bytes((2, 3, 4)), + mid_ticks_x2=int64_bytes((2020, 1980, 2000)), + tick_size=Decimal("0.10"), + ) + + curve = build_equity_curve(fills, marks, gc_spec()) + + assert [point.mark_price for point in curve] == [ + Decimal("101.0"), + Decimal("99.0"), + Decimal("100.0"), + ] + assert curve[-1].equity == Decimal("-5.00") + + +def test_build_equity_curve_accepts_mixed_public_and_compact_marks() -> None: + marks = [ + ValuationMark(ts_ns=1, price=Decimal("100.0")), + CompiledValuationMarks.from_bytes( + ts_ns=int64_bytes((2,)), + mid_ticks_x2=int64_bytes((2010,)), + tick_size=Decimal("0.10"), + ), + ] + + curve = build_equity_curve((), marks, gc_spec()) + + assert [(point.ts_ns, point.mark_price) for point in curve] == [ + (1, Decimal("100.0")), + (2, Decimal("100.5")), + ] + + +def test_compact_valuation_marks_reject_mismatched_columns() -> None: + with pytest.raises(ValueError, match="equal length"): + CompiledValuationMarks.from_bytes( + ts_ns=int64_bytes((1, 2)), + mid_ticks_x2=int64_bytes((2000,)), + tick_size=Decimal("0.10"), + ) diff --git a/tests/test_replay.py b/tests/test_replay.py index 0a0d416..f8ea5d5 100644 --- a/tests/test_replay.py +++ b/tests/test_replay.py @@ -3,6 +3,7 @@ import pytest from ordersim import ( + CompiledValuationMarks, ConstantLatency, EmpiricalPlayback, InstrumentSpec, @@ -12,13 +13,18 @@ Replay, ReplayGateway, RestingOrder, - ValuationMark, ) from ordersim.fixtures.synthetic import SyntheticSource from ordersim.replay import simulator as replay_simulator from ordersim.types import OrderEvent +def int64_bytes(values: tuple[int, ...]) -> bytes: + from array import array + + return array("q", values).tobytes() + + def gc_spec() -> InstrumentSpec: return InstrumentSpec( symbol="GC", @@ -130,10 +136,12 @@ def __init__(self) -> None: def apply_events_batch_with_marks(self, events): self.batch_calls += 1 - return [], [ - ValuationMark(ts_ns=int(ts_ns), price=Decimal("100.5")) - for ts_ns in events.ts_ns - ] + mark_count = len(events.ts_ns) + return [], CompiledValuationMarks.from_bytes( + ts_ns=int64_bytes(tuple(int(ts_ns) for ts_ns in events.ts_ns)), + mid_ticks_x2=int64_bytes((2010,) * mark_count), + tick_size=Decimal("0.10"), + ) def apply_event(self, event): raise AssertionError("scalar event path should not be used")