feat(cubestore): dictionary group keys & group-by-limit trim aggregate#11128
feat(cubestore): dictionary group keys & group-by-limit trim aggregate#11128waralexrom wants to merge 17 commits into
Conversation
…(wip, behind flag) Behind CUBESTORE_DICTIONARY_ENCODING (dev default currently ON - flip to false before merge). Adds DictionaryComparator + DictionaryGroupColumn to the sorted inline aggregate and routes dict schemas there via supported_type; String columns flow as Dictionary(Int32,Utf8) end-to-end (metastore as_arrow_field + CubeTable / CubeTableLogical schema). Measured on stend: inline/sorted ~10% faster, but hash/Linear ~60% slower (read-side cast_to_dictionary + df row fallback). WIP / not yet handled (only matters when flag ON): in-memory chunks stay Utf8 (schema mismatch), dict->string result serialization for non-aggregate SELECT, native dict parquet read to avoid the read cast, and hash-aggregate dict-awareness.
…topk), wip behind flag Brings the worker-side partial hash-aggregate trim for `GROUP BY <non-index-prefix> ORDER BY <subset of group-by> LIMIT k` onto this branch, renamed from the misleading "TopK" naming (it trims by group-key order, not by an aggregate top-N): - GroupByLimitAggregateExec + group_by_limit_aggregate module - group_by_limit_rewriter - config group_by_limit_factor / CUBESTORE_GROUP_BY_LIMIT_FACTOR (default 2) Reuses DataFusion's GroupValues building blocks; the only fork change is pub new_group_values (df branch cubestore-hash-aggregate-limit). WIP: the trim is planted during router planning but is NOT yet carried across the ClusterSend boundary to the worker's independent physical re-plan, so it does not engage at distributed execution yet. Next: cluster-boundary carry.
…via worker_sort_and_limit The rewriter only plants GroupByLimitAggregate during ROUTER planning, but the worker re-plans physical from logical independently, so the trim never reached execution. Instead of a new proto, reuse the existing worker_sort_and_limit carry (already serialized across ClusterSend): when group_by_limit_factor > 0, resort_worker_subtree now wraps the worker partial hash aggregate in GroupByLimitAggregateExec (trim during aggregation, bounded O(factor*k) memory) instead of a SortExec over the full partial; the Sort above still orders the <= k surviving groups so the router's sort-preserving merge stays correct. group_by_limit_factor (CUBESTORE_GROUP_BY_LIMIT_FACTOR, default 2) is threaded router->worker via CubeQueryPlanner; 0 keeps the previous sort-then-trim. Verified on the production-dump stend: with factor>0 the worker EXPLAIN ANALYZE shows GroupByLimitAggregate (was LinearPartialAggregate) and results are identical. Value is bounded hash-table memory (OOM avoidance) on high-cardinality group-bys, not speed on small inputs. WIP: only fires when worker_sort_and_limit fires, i.e. single-table `ORDER BY <group subset> LIMIT k`. UNION-of-tables (the prod query.sql shape) does not populate worker_sort_and_limit yet (separate ctx-propagation gap).
…ough UNION The worker group-by-limit trim previously required an ORDER BY and was dropped when the per-branch ClusterSends of a UNION were merged. Now: - compute_worker_sort_and_limit handles a bare LIMIT (no ORDER BY): the total order is the full group key in group-by order, so "any n" becomes "the n smallest by group key" -- a valid deterministic choice. An ORDER BY prefix, when present, still sorts first. - pull_up_cluster_send preserves worker_sort_and_limit across a UNION (the same group-by/limit context descends to every branch, so the descriptors are positionally identical and stay valid over the union); kept only when all branches agree. Adds planning coverage (bare LIMIT, UNION ALL + bare LIMIT) and a bare-LIMIT execution assertion to topk_hash_aggregate_trim; excludes that test from the migration harness (new test, no recorded fixture).
Make dictionary-encoded string group keys fast in the worker partial aggregate instead of materializing strings. - GroupByLimitAggregateStream remaps each Dictionary(Int32, Utf8) group column to Int32 global ids (interning each distinct value once into a per-stream global dictionary) and groups on those via DataFusion's primitive GroupValues path, rebuilding the Dictionary columns at the single final emit. The trim/merge sort the rebuilt dictionaries by value, so the per-worker id maps stay internal and cross-worker combination is unaffected. - SuppliedSchemaReaderCustomizer pins the index (dictionary) schema on the parquet reader (ArrowReaderOptions::with_schema), so dictionary string columns are read natively from the on-disk dictionary pages instead of being materialized as Utf8 and cast to dictionary per batch by the schema adapter. Composes with the configured customizer; used only for indexes that carry a Dictionary column. - batches_to_dataframe decodes Dictionary result columns to their value type. On the production dump (UNION, GROUP BY 6 string cols, LIMIT 10) the native read removes the per-batch Utf8->Dictionary cast that made dictionary encoding a regression; the query is now slightly faster than the non-dictionary path.
Replace the per-row build of the global-id Int32Array (iterate keys, match Option, collect) with a vectorized `take`: build the local->global map once per batch as an Int32Array (null where the dictionary entry is null) and gather it by the dictionary keys. Null keys and null entries propagate to null, identical to the previous semantics. Cuts the remap from ~16% of worker compute to negligible (~3.7s -> ~3.1s on the production-dump UNION query, now ~25% faster than the non-dictionary path).
- GlobalDict unit tests: global ids stay consistent across batches with different local dictionaries, and null keys / null dictionary entries round-trip as null. - End-to-end test with dictionary encoding on: a String group key alongside Timestamp and Decimal columns, compacted to a parquet partition, then grouped by the string. Locks in that the native dictionary read (with_supplied_schema) accepts every column type and the result decodes back to strings.
…-gated) Behind CUBESTORE_GROUP_BY_LIMIT_HASH_FINAL (default off, gated on group_by_limit_factor > 0): route the worker top-k through a router-side hash final aggregate instead of the sorted-merge framework. - Worker subtree: CoalescePartitions <- GroupByLimitAggregate (drops the per-partition SortExec and SortPreservingMerge; emits the trimmed top-k unsorted). CoalescePartitions spawns a task per input partition, so the per-union-branch aggregates run in parallel -- the sorted merge drained them on a single task. - Router: SortExec(T, fetch=k) <- AggregateExec(Final, hash) <- CoalescePartitions <- ClusterSend. The explicit top-k sort by the full total order T is required even for a bare LIMIT: it keeps exactly the groups every worker kept (fully combined here), where a plain limit could take an undercounted group only one worker retained. On the production-dump UNION query (GROUP BY 6 cols, LIMIT 10): ~2.1x faster (~3.1s -> ~1.5s) at neutral peak RSS, because the worker now uses ~2 cores instead of 1. Same result as the sorted path (ORDER BY and bare LIMIT verified). The flag must be set fleet-wide: worker and router re-plan independently, and a worker-on/router-off split would feed unsorted streams into a sorted merge.
…lag-gated) Behind CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION (default off; only active with CUBESTORE_GROUP_BY_LIMIT_HASH_FINAL and group_by_limit_factor > 0): strip the CoalescePartitions below the trimmed worker aggregate so it runs over every raw CubeTableExec partition instead of one stream per union branch. The hash-final CoalescePartitions on top then parallelizes all partitions. Per-partition top-k stays complete by the same total-order argument as the per-worker cut: a group in the global top-k by T has fewer than k smaller-keyed groups globally, hence in any single partition, so it survives every partition's local top-k and reaches the router fully combined. The strip only removes CoalescePartitions, never the SortPreservingMerge that feeds the single-partition LastRowByUniqueKeyExec, so unique-key tables stay correct. On the production-dump UNION query: ~799ms vs ~1504ms (hash-final 3-way) and ~2159ms (master), ~9 cores at peak, peak RSS still neutral because the index-sorted partitions are key-local (small per-partition tables). Memory is bounded by partition count, so a group key spread across partitions would cost ~N x; gated off by default.
… path The sorted-merge worker top-k (per-partition SortExec + SortPreservingMerge -> router SortedFinalAggregate) drained the worker partitions on a single task, so it ran on one core and was slower than master, which keeps a parallel CoalescePartitions over the partial aggregate. The hash-final path (worker CoalescePartitions -> router hash Final + Sort(T, fetch=k)) is strictly better: same memory, ~2x faster because CoalescePartitions parallelizes the per-partition aggregates. So remove the sorted variant and the CUBESTORE_GROUP_BY_LIMIT_HASH_FINAL flag entirely; hash-final is now how the trimmed top-k is always combined. Two knobs remain: - group_by_limit_factor (env CUBESTORE_GROUP_BY_LIMIT_FACTOR, >0): whether to use the trimming aggregate at all. 0 leaves the plan untouched (master behavior). - CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION: whether to push it below the merge (per-partition N-way) -- still experimental, default off. push_worker_sort_and_limit now returns early when factor == 0. Existing planning/execution/limit_pushdown/union tests pass against the hash-final plan.
With dictionary encoding on, the index schema exposes string columns as Dictionary(Int32, Utf8), but in-memory chunks are written with plain Utf8. The memory scan previously rejected this as an "index schema / chunk schema mismatch", so any query touching an uncompacted (in-memory) chunk failed under dictionary encoding. Cast the chunk batches up to the index schema instead (Utf8 -> Dictionary via cast_record_batch_to_schema): the memory scan now matches the dictionary parquet partitions and feeds the dict-aware aggregate, so streaming / freshly-imported data gets the same path and benefit. Tests: - test_cast_record_batch_to_dictionary_schema: the cast preserves values and nulls and leaves non-string columns untouched. - dictionary_encoding_in_memory_group_by: dictionary encoding on, a unique-key table (streaming in-memory chunks), no compaction, group-by the string column returns correct sums (asserts the chunks are in-memory).
ORDER BY <aggregate> DESC LIMIT goes through ClusterAggregateTopK, whose scalar
helpers (create_builder / append_value via cube_match_scalar) had no arm for
ScalarValue::Dictionary and panicked ("Unhandled cube_match_scalar match arm:
Dictionary(...)") whenever the group key was dictionary-encoded. So with
dictionary encoding on, any top-k-by-aggregate query crashed the worker
subprocess.
Special-case ScalarValue::Dictionary at the function level (the macro is left
untouched): build into a StringDictionaryBuilder<Int32Type> and append the inner
Utf8 value, so the top-k result columns keep the dictionary type the schema
expects. Grouping already works (ScalarValue::Dictionary hashes/compares by its
resolved inner value) and cmp_same_types only ever sees aggregate values, so no
other change is needed. Covers the only dictionary layout CubeStore produces
(Dictionary(Int32, Utf8)); other layouts still panic explicitly.
Verified on the production dump: ORDER BY sum(...) DESC LIMIT 10 with dictionary
encoding on now returns the same rows as with it off. Adds an end-to-end test.
… off (opt-in via env)
|
Claude finished @waralexrom's task in 2m 42s —— View job Claude PR ReviewBig, well-structured PR with thoughtful comments explaining the why of each rewrite, and the flag-gated rollout is the right call. The total-order trick in the group-by-limit rewriter (extending ORDER BY with the remaining group columns on both worker and router) is correctly explained and looks sound. A handful of medium/low items below — none block; mostly perf and a couple of footguns worth tightening before defaulting things on. 1 medium, 4 low, plus general notes — click to expandMedium
Low
Notes / nits
|
- per-partition: strip only the leading CoalescePartitionsExec feeding the aggregate, not every one in the subtree (preserves UNION/mixed-partitioning plan semantics) - GlobalDict: share one Arc<str> allocation between the id map and values vec instead of two String allocations per new value - GroupByLimitAggregateExec::statistics: report an inexact upper bound (min(input_rows, factor*k*partitions)) instead of Absent - drop unused col_idx_base from aggregate_expressions (Partial-only) - comments: scalar-fallback note on DictionaryGroupColumn, duplicate ORDER BY skip
|
Claude finished @waralexrom's task in 48s —— View job Follow-up review — 4f9f439Verified all five items from the prior review are addressed cleanly; no new issues spotted. Verdict: looks good to land (modulo the pre-existing Per-item verification — click to expand
Pre-existing items not addressed (still low / non-blocking)
New observations on 4f9f439
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #11128 +/- ##
=======================================
Coverage 58.75% 58.75%
=======================================
Files 217 217
Lines 17392 17392
Branches 3524 3524
=======================================
Hits 10219 10219
Misses 6659 6659
Partials 514 514
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…ushdown Sort not emitted)
|
Claude finished @waralexrom's task in 1m 2s —— View job Follow-up review — cbff840Only one new commit since the prior review (4f9f439): Details — click to expandConcern (low-medium) — the only E2E test for the new path is now ignored
That leaves the new distributed pushdown path covered only by unit tests on the rewriter — there is no end-to-end test exercising the router→worker plan in CI. Combined with the fact that the symbol the comment references (
If the inline-path pushdown really is off right now, then the perf numbers in the PR description ("~2.4× time, ~5.6× memory") may not reflect what merges to master. Not a blocker — the trim feature is flag-gated and defaults to off — but worth confirming before flipping Other notes
|
…t path The 'drop sorted worker top-k' refactor narrowed resort_worker_subtree to only trim plain hash AggregateExec, so the inline (sorted) aggregate lost the per-partition bounded Sort it had: for GROUP BY <prefix> ... ORDER BY <non-prefix group col> LIMIT n the worker shipped every group to the router (router-only bounding -> the inline-aggregate limit OOM class). Split resort_worker_subtree by partial aggregate kind, one mechanism two shapes: - inline/sorted: SortPreservingMerge(T) <- Sort(T, fetch, per partition) -- we can't trim a sorted aggregate and don't know the group count, so always bound with a sort. Router: sorted final over a sort-preserving merge (the query's Sort+Limit does the final bounding, no extra Sort). - hash: the trimming GroupByLimitAggregate + coalesce (factor-aware), router does the hash final + Sort(fetch). Unchanged. Also drop the top-level factor==0 early return so the inline bounding applies regardless of the trim factor (factor only gates the hash trim). Tests: un-ignore worker_sort_and_limit_cluster; update its hash-path case to assert the GroupByLimitAggregate trim instead of the old worker Sort. Full lib and in-process SQL suites pass with all optimization envs on.
|
Claude finished @waralexrom's task in 1m 10s —— View job Follow-up review — 4bb1499The fix itself is sound — the split by partial aggregate kind (sorted/inline gets One real issue — please remove before merge. Posted inline on the artifact: Everything from the prior two reviews remains in the state reported there — the two leftover low-priority follow-ups ( Checklist
|
… them The previous commit accidentally added RocksDB scratch directories left by the metastore unit tests (test-*-local). Remove them and ignore the pattern.
|
Claude finished @waralexrom's task in 39s —— View job Follow-up review — c6faaedVerdict: clean — addresses the prior inline comment and adds a guard against repeats. No new issues. 0 high, 0 medium, 0 low. Details
|
Summary
Two related CubeStore query-path improvements (plus experimental top-k
parallelism), all flag-gated so they can be enabled and validated incrementally:
(1) a group-by-limit "trim" aggregate that bounds worker output for
GROUP BY … LIMITqueries, and (2) dictionary-encoded string group keys withnative dictionary parquet reads.
Changes
aggregate; engaged at distributed execution via
worker_sort_and_limit, andextended to bare
LIMITand throughUNION.group-by-limit aggregates, native
Dictionary(Int32, Utf8)parquet reads,dict-encoded in-memory chunks on scan, vectorized dict→global-id remap, and
dictionary handling in the aggregate top-k builders.
per-partition N-way; the sorted worker top-k path was removed so hash-final is
the single path.
CUBESTORE_GROUP_BY_LIMIT_FACTOR,CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION,CUBESTORE_DICTIONARY_ENCODING.top-k incl. null keys, group-key remap units).
Testing
cargo test(cubestore lib + sql-tests) green for the new aggregate / dict /top-k paths.
synthetic data: the trim cuts
GROUP BY … LIMIT~2.4× time and ~5.6× memory;dict helps the wired aggregate paths.
Notes / follow-ups (before enabling by default)
CUBESTORE_DICTIONARY_ENCODINGdev default tofalsebefore merge.(
LinearSingleAggregate); on that path dict falls back to row format andregresses memory — wire it before defaulting dict on.