Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,7 +84,11 @@
/** A version-agnostic PSC {@link ScanTableSource}. */
@Internal
public class PscDynamicSource
implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
implements
ScanTableSource,
SupportsReadingMetadata,
SupportsWatermarkPushDown,
SupportsProjectionPushDown {

private static final Logger LOG = LoggerFactory.getLogger(PscDynamicSource.class);

Expand Down Expand Up @@ -114,11 +122,29 @@ public class PscDynamicSource
/** Format for decoding values from PSC. */
protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;

/** Indices that determine the key fields and the target position in the produced row. */
protected final int[] keyProjection;
/**
* Projection paths for key fields. Each int[] is a path to a field:
* - [topLevelIndex] for top-level fields
* - [topLevelIndex, nestedIndex, ...] for nested fields within ROW types
* Used by formats that support nested projection (e.g., Thrift's PartialThriftDeserializer).
*/
protected int[][] keyProjection;

/**
* Projection paths for value fields. Each int[] is a path to a field:
* - [topLevelIndex] for top-level fields
* - [topLevelIndex, nestedIndex, ...] for nested fields within ROW types
* Used by formats that support nested projection (e.g., Thrift's PartialThriftDeserializer).
*/
protected int[][] valueProjection;

/** Indices that determine the value fields and the target position in the produced row. */
protected final int[] valueProjection;
// Query-specific projections (see SupportsProjectionPushDown).
// - *Format* projections are indices into physicalDataType (control what gets deserialized).
// - *Output* projections are indices into the projected physical row (control where fields land).
protected int[] keyFormatProjection;
protected int[] valueFormatProjection;
protected int[] keyOutputProjection;
protected int[] valueOutputProjection;

/** Prefix that needs to be removed from fields when constructing the physical data type. */
protected final @Nullable String keyPrefix;
Expand Down Expand Up @@ -206,6 +232,10 @@ private int getIntendedParallelism(StreamExecutionEnvironment execEnv) {
return scanParallelism != null ? scanParallelism : execEnv.getParallelism();
}

/**
* Backwards-compatible constructor that accepts int[] projections.
* Converts them to int[][] format internally.
*/
public PscDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
Expand All @@ -228,6 +258,55 @@ public PscDynamicSource(
boolean enableRescale,
@Nullable Double rateLimitRecordsPerSecond,
@Nullable Integer scanParallelism) {
this(
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
toNestedProjection(keyProjection),
toNestedProjection(valueProjection),
keyPrefix,
topics,
topicPattern,
properties,
startupMode,
specificStartupOffsets,
startupTimestampMillis,
boundedMode,
specificBoundedOffsets,
boundedTimestampMillis,
upsertMode,
tableIdentifier,
sourceUidPrefix,
enableRescale,
rateLimitRecordsPerSecond,
scanParallelism);
}

/**
* Primary constructor that accepts int[][] projections for full nested field support.
*/
public PscDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[][] keyProjection,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One main question that I still have here: in this keyProjection / valueProjection arrays for nested projection, those embedded indices are based on the field/sub-field indices in the DDL table schema definition, right? So, does it also mean that the nested projection also relies on the field/sub-field indices of the table DDL schema are exactly the same as the Thrift schema? i.e. if Thrift schema has 1: f1, 2:f2, 3:f3, 4:f4 fields, and the table DDL has not been updated yet and only has 1:f1, 2:f2, 3:f4 three fields. If the projection is on f1 and f4, the indices will be 1,3. However, due to difference between table DDL and Thrift schema, the projection will become 1:f1, and 3:f3, not expected.

The only reliable way to implement that is to have the nested projection based on field/sub-field names, instead of indices. This is what I believe to be implemented: a) int[][] keyProjection and int[][] valueProjection are indices to the table DDL schema; b) we need to find the list of field/sub-field names according to the table DDL schema based on keyProjection and valueProjection; c) query and convert the field/sub-field names to Thrift field/sub-field indices, which is the actual nested projection int[][] that are passed into the partial Thrift deserializer.

int[][] valueProjection,
@Nullable String keyPrefix,
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
StartupMode startupMode,
Map<PscTopicUriPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
BoundedMode boundedMode,
Map<PscTopicUriPartition, Long> specificBoundedOffsets,
long boundedTimestampMillis,
boolean upsertMode,
String tableIdentifier,
@Nullable String sourceUidPrefix,
boolean enableRescale,
@Nullable Double rateLimitRecordsPerSecond,
@Nullable Integer scanParallelism) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
Expand All @@ -241,6 +320,12 @@ public PscDynamicSource(
this.valueProjection =
Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
this.keyPrefix = keyPrefix;

// Default behavior: no projection pushdown, keep the DDL-level projections.
this.keyFormatProjection = getTopLevelIndices(this.keyProjection);
this.valueFormatProjection = getTopLevelIndices(this.valueProjection);
this.keyOutputProjection = getTopLevelIndices(this.keyProjection);
this.valueOutputProjection = getTopLevelIndices(this.valueProjection);
// Mutable attributes
this.producedDataType = physicalDataType;
this.metadataKeys = Collections.emptyList();
Expand Down Expand Up @@ -327,10 +412,20 @@ public ChangelogMode getChangelogMode() {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final DeserializationSchema<RowData> keyDeserialization =
createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
createDeserialization(
context,
keyDecodingFormat,
keyFormatProjection,
keyProjection,
keyPrefix);

final DeserializationSchema<RowData> valueDeserialization =
createDeserialization(context, valueDecodingFormat, valueProjection, null);
createDeserialization(
context,
valueDecodingFormat,
valueFormatProjection,
valueProjection,
null);

final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
Expand Down Expand Up @@ -456,6 +551,134 @@ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}

@Override
public boolean supportsNestedProjection() {
return true;
}

@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
Preconditions.checkNotNull(projectedFields, "Projected fields must not be null.");
Preconditions.checkNotNull(producedDataType, "Produced data type must not be null.");

final LogicalType physicalType = physicalDataType.getLogicalType();
Preconditions.checkArgument(
physicalType.is(LogicalTypeRoot.ROW), "Row data type expected.");
final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType);

// projectedFields is a 2D array where:
// - The first dimension represents the output field position (order in the projected row)
// - The second dimension is the path to the field: [topLevelIndex] for top-level fields,
// or [topLevelIndex, nestedIndex, ...] for nested fields within ROW types
// Example: For schema (a INT, b ROW<x STRING, y INT>, c BIGINT):
// - [[2], [1, 0]] means SELECT c, b.x → output row has c at position 0, b.x at position 1
// - [2] is the path to top-level field 'c'
// - [1, 0] is the path to nested field 'x' within 'b'

// Track which top-level fields are projected (for filtering)
final boolean[] physicalFieldProjected = new boolean[physicalFieldCount];

// Group projected paths by their top-level field index, preserving output positions.
// Each entry maps: path -> outputPosition
// This fixes the collision issue where multiple nested fields from the same parent
// (e.g., b.key and b.value) each need their own output position.
final Map<Integer, List<PathWithOutputPos>> pathsByTopLevelIndex = new LinkedHashMap<>();

for (int outputPos = 0; outputPos < projectedFields.length; outputPos++) {
final int[] path = projectedFields[outputPos];
Preconditions.checkArgument(
path != null && path.length >= 1,
"Projection path must have at least one element but got: %s",
Arrays.toString(path));
final int physicalPos = path[0];
Preconditions.checkArgument(
physicalPos >= 0 && physicalPos < physicalFieldCount,
"Projected field index out of bounds: %s",
physicalPos);

physicalFieldProjected[physicalPos] = true;
pathsByTopLevelIndex
.computeIfAbsent(physicalPos, k -> new ArrayList<>())
.add(new PathWithOutputPos(path, outputPos));
}

// This sets the physical output type. Note that SupportsReadingMetadata#applyReadableMetadata
// may overwrite producedDataType later with appended metadata columns.
this.producedDataType = producedDataType;

// Get original top-level indices from current projections
int[] originalKeyTopLevel = getTopLevelIndices(keyProjection);
int[] originalValueTopLevel = getTopLevelIndices(valueProjection);

// Build key format projection, nested paths, and output mapping
List<Integer> keyFormatList = new ArrayList<>();
List<int[]> keyNestedList = new ArrayList<>();
List<Integer> keyOutputList = new ArrayList<>();
for (int physicalPos : originalKeyTopLevel) {
if (physicalFieldProjected[physicalPos]) {
keyFormatList.add(physicalPos);
List<PathWithOutputPos> pathsWithPos = pathsByTopLevelIndex.get(physicalPos);
if (pathsWithPos != null) {
for (PathWithOutputPos pwp : pathsWithPos) {
keyNestedList.add(pwp.path);
keyOutputList.add(pwp.outputPos);
}
}
}
}
this.keyFormatProjection = keyFormatList.stream().mapToInt(Integer::intValue).toArray();
this.keyProjection = keyNestedList.toArray(new int[0][]);
this.keyOutputProjection = keyOutputList.stream().mapToInt(Integer::intValue).toArray();

// Build value format projection, nested paths, and output mapping
List<Integer> valueFormatList = new ArrayList<>();
List<int[]> valueNestedList = new ArrayList<>();
List<Integer> valueOutputList = new ArrayList<>();
for (int physicalPos : originalValueTopLevel) {
if (physicalFieldProjected[physicalPos]) {
valueFormatList.add(physicalPos);
List<PathWithOutputPos> pathsWithPos = pathsByTopLevelIndex.get(physicalPos);
if (pathsWithPos != null) {
for (PathWithOutputPos pwp : pathsWithPos) {
valueNestedList.add(pwp.path);
valueOutputList.add(pwp.outputPos);
}
}
}
}
this.valueFormatProjection = valueFormatList.stream().mapToInt(Integer::intValue).toArray();
this.valueProjection = valueNestedList.toArray(new int[0][]);
this.valueOutputProjection = valueOutputList.stream().mapToInt(Integer::intValue).toArray();
}

/** Helper class to associate a projection path with its output position. */
private static class PathWithOutputPos {
final int[] path;
final int outputPos;

PathWithOutputPos(int[] path, int outputPos) {
this.path = path;
this.outputPos = outputPos;
}
}

/** Converts a 1D projection (top-level indices) to 2D nested projection format. */
private static int[][] toNestedProjection(int[] projection) {
int[][] result = new int[projection.length][];
for (int i = 0; i < projection.length; i++) {
result[i] = new int[] {projection[i]};
}
return result;
}

/** Extracts unique top-level indices from nested projection paths. */
private static int[] getTopLevelIndices(int[][] projection) {
return Arrays.stream(projection)
.mapToInt(path -> path[0])
.distinct()
.toArray();
}

@Override
public DynamicTableSource copy() {
final PscDynamicSource copy =
Expand Down Expand Up @@ -484,6 +707,10 @@ public DynamicTableSource copy() {
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
copy.keyFormatProjection = keyFormatProjection;
copy.valueFormatProjection = valueFormatProjection;
copy.keyOutputProjection = keyOutputProjection;
copy.valueOutputProjection = valueOutputProjection;
return copy;
}

Expand All @@ -506,8 +733,8 @@ public boolean equals(Object o) {
&& Objects.equals(physicalDataType, that.physicalDataType)
&& Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
&& Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
&& Arrays.equals(keyProjection, that.keyProjection)
&& Arrays.equals(valueProjection, that.valueProjection)
&& Arrays.deepEquals(keyProjection, that.keyProjection)
&& Arrays.deepEquals(valueProjection, that.valueProjection)
&& Objects.equals(keyPrefix, that.keyPrefix)
&& Objects.equals(topicUris, that.topicUris)
&& Objects.equals(String.valueOf(topicUriPattern), String.valueOf(that.topicUriPattern))
Expand Down Expand Up @@ -535,8 +762,8 @@ public int hashCode() {
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
Arrays.hashCode(keyProjection),
Arrays.hashCode(valueProjection),
Arrays.deepHashCode(keyProjection),
Arrays.deepHashCode(valueProjection),
keyPrefix,
topicUris,
topicUriPattern,
Expand Down Expand Up @@ -685,16 +912,16 @@ private PscDeserializationSchema<RowData> createPscDeserializationSchema(
// adjust value format projection to include value format's metadata columns at the end
final int[] adjustedValueProjection =
IntStream.concat(
IntStream.of(valueProjection),
IntStream.of(valueOutputProjection),
IntStream.range(
keyProjection.length + valueProjection.length,
keyOutputProjection.length + valueOutputProjection.length,
adjustedPhysicalArity))
.toArray();

return new DynamicPscDeserializationSchema(
adjustedPhysicalArity,
keyDeserialization,
keyProjection,
keyOutputProjection,
valueDeserialization,
adjustedValueProjection,
hasMetadata,
Expand All @@ -706,12 +933,15 @@ private PscDeserializationSchema<RowData> createPscDeserializationSchema(
private @Nullable DeserializationSchema<RowData> createDeserialization(
Context context,
@Nullable DecodingFormat<DeserializationSchema<RowData>> format,
int[] projection,
int[] formatProjection,
int[][] nestedProjection,
@Nullable String prefix) {
if (format == null) {
return null;
}
DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType);
// Use nested projection for proper nested field pruning
DataType physicalFormatDataType =
Projection.of(nestedProjection).project(this.physicalDataType);
if (prefix != null) {
physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
}
Expand Down
Loading
Loading