From 1f115a582cfd5c61eebd7df955717967f40531db Mon Sep 17 00:00:00 2001 From: Tibor Date: Tue, 19 May 2026 20:38:56 +0200 Subject: [PATCH] Use compiled batches in replay advancement --- benchmarks/engine_throughput.py | 8 +-- cpp/matching_engine_cpp.cpp | 53 ++++++++++++++++++++ docs/benchmarks.md | 19 ++++++-- docs/execution-engines.md | 4 +- src/ordersim/replay/simulator.py | 27 +++++++--- src/ordersim/sim/cpp_matching_engine.py | 29 +++++++++++ tests/test_cpp_execution_engine.py | 44 +++++++++++++++++ tests/test_replay.py | 65 +++++++++++++++++++++++++ 8 files changed, 230 insertions(+), 19 deletions(-) diff --git a/benchmarks/engine_throughput.py b/benchmarks/engine_throughput.py index 96ba813..f499bbf 100644 --- a/benchmarks/engine_throughput.py +++ b/benchmarks/engine_throughput.py @@ -36,7 +36,7 @@ def events_per_second(self) -> float: def run_scalar(engine: ExecutionEngine, events: Sequence[MBOEvent]) -> None: - """Apply one event at a time through the public scalar engine API.""" + """Apply one event at a time through the public compatibility API.""" for event in events: engine.apply_event(event) @@ -137,7 +137,7 @@ def main() -> None: return cpp_scalar = measure( - "CppMatchingEngine scalar", + "CppMatchingEngine per-event", lambda: run_scalar(CppMatchingEngine(tick_size=TICK_SIZE), events), event_count=len(events), repeats=args.repeats, @@ -158,8 +158,8 @@ def main() -> None: python_eps = results[0].events_per_second scalar_speedup = cpp_scalar.events_per_second / python_eps batch_speedup = cpp_batch.events_per_second / python_eps - print(f"scalar C++ speedup vs Python {scalar_speedup:>7.2f}x") - print(f"batch C++ speedup vs Python {batch_speedup:>7.2f}x") + print(f"per-event C++ speedup vs Python {scalar_speedup:>7.2f}x") + print(f"batch C++ speedup vs Python {batch_speedup:>7.2f}x") if __name__ == "__main__": diff --git a/cpp/matching_engine_cpp.cpp b/cpp/matching_engine_cpp.cpp index 008f97c..d5b3a79 100644 --- a/cpp/matching_engine_cpp.cpp +++ b/cpp/matching_engine_cpp.cpp @@ -17,6 +17,12 @@ struct BatchBuffers { const int64_t* order_id; }; +struct MarkRow { + int64_t ts_ns; + int64_t bid_ticks; + int64_t ask_ticks; +}; + template const T* checked_buffer( const py::buffer& buffer, @@ -133,6 +139,47 @@ std::vector apply_events_batch( return fills; } +py::tuple apply_events_batch_with_marks( + MatchingEngineCpp& engine, + const py::buffer& ts_ns, + const py::buffer& action, + const py::buffer& side, + const py::buffer& price_ticks, + const py::buffer& size, + const py::buffer& order_id +) { + const BatchBuffers batch = read_batch_buffers( + ts_ns, + action, + side, + price_ticks, + size, + order_id + ); + + std::vector fills; + std::vector marks; + + for (py::ssize_t i = 0; i < batch.n; ++i) { + const auto event_fills = engine.apply_event_code( + batch.ts_ns[i], + static_cast(batch.action[i]), + static_cast(batch.side[i]), + batch.price_ticks[i], + batch.size[i], + batch.order_id[i] + ); + fills.insert(fills.end(), event_fills.begin(), event_fills.end()); + + const auto [bid_ticks, ask_ticks] = engine.book_top(); + if (bid_ticks >= 0 && ask_ticks >= 0) { + marks.push_back(MarkRow{batch.ts_ns[i], bid_ticks, ask_ticks}); + } + } + + return py::make_tuple(fills, marks); +} + py::tuple apply_events_until_fill( MatchingEngineCpp& engine, const py::buffer& ts_ns, @@ -181,10 +228,16 @@ PYBIND11_MODULE(_matching_engine_cpp, module) { .def_readonly("size", &FillRow::size) .def_readonly("ts_ns", &FillRow::ts_ns); + py::class_(module, "MarkRow") + .def_readonly("ts_ns", &MarkRow::ts_ns) + .def_readonly("bid_ticks", &MarkRow::bid_ticks) + .def_readonly("ask_ticks", &MarkRow::ask_ticks); + py::class_(module, "MatchingEngineCpp") .def(py::init<>()) .def("apply_event", &MatchingEngineCpp::apply_event) .def("apply_events_batch", &apply_events_batch) + .def("apply_events_batch_with_marks", &apply_events_batch_with_marks) .def("apply_events_until_fill", &apply_events_until_fill) .def("place_limit", &MatchingEngineCpp::place_limit) .def("place_market", &MatchingEngineCpp::place_market) diff --git a/docs/benchmarks.md b/docs/benchmarks.md index 716b00c..a891139 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -32,9 +32,13 @@ This benchmark measures event ingestion through the execution engine itself: | Path | What it measures | |---|---| | `MatchingEngine` scalar | `apply_event(MBOEvent)` on the Python reference engine | -| `CppMatchingEngine` scalar | `apply_event(MBOEvent)` one event at a time | +| `CppMatchingEngine` per-event | `apply_event(MBOEvent)` one event at a time for compatibility checks | | `CppMatchingEngine` batch | `apply_events_batch(...)` over precompiled primitive columns | +The per-event C++ number is a diagnostic baseline, not the intended fast path. +The useful compiled path is batched: Python hands C++ a contiguous slice and C++ +advances internally. + The batch result excludes `CompiledEventColumns.from_events(...)` construction. That conversion is meant to happen once before repeated compiled-engine runs; including it would answer a different question. @@ -57,11 +61,15 @@ It compares the explicit Python reference engine with the default engine chosen by `Replay(...)`. Full replay is slower than direct engine ingestion because it also performs the work that makes `ordersim` inspectable: -- event-by-event replay advancement; - per-event valuation marks when both sides of the book exist; - fill-ledger and equity-curve assembly; - strategy-facing gateway calls. +When the default C++ engine is available, ordinary replay advances each requested +time slice through compiled columns and returns both fills and valuation marks. +The Python reference engine remains event-by-event because it is the readable +behavioral model. + ## Interpreting Results Direct engine throughput answers, "how quickly can the engine consume already @@ -78,9 +86,10 @@ keeps the future boundary-batched path from rebuilding columns inside ## What This Exposes -The intended next performance step is boundary-batched replay. In that design, -the compiled engine can advance independently through market-data events until -the next point where Python must observe or decide: +The compiled replay path already advances requested time slices internally while +preserving fills and valuation marks. The next performance step is stricter +boundary-batched replay. In that design, the compiled engine can advance +independently until the next point where Python must observe or decide: - the strategy asks to advance only up to a timestamp; - a passive fill occurs and strategy logic may need to react; diff --git a/docs/execution-engines.md b/docs/execution-engines.md index b367dac..3727b0e 100644 --- a/docs/execution-engines.md +++ b/docs/execution-engines.md @@ -88,8 +88,8 @@ public API remains exact-`Decimal`. For callers that already hold normalized events in memory, the wrapper also exposes a compiled batch-ingest path. It accepts primitive columns derived from the same `MBOEvent` schema and returns passive fills without changing the public -matching semantics. Ordinary `Replay(...)` still applies one event at a time so -it can record the per-event valuation marks that build the default equity curve. +matching semantics. Ordinary `Replay(...)` uses the compiled batch path when it +can also receive the valuation marks needed to build the default equity curve. `Replay(...)` precompiles its immutable event stream once and shares that column view with each strategy run, so future compiled replay paths do not need to rebuild primitive columns inside `run_many(...)`. diff --git a/src/ordersim/replay/simulator.py b/src/ordersim/replay/simulator.py index c79b4eb..08e2184 100644 --- a/src/ordersim/replay/simulator.py +++ b/src/ordersim/replay/simulator.py @@ -1,5 +1,6 @@ """Small replay runner built around the reference matching engine.""" +from bisect import bisect_right from collections.abc import Callable, MutableSequence from dataclasses import dataclass from typing import Any @@ -99,6 +100,7 @@ def _init( compiled_events: CompiledEventColumns | None = None, ) -> None: self._events = events + self._event_timestamps = tuple(event.ts_ns for event in events) self._compiled_events = compiled_events self._engine = engine or python_execution_engine_factory() self._latency_model = latency_model or default_latency_model_factory() @@ -126,14 +128,10 @@ def advance_to(self, ts_ns: int) -> list[Fill]: raise ValueError("cannot move replay time backwards") fills: list[Fill] = [] - while ( - self._cursor < len(self._events) - and self._events[self._cursor].ts_ns <= ts_ns - ): - event = self._events[self._cursor] - fills.extend(self._engine.apply_event(event)) - self._cursor += 1 - self._record_valuation_mark(event.ts_ns) + stop = bisect_right(self._event_timestamps, ts_ns, lo=self._cursor) + if stop > self._cursor: + fills.extend(self._advance_events(self._cursor, stop)) + self._cursor = stop self._now_ns = ts_ns self._engine.advance_time(ts_ns) @@ -204,6 +202,19 @@ def _advance_to_venue_receipt(self) -> None: sample = self._latency_model.sample(self._now_ns) self.advance_to(self._now_ns + sample.entry_ns) + def _advance_events(self, start: int, stop: int) -> list[Fill]: + apply_compiled = getattr(self._engine, "apply_events_batch_with_marks", None) + if self._compiled_events is not None and apply_compiled is not None: + fills, marks = apply_compiled(self._compiled_events.slice(start, stop)) + self._valuation_marks.extend(marks) + return list(fills) + + fills: list[Fill] = [] + for event in self._events[start:stop]: + fills.extend(self._engine.apply_event(event)) + self._record_valuation_mark(event.ts_ns) + return fills + def _record_valuation_mark(self, ts_ns: int) -> None: bid, ask = self._engine.book_top() if bid is None or ask is None: diff --git a/src/ordersim/sim/cpp_matching_engine.py b/src/ordersim/sim/cpp_matching_engine.py index cbbf390..6e1f547 100644 --- a/src/ordersim/sim/cpp_matching_engine.py +++ b/src/ordersim/sim/cpp_matching_engine.py @@ -3,6 +3,7 @@ from decimal import Decimal from typing import Any +from ordersim.economics import ValuationMark from ordersim.replay.compiled_events import CompiledEventSlice from ordersim.sim.matching_engine import PriceLevel from ordersim.types import ( @@ -53,6 +54,24 @@ def apply_events_batch( ) return [self._fill_from_row(row) for row in rows] + def apply_events_batch_with_marks( + self, + events: CompiledEventSlice, + ) -> tuple[list[Fill], list[ValuationMark]]: + """Apply one compiled event slice and return fills plus midpoint marks.""" + + fill_rows, mark_rows = self._core.apply_events_batch_with_marks( + events.ts_ns, + events.action, + events.side, + events.price_ticks, + events.size, + events.order_id, + ) + fills = [self._fill_from_row(row) for row in fill_rows] + marks = [self._valuation_mark_from_row(row) for row in mark_rows] + return fills, marks + def apply_events_until_fill( self, events: CompiledEventSlice, @@ -158,6 +177,16 @@ def _fill_from_row(self, row: Any) -> Fill: ts_ns=row.ts_ns, ) + def _valuation_mark_from_row(self, row: Any) -> ValuationMark: + return ValuationMark( + ts_ns=row.ts_ns, + price=( + self._ticks_to_price(row.bid_ticks) + + self._ticks_to_price(row.ask_ticks) + ) + / 2, + ) + def cpp_execution_engine_available() -> bool: """Return whether the optional compiled extension can be imported.""" diff --git a/tests/test_cpp_execution_engine.py b/tests/test_cpp_execution_engine.py index 36b90ec..f5532c8 100644 --- a/tests/test_cpp_execution_engine.py +++ b/tests/test_cpp_execution_engine.py @@ -131,6 +131,50 @@ def test_cpp_execution_engine_applies_compiled_event_batches() -> None: ] +def test_cpp_execution_engine_applies_compiled_batches_with_marks() -> None: + events = ( + MBOEvent( + ts_ns=1, + action="add", + side="bid", + price=Decimal("100.0"), + size=1, + order_id=1, + ), + MBOEvent( + ts_ns=2, + action="add", + side="ask", + price=Decimal("101.0"), + size=1, + order_id=2, + ), + MBOEvent( + ts_ns=3, + action="trade", + side="bid", + price=Decimal("100.0"), + size=2, + order_id=3, + ), + ) + columns = CompiledEventColumns.from_events(events, tick_size=Decimal("0.10")) + engine = CppMatchingEngine(tick_size=Decimal("0.10")) + + resting = engine.place_limit(side="buy", price=Decimal("100.0"), size=1) + fills, marks = engine.apply_events_batch_with_marks( + columns.slice(0, len(events)) + ) + + assert resting.order_id is not None + assert [(fill.order_id, fill.price, fill.ts_ns) for fill in fills] == [ + (resting.order_id, Decimal("100.00"), 3), + ] + assert [(mark.ts_ns, mark.price) for mark in marks] == [ + (2, Decimal("100.50")), + ] + + def test_cpp_execution_engine_stops_compiled_batch_at_passive_fill() -> None: events = ( MBOEvent( diff --git a/tests/test_replay.py b/tests/test_replay.py index d35bf56..0a0d416 100644 --- a/tests/test_replay.py +++ b/tests/test_replay.py @@ -12,6 +12,7 @@ Replay, ReplayGateway, RestingOrder, + ValuationMark, ) from ordersim.fixtures.synthetic import SyntheticSource from ordersim.replay import simulator as replay_simulator @@ -121,6 +122,70 @@ def spy_from_events(cls, events, *, tick_size): assert calls == 1 +def test_replay_uses_compiled_batch_path_when_engine_supports_it() -> None: + class BatchEngine: + def __init__(self) -> None: + self.batch_calls = 0 + self.last_advanced_to = 0 + + def apply_events_batch_with_marks(self, events): + self.batch_calls += 1 + return [], [ + ValuationMark(ts_ns=int(ts_ns), price=Decimal("100.5")) + for ts_ns in events.ts_ns + ] + + def apply_event(self, event): + raise AssertionError("scalar event path should not be used") + + def advance_time(self, ts_ns: int) -> None: + self.last_advanced_to = ts_ns + + def place_limit(self, side, price, size, tif="GTC"): + raise AssertionError("order placement is not used in this test") + + def place_market(self, side, size): + raise AssertionError("order placement is not used in this test") + + def cancel(self, order_id): + raise AssertionError("order cancellation is not used in this test") + + def book_top(self): + return Decimal("100.0"), Decimal("101.0") + + def book_depth(self, levels): + return (), () + + def position(self): + return 0 + + def own_orders(self): + return () + + created: list[BatchEngine] = [] + + def factory() -> BatchEngine: + engine = BatchEngine() + created.append(engine) + return engine + + replay = Replay( + data=SyntheticSource.small_mbo(), + instrument=gc_spec(), + execution_engine_factory=factory, + ) + + def strategy(gateway) -> None: + gateway.advance_to(1_000_000_200) + + result = replay.run(strategy) + + assert len(created) == 1 + assert created[0].batch_calls == 1 + assert created[0].last_advanced_to == 1_000_000_200 + assert result.equity_curve[-1].mark_price == Decimal("100.5") + + def test_replay_gateway_exposes_book_depth() -> None: replay = Replay(data=SyntheticSource.small_mbo(), instrument=gc_spec())