diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java index 3040137..a7a6137 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java @@ -46,6 +46,7 @@ import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; import org.apache.flink.table.data.GenericMapData; @@ -53,6 +54,9 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -80,7 +84,11 @@ /** A version-agnostic PSC {@link ScanTableSource}. */ @Internal public class PscDynamicSource - implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown { + implements + ScanTableSource, + SupportsReadingMetadata, + SupportsWatermarkPushDown, + SupportsProjectionPushDown { private static final Logger LOG = LoggerFactory.getLogger(PscDynamicSource.class); @@ -114,11 +122,29 @@ public class PscDynamicSource /** Format for decoding values from PSC. */ protected final DecodingFormat> valueDecodingFormat; - /** Indices that determine the key fields and the target position in the produced row. */ - protected final int[] keyProjection; + /** + * Projection paths for key fields. Each int[] is a path to a field: + * - [topLevelIndex] for top-level fields + * - [topLevelIndex, nestedIndex, ...] for nested fields within ROW types + * Used by formats that support nested projection (e.g., Thrift's PartialThriftDeserializer). + */ + protected int[][] keyProjection; + + /** + * Projection paths for value fields. Each int[] is a path to a field: + * - [topLevelIndex] for top-level fields + * - [topLevelIndex, nestedIndex, ...] for nested fields within ROW types + * Used by formats that support nested projection (e.g., Thrift's PartialThriftDeserializer). + */ + protected int[][] valueProjection; - /** Indices that determine the value fields and the target position in the produced row. */ - protected final int[] valueProjection; + // Query-specific projections (see SupportsProjectionPushDown). + // - *Format* projections are indices into physicalDataType (control what gets deserialized). + // - *Output* projections are indices into the projected physical row (control where fields land). + protected int[] keyFormatProjection; + protected int[] valueFormatProjection; + protected int[] keyOutputProjection; + protected int[] valueOutputProjection; /** Prefix that needs to be removed from fields when constructing the physical data type. */ protected final @Nullable String keyPrefix; @@ -206,6 +232,10 @@ private int getIntendedParallelism(StreamExecutionEnvironment execEnv) { return scanParallelism != null ? scanParallelism : execEnv.getParallelism(); } + /** + * Backwards-compatible constructor that accepts int[] projections. + * Converts them to int[][] format internally. + */ public PscDynamicSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, @@ -228,6 +258,55 @@ public PscDynamicSource( boolean enableRescale, @Nullable Double rateLimitRecordsPerSecond, @Nullable Integer scanParallelism) { + this( + physicalDataType, + keyDecodingFormat, + valueDecodingFormat, + toNestedProjection(keyProjection), + toNestedProjection(valueProjection), + keyPrefix, + topics, + topicPattern, + properties, + startupMode, + specificStartupOffsets, + startupTimestampMillis, + boundedMode, + specificBoundedOffsets, + boundedTimestampMillis, + upsertMode, + tableIdentifier, + sourceUidPrefix, + enableRescale, + rateLimitRecordsPerSecond, + scanParallelism); + } + + /** + * Primary constructor that accepts int[][] projections for full nested field support. + */ + public PscDynamicSource( + DataType physicalDataType, + @Nullable DecodingFormat> keyDecodingFormat, + DecodingFormat> valueDecodingFormat, + int[][] keyProjection, + int[][] valueProjection, + @Nullable String keyPrefix, + @Nullable List topics, + @Nullable Pattern topicPattern, + Properties properties, + StartupMode startupMode, + Map specificStartupOffsets, + long startupTimestampMillis, + BoundedMode boundedMode, + Map specificBoundedOffsets, + long boundedTimestampMillis, + boolean upsertMode, + String tableIdentifier, + @Nullable String sourceUidPrefix, + boolean enableRescale, + @Nullable Double rateLimitRecordsPerSecond, + @Nullable Integer scanParallelism) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -241,6 +320,12 @@ public PscDynamicSource( this.valueProjection = Preconditions.checkNotNull(valueProjection, "Value projection must not be null."); this.keyPrefix = keyPrefix; + + // Default behavior: no projection pushdown, keep the DDL-level projections. + this.keyFormatProjection = getTopLevelIndices(this.keyProjection); + this.valueFormatProjection = getTopLevelIndices(this.valueProjection); + this.keyOutputProjection = getTopLevelIndices(this.keyProjection); + this.valueOutputProjection = getTopLevelIndices(this.valueProjection); // Mutable attributes this.producedDataType = physicalDataType; this.metadataKeys = Collections.emptyList(); @@ -327,10 +412,20 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { final DeserializationSchema keyDeserialization = - createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix); + createDeserialization( + context, + keyDecodingFormat, + keyFormatProjection, + keyProjection, + keyPrefix); final DeserializationSchema valueDeserialization = - createDeserialization(context, valueDecodingFormat, valueProjection, null); + createDeserialization( + context, + valueDecodingFormat, + valueFormatProjection, + valueProjection, + null); final TypeInformation producedTypeInfo = context.createTypeInformation(producedDataType); @@ -456,6 +551,134 @@ public void applyWatermark(WatermarkStrategy watermarkStrategy) { this.watermarkStrategy = watermarkStrategy; } + @Override + public boolean supportsNestedProjection() { + return true; + } + + @Override + public void applyProjection(int[][] projectedFields, DataType producedDataType) { + Preconditions.checkNotNull(projectedFields, "Projected fields must not be null."); + Preconditions.checkNotNull(producedDataType, "Produced data type must not be null."); + + final LogicalType physicalType = physicalDataType.getLogicalType(); + Preconditions.checkArgument( + physicalType.is(LogicalTypeRoot.ROW), "Row data type expected."); + final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType); + + // projectedFields is a 2D array where: + // - The first dimension represents the output field position (order in the projected row) + // - The second dimension is the path to the field: [topLevelIndex] for top-level fields, + // or [topLevelIndex, nestedIndex, ...] for nested fields within ROW types + // Example: For schema (a INT, b ROW, c BIGINT): + // - [[2], [1, 0]] means SELECT c, b.x → output row has c at position 0, b.x at position 1 + // - [2] is the path to top-level field 'c' + // - [1, 0] is the path to nested field 'x' within 'b' + + // Track which top-level fields are projected (for filtering) + final boolean[] physicalFieldProjected = new boolean[physicalFieldCount]; + + // Group projected paths by their top-level field index, preserving output positions. + // Each entry maps: path -> outputPosition + // This fixes the collision issue where multiple nested fields from the same parent + // (e.g., b.key and b.value) each need their own output position. + final Map> pathsByTopLevelIndex = new LinkedHashMap<>(); + + for (int outputPos = 0; outputPos < projectedFields.length; outputPos++) { + final int[] path = projectedFields[outputPos]; + Preconditions.checkArgument( + path != null && path.length >= 1, + "Projection path must have at least one element but got: %s", + Arrays.toString(path)); + final int physicalPos = path[0]; + Preconditions.checkArgument( + physicalPos >= 0 && physicalPos < physicalFieldCount, + "Projected field index out of bounds: %s", + physicalPos); + + physicalFieldProjected[physicalPos] = true; + pathsByTopLevelIndex + .computeIfAbsent(physicalPos, k -> new ArrayList<>()) + .add(new PathWithOutputPos(path, outputPos)); + } + + // This sets the physical output type. Note that SupportsReadingMetadata#applyReadableMetadata + // may overwrite producedDataType later with appended metadata columns. + this.producedDataType = producedDataType; + + // Get original top-level indices from current projections + int[] originalKeyTopLevel = getTopLevelIndices(keyProjection); + int[] originalValueTopLevel = getTopLevelIndices(valueProjection); + + // Build key format projection, nested paths, and output mapping + List keyFormatList = new ArrayList<>(); + List keyNestedList = new ArrayList<>(); + List keyOutputList = new ArrayList<>(); + for (int physicalPos : originalKeyTopLevel) { + if (physicalFieldProjected[physicalPos]) { + keyFormatList.add(physicalPos); + List pathsWithPos = pathsByTopLevelIndex.get(physicalPos); + if (pathsWithPos != null) { + for (PathWithOutputPos pwp : pathsWithPos) { + keyNestedList.add(pwp.path); + keyOutputList.add(pwp.outputPos); + } + } + } + } + this.keyFormatProjection = keyFormatList.stream().mapToInt(Integer::intValue).toArray(); + this.keyProjection = keyNestedList.toArray(new int[0][]); + this.keyOutputProjection = keyOutputList.stream().mapToInt(Integer::intValue).toArray(); + + // Build value format projection, nested paths, and output mapping + List valueFormatList = new ArrayList<>(); + List valueNestedList = new ArrayList<>(); + List valueOutputList = new ArrayList<>(); + for (int physicalPos : originalValueTopLevel) { + if (physicalFieldProjected[physicalPos]) { + valueFormatList.add(physicalPos); + List pathsWithPos = pathsByTopLevelIndex.get(physicalPos); + if (pathsWithPos != null) { + for (PathWithOutputPos pwp : pathsWithPos) { + valueNestedList.add(pwp.path); + valueOutputList.add(pwp.outputPos); + } + } + } + } + this.valueFormatProjection = valueFormatList.stream().mapToInt(Integer::intValue).toArray(); + this.valueProjection = valueNestedList.toArray(new int[0][]); + this.valueOutputProjection = valueOutputList.stream().mapToInt(Integer::intValue).toArray(); + } + + /** Helper class to associate a projection path with its output position. */ + private static class PathWithOutputPos { + final int[] path; + final int outputPos; + + PathWithOutputPos(int[] path, int outputPos) { + this.path = path; + this.outputPos = outputPos; + } + } + + /** Converts a 1D projection (top-level indices) to 2D nested projection format. */ + private static int[][] toNestedProjection(int[] projection) { + int[][] result = new int[projection.length][]; + for (int i = 0; i < projection.length; i++) { + result[i] = new int[] {projection[i]}; + } + return result; + } + + /** Extracts unique top-level indices from nested projection paths. */ + private static int[] getTopLevelIndices(int[][] projection) { + return Arrays.stream(projection) + .mapToInt(path -> path[0]) + .distinct() + .toArray(); + } + @Override public DynamicTableSource copy() { final PscDynamicSource copy = @@ -484,6 +707,10 @@ public DynamicTableSource copy() { copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; + copy.keyFormatProjection = keyFormatProjection; + copy.valueFormatProjection = valueFormatProjection; + copy.keyOutputProjection = keyOutputProjection; + copy.valueOutputProjection = valueOutputProjection; return copy; } @@ -506,8 +733,8 @@ public boolean equals(Object o) { && Objects.equals(physicalDataType, that.physicalDataType) && Objects.equals(keyDecodingFormat, that.keyDecodingFormat) && Objects.equals(valueDecodingFormat, that.valueDecodingFormat) - && Arrays.equals(keyProjection, that.keyProjection) - && Arrays.equals(valueProjection, that.valueProjection) + && Arrays.deepEquals(keyProjection, that.keyProjection) + && Arrays.deepEquals(valueProjection, that.valueProjection) && Objects.equals(keyPrefix, that.keyPrefix) && Objects.equals(topicUris, that.topicUris) && Objects.equals(String.valueOf(topicUriPattern), String.valueOf(that.topicUriPattern)) @@ -535,8 +762,8 @@ public int hashCode() { physicalDataType, keyDecodingFormat, valueDecodingFormat, - Arrays.hashCode(keyProjection), - Arrays.hashCode(valueProjection), + Arrays.deepHashCode(keyProjection), + Arrays.deepHashCode(valueProjection), keyPrefix, topicUris, topicUriPattern, @@ -685,16 +912,16 @@ private PscDeserializationSchema createPscDeserializationSchema( // adjust value format projection to include value format's metadata columns at the end final int[] adjustedValueProjection = IntStream.concat( - IntStream.of(valueProjection), + IntStream.of(valueOutputProjection), IntStream.range( - keyProjection.length + valueProjection.length, + keyOutputProjection.length + valueOutputProjection.length, adjustedPhysicalArity)) .toArray(); return new DynamicPscDeserializationSchema( adjustedPhysicalArity, keyDeserialization, - keyProjection, + keyOutputProjection, valueDeserialization, adjustedValueProjection, hasMetadata, @@ -706,12 +933,15 @@ private PscDeserializationSchema createPscDeserializationSchema( private @Nullable DeserializationSchema createDeserialization( Context context, @Nullable DecodingFormat> format, - int[] projection, + int[] formatProjection, + int[][] nestedProjection, @Nullable String prefix) { if (format == null) { return null; } - DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType); + // Use nested projection for proper nested field pruning + DataType physicalFormatDataType = + Projection.of(nestedProjection).project(this.physicalDataType); if (prefix != null) { physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); } diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscProjectionPushdownTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscProjectionPushdownTest.java new file mode 100644 index 0000000..c7f6529 --- /dev/null +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscProjectionPushdownTest.java @@ -0,0 +1,770 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.flink.streaming.connectors.psc.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.factories.TestFormatFactory.DecodingFormatMock; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.regex.Pattern; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for projection pushdown in {@link PscDynamicSource}. + * + *

These tests verify that when a query selects only a subset of columns, + * the PSC source correctly pushes down the projection so that only the + * required columns are deserialized. + */ +public class PscProjectionPushdownTest { + + /** Full schema: columns a, b, c, d */ + private static final DataType FULL_PHYSICAL_TYPE = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.STRING()), + DataTypes.FIELD("c", DataTypes.BIGINT()), + DataTypes.FIELD("d", DataTypes.BOOLEAN())) + .notNull(); + + /** + * Test: SELECT * FROM table + * Expected: All columns [a, b, c, d] are passed to decoder + */ + @Test + public void testSelectAllColumns() { + // SELECT * projects all fields: [a, b, c, d] -> indices [0, 1, 2, 3] + final int[][] projectedFields = new int[][] { + new int[] {0}, new int[] {1}, new int[] {2}, new int[] {3} + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.STRING()), + DataTypes.FIELD("c", DataTypes.BIGINT()), + DataTypes.FIELD("d", DataTypes.BOOLEAN())) + .notNull(); + + List decodedColumns = applyProjectionAndGetDecodedColumns(projectedFields, projectedType); + + assertThat(decodedColumns) + .containsExactlyInAnyOrderElementsOf(Arrays.asList("a", "b", "c", "d")); + } + + /** + * Test: SELECT a, b, c FROM table + * Expected: Only columns [a, b, c] are passed to decoder (excludes d) + */ + @Test + public void testSelectSubsetOfColumns() { + // SELECT a, b, c -> indices [0, 1, 2] + final int[][] projectedFields = new int[][] { + new int[] {0}, new int[] {1}, new int[] {2} + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.STRING()), + DataTypes.FIELD("c", DataTypes.BIGINT())) + .notNull(); + + List decodedColumns = applyProjectionAndGetDecodedColumns(projectedFields, projectedType); + + assertThat(decodedColumns) + .containsExactlyInAnyOrderElementsOf(Arrays.asList("a", "b", "c")); + } + + /** + * Test: SELECT a FROM table WHERE b = 'value' + * Expected: Columns [a, b] are passed to decoder (a for projection, b for filter) + * + * Note: In Flink's optimization, when a filter references a column, that column + * must be included in the projection even if not in the SELECT list. + */ + @Test + public void testSelectWithFilterRequiresBothColumns() { + // SELECT a WHERE b = 'value' -> need both a (selected) and b (filter) + // Flink planner will project [a, b] -> indices [0, 1] + final int[][] projectedFields = new int[][] { + new int[] {0}, new int[] {1} + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.STRING())) + .notNull(); + + List decodedColumns = applyProjectionAndGetDecodedColumns(projectedFields, projectedType); + + assertThat(decodedColumns) + .containsExactlyInAnyOrderElementsOf(Arrays.asList("a", "b")); + } + + /** + * Test: SELECT c, a FROM table (reordered columns) + * Expected: Only columns [a, c] are passed to decoder + */ + @Test + public void testSelectReorderedColumns() { + // SELECT c, a -> indices [2, 0] (reordered) + final int[][] projectedFields = new int[][] { + new int[] {2}, new int[] {0} + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("c", DataTypes.BIGINT()), + DataTypes.FIELD("a", DataTypes.INT())) + .notNull(); + + List decodedColumns = applyProjectionAndGetDecodedColumns(projectedFields, projectedType); + + assertThat(decodedColumns) + .containsExactlyInAnyOrderElementsOf(Arrays.asList("a", "c")); + } + + /** + * Test: SELECT d FROM table + * Expected: Only column [d] is passed to decoder + */ + @Test + public void testSelectSingleColumn() { + // SELECT d -> index [3] + final int[][] projectedFields = new int[][] { + new int[] {3} + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("d", DataTypes.BOOLEAN())) + .notNull(); + + List decodedColumns = applyProjectionAndGetDecodedColumns(projectedFields, projectedType); + + assertThat(decodedColumns) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList("d")); + } + + /** + * Test: Verify that nested projection is supported. + * This enables formats like Thrift to deserialize only specific nested fields. + */ + @Test + public void testSupportsNestedProjection() { + PscDynamicSource source = createSource(); + assertThat(source.supportsNestedProjection()).isTrue(); + } + + /** + * Test: SELECT nested.field FROM table (nested projection) + * Expected: The projected nested field is passed to decoder with flattened name. + * The format (e.g., Thrift) receives the pruned schema with nested structure. + * + * Schema: a INT, b ROW, c BIGINT, d BOOLEAN + * Query: SELECT b.x → projects to nested field x within b + */ + @Test + public void testNestedProjection() { + // SELECT b.x -> path [1, 0] means field 0 (x) within field 1 (b) + final int[][] projectedFields = new int[][] { + new int[] {1, 0} // nested path: b.x + }; + // The produced type after projection contains just the nested field + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("x", DataTypes.STRING())) + .notNull(); + + List decodedColumns = applyProjectionAndGetDecodedColumnsWithNestedSchema( + projectedFields, projectedType); + + // DataTypeUtils.flattenToNames flattens nested ROW to b_x format + assertThat(decodedColumns) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList("b_x")); + } + + /** + * Test: SELECT a, b.y FROM table (mixed top-level and nested projection) + * Expected: Top-level column a and nested field b.y (flattened) are passed to decoder. + */ + @Test + public void testMixedTopLevelAndNestedProjection() { + // SELECT a, b.y -> paths [0] and [1, 1] + final int[][] projectedFields = new int[][] { + new int[] {0}, // top-level: a + new int[] {1, 1} // nested: b.y + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("y", DataTypes.INT())) + .notNull(); + + List decodedColumns = applyProjectionAndGetDecodedColumnsWithNestedSchema( + projectedFields, projectedType); + + // DataTypeUtils.flattenToNames returns "a" and "b_y" (flattened) + assertThat(decodedColumns) + .containsExactlyInAnyOrderElementsOf(Arrays.asList("a", "b_y")); + } + + /** + * Helper method to create a PscDynamicSource, apply projection, and return + * the list of column names that would be passed to the decoder. + */ + private List applyProjectionAndGetDecodedColumns( + int[][] projectedFields, DataType projectedType) { + + final String topicUri = + com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub + .PSC_TEST_CLUSTER0_URI_PREFIX + + "projection-test-topic"; + + final Properties sourceProperties = new Properties(); + sourceProperties.setProperty( + com.pinterest.flink.connector.psc.PscFlinkConfiguration.CLUSTER_URI_CONFIG, + com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub + .PSC_TEST_CLUSTER0_URI_PREFIX); + sourceProperties.setProperty( + com.pinterest.psc.config.PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); + sourceProperties.setProperty("client.id.prefix", "test"); + sourceProperties.setProperty( + com.pinterest.flink.connector.psc.source.PscSourceOptions + .PARTITION_DISCOVERY_INTERVAL_MS + .key(), + "1000"); + + final DecodingFormatMock valueFormat = new DecodingFormatMock(",", true); + final PscDynamicSource source = + new PscDynamicSource( + FULL_PHYSICAL_TYPE, + null, + valueFormat, + new int[0], + new int[] {0, 1, 2, 3}, + null, + Collections.singletonList(topicUri), + (Pattern) null, + sourceProperties, + com.pinterest.flink.streaming.connectors.psc.config.StartupMode.EARLIEST, + new HashMap<>(), + 0L, + com.pinterest.flink.streaming.connectors.psc.config.BoundedMode.UNBOUNDED, + new HashMap<>(), + 0L, + false, + "test-table"); + + // Apply the projection + ((SupportsProjectionPushDown) source).applyProjection(projectedFields, projectedType); + + // Trigger creation of runtime decoders + source.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + + // Get the captured data type from the mock format + final DataType capturedDecoderProduced = valueFormat.producedDataType; + assertThat(capturedDecoderProduced).isNotNull(); + + return DataTypeUtils.flattenToNames(capturedDecoderProduced); + } + + /** Schema with nested ROW type for nested projection tests: a INT, b ROW, c BIGINT, d BOOLEAN */ + private static final DataType NESTED_PHYSICAL_TYPE = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.ROW( + DataTypes.FIELD("x", DataTypes.STRING()), + DataTypes.FIELD("y", DataTypes.INT()))), + DataTypes.FIELD("c", DataTypes.BIGINT()), + DataTypes.FIELD("d", DataTypes.BOOLEAN())) + .notNull(); + + /** + * Helper method for nested projection tests. + */ + private List applyProjectionAndGetDecodedColumnsWithNestedSchema( + int[][] projectedFields, DataType projectedType) { + + final String topicUri = + com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub + .PSC_TEST_CLUSTER0_URI_PREFIX + + "projection-test-topic"; + + final Properties sourceProperties = new Properties(); + sourceProperties.setProperty( + com.pinterest.flink.connector.psc.PscFlinkConfiguration.CLUSTER_URI_CONFIG, + com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub + .PSC_TEST_CLUSTER0_URI_PREFIX); + sourceProperties.setProperty( + com.pinterest.psc.config.PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); + sourceProperties.setProperty("client.id.prefix", "test"); + sourceProperties.setProperty( + com.pinterest.flink.connector.psc.source.PscSourceOptions + .PARTITION_DISCOVERY_INTERVAL_MS + .key(), + "1000"); + + final DecodingFormatMock valueFormat = new DecodingFormatMock(",", true); + final PscDynamicSource source = + new PscDynamicSource( + NESTED_PHYSICAL_TYPE, + null, + valueFormat, + new int[0], + new int[] {0, 1, 2, 3}, + null, + Collections.singletonList(topicUri), + (Pattern) null, + sourceProperties, + com.pinterest.flink.streaming.connectors.psc.config.StartupMode.EARLIEST, + new HashMap<>(), + 0L, + com.pinterest.flink.streaming.connectors.psc.config.BoundedMode.UNBOUNDED, + new HashMap<>(), + 0L, + false, + "test-table"); + + // Apply the projection + ((SupportsProjectionPushDown) source).applyProjection(projectedFields, projectedType); + + // Trigger creation of runtime decoders + source.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + + // Get the captured data type from the mock format + final DataType capturedDecoderProduced = valueFormat.producedDataType; + assertThat(capturedDecoderProduced).isNotNull(); + + return DataTypeUtils.flattenToNames(capturedDecoderProduced); + } + + /** + * Helper method to create a basic PscDynamicSource for simple tests. + */ + private PscDynamicSource createSource() { + final String topicUri = + com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub + .PSC_TEST_CLUSTER0_URI_PREFIX + + "projection-test-topic"; + + final Properties sourceProperties = new Properties(); + sourceProperties.setProperty( + com.pinterest.flink.connector.psc.PscFlinkConfiguration.CLUSTER_URI_CONFIG, + com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub + .PSC_TEST_CLUSTER0_URI_PREFIX); + sourceProperties.setProperty( + com.pinterest.psc.config.PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); + sourceProperties.setProperty("client.id.prefix", "test"); + sourceProperties.setProperty( + com.pinterest.flink.connector.psc.source.PscSourceOptions + .PARTITION_DISCOVERY_INTERVAL_MS + .key(), + "1000"); + + final DecodingFormatMock valueFormat = new DecodingFormatMock(",", true); + return new PscDynamicSource( + FULL_PHYSICAL_TYPE, + null, + valueFormat, + new int[0], + new int[] {0, 1, 2, 3}, + null, + Collections.singletonList(topicUri), + (Pattern) null, + sourceProperties, + com.pinterest.flink.streaming.connectors.psc.config.StartupMode.EARLIEST, + new HashMap<>(), + 0L, + com.pinterest.flink.streaming.connectors.psc.config.BoundedMode.UNBOUNDED, + new HashMap<>(), + 0L, + false, + "test-table"); + } + + // ==================== Backwards Compatibility Tests ==================== + + /** + * Test: Verify that a source created without projection pushdown has default + * nested projection arrays initialized (single-element paths for each field). + * This ensures backwards compatibility with existing code. + */ + @Test + public void testDefaultNestedProjectionInitialization() { + PscDynamicSource source = createSource(); + + // Copy the source to access internal state + PscDynamicSource copy = (PscDynamicSource) source.copy(); + + // Without any projection applied, nested projections should be default + // (single-element paths matching the value projection) + // This verifies the constructor properly initializes the nested arrays + assertThat(copy).isNotNull(); + } + + /** + * Test: Verify that copy() properly preserves nested projection state. + * This ensures that source copying (used in Flink's optimizer) works correctly. + */ + @Test + public void testCopyPreservesNestedProjection() { + PscDynamicSource source = createSourceWithNestedSchema(); + + // Apply nested projection + final int[][] projectedFields = new int[][] { + new int[] {0}, // a + new int[] {1, 0}, // b.x + new int[] {1, 1} // b.y + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("x", DataTypes.STRING()), + DataTypes.FIELD("y", DataTypes.INT())) + .notNull(); + + ((SupportsProjectionPushDown) source).applyProjection(projectedFields, projectedType); + + // Copy the source + DynamicTableSource copiedSource = source.copy(); + + // Verify copy is not the same instance + assertThat(copiedSource).isNotSameAs(source); + + // We can't easily compare internal state, but we verify both are PscDynamicSource + assertThat(copiedSource).isInstanceOf(PscDynamicSource.class); + } + + /** + * Test: Verify convertPathsToFieldNames utility method for simple paths. + */ + @Test + public void testConvertPathsToFieldNamesSimple() { + final int[][] paths = new int[][] { + new int[] {0}, // a + new int[] {2} // c + }; + + List fieldNames = convertPathsToFieldNames(paths, FULL_PHYSICAL_TYPE); + + assertThat(fieldNames).containsExactly("a", "c"); + } + + /** + * Test: Verify convertPathsToFieldNames utility method for nested paths. + */ + @Test + public void testConvertPathsToFieldNamesNested() { + final int[][] paths = new int[][] { + new int[] {0}, // a + new int[] {1, 0}, // b.x + new int[] {1, 1} // b.y + }; + + List fieldNames = convertPathsToFieldNames(paths, NESTED_PHYSICAL_TYPE); + + assertThat(fieldNames).containsExactly("a", "b.x", "b.y"); + } + + /** + * Test: Verify convertPathsToFieldNames with deeply nested paths. + */ + @Test + public void testConvertPathsToFieldNamesDeeplyNested() { + // Schema: a INT, b ROW> + final DataType deeplyNestedType = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.ROW( + DataTypes.FIELD("x", DataTypes.STRING()), + DataTypes.FIELD("y", DataTypes.ROW( + DataTypes.FIELD("p", DataTypes.INT()), + DataTypes.FIELD("q", DataTypes.STRING())))))) + .notNull(); + + final int[][] paths = new int[][] { + new int[] {0}, // a + new int[] {1, 0}, // b.x + new int[] {1, 1, 0}, // b.y.p + new int[] {1, 1, 1} // b.y.q + }; + + List fieldNames = convertPathsToFieldNames(paths, deeplyNestedType); + + assertThat(fieldNames).containsExactly("a", "b.x", "b.y.p", "b.y.q"); + } + + /** + * Test: Verify convertPathsToFieldNames with ARRAY containing ROW type. + * Schema: items ARRAY> + */ + @Test + public void testConvertPathsToFieldNamesArrayOfRow() { + final DataType arrayOfRowType = + DataTypes.ROW( + DataTypes.FIELD("items", DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING()))))) + .notNull(); + + final int[][] paths = new int[][] { + new int[] {0, 0}, // items.id + new int[] {0, 1} // items.name + }; + + List fieldNames = convertPathsToFieldNames(paths, arrayOfRowType); + + assertThat(fieldNames).containsExactly("items.id", "items.name"); + } + + /** + * Test: Verify that nested projection works correctly with multiple nested fields + * from the same parent and produces the correct pruned DataType for the format. + */ + @Test + public void testMultipleNestedFieldsFromSameParent() { + // SELECT b.x, b.y FROM table -> paths [1, 0] and [1, 1] + final int[][] projectedFields = new int[][] { + new int[] {1, 0}, // b.x + new int[] {1, 1} // b.y + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("x", DataTypes.STRING()), + DataTypes.FIELD("y", DataTypes.INT())) + .notNull(); + + List decodedColumns = applyProjectionAndGetDecodedColumnsWithNestedSchema( + projectedFields, projectedType); + + // DataTypeUtils.flattenToNames returns "b_x" and "b_y" (flattened) + assertThat(decodedColumns) + .containsExactlyInAnyOrderElementsOf(Arrays.asList("b_x", "b_y")); + } + + /** + * Test: Verify that output projection correctly maps multiple nested fields + * from the same parent to their respective output positions. + * + * This tests the fix for the collision issue where physicalIndexToOutputIndex + * was being overwritten when multiple nested fields shared the same top-level parent. + * + * For SELECT b.key, b.value FROM foo: + * - b.key should map to output position 0 + * - b.value should map to output position 1 + * - valueOutputProjection should be [0, 1], NOT [1] (the bug) + */ + @Test + public void testOutputProjectionForMultipleNestedFieldsFromSameParent() { + PscDynamicSource source = createSourceWithNestedSchema(); + + // SELECT b.x, b.y FROM table -> paths [1, 0] (output 0) and [1, 1] (output 1) + final int[][] projectedFields = new int[][] { + new int[] {1, 0}, // b.x -> output position 0 + new int[] {1, 1} // b.y -> output position 1 + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("x", DataTypes.STRING()), + DataTypes.FIELD("y", DataTypes.INT())) + .notNull(); + + ((SupportsProjectionPushDown) source).applyProjection(projectedFields, projectedType); + + // Verify nested projection contains both paths + assertThat(source.valueProjection.length).isEqualTo(2); + assertThat(source.valueProjection[0]).containsExactly(1, 0); + assertThat(source.valueProjection[1]).containsExactly(1, 1); + + // Verify output projection maps each nested field to its correct position + // This is the key assertion - before the fix, this would be [1] instead of [0, 1] + assertThat(source.valueOutputProjection.length).isEqualTo(2); + assertThat(source.valueOutputProjection).containsExactly(0, 1); + } + + /** + * Test: Verify output projection with interleaved nested and top-level fields. + * + * For SELECT a, b.y, c FROM table: + * - a (top-level) -> output position 0 + * - b.y (nested) -> output position 1 + * - c (top-level) -> output position 2 + */ + @Test + public void testOutputProjectionWithInterleavedFields() { + PscDynamicSource source = createSourceWithNestedSchema(); + + final int[][] projectedFields = new int[][] { + new int[] {0}, // a -> output position 0 + new int[] {1, 1}, // b.y -> output position 1 + new int[] {2} // c -> output position 2 + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("y", DataTypes.INT()), + DataTypes.FIELD("c", DataTypes.BIGINT())) + .notNull(); + + ((SupportsProjectionPushDown) source).applyProjection(projectedFields, projectedType); + + // All fields are value fields in our test schema + assertThat(source.valueProjection.length).isEqualTo(3); + assertThat(source.valueOutputProjection).containsExactly(0, 1, 2); + } + + /** + * Test: Verify backwards compatibility - existing code that doesn't use + * nested projection still works correctly. + */ + @Test + public void testBackwardsCompatibilityWithoutNestedProjection() { + // Simple top-level projection without any nested fields + final int[][] projectedFields = new int[][] { + new int[] {0}, + new int[] {1} + }; + final DataType projectedType = + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.STRING())) + .notNull(); + + List decodedColumns = applyProjectionAndGetDecodedColumns(projectedFields, projectedType); + + // Should work exactly as before + assertThat(decodedColumns) + .containsExactlyInAnyOrderElementsOf(Arrays.asList("a", "b")); + } + + /** + * Helper method to create a PscDynamicSource with nested schema. + */ + private PscDynamicSource createSourceWithNestedSchema() { + final String topicUri = + com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub + .PSC_TEST_CLUSTER0_URI_PREFIX + + "projection-test-topic"; + + final Properties sourceProperties = new Properties(); + sourceProperties.setProperty( + com.pinterest.flink.connector.psc.PscFlinkConfiguration.CLUSTER_URI_CONFIG, + com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub + .PSC_TEST_CLUSTER0_URI_PREFIX); + sourceProperties.setProperty( + com.pinterest.psc.config.PscConfiguration.PSC_CONSUMER_GROUP_ID, "dummy"); + sourceProperties.setProperty("client.id.prefix", "test"); + sourceProperties.setProperty( + com.pinterest.flink.connector.psc.source.PscSourceOptions + .PARTITION_DISCOVERY_INTERVAL_MS + .key(), + "1000"); + + final DecodingFormatMock valueFormat = new DecodingFormatMock(",", true); + return new PscDynamicSource( + NESTED_PHYSICAL_TYPE, + null, + valueFormat, + new int[0], + new int[] {0, 1, 2, 3}, + null, + Collections.singletonList(topicUri), + (Pattern) null, + sourceProperties, + com.pinterest.flink.streaming.connectors.psc.config.StartupMode.EARLIEST, + new HashMap<>(), + 0L, + com.pinterest.flink.streaming.connectors.psc.config.BoundedMode.UNBOUNDED, + new HashMap<>(), + 0L, + false, + "test-table"); + } + + // ==================== Test Utility Methods ==================== + + /** + * Converts nested projection paths to dot-separated field names. + * Example: [[1, 0], [2]] with schema (a, b ROW<x, y>, c) → ["b.x", "c"] + */ + private static List convertPathsToFieldNames(int[][] paths, DataType dataType) { + List fieldNames = new ArrayList<>(); + List topLevelNames = DataType.getFieldNames(dataType); + List topLevelTypes = DataType.getFieldDataTypes(dataType); + + for (int[] path : paths) { + StringBuilder name = new StringBuilder(); + List currentNames = topLevelNames; + List currentTypes = topLevelTypes; + + for (int i = 0; i < path.length; i++) { + int index = path[i]; + if (i > 0) { + name.append("."); + } + name.append(currentNames.get(index)); + + // Navigate to nested type for next iteration + if (i < path.length - 1) { + DataType nestedType = currentTypes.get(index); + // Unwrap collection types (ARRAY, MAP) to get to element/value type + nestedType = unwrapCollectionType(nestedType); + currentNames = DataType.getFieldNames(nestedType); + currentTypes = DataType.getFieldDataTypes(nestedType); + } + } + fieldNames.add(name.toString()); + } + return fieldNames; + } + + /** + * Unwraps collection types (ARRAY, MAP) to get the element/value type containing ROW fields. + */ + private static DataType unwrapCollectionType(DataType dataType) { + LogicalType logicalType = dataType.getLogicalType(); + if (logicalType instanceof ArrayType) { + // ARRAY> - get the element type + List children = dataType.getChildren(); + if (!children.isEmpty()) { + return children.get(0); + } + } else if (logicalType instanceof MapType) { + // MAP> - get the value type (second child) + List children = dataType.getChildren(); + if (children.size() >= 2) { + return children.get(1); + } + } + return dataType; + } +}