Skip to content

Fix concurrent partition draining in write execution to prevent memory accumulation#45

Draft
Copilot wants to merge 69 commits into
upsertfrom
copilot/fix-partition-drain-concurrency
Draft

Fix concurrent partition draining in write execution to prevent memory accumulation#45
Copilot wants to merge 69 commits into
upsertfrom
copilot/fix-partition-drain-concurrency

Conversation

Copilot AI commented Dec 22, 2025

Copy link
Copy Markdown

Description

Partition streams were created sequentially before spawning async consumers, causing repartition producers to fill channels for idle partitions. This manifests as memory buildup in repartition queues with some partitions at 0 bytes while others explode.

Changes

  • Moved execute(i, task_ctx) inside spawned tasks: Ensures all partition streams start concurrently rather than sequentially registering with repartition logic before consumption begins
  • Applied to both write paths: Standard write and CDC write paths in write_execution_plan_v2

Before:

for i in 0..partition_count {
    let mut stream = inner_plan.execute(i, task_ctx)?;  // Sequential registration
    tokio::task::spawn(async move { /* consume stream */ });
}

After:

for i in 0..partition_count {
    tokio::task::spawn(async move {
        let mut stream = inner_plan.execute(i, task_ctx)?;  // Concurrent execution
        /* consume stream */
    });
}

Related Issue(s)

N/A

Documentation

N/A

Original prompt

Make sure you drain all output partitions concurrently for Upsert operation.
If your application (or the Delta upsert pipeline) consumes partitions sequentially, repartition producers can fill channels for “not currently drained” partitions, causing memory to accumulate in the repartition queues.

This matches the symptom “some partitions at 0 bytes” while others explode: it can happen when some consumers aren’t polling, so only the partitions being drained make progress and the others build up/backpressure patterns get weird.

Fix
Ensure you execute/collect all output partitions concurrently, not one-by-one.
If you’re using something like collect(partition) in a loop, replace with JoinSet / FuturesUnordered over all partitions and await them together (similar to what DataFusion’s own tests do in this file).
If the upsert path is single-threadedly pulling partitions, this is especially important.


💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.

alexwilcoxson-rel and others added 30 commits May 19, 2025 20:59
fix: handle edge in schema adapter for single missing field
fix: use provided sessionState in method argument
fix: scan time was always 0 for merge metrics
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
…dability-metrics

Refactor upsert.rs for improved readability, idiomatic Rust, and meaningful metrics
adampolomski and others added 22 commits December 12, 2025 00:27
… detection

Instead of caching the conflicts DataFrame to work around DataFusion's Dictionary
encoding schema mismatch, implement manual join logic:
- Collect target DataFrame with join keys + file paths (small result)
- Collect distinct source join keys (small result)
- Perform join in memory using HashSet for efficient lookup
- Extract file paths that have matching keys

This avoids materializing large DataFrames while still handling the schema
inconsistency by working entirely in memory on small, already-collected data.

Memory impact: Only materializes join keys + file paths (one row per conflicting
file), not full row data. Much more efficient than caching full DataFrames.

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
The previous approach incorrectly materialized the entire target DataFrame which
could be billions of rows. The corrected approach:

1. Keeps target_df and source lazy (not materialized)
2. Performs inner join in DataFusion (lazy operation)
3. Selects only minimal columns (join keys + file path, not full rows)
4. Collects ONLY the join result which is small (only conflicting rows)

Memory footprint: For a table with billions of rows but only thousands of
conflicts, we materialize only thousands of rows with minimal columns, not
billions of full rows.

The join result is inherently small because it contains only rows where join
keys match between source and target (actual conflicts).

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
…rics

- Changed extract_conflicting_filenames to extract_conflicts_dataframe to return a DataFrame
- Added extract_file_paths_from_conflicts to extract file paths from the cached DataFrame
- Cache the conflicts DataFrame for reuse in multiple places
- Added num_conflicting_records field to UpsertMetrics
- Count and report conflicting records in metrics
- Updated tests to verify num_conflicting_records metric

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
…sert' into copilot/extract-dataframe-for-upsert

# Conflicts:
#	crates/core/src/operations/upsert.rs
…or-upsert

Extract conflicts DataFrame and add conflicting records metric to upsert
…or-upsert

Copilot/extract dataframe for upsert
Upsert performance and monitoring improvements
Move partition stream creation (execute call) inside spawned tasks to ensure
all output partitions are drained concurrently. This prevents memory
accumulation in repartition queues when some partitions are not actively
being consumed.

The fix applies to both the standard write path and the CDC write path.
All upsert tests continue to pass.

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Copilot AI changed the title [WIP] Fix concurrent output partition drainage for upsert operation Fix concurrent partition draining in write execution to prevent memory accumulation Dec 22, 2025
Copilot AI requested a review from adampolomski December 22, 2025 09:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants