Skip to content

feat: upsert#53

Open
mandrush wants to merge 55 commits into
mainfrom
upsert
Open

feat: upsert#53
mandrush wants to merge 55 commits into
mainfrom
upsert

Conversation

@mandrush

Copy link
Copy Markdown
Collaborator

Description

The description of the main changes of your pull request

Related Issue(s)

Documentation

adampolomski and others added 25 commits February 16, 2026 12:59
…ing execution

Remove cache calls on large DataFrames while keeping small result caching:
- Keep conflicts_df cache (small: join keys + file paths only)
- Remove implicit materializations from target_df, filtered_target_df, non_conflicting_target, and result_df
- All large DataFrames now use lazy streaming execution
- Add schema normalization (cast Dictionary to Utf8) for file path column to fix compatibility
- Add helper method find_conflicts_keys_only for clean anti-join logic

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
The target_df parameter was not used in the function body - it only selects
keys from self.source. Removed the parameter and updated the call site.

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
… detection

Instead of caching the conflicts DataFrame to work around DataFusion's Dictionary
encoding schema mismatch, implement manual join logic:
- Collect target DataFrame with join keys + file paths (small result)
- Collect distinct source join keys (small result)
- Perform join in memory using HashSet for efficient lookup
- Extract file paths that have matching keys

This avoids materializing large DataFrames while still handling the schema
inconsistency by working entirely in memory on small, already-collected data.

Memory impact: Only materializes join keys + file paths (one row per conflicting
file), not full row data. Much more efficient than caching full DataFrames.

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
The previous approach incorrectly materialized the entire target DataFrame which
could be billions of rows. The corrected approach:

1. Keeps target_df and source lazy (not materialized)
2. Performs inner join in DataFusion (lazy operation)
3. Selects only minimal columns (join keys + file path, not full rows)
4. Collects ONLY the join result which is small (only conflicting rows)

Memory footprint: For a table with billions of rows but only thousands of
conflicts, we materialize only thousands of rows with minimal columns, not
billions of full rows.

The join result is inherently small because it contains only rows where join
keys match between source and target (actual conflicts).

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
…rics

- Changed extract_conflicting_filenames to extract_conflicts_dataframe to return a DataFrame
- Added extract_file_paths_from_conflicts to extract file paths from the cached DataFrame
- Cache the conflicts DataFrame for reuse in multiple places
- Added num_conflicting_records field to UpsertMetrics
- Count and report conflicting records in metrics
- Updated tests to verify num_conflicting_records metric

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants