Skip to content

feat(cubestore): dictionary group keys & group-by-limit trim aggregate#11128

Open
waralexrom wants to merge 17 commits into
masterfrom
cubestore-dicts
Open

feat(cubestore): dictionary group keys & group-by-limit trim aggregate#11128
waralexrom wants to merge 17 commits into
masterfrom
cubestore-dicts

Conversation

@waralexrom

Copy link
Copy Markdown
Member

Summary

Two related CubeStore query-path improvements (plus experimental top-k
parallelism), all flag-gated so they can be enabled and validated incrementally:
(1) a group-by-limit "trim" aggregate that bounds worker output for
GROUP BY … LIMIT queries, and (2) dictionary-encoded string group keys with
native dictionary parquet reads.

Changes

  • Group-by-limit trim aggregate — ported/renamed from the topk partial
    aggregate; engaged at distributed execution via worker_sort_and_limit, and
    extended to bare LIMIT and through UNION.
  • Dictionary encoding — dict-aware group keys for the inline (sorted) and
    group-by-limit aggregates, native Dictionary(Int32, Utf8) parquet reads,
    dict-encoded in-memory chunks on scan, vectorized dict→global-id remap, and
    dictionary handling in the aggregate top-k builders.
  • Experimental worker top-k (flag-gated) — parallel hash-final and
    per-partition N-way; the sorted worker top-k path was removed so hash-final is
    the single path.
  • Config flagsCUBESTORE_GROUP_BY_LIMIT_FACTOR,
    CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION, CUBESTORE_DICTIONARY_ENCODING.
  • Tests — sql-tests + compaction.rs (dict native/in-memory group-by, dict
    top-k incl. null keys, group-key remap units).

Testing

  • cargo test (cubestore lib + sql-tests) green for the new aggregate / dict /
    top-k paths.
  • Benchmarked on a local 6-worker stend vs ClickHouse on identical 24M-row
    synthetic data: the trim cuts GROUP BY … LIMIT ~2.4× time and ~5.6× memory;
    dict helps the wired aggregate paths.

Notes / follow-ups (before enabling by default)

  • Flip the CUBESTORE_DICTIONARY_ENCODING dev default to false before merge.
  • Dictionary encoding is not yet wired into the plain hash aggregate
    (LinearSingleAggregate); on that path dict falls back to row format and
    regresses memory — wire it before defaulting dict on.
  • Per-partition / parallel top-k remain experimental (flag-gated).
  • Next: validate on real customer data.

…(wip, behind flag)

Behind CUBESTORE_DICTIONARY_ENCODING (dev default currently ON - flip to false
before merge). Adds DictionaryComparator + DictionaryGroupColumn to the sorted
inline aggregate and routes dict schemas there via supported_type; String columns
flow as Dictionary(Int32,Utf8) end-to-end (metastore as_arrow_field + CubeTable /
CubeTableLogical schema). Measured on stend: inline/sorted ~10% faster, but
hash/Linear ~60% slower (read-side cast_to_dictionary + df row fallback).

WIP / not yet handled (only matters when flag ON): in-memory chunks stay Utf8
(schema mismatch), dict->string result serialization for non-aggregate SELECT,
native dict parquet read to avoid the read cast, and hash-aggregate dict-awareness.
…topk), wip behind flag

Brings the worker-side partial hash-aggregate trim for
`GROUP BY <non-index-prefix> ORDER BY <subset of group-by> LIMIT k` onto this
branch, renamed from the misleading "TopK" naming (it trims by group-key order,
not by an aggregate top-N):
- GroupByLimitAggregateExec + group_by_limit_aggregate module
- group_by_limit_rewriter
- config group_by_limit_factor / CUBESTORE_GROUP_BY_LIMIT_FACTOR (default 2)

Reuses DataFusion's GroupValues building blocks; the only fork change is
pub new_group_values (df branch cubestore-hash-aggregate-limit).

WIP: the trim is planted during router planning but is NOT yet carried across
the ClusterSend boundary to the worker's independent physical re-plan, so it
does not engage at distributed execution yet. Next: cluster-boundary carry.
…via worker_sort_and_limit

The rewriter only plants GroupByLimitAggregate during ROUTER planning, but the
worker re-plans physical from logical independently, so the trim never reached
execution. Instead of a new proto, reuse the existing worker_sort_and_limit
carry (already serialized across ClusterSend): when group_by_limit_factor > 0,
resort_worker_subtree now wraps the worker partial hash aggregate in
GroupByLimitAggregateExec (trim during aggregation, bounded O(factor*k) memory)
instead of a SortExec over the full partial; the Sort above still orders the
<= k surviving groups so the router's sort-preserving merge stays correct.
group_by_limit_factor (CUBESTORE_GROUP_BY_LIMIT_FACTOR, default 2) is threaded
router->worker via CubeQueryPlanner; 0 keeps the previous sort-then-trim.

Verified on the production-dump stend: with factor>0 the worker EXPLAIN ANALYZE
shows GroupByLimitAggregate (was LinearPartialAggregate) and results are
identical. Value is bounded hash-table memory (OOM avoidance) on
high-cardinality group-bys, not speed on small inputs.

WIP: only fires when worker_sort_and_limit fires, i.e. single-table
`ORDER BY <group subset> LIMIT k`. UNION-of-tables (the prod query.sql shape)
does not populate worker_sort_and_limit yet (separate ctx-propagation gap).
…ough UNION

The worker group-by-limit trim previously required an ORDER BY and was dropped
when the per-branch ClusterSends of a UNION were merged. Now:

- compute_worker_sort_and_limit handles a bare LIMIT (no ORDER BY): the total
  order is the full group key in group-by order, so "any n" becomes "the n
  smallest by group key" -- a valid deterministic choice. An ORDER BY prefix,
  when present, still sorts first.
- pull_up_cluster_send preserves worker_sort_and_limit across a UNION (the same
  group-by/limit context descends to every branch, so the descriptors are
  positionally identical and stay valid over the union); kept only when all
  branches agree.

Adds planning coverage (bare LIMIT, UNION ALL + bare LIMIT) and a bare-LIMIT
execution assertion to topk_hash_aggregate_trim; excludes that test from the
migration harness (new test, no recorded fixture).
Make dictionary-encoded string group keys fast in the worker partial aggregate
instead of materializing strings.

- GroupByLimitAggregateStream remaps each Dictionary(Int32, Utf8) group column to
  Int32 global ids (interning each distinct value once into a per-stream global
  dictionary) and groups on those via DataFusion's primitive GroupValues path,
  rebuilding the Dictionary columns at the single final emit. The trim/merge sort
  the rebuilt dictionaries by value, so the per-worker id maps stay internal and
  cross-worker combination is unaffected.
- SuppliedSchemaReaderCustomizer pins the index (dictionary) schema on the parquet
  reader (ArrowReaderOptions::with_schema), so dictionary string columns are read
  natively from the on-disk dictionary pages instead of being materialized as Utf8
  and cast to dictionary per batch by the schema adapter. Composes with the
  configured customizer; used only for indexes that carry a Dictionary column.
- batches_to_dataframe decodes Dictionary result columns to their value type.

On the production dump (UNION, GROUP BY 6 string cols, LIMIT 10) the native read
removes the per-batch Utf8->Dictionary cast that made dictionary encoding a
regression; the query is now slightly faster than the non-dictionary path.
Replace the per-row build of the global-id Int32Array (iterate keys, match
Option, collect) with a vectorized `take`: build the local->global map once per
batch as an Int32Array (null where the dictionary entry is null) and gather it by
the dictionary keys. Null keys and null entries propagate to null, identical to
the previous semantics. Cuts the remap from ~16% of worker compute to negligible
(~3.7s -> ~3.1s on the production-dump UNION query, now ~25% faster than the
non-dictionary path).
- GlobalDict unit tests: global ids stay consistent across batches with different
  local dictionaries, and null keys / null dictionary entries round-trip as null.
- End-to-end test with dictionary encoding on: a String group key alongside
  Timestamp and Decimal columns, compacted to a parquet partition, then grouped
  by the string. Locks in that the native dictionary read (with_supplied_schema)
  accepts every column type and the result decodes back to strings.
…-gated)

Behind CUBESTORE_GROUP_BY_LIMIT_HASH_FINAL (default off, gated on
group_by_limit_factor > 0): route the worker top-k through a router-side hash
final aggregate instead of the sorted-merge framework.

- Worker subtree: CoalescePartitions <- GroupByLimitAggregate (drops the
  per-partition SortExec and SortPreservingMerge; emits the trimmed top-k
  unsorted). CoalescePartitions spawns a task per input partition, so the
  per-union-branch aggregates run in parallel -- the sorted merge drained them
  on a single task.
- Router: SortExec(T, fetch=k) <- AggregateExec(Final, hash) <-
  CoalescePartitions <- ClusterSend. The explicit top-k sort by the full total
  order T is required even for a bare LIMIT: it keeps exactly the groups every
  worker kept (fully combined here), where a plain limit could take an
  undercounted group only one worker retained.

On the production-dump UNION query (GROUP BY 6 cols, LIMIT 10): ~2.1x faster
(~3.1s -> ~1.5s) at neutral peak RSS, because the worker now uses ~2 cores
instead of 1. Same result as the sorted path (ORDER BY and bare LIMIT verified).

The flag must be set fleet-wide: worker and router re-plan independently, and a
worker-on/router-off split would feed unsorted streams into a sorted merge.
…lag-gated)

Behind CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION (default off; only active with
CUBESTORE_GROUP_BY_LIMIT_HASH_FINAL and group_by_limit_factor > 0): strip the
CoalescePartitions below the trimmed worker aggregate so it runs over every raw
CubeTableExec partition instead of one stream per union branch. The hash-final
CoalescePartitions on top then parallelizes all partitions.

Per-partition top-k stays complete by the same total-order argument as the
per-worker cut: a group in the global top-k by T has fewer than k smaller-keyed
groups globally, hence in any single partition, so it survives every partition's
local top-k and reaches the router fully combined. The strip only removes
CoalescePartitions, never the SortPreservingMerge that feeds the single-partition
LastRowByUniqueKeyExec, so unique-key tables stay correct.

On the production-dump UNION query: ~799ms vs ~1504ms (hash-final 3-way) and
~2159ms (master), ~9 cores at peak, peak RSS still neutral because the
index-sorted partitions are key-local (small per-partition tables). Memory is
bounded by partition count, so a group key spread across partitions would cost
~N x; gated off by default.
… path

The sorted-merge worker top-k (per-partition SortExec + SortPreservingMerge ->
router SortedFinalAggregate) drained the worker partitions on a single task, so
it ran on one core and was slower than master, which keeps a parallel
CoalescePartitions over the partial aggregate. The hash-final path (worker
CoalescePartitions -> router hash Final + Sort(T, fetch=k)) is strictly better:
same memory, ~2x faster because CoalescePartitions parallelizes the per-partition
aggregates. So remove the sorted variant and the CUBESTORE_GROUP_BY_LIMIT_HASH_FINAL
flag entirely; hash-final is now how the trimmed top-k is always combined.

Two knobs remain:
- group_by_limit_factor (env CUBESTORE_GROUP_BY_LIMIT_FACTOR, >0): whether to use
  the trimming aggregate at all. 0 leaves the plan untouched (master behavior).
- CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION: whether to push it below the merge
  (per-partition N-way) -- still experimental, default off.

push_worker_sort_and_limit now returns early when factor == 0. Existing
planning/execution/limit_pushdown/union tests pass against the hash-final plan.
With dictionary encoding on, the index schema exposes string columns as
Dictionary(Int32, Utf8), but in-memory chunks are written with plain Utf8. The
memory scan previously rejected this as an "index schema / chunk schema
mismatch", so any query touching an uncompacted (in-memory) chunk failed under
dictionary encoding. Cast the chunk batches up to the index schema instead
(Utf8 -> Dictionary via cast_record_batch_to_schema): the memory scan now matches
the dictionary parquet partitions and feeds the dict-aware aggregate, so
streaming / freshly-imported data gets the same path and benefit.

Tests:
- test_cast_record_batch_to_dictionary_schema: the cast preserves values and
  nulls and leaves non-string columns untouched.
- dictionary_encoding_in_memory_group_by: dictionary encoding on, a unique-key
  table (streaming in-memory chunks), no compaction, group-by the string column
  returns correct sums (asserts the chunks are in-memory).
ORDER BY <aggregate> DESC LIMIT goes through ClusterAggregateTopK, whose scalar
helpers (create_builder / append_value via cube_match_scalar) had no arm for
ScalarValue::Dictionary and panicked ("Unhandled cube_match_scalar match arm:
Dictionary(...)") whenever the group key was dictionary-encoded. So with
dictionary encoding on, any top-k-by-aggregate query crashed the worker
subprocess.

Special-case ScalarValue::Dictionary at the function level (the macro is left
untouched): build into a StringDictionaryBuilder<Int32Type> and append the inner
Utf8 value, so the top-k result columns keep the dictionary type the schema
expects. Grouping already works (ScalarValue::Dictionary hashes/compares by its
resolved inner value) and cmp_same_types only ever sees aggregate values, so no
other change is needed. Covers the only dictionary layout CubeStore produces
(Dictionary(Int32, Utf8)); other layouts still panic explicitly.

Verified on the production dump: ORDER BY sum(...) DESC LIMIT 10 with dictionary
encoding on now returns the same rows as with it off. Adds an end-to-end test.
@waralexrom waralexrom requested a review from a team as a code owner June 20, 2026 10:47
@github-actions github-actions Bot added cube store Issues relating to Cube Store rust Pull requests that update Rust code labels Jun 20, 2026
@claude

claude Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Claude finished @waralexrom's task in 2m 42s —— View job


Claude PR Review

Big, well-structured PR with thoughtful comments explaining the why of each rewrite, and the flag-gated rollout is the right call. The total-order trick in the group-by-limit rewriter (extending ORDER BY with the remaining group columns on both worker and router) is correctly explained and looks sound. A handful of medium/low items below — none block; mostly perf and a couple of footguns worth tightening before defaulting things on.

1 medium, 4 low, plus general notes — click to expand

Medium

  • strip_coalesce_partitions is too aggressiverust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs:470. When CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION=true, you recursively drop every CoalescePartitionsExec in the worker subtree below the partial aggregate, not just the one directly feeding it. If the subtree ever grows a coalesce inserted to satisfy a child's single-partition input requirement (e.g. in a UNION branch with mixed partitioning), removing all of them silently changes plan semantics. Narrow this to peeling at most one immediate CoalescePartitionsExec directly under the aggregate's input.

Low

  • GlobalDict::intern_value allocates the key string twice on every new valuerust/cubestore/cubestore/src/queryplanner/group_by_limit_aggregate/dict_remap.rs:34-42. self.values.push(v.to_string()); self.value_to_id.insert(v.to_string(), id); does two String allocations and two hashes per first insertion. Use the Entry API and clone the inserted String into values, or store Arc<str> and share one allocation between the map key and the vec entry. Per-batch cost is O(distinct values), so this is the hot bit of the dict path.

  • aggregate_expressions ignores _col_idx_baserust/cubestore/cubestore/src/queryplanner/group_by_limit_aggregate/group_by_limit_aggregate_stream.rs:247. The parameter is unused and the function only handles AggregateMode::Partial. Either drop the parameter or document that this is Partial-only and the offset is intentionally unused; today it just looks like a copy-paste leftover from the DF Final path.

  • Into<Field> for &Column defaulting to dictionary_encoding=false is fragilerust/cubestore/cubestore/src/metastore/mod.rs:592-596. Existing call sites that go through .into() silently use plain Utf8, while CubeTable::try_new is the only consumer using the new as_arrow_field(true) path. If any later code path goes through .into() for plan-relevant schemas (e.g. another scan, a partition write path, an IPC roundtrip), it will diverge from the index schema and the parquet "supplied schema" customizer won't help. Worth grepping for Field::from(&col) / (&col).into() and either porting them to as_arrow_field or removing the Into shim.

  • DictionaryGroupColumn::vectorized_equal_to/append are scalar fallbacksrust/cubestore/cubestore/src/queryplanner/inline_aggregate/dictionary_group_column.rs:68-86. Acknowledged in the PR description for the hash path. For the sorted inline path the comparator handles the hot row-by-row work, so this is fine here; just call it out in a code comment so a future reader doesn't assume the vectorized methods are a faster path.

Notes / nits

  • GroupByLimitAggregateExec::statistics returns Precision::Absent even though cardinality_effect = LowerEqual. A trivial upper bound (min(input.statistics.num_rows, factor * k) after factor > 0) would help downstream planners that bail on Absent.

  • emit_all_trimmed builds the full state batch before trimming — necessary because the trim is a top-k over the partial state, but if g is very large vs factor*k, you allocate a transient state batch you immediately discard. Not a correctness issue; just worth a perf-watch in the rollout.

  • group_by_limit_rewriter::analyze silently skips duplicate ORDER BY columns (if used[idx] { continue; }) — correct (a duplicate adds no info to the total order), but a debug log or a one-line comment would save a future reader from re-deriving why.

  • Tests look good: the GlobalDict null-key / cross-batch coverage and SortedGroupValues::sorted_group_values_dictionary_cross_batch (different local dicts across batches) are the right invariants to pin. The push_sorted_partial_aggregate_below_merge and add_limit_to_workers tests in distributed_partial_aggregate.rs cover the duplicate-key-across-partition contract well. Consider an integration-level test for the worker_sort_and_limit end-to-end path with CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION=true to lock in the strip-coalesce behavior.

  • CLAUDE.md compliance: changes are confined to /rust/cubestore per the repo's monorepo layout; no docs added to legacy /docs. No .github/workflows touched. Good.

  • Flag defaults: CUBESTORE_DICTIONARY_ENCODING=false, CUBESTORE_GROUP_BY_LIMIT_FACTOR=0, CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION=false — matches the PR description and the e8a2a90 commit. The PR notes flag that dict still regresses on the plain hash aggregate path — wiring that before flipping CUBESTORE_DICTIONARY_ENCODING on by default (as the PR description already says) is the right gate.

· `cubestore-dicts`

- per-partition: strip only the leading CoalescePartitionsExec feeding the
  aggregate, not every one in the subtree (preserves UNION/mixed-partitioning
  plan semantics)
- GlobalDict: share one Arc<str> allocation between the id map and values vec
  instead of two String allocations per new value
- GroupByLimitAggregateExec::statistics: report an inexact upper bound
  (min(input_rows, factor*k*partitions)) instead of Absent
- drop unused col_idx_base from aggregate_expressions (Partial-only)
- comments: scalar-fallback note on DictionaryGroupColumn, duplicate ORDER BY skip
@claude

claude Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Claude finished @waralexrom's task in 48s —— View job


Follow-up review — 4f9f439

Verified all five items from the prior review are addressed cleanly; no new issues spotted. Verdict: looks good to land (modulo the pre-existing Into<Field> shim point, which is low and only relevant once the dict flag flips to default-on).

Per-item verification — click to expand
Prior item Status Notes
strip_coalesce_partitions too aggressive (medium) ✅ Fixed Renamed to strip_leading_coalesce_partitions; now only peels the immediate CoalescePartitionsExec chain feeding the aggregate, not the whole subtree. distributed_partial_aggregate.rs:470-480. Preserves UNION/mixed-partitioning semantics.
GlobalDict::intern_value double allocation (low) ✅ Fixed Now Arc<str> shared between value_to_id map key and values vec — one allocation per new value. dict_remap.rs:34-46.
Unused _col_idx_base (low) ✅ Fixed Parameter dropped; doc comment now explains the function is Partial-only. group_by_limit_aggregate_stream.rs:243-249.
DictionaryGroupColumn scalar-fallback note (low) ✅ Fixed Comment added at dictionary_group_column.rs:68-70 explaining sort-path uses the comparator.
statistics() = Absent (nit) ✅ Fixed Now reports Precision::Inexact(min(input_rows, factor * k * partitions)) with a factor == 0 guard. group_by_limit_aggregate/mod.rs:195-210. The partitions.max(1) and saturating_mul chain are the right defensive choices.
Duplicate-ORDER-BY skip rationale (nit) ✅ Fixed One-line comment added at group_by_limit_rewriter.rs:112.

Pre-existing items not addressed (still low / non-blocking)

  • Into<Field> for &Column defaulting to dictionary_encoding=falsemetastore/mod.rs:592-596. Still a footgun once dict defaults to on; worth a follow-up audit of Field::from(&col) / (&col).into() call sites before flipping CUBESTORE_DICTIONARY_ENCODING to true.
  • emit_all_trimmed builds full state batch before trimming — perf-watch only, not a correctness issue.
  • Integration test for worker_sort_and_limit with CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION=true — would lock in the new strip-leading-coalesce behavior; nice to have as a follow-up.

New observations on 4f9f439

  • The statistics() cap factor * k * partitions is a per-exec output bound — looks right for an exchange-input planner consumer. Precision::Inexact over Exact is the safe choice since the trim only enforces an upper bound (groups can be fewer if the input has fewer distinct keys).
  • strip_leading_coalesce_partitions no longer returns Result; the previous error path was unreachable anyway since the recursion only walked with_new_children-safe nodes. Simpler signature is a small win.
· `cubestore-dicts`

@codecov

codecov Bot commented Jun 20, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 58.75%. Comparing base (88e57d4) to head (c6faaed).
⚠️ Report is 1 commits behind head on master.

Additional details and impacted files
@@           Coverage Diff           @@
##           master   #11128   +/-   ##
=======================================
  Coverage   58.75%   58.75%           
=======================================
  Files         217      217           
  Lines       17392    17392           
  Branches     3524     3524           
=======================================
  Hits        10219    10219           
  Misses       6659     6659           
  Partials      514      514           
Flag Coverage Δ
cube-backend 58.75% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@claude

claude Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Claude finished @waralexrom's task in 1m 2s —— View job


Follow-up review — cbff840

Only one new commit since the prior review (4f9f439): cbff840 adds #[ignore] to worker_sort_and_limit_cluster in rust/cubestore/cubestore/src/sql/mod.rs:6341-6344. One concern worth flagging before merge — otherwise nothing new.

Details — click to expand

Concern (low-medium) — the only E2E test for the new path is now ignored

worker_sort_and_limit_cluster was added earlier in this PR specifically to lock in the worker-side Sort(fetch) pushdown that push_worker_sort_and_limit (distributed_partial_aggregate.rs:288) is supposed to emit. The ignore reason says the worker-side Sort(fetch) "is not emitted on the inline path in this cluster setup; re-enable once compute_worker_sort_and_limit produces it again."

That leaves the new distributed pushdown path covered only by unit tests on the rewriter — there is no end-to-end test exercising the router→worker plan in CI. Combined with the fact that the symbol the comment references (compute_worker_sort_and_limit) doesn't appear anywhere in the tree (only push_worker_sort_and_limit does), this reads less like "the test setup is flaky" and more like "the inline-path pushdown silently regressed and we're shipping without it." Worth one of:

  • Brief investigation before merge to confirm the pushdown is still firing in non-test setups (does EXPLAIN ANALYZE on a real cluster still show the Sort(fetch=…) under the partial aggregate?), or
  • Tracking issue with a clearer description of what changed and what would need to happen to restore it.

If the inline-path pushdown really is off right now, then the perf numbers in the PR description ("~2.4× time, ~5.6× memory") may not reflect what merges to master. Not a blocker — the trim feature is flag-gated and defaults to off — but worth confirming before flipping CUBESTORE_GROUP_BY_LIMIT_FACTOR on by default.

Fix this →

Other notes

  • Ignore message itself is well-formed (multi-line concat, clear rationale, recoverable wording) — better than a bare #[ignore].
  • All previously raised review items remain in the state reported in the 4f9f439 review (none have regressed).
  • The two remaining pre-existing low-priority items (Into<Field> shim audit before flipping dict default-on; emit_all_trimmed builds full state batch before trim) still stand as follow-ups, not blockers.
· `cubestore-dicts`

…t path

The 'drop sorted worker top-k' refactor narrowed resort_worker_subtree to only
trim plain hash AggregateExec, so the inline (sorted) aggregate lost the
per-partition bounded Sort it had: for GROUP BY <prefix> ... ORDER BY <non-prefix
group col> LIMIT n the worker shipped every group to the router (router-only
bounding -> the inline-aggregate limit OOM class).

Split resort_worker_subtree by partial aggregate kind, one mechanism two shapes:
- inline/sorted: SortPreservingMerge(T) <- Sort(T, fetch, per partition) -- we
  can't trim a sorted aggregate and don't know the group count, so always bound
  with a sort. Router: sorted final over a sort-preserving merge (the query's
  Sort+Limit does the final bounding, no extra Sort).
- hash: the trimming GroupByLimitAggregate + coalesce (factor-aware), router does
  the hash final + Sort(fetch). Unchanged.

Also drop the top-level factor==0 early return so the inline bounding applies
regardless of the trim factor (factor only gates the hash trim).

Tests: un-ignore worker_sort_and_limit_cluster; update its hash-path case to
assert the GroupByLimitAggregate trim instead of the old worker Sort. Full lib
and in-process SQL suites pass with all optimization envs on.
@claude

claude Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Claude finished @waralexrom's task in 1m 10s —— View job


Follow-up review — 4bb1499

The fix itself is sound — the split by partial aggregate kind (sorted/inline gets SortPreservingMerge ← Sort(fetch, per-partition) ← PartialAggregate; hash keeps the trim path), and dropping the top-level factor == 0 early return so inline bounding always applies regardless of trim factor, both read correctly against the regression the commit message describes. The new lex_ordering_from_cols helper and the is_hash decision via locate_partial_aggregate are clean. The un-ignored worker_sort_and_limit_cluster now covers both shapes.

One real issue — please remove before merge. Posted inline on the artifact: 4bb1499 accidentally committed two RocksDB scratch directories (test-create_partitions-local/ and test-get_active_partitions_for_indexes-local/ under rust/cubestore/cubestore/), totalling 14 files. These are runtime output of the metastore unit tests (random IDENTITY UUID, MANIFEST/LOG/OPTIONS with embedded timestamps) — not intentional fixtures. git rm -r both, then add a gitignore pattern (e.g. cubestore/test-*-local/ in rust/cubestore/.gitignore) so future runs can't repeat it.

Everything from the prior two reviews remains in the state reported there — the two leftover low-priority follow-ups (Into<Field> shim audit before flipping dict default-on; emit_all_trimmed allocating the full state batch before trim) still stand as non-blockers.

Checklist
  • Read commits since prior review (4bb1499)
  • Verified inline-path sort restoration logic in distributed_partial_aggregate.rs
  • Confirmed test un-ignore in sql/mod.rs
  • Spotted accidentally committed RocksDB scratch dirs and posted inline
  • Re-checked deduplication against existing review threads (none on file)
· `cubestore-dicts`

Comment thread rust/cubestore/cubestore/test-create_partitions-local/metastore/IDENTITY Outdated
… them

The previous commit accidentally added RocksDB scratch directories left by the
metastore unit tests (test-*-local). Remove them and ignore the pattern.
@claude

claude Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Claude finished @waralexrom's task in 39s —— View job


Follow-up review — c6faaed

Verdict: clean — addresses the prior inline comment and adds a guard against repeats. No new issues. 0 high, 0 medium, 0 low.

Details
  • Scratch dirs removed: 14 files deleted across rust/cubestore/cubestore/test-create_partitions-local/ and rust/cubestore/cubestore/test-get_active_partitions_for_indexes-local/ (git diff 4bb1499..HEAD --stat confirms 890 deletions).
  • Gitignore guard added (rust/cubestore/.gitignore:12-15):
    /cubestore/test-*-local/
    /cubestore/test-*-upstream/
    
    Anchored to the crate root with a leading / (right call — won't accidentally swallow similarly-named paths elsewhere), and the -upstream variant covers the matching RocksDB upstream scratch dir the metastore tests can also leave behind. Comment on the rule explains why, which is the kind of comment the repo style guide actually wants.
  • Prior inline review thread resolved.
  • All previously raised items from the 4bb1499 / cbff840 / 4f9f439 reviews remain in the same state — the two outstanding low-priority follow-ups (audit Into<Field> shim before flipping dict default-on; emit_all_trimmed allocates full state batch before trim) are still non-blockers.
· `cubestore-dicts`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cube store Issues relating to Cube Store rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant