Perf optimizations + polars 0.54 upgrade (with benchmarks)#1
Closed
ptournaris-ai wants to merge 4 commits into
Closed
Perf optimizations + polars 0.54 upgrade (with benchmarks)#1ptournaris-ai wants to merge 4 commits into
ptournaris-ai wants to merge 4 commits into
Conversation
…rop per-column Series clone
Replace the eager 'download whole object into Vec<u8> then ParquetReader::finish() then filter' path with LazyFrame::scan_parquet over s3://, pushing the configured filter and keep_num_of_records limit into the read so only matching row groups and needed columns are fetched and decoded. collect() runs in spawn_blocking to keep the async runtime free and avoid a nested-runtime panic. Credentials are resolved via the default aws-config provider chain (IRSA/IMDS/env/SSO) and handed to Polars, matching get_object auth.
Points dms-cdc-operator at the polars-0.54 fork branch via [patch.crates-io] until nikoshet/rust-dms-cdc-operator#41 is released (the DataframeOperator trait's DataFrame type pins our polars version). polars 0.54 API migration: - drop removed 'new_streaming'/'streaming' features, add 'regex' (contains_literal) - LazyFrame::scan_parquet now takes PlRefPath - DataFrame::get_columns() -> columns(); get_column_names_str() -> get_column_names() - StringChunked/Int32 iteration: .into_iter() -> .iter() - DataFrame::with_column takes Column, not Series (Column::from) - DataFrame::new(cols) -> DataFrame::new(height, cols) (tests) Verified: cargo check --workspace --all-targets clean; 14 transform-crate unit tests pass on 0.54. DB/S3 integration tests not run (need fixtures).
- benchmarks/microbench: criterion OLD-vs-NEW for the hot paths (sanitize, scan+filter, scan+slice, config cache) with equivalence tests gating timings - benchmarks/s3-integration: end-to-end scan_parquet test against a local Floci/LocalStack S3 endpoint (docker-compose included) - docs/BENCHMARKS.md: results table (~9.4x sanitize, ~4x record-reduction, ~4900x config) and how to reproduce - workspace 'exclude' so the standalone harnesses stay out of the build graph
Author
|
Superseded by upstream PR bluegroundltd#43. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Performance optimizations for the anonymization hot path, plus the polars
0.48 → 0.54upgrade, with a reproducible benchmark harness.Four commits, reviewable in order:
Performance changes
load_config_fordid a disk read + TOML parse on every Parquet file; now parsed once and shared as anArc(process-wide cache).sanitize_null_bytes— replaced the row-by-rowVec<Option<String>>rebuild with a single columnarwhen/then/otherwiseexpression.Seriesclone — the transform loop now moves the producedSeriesinto the DataFrame instead of cloning it.ParquetReader::finish()→ filter afterwards" withLazyFrame::scan_parquet(s3://…)that pushes the configured filter andkeep_num_of_recordslimit into the read, so only matching row groups/columns are fetched and decoded.collect()runs inspawn_blocking. Credentials are resolved via the default aws-config chain (IRSA/IMDS/env/SSO) and handed to polars, matchingget_objectauth.Measured (medians, 2M rows — see
docs/BENCHMARKS.md)sanitize_null_byteskeep_num_of_recordsEach benchmark compares the exact OLD vs NEW function body; equivalence tests assert NEW is byte-identical to OLD before timings are trusted. The S3 scan path is additionally validated end-to-end against a local Floci/LocalStack S3 endpoint (
benchmarks/s3-integration).polars 0.48 → 0.54
Required API migration (mechanical):
get_columns()→columns(),get_column_names_str()→get_column_names(),scan_parquetnow takesPlRefPath,ChunkedArray.into_iter()→.iter(),with_columntakesColumnnotSeries,DataFrame::new(height, cols),AnyValue::Decimal(value, precision, scale), dropnew_streaming/streamingfeatures (addregex), MSRV → 1.88.Verification
cargo check --workspace --all-targetsis clean on 0.54 (with submodules trimmed locally); 14 transform-crate unit tests and the microbench equivalence tests pass; the S3 integration test passes against Floci. DB/S3-backed integration tests that need the docker-compose fixtures were not run.