Fix concurrent partition draining in write execution to prevent memory accumulation#45
Draft
Copilot wants to merge 69 commits into
Draft
Fix concurrent partition draining in write execution to prevent memory accumulation#45Copilot wants to merge 69 commits into
Copilot wants to merge 69 commits into
Conversation
fix: handle edge in schema adapter for single missing field
fix: use provided sessionState in method argument
fix: scan time was always 0 for merge metrics
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
…dability-metrics Refactor upsert.rs for improved readability, idiomatic Rust, and meaningful metrics
This reverts commit 9a5a201.
… detection Instead of caching the conflicts DataFrame to work around DataFusion's Dictionary encoding schema mismatch, implement manual join logic: - Collect target DataFrame with join keys + file paths (small result) - Collect distinct source join keys (small result) - Perform join in memory using HashSet for efficient lookup - Extract file paths that have matching keys This avoids materializing large DataFrames while still handling the schema inconsistency by working entirely in memory on small, already-collected data. Memory impact: Only materializes join keys + file paths (one row per conflicting file), not full row data. Much more efficient than caching full DataFrames. Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
The previous approach incorrectly materialized the entire target DataFrame which could be billions of rows. The corrected approach: 1. Keeps target_df and source lazy (not materialized) 2. Performs inner join in DataFusion (lazy operation) 3. Selects only minimal columns (join keys + file path, not full rows) 4. Collects ONLY the join result which is small (only conflicting rows) Memory footprint: For a table with billions of rows but only thousands of conflicts, we materialize only thousands of rows with minimal columns, not billions of full rows. The join result is inherently small because it contains only rows where join keys match between source and target (actual conflicts). Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
…rics - Changed extract_conflicting_filenames to extract_conflicts_dataframe to return a DataFrame - Added extract_file_paths_from_conflicts to extract file paths from the cached DataFrame - Cache the conflicts DataFrame for reuse in multiple places - Added num_conflicting_records field to UpsertMetrics - Count and report conflicting records in metrics - Updated tests to verify num_conflicting_records metric Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
…sert' into copilot/extract-dataframe-for-upsert # Conflicts: # crates/core/src/operations/upsert.rs
…or-upsert Extract conflicts DataFrame and add conflicting records metric to upsert
…or-upsert Copilot/extract dataframe for upsert
Upsert performance and monitoring improvements
Move partition stream creation (execute call) inside spawned tasks to ensure all output partitions are drained concurrently. This prevents memory accumulation in repartition queues when some partitions are not actively being consumed. The fix applies to both the standard write path and the CDC write path. All upsert tests continue to pass. Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Copilot
AI
changed the title
[WIP] Fix concurrent output partition drainage for upsert operation
Fix concurrent partition draining in write execution to prevent memory accumulation
Dec 22, 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Partition streams were created sequentially before spawning async consumers, causing repartition producers to fill channels for idle partitions. This manifests as memory buildup in repartition queues with some partitions at 0 bytes while others explode.
Changes
execute(i, task_ctx)inside spawned tasks: Ensures all partition streams start concurrently rather than sequentially registering with repartition logic before consumption beginswrite_execution_plan_v2Before:
After:
Related Issue(s)
N/A
Documentation
N/A
Original prompt
💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.