#733 add numbers to duplicated sanitized cols#734
Conversation
…d retain original column metadata
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 6 minutes and 43 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
WalkthroughReorders JDBC decimal-correction load to run before sanitization; enhances DataFrame column sanitization to avoid case-insensitive name collisions by appending numeric suffixes and preserve original column names and existing metadata; adds metadata parsing for CHAR/VARCHAR lengths and expands unit tests for these behaviors. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala (1)
155-185: Tests look good, but consider adding mixed-case coverage.The collision handling and metadata tests are comprehensive for lowercase scenarios. However, the case-sensitivity bug in
sanitizeDfColumns(whereremove/addoperations don't use lowercase) wouldn't be caught by these tests since all column names are lowercase.Consider adding a test with mixed-case columns to validate case-insensitive collision detection:
🧪 Suggested additional test case
"handle case-insensitive name collisions" in { val df = List(("A", 1, 2)).toDF("Test_Column", "Test Column", "TEST COLUMN") val actualDf = sanitizeDfColumns(df, " ") // All three should produce unique names despite case differences val colNames = actualDf.schema.fields.map(_.name) assert(colNames.distinct.length == 3, "All columns should have unique names") assert(colNames.contains("Test_Column")) // The other two should get suffixes like Test_Column_1, Test_Column_2 }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala` around lines 155 - 185, Add a new test in SparkUtilsSuite.scala (e.g., "handle case-insensitive name collisions") that constructs a DataFrame with mixed-case colliding column names (for example "Test_Column", "Test Column", "TEST COLUMN"), calls sanitizeDfColumns(...) and then verifies the resulting schema has distinct names (colNames.distinct.length == originalCount) and that the original base name (e.g., "Test_Column") is preserved with suffixes like "_1", "_2" for the others; reference sanitizeDfColumns and the test name to locate where to add it and reuse existing helpers (toDF, convertDataFrameToPrettyJSON or schema assertions) to assert uniqueness and expected suffixing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala`:
- Around line 169-189: The column-selection fails because schemaWithMetaData is
taken from the already-sanitized df while dfJdbcWithCustomSchema contains
original unsanitized column names; update the code inside the
JdbcSparkUtils.getCorrectedDecimalsSchema block to either (a) apply your same
sanitization routine to dfJdbcWithCustomSchema before selecting (so names match
schemaWithMetaData), or (b) build cols by reading original names from
dfJdbcWithCustomSchema.schema and aliasing them to the sanitized names from
schemaWithMetaData (use .as(sanitizedName, metadata) on col(originalName));
adjust the selection on dfJdbcWithCustomSchema accordingly so column names
referenced exist and metadata is reattached.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala`:
- Around line 145-154: The HashSet namesLowercase is populated with lowercase
strings but the code removes/adds using original-case names causing removals to
fail and future contains checks to break; in the block around uniqueName
(variables namesLowercase, srcName, trgName, getUniqueName) change the HashSet
interactions to use lowercase consistently (e.g., call toLowerCase on srcName
when removing and on newName or trgName when adding) so namesLowercase only ever
stores lowercase entries while leaving returned uniqueName value semantics
unchanged.
---
Nitpick comments:
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala`:
- Around line 155-185: Add a new test in SparkUtilsSuite.scala (e.g., "handle
case-insensitive name collisions") that constructs a DataFrame with mixed-case
colliding column names (for example "Test_Column", "Test Column", "TEST
COLUMN"), calls sanitizeDfColumns(...) and then verifies the resulting schema
has distinct names (colNames.distinct.length == originalCount) and that the
original base name (e.g., "Test_Column") is preserved with suffixes like "_1",
"_2" for the others; reference sanitizeDfColumns and the test name to locate
where to add it and reuse existing helpers (toDF, convertDataFrameToPrettyJSON
or schema assertions) to assert uniqueness and expected suffixing.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3b54f110-2142-4f3b-8175-535bd6e4fece
📒 Files selected for processing (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala
Unit Test Coverage
Files
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala`:
- Around line 164-176: The sanitization call (SparkUtils.sanitizeDfColumns) is
currently gated by the decimal-correction if-block so it only runs when
jdbcReaderConfig.correctDecimalsInSchema || correctDecimalsFixPrecision is true;
move the sanitization so it always runs for data queries. Specifically, keep the
existing decimal-correction logic using
JdbcSparkUtils.getCorrectedDecimalsSchema and the reload into df, but remove the
SparkUtils.sanitizeDfColumns call from inside that if and instead call
SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters) once after
the decimal-correction block when isDataQuery is true (so sanitize runs
regardless of the decimal flags).
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala`:
- Around line 406-409: The code currently captures only the length from
CHAR/VARCHAR metadata, losing the type keyword so
transformSchemaForCatalog/transformPrimitive can only emit VarcharType; update
the regex charVarcharLengthPattern to capture the type keyword (CHAR or VARCHAR)
as a group, change getLengthFromMetadata to return both the keyword and length
(or a small tuple/object) when CHAR_VARCHAR_METADATA_KEY is present, and update
transformPrimitive/transformSchemaForCatalog to check that captured keyword and
create CharType(n) when the keyword is CHAR and VarcharType(n) when VARCHAR,
preserving the original semantic distinction.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c04c69dd-5fe4-4952-84d1-95ff0802cb27
📒 Files selected for processing (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala
| if (jdbcReaderConfig.correctDecimalsInSchema || jdbcReaderConfig.correctDecimalsFixPrecision) { | ||
| if (isDataQuery) { | ||
| df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters) | ||
| } | ||
|
|
||
| JdbcSparkUtils.getCorrectedDecimalsSchema(df, jdbcReaderConfig.correctDecimalsFixPrecision).foreach(schema => | ||
| JdbcSparkUtils.getCorrectedDecimalsSchema(df, jdbcReaderConfig.correctDecimalsFixPrecision).foreach { schema => | ||
| df = spark | ||
| .read | ||
| .format("jdbc") | ||
| .options(connectionOptions) | ||
| .option("customSchema", schema) | ||
| .load() | ||
| ) | ||
| } | ||
|
|
||
| if (isDataQuery) { | ||
| df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters) | ||
| } |
There was a problem hiding this comment.
Move sanitization out of the decimal-correction branch.
sanitizeDfColumns() now runs only when one of the decimal-correction flags is enabled. With both flags off, data queries stop sanitizing entirely, so the duplicate-safe renames and original_name metadata from this PR never get applied.
💡 Proposed fix
if (jdbcReaderConfig.correctDecimalsInSchema || jdbcReaderConfig.correctDecimalsFixPrecision) {
JdbcSparkUtils.getCorrectedDecimalsSchema(df, jdbcReaderConfig.correctDecimalsFixPrecision).foreach { schema =>
df = spark
.read
.format("jdbc")
.options(connectionOptions)
.option("customSchema", schema)
.load()
}
-
- if (isDataQuery) {
- df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters)
- }
+ }
+
+ if (isDataQuery) {
+ df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (jdbcReaderConfig.correctDecimalsInSchema || jdbcReaderConfig.correctDecimalsFixPrecision) { | |
| if (isDataQuery) { | |
| df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters) | |
| } | |
| JdbcSparkUtils.getCorrectedDecimalsSchema(df, jdbcReaderConfig.correctDecimalsFixPrecision).foreach(schema => | |
| JdbcSparkUtils.getCorrectedDecimalsSchema(df, jdbcReaderConfig.correctDecimalsFixPrecision).foreach { schema => | |
| df = spark | |
| .read | |
| .format("jdbc") | |
| .options(connectionOptions) | |
| .option("customSchema", schema) | |
| .load() | |
| ) | |
| } | |
| if (isDataQuery) { | |
| df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters) | |
| } | |
| if (jdbcReaderConfig.correctDecimalsInSchema || jdbcReaderConfig.correctDecimalsFixPrecision) { | |
| JdbcSparkUtils.getCorrectedDecimalsSchema(df, jdbcReaderConfig.correctDecimalsFixPrecision).foreach { schema => | |
| df = spark | |
| .read | |
| .format("jdbc") | |
| .options(connectionOptions) | |
| .option("customSchema", schema) | |
| .load() | |
| } | |
| } | |
| if (isDataQuery) { | |
| df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala`
around lines 164 - 176, The sanitization call (SparkUtils.sanitizeDfColumns) is
currently gated by the decimal-correction if-block so it only runs when
jdbcReaderConfig.correctDecimalsInSchema || correctDecimalsFixPrecision is true;
move the sanitization so it always runs for data queries. Specifically, keep the
existing decimal-correction logic using
JdbcSparkUtils.getCorrectedDecimalsSchema and the reload into df, but remove the
SparkUtils.sanitizeDfColumns call from inside that if and instead call
SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters) once after
the decimal-correction block when isDataQuery is true (so sanitize runs
regardless of the decimal flags).
Closes #733
Summary by CodeRabbit
Bug Fixes
New Features
Tests