From 8266e32fcbea70d6aeedb9c43ec1b4b7c6c7f908 Mon Sep 17 00:00:00 2001 From: Carlo Maria Proietti Date: Wed, 11 Mar 2026 10:49:19 +0100 Subject: [PATCH 1/7] a draft for a DF API --- .../org/apache/wayang/api/DataFrame.scala | 29 ++++++++ .../scala/org/apache/wayang/api/Row.scala | 23 +++++++ .../basic/operators/SelectOperator.java | 35 ++++++++++ .../apache/wayang/spark/mapping/Mappings.java | 3 +- .../wayang/spark/mapping/SelectMapping.java | 38 +++++++++++ .../spark/operators/SparkSelectOperator.java | 66 +++++++++++++++++++ 6 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala create mode 100644 wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/Row.scala create mode 100644 wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java create mode 100644 wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java create mode 100644 wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala new file mode 100644 index 000000000..786ed5dbb --- /dev/null +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala @@ -0,0 +1,29 @@ +package org.apache.wayang.api + +import org.apache.wayang.basic.operators.SelectOperator + +/** + * DataFrame abstraction for Apache Wayang, specializing DataQuanta for Row types. + * The idea is the following: DataQuanta[] is a good abstraction for both typed and untyped data structures. + * Taking spark as example, DQ can abstract both hard-typed JavaRdd (e.g. DataQuanta[Person]) and an untyped Dataset[Row] (i.e. DataFrame) + * (so, DataQuanta[Row]). + * For this reason it is possible for a Wayang DataFrame to be a wrapper around a DataQuanta[Row]. + * Row's core is list of untyped (Any) elements and a schema + * that allows to associate names of columns to both elements of row and their actual type. + * Taking Spark as example, a DataQuanta[Row] is translated into a Dataset[Row]. + */ +class DataFrame(df: DataQuanta[Row]) { + + /** + * Selects specific columns based on the provided input strings. + * @return A new DataFrame containing only the selected columns. + */ + def select(columns: String*): DataFrame = { + val selectOperator = new SelectOperator(columns) + this.df.connectTo(selectOperator, 0) + implicit val pb: PlanBuilder = df.planBuilder + val dq =new DataQuanta[Row](selectOperator) + new DataFrame(dq) + } + +} \ No newline at end of file diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/Row.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/Row.scala new file mode 100644 index 000000000..acf9e3044 --- /dev/null +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/Row.scala @@ -0,0 +1,23 @@ +package org.apache.wayang.api + +/** + * Represents a single row of data with an associated schema. + * @param schema The metadata describing the structure of the fields. + */ +case class Row(fields: List[Any], schema: Schema = null) { + + val size: Int = fields.size + + def getFields: List[Any] = fields + + def get(index: Int): Any = fields(index) + + def getSchema: Schema = schema + +} + +/** + * Of course a Schema is more complicated than the following Map, what follows is useful to + * communicate the core of the Schema. + */ +case class Schema (columnNames: Map[String, Class[_]]) \ No newline at end of file diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java new file mode 100644 index 000000000..4f021cc65 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java @@ -0,0 +1,35 @@ +package org.apache.wayang.basic.operators; + +import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator; + +import java.util.ArrayList; + +public class SelectOperator extends UnaryToUnaryOperator { + + /** + * About the following property and the properties of new Operators for DF API: + * + * When working with DataFrames, user is forced to write untyped expressions instead + * of hard typed ones. e.g.: ds.filter($"age" > 21) instead of ds.filter(user => user.age > 21). + * For this reason none of the new Operators will exploit PredicateDescriptor, whose 'core' is a UDF. + * + * Regarding SelectOperator, it might have a String (maybe wrapped) that will be exploited + * by the engine chosen for the actual execution. (it is common for big data + * engines to have this signature for the select method). Other Operator (e.g. filter) + * will need an untyped expression instead of PredicateDescriptor + * + * The untyped architecture of DF makes the compilation time less prone to find errors. + * However, big data engines exploit this in order to increase performances. For these reasons, in order to have a proper + * DF API, it is necessary to create new implementations of existing Operators (e.g. FilterOperator) that will + * exploit different backends (e.g. currently there is SparkFilterOperator that exploits JavaRDD, + * while it would be optimal for a DF API to exploit Dataset (i.e. DataFrame) instead of JavaRDD). + * + * Note that both In an Out type is Row (due to DF's untyped architecture) + */ + protected final ArrayList columns; + + /** + Obviously, this class lacks of constructors but most importantly it lacks a region for cardinality estimation. + */ + +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java index 1e3642a9a..cdfaa64be 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java @@ -61,7 +61,8 @@ public class Mappings { new ZipWithIdMapping(), new KafkaTopicSinkMapping(), new KafkaTopicSourceMapping(), - new ParquetSinkMapping() + new ParquetSinkMapping(), + new SelectMapping() ); diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java new file mode 100644 index 000000000..2e011aac5 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java @@ -0,0 +1,38 @@ +package org.apache.wayang.spark.mapping; + +import org.apache.wayang.basic.operators.ParquetSink; +import org.apache.wayang.core.mapping.*; +import org.apache.wayang.spark.operators.SelectOperator; +import org.apache.wayang.spark.operators.SparkSelectOperator; +import org.apache.wayang.spark.platform.SparkPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * inspired by similar classes (e.g. FilterMapping) + */ +public class SelectMapping implements Mapping { + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + SparkPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern operatorPattern = new OperatorPattern<>( + //SelectOperator lacks constructor + "select", new SelectOperator(), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new SparkSelectOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java new file mode 100644 index 000000000..fcd0b2b4c --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java @@ -0,0 +1,66 @@ +package org.apache.wayang.spark.operators; + +import org.apache.spark.sql.Column; +import org.apache.spark.sql.functions; +import org.apache.wayang.basic.operators.SelectOperator; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.DatasetChannel; +import org.apache.wayang.spark.execution.SparkExecutor; + +/** + * This class exploits Spark DataSet intead of JavaRDD, in Spark a DataFrame is nothing but a DataSet + */ +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import java.util.Collection; +import java.util.List; + +public class SparkSelectOperator extends SelectOperator + implements SparkExecutionOperator{ + + + /** + * evaluate function may work as follows (inspired ny SparkParquetSink and SparkFiterOperator) + */ + @Override + public Tuple, Collection> evaluate(ChannelInstance[] inputs, + ChannelInstance[] outputs, SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == this.getNumInputs(); + assert outputs.length == this.getNumOutputs(); + + //in Spark, a DF is nothing but a Dataset + final Dataset inputDs = this.obtainDataset(inputs[0], sparkExecutor); //get the input DF from inputs, assuming I have obtainDataset + Column[] columns = this.columns.stream() + .map(functions::col) + .toArray(Column[]::new); + final Dataset outputDs = inputDs.select(columns); + ((DatasetChannel.Instance) outputs[0]).accept(outputDs, sparkExecutor); + + return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext); + } + + /** + * TODO... + */ + + @Override + public boolean containsAction() { + return false; + } + + @Override + public List getSupportedInputChannels(int index) { + return List.of(); + } + + @Override + public List getSupportedOutputChannels(int index) { + return List.of(); + } +} From 0c65aa2577ed6eed0bac292a7277e45cef56c70d Mon Sep 17 00:00:00 2001 From: Carlo Maria Proietti Date: Wed, 11 Mar 2026 15:38:55 +0100 Subject: [PATCH 2/7] small fix --- .../java/org/apache/wayang/spark/mapping/SelectMapping.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java index 2e011aac5..ac782159b 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java @@ -1,8 +1,7 @@ package org.apache.wayang.spark.mapping; -import org.apache.wayang.basic.operators.ParquetSink; +import org.apache.wayang.basic.operators.SelectOperator; import org.apache.wayang.core.mapping.*; -import org.apache.wayang.spark.operators.SelectOperator; import org.apache.wayang.spark.operators.SparkSelectOperator; import org.apache.wayang.spark.platform.SparkPlatform; @@ -23,7 +22,7 @@ public Collection getTransformations() { } private SubplanPattern createSubplanPattern() { - OperatorPattern operatorPattern = new OperatorPattern<>( + OperatorPattern operatorPattern = new OperatorPattern<>( //SelectOperator lacks constructor "select", new SelectOperator(), false ); From 7b3ce80eaabe9eafecbff95d37686f060f6e0d5b Mon Sep 17 00:00:00 2001 From: Carlo Maria Proietti Date: Fri, 13 Mar 2026 09:46:53 +0100 Subject: [PATCH 3/7] refining --- .../org/apache/wayang/api/DataFrame.scala | 8 +++++++- .../org/apache/wayang/basic/data}/Row.scala | 2 +- .../wayang/basic/operators/SelectOperator.java | 18 +++++++++++++++++- .../wayang/spark/mapping/SelectMapping.java | 6 +++--- .../spark/operators/SparkSelectOperator.java | 6 ++++-- 5 files changed, 32 insertions(+), 8 deletions(-) rename {wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api => wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data}/Row.scala (93%) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala index 786ed5dbb..edb4d1b02 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala @@ -1,7 +1,11 @@ package org.apache.wayang.api +import org.apache.wayang.basic.data.Row import org.apache.wayang.basic.operators.SelectOperator +import java.util +import java.util.{ArrayList => JArrayList} + /** * DataFrame abstraction for Apache Wayang, specializing DataQuanta for Row types. * The idea is the following: DataQuanta[] is a good abstraction for both typed and untyped data structures. @@ -19,7 +23,9 @@ class DataFrame(df: DataQuanta[Row]) { * @return A new DataFrame containing only the selected columns. */ def select(columns: String*): DataFrame = { - val selectOperator = new SelectOperator(columns) + val cols = new JArrayList[String](util.Arrays.asList(columns: _*)) + //still need to create a proper constructor + val selectOperator = new SelectOperator(cols) this.df.connectTo(selectOperator, 0) implicit val pb: PlanBuilder = df.planBuilder val dq =new DataQuanta[Row](selectOperator) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/Row.scala b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.scala similarity index 93% rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/Row.scala rename to wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.scala index acf9e3044..610385607 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/Row.scala +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.scala @@ -1,4 +1,4 @@ -package org.apache.wayang.api +package org.apache.wayang.basic.data /** * Represents a single row of data with an associated schema. diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java index 4f021cc65..7d616b7ca 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java @@ -1,6 +1,8 @@ package org.apache.wayang.basic.operators; +import org.apache.wayang.basic.data.Row; import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator; +import org.apache.wayang.core.types.DataSetType; import java.util.ArrayList; @@ -28,8 +30,22 @@ public class SelectOperator extends UnaryToUnaryOperator { */ protected final ArrayList columns; + public SelectOperator(ArrayList cols, DataSetType type) { + super(type, type, true); + this.columns = cols; + } + + public SelectOperator(SelectOperator that) { + super(that); + this.columns = that.getColumns(); + } + + private ArrayList getColumns() { + return this.columns; + } + /** - Obviously, this class lacks of constructors but most importantly it lacks a region for cardinality estimation. + Obviously, this class lacks a region for cardinality estimation. */ } diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java index ac782159b..f7efb4a05 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java @@ -1,7 +1,9 @@ package org.apache.wayang.spark.mapping; +import org.apache.wayang.basic.data.Row; import org.apache.wayang.basic.operators.SelectOperator; import org.apache.wayang.core.mapping.*; +import org.apache.wayang.core.types.DataSetType; import org.apache.wayang.spark.operators.SparkSelectOperator; import org.apache.wayang.spark.platform.SparkPlatform; @@ -23,9 +25,7 @@ public Collection getTransformations() { private SubplanPattern createSubplanPattern() { OperatorPattern operatorPattern = new OperatorPattern<>( - //SelectOperator lacks constructor - "select", new SelectOperator(), false - ); + "select", new SelectOperator(null, DataSetType.createDefault(Row.class)), false); return SubplanPattern.createSingleton(operatorPattern); } diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java index fcd0b2b4c..5b2d89bc0 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java @@ -1,6 +1,7 @@ package org.apache.wayang.spark.operators; import org.apache.spark.sql.Column; +import org.apache.spark.sql.Row; import org.apache.spark.sql.functions; import org.apache.wayang.basic.operators.SelectOperator; import org.apache.wayang.core.optimizer.OptimizationContext; @@ -16,14 +17,15 @@ * This class exploits Spark DataSet intead of JavaRDD, in Spark a DataFrame is nothing but a DataSet */ import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import java.util.Collection; import java.util.List; public class SparkSelectOperator extends SelectOperator implements SparkExecutionOperator{ - + public SparkSelectOperator(SelectOperator that) { + super(that); + } /** * evaluate function may work as follows (inspired ny SparkParquetSink and SparkFiterOperator) */ From 593936f965180d05c5e6c5cb193273f1e403f3a2 Mon Sep 17 00:00:00 2001 From: Carlo Maria Proietti Date: Fri, 13 Mar 2026 18:18:39 +0100 Subject: [PATCH 4/7] refining --- .../org/apache/wayang/api/DataFrame.scala | 6 +++++ .../org/apache/wayang/basic/data/Row.java | 26 +++++++++++++++++++ .../org/apache/wayang/basic/data/Row.scala | 23 ---------------- .../org/apache/wayang/basic/data/Schema.java | 26 +++++++++++++++++++ .../spark/operators/SparkSelectOperator.java | 3 ++- 5 files changed, 60 insertions(+), 24 deletions(-) create mode 100644 wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.java delete mode 100644 wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.scala create mode 100644 wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala index edb4d1b02..879e4d0e1 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala @@ -32,4 +32,10 @@ class DataFrame(df: DataQuanta[Row]) { new DataFrame(dq) } + /** + * schema operator will be a sink as it triggers the execution of the plan. + */ + + + } \ No newline at end of file diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.java new file mode 100644 index 000000000..f3a0237d5 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.java @@ -0,0 +1,26 @@ +package org.apache.wayang.basic.data; + +import java.util.List; + +/** + * Represents a single row of data with an associated schema. + * @param schema The metadata describing the structure of the fields. + */ +public record Row(List fields, Schema schema) { + + public int size() { + return fields != null ? fields.size() : 0; + } + + public Object getAttribute(String attribute) { + int indexOfAttribute = schema.getIndexOfAttribute(attribute); + if (indexOfAttribute >= 0) { + return this.getAttributeAtIndex(indexOfAttribute); + } + return null; + } + + private Object getAttributeAtIndex(int index) { + return fields != null ? fields.get(index) : null; + } +} \ No newline at end of file diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.scala b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.scala deleted file mode 100644 index 610385607..000000000 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.scala +++ /dev/null @@ -1,23 +0,0 @@ -package org.apache.wayang.basic.data - -/** - * Represents a single row of data with an associated schema. - * @param schema The metadata describing the structure of the fields. - */ -case class Row(fields: List[Any], schema: Schema = null) { - - val size: Int = fields.size - - def getFields: List[Any] = fields - - def get(index: Int): Any = fields(index) - - def getSchema: Schema = schema - -} - -/** - * Of course a Schema is more complicated than the following Map, what follows is useful to - * communicate the core of the Schema. - */ -case class Schema (columnNames: Map[String, Class[_]]) \ No newline at end of file diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java new file mode 100644 index 000000000..dab1883a7 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java @@ -0,0 +1,26 @@ +package org.apache.wayang.basic.data; + +import org.apache.wayang.core.types.DataUnitType; + +import java.util.List; +import java.util.Map; + +/** + * This class can describe the structure of a Row in a DataFrame. + */ +public record Schema(List>> schema) { + + public int getIndexOfAttribute(String columnName) { + if (columnName == null || schema == null) { + return -1; + } + int size = schema.size(); + for (int i = 0; i < size; i++) { + if (schema.get(i).getKey().equals(columnName)) { + return i; + } + } + return -1; // Not found + } + +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java index 5b2d89bc0..274908712 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java @@ -14,7 +14,7 @@ import org.apache.wayang.spark.execution.SparkExecutor; /** - * This class exploits Spark DataSet intead of JavaRDD, in Spark a DataFrame is nothing but a DataSet + * This class exploits Spark DataSet instead of JavaRDD, in Spark a DataFrame is nothing but a DataSet */ import org.apache.spark.sql.Dataset; import java.util.Collection; @@ -53,6 +53,7 @@ public Tuple, Collection> eval @Override public boolean containsAction() { + //this operator does not trigger the execution of the plan return false; } From 7aa66bd798c7fe6b6d3b31ff25b0d44916af3e26 Mon Sep 17 00:00:00 2001 From: Carlo Maria Proietti Date: Wed, 18 Mar 2026 12:36:25 +0100 Subject: [PATCH 5/7] refining --- .../org/apache/wayang/api/DataFrame.scala | 15 ++++---------- .../org/apache/wayang/basic/data/Schema.java | 2 +- .../basic/operators/SelectOperator.java | 20 +++++++++++-------- .../wayang/spark/mapping/SelectMapping.java | 5 ++--- .../spark/operators/SparkSelectOperator.java | 2 +- 5 files changed, 20 insertions(+), 24 deletions(-) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala index 879e4d0e1..f27d78c6e 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala @@ -9,10 +9,11 @@ import java.util.{ArrayList => JArrayList} /** * DataFrame abstraction for Apache Wayang, specializing DataQuanta for Row types. * The idea is the following: DataQuanta[] is a good abstraction for both typed and untyped data structures. - * Taking spark as example, DQ can abstract both hard-typed JavaRdd (e.g. DataQuanta[Person]) and an untyped Dataset[Row] (i.e. DataFrame) - * (so, DataQuanta[Row]). + * Taking spark as example, DataQuanta currently abstracts the hard-typed JavaRdd (e.g. DataQuanta[Person]); + * however it is also able to abstract untyped Dataset[Row] i.e. DataFrame (so, DataQuanta[Row]). + * * For this reason it is possible for a Wayang DataFrame to be a wrapper around a DataQuanta[Row]. - * Row's core is list of untyped (Any) elements and a schema + * Row's core is list of untyped (Object) elements and a schema * that allows to associate names of columns to both elements of row and their actual type. * Taking Spark as example, a DataQuanta[Row] is translated into a Dataset[Row]. */ @@ -24,18 +25,10 @@ class DataFrame(df: DataQuanta[Row]) { */ def select(columns: String*): DataFrame = { val cols = new JArrayList[String](util.Arrays.asList(columns: _*)) - //still need to create a proper constructor val selectOperator = new SelectOperator(cols) this.df.connectTo(selectOperator, 0) implicit val pb: PlanBuilder = df.planBuilder val dq =new DataQuanta[Row](selectOperator) new DataFrame(dq) } - - /** - * schema operator will be a sink as it triggers the execution of the plan. - */ - - - } \ No newline at end of file diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java index dab1883a7..c99fef9a5 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java @@ -20,7 +20,7 @@ public int getIndexOfAttribute(String columnName) { return i; } } - return -1; // Not found + return -1; // Not found, of course in a real implementation an exception would be thrown } } diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java index 7d616b7ca..7553f1988 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java @@ -17,21 +17,25 @@ public class SelectOperator extends UnaryToUnaryOperator { * * Regarding SelectOperator, it might have a String (maybe wrapped) that will be exploited * by the engine chosen for the actual execution. (it is common for big data - * engines to have this signature for the select method). Other Operator (e.g. filter) - * will need an untyped expression instead of PredicateDescriptor + * engines to have this signature for the select method). Also other Operators (e.g. filter) + * will need an untyped expression instead of PredicateDescriptor. * * The untyped architecture of DF makes the compilation time less prone to find errors. - * However, big data engines exploit this in order to increase performances. For these reasons, in order to have a proper - * DF API, it is necessary to create new implementations of existing Operators (e.g. FilterOperator) that will - * exploit different backends (e.g. currently there is SparkFilterOperator that exploits JavaRDD, - * while it would be optimal for a DF API to exploit Dataset (i.e. DataFrame) instead of JavaRDD). + * However, big data engines exploit this in order to increase performances + * (see Predicate Pushdown). For these reasons, in order to have a proper + * DF API, it is necessary to create new versions of existing Operators that will + * be based on untyped expressions instead of UDF and will be able to properly exploit + * the most performative backends. An example: + * SparkFilterOperator extends FilterOperator whose core is an udf and exploits JavaRDD, + * instead Spark_Df_FilterOperator extends Filter_Df_Operator whose core is an untyped expression + * and exploits Spark Dataframe. * * Note that both In an Out type is Row (due to DF's untyped architecture) */ protected final ArrayList columns; - public SelectOperator(ArrayList cols, DataSetType type) { - super(type, type, true); + public SelectOperator(ArrayList cols) { + super(DataSetType.createDefault(Row.class), DataSetType.createDefault(Row.class), true); this.columns = cols; } diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java index f7efb4a05..d2cde74e8 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java @@ -1,12 +1,11 @@ package org.apache.wayang.spark.mapping; -import org.apache.wayang.basic.data.Row; import org.apache.wayang.basic.operators.SelectOperator; import org.apache.wayang.core.mapping.*; -import org.apache.wayang.core.types.DataSetType; import org.apache.wayang.spark.operators.SparkSelectOperator; import org.apache.wayang.spark.platform.SparkPlatform; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -25,7 +24,7 @@ public Collection getTransformations() { private SubplanPattern createSubplanPattern() { OperatorPattern operatorPattern = new OperatorPattern<>( - "select", new SelectOperator(null, DataSetType.createDefault(Row.class)), false); + "select", new SelectOperator((ArrayList) null), false); return SubplanPattern.createSingleton(operatorPattern); } diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java index 274908712..63cb8ef81 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java @@ -14,7 +14,7 @@ import org.apache.wayang.spark.execution.SparkExecutor; /** - * This class exploits Spark DataSet instead of JavaRDD, in Spark a DataFrame is nothing but a DataSet + * This class exploits Spark Dataset, in Spark a DataFrame is nothing but a Dataset */ import org.apache.spark.sql.Dataset; import java.util.Collection; From 98e7a8e24b6b0a91a0a7dc6ff4a130ce2db9c1aa Mon Sep 17 00:00:00 2001 From: Carlo Maria Proietti Date: Sat, 21 Mar 2026 16:16:30 +0100 Subject: [PATCH 6/7] refining --- .../org/apache/wayang/api/DataFrame.scala | 38 +++++++++---------- .../org/apache/wayang/api/DataQuanta.scala | 14 +++++++ .../basic/operators/SelectOperator.java | 30 ++++----------- .../spark/operators/SparkSelectOperator.java | 12 +++--- 4 files changed, 46 insertions(+), 48 deletions(-) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala index f27d78c6e..6ca4e3761 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala @@ -1,34 +1,34 @@ package org.apache.wayang.api -import org.apache.wayang.basic.data.Row +import org.apache.wayang.basic.data.Record import org.apache.wayang.basic.operators.SelectOperator +import org.apache.wayang.core.plan.wayangplan.ElementaryOperator -import java.util -import java.util.{ArrayList => JArrayList} +import java.util.{Arrays, ArrayList => JArrayList} /** - * DataFrame abstraction for Apache Wayang, specializing DataQuanta for Row types. - * The idea is the following: DataQuanta[] is a good abstraction for both typed and untyped data structures. - * Taking spark as example, DataQuanta currently abstracts the hard-typed JavaRdd (e.g. DataQuanta[Person]); - * however it is also able to abstract untyped Dataset[Row] i.e. DataFrame (so, DataQuanta[Row]). + * DataFrame abstraction for Apache Wayang, specializing DataQuanta for [[Record]](s). + * The idea is the following: DataQuanta[] is a good abstraction for both hard-typed structures and Dataframes. + * Taking Wayang-Spark as example, DataQuanta can abstract both JavaRdd (DataQuanta[Person]); + * and Dataset[Row] i.e. DataFrame (DataQuanta[Record]). + * + * For this reason, it is possible for a Wayang DataFrame to be a wrapper around a DataQuanta[[Record]]. + * DataFrame API will provide methods that take expressions as input instead of udf. This allows the new API + * to leverage modern engines (e.g. Spark Dataframe) with their advanced optimizations (e.g. Predicate Pushdown). + * + * In this draft, DataFrame extends DataQuanta allowing the user to call Dataframe-style methods. * - * For this reason it is possible for a Wayang DataFrame to be a wrapper around a DataQuanta[Row]. - * Row's core is list of untyped (Object) elements and a schema - * that allows to associate names of columns to both elements of row and their actual type. - * Taking Spark as example, a DataQuanta[Row] is translated into a Dataset[Row]. */ -class DataFrame(df: DataQuanta[Row]) { +class DataFrame private (operator: ElementaryOperator)(implicit planBuilder: PlanBuilder) + extends DataQuanta[Record](operator) { /** - * Selects specific columns based on the provided input strings. - * @return A new DataFrame containing only the selected columns. + * Selects specific columns and returns a new DataFrame. */ def select(columns: String*): DataFrame = { - val cols = new JArrayList[String](util.Arrays.asList(columns: _*)) + val cols = new JArrayList[String](Arrays.asList(columns: _*)) val selectOperator = new SelectOperator(cols) - this.df.connectTo(selectOperator, 0) - implicit val pb: PlanBuilder = df.planBuilder - val dq =new DataQuanta[Row](selectOperator) - new DataFrame(dq) + this.connectTo(selectOperator, 0) + new DataFrame(selectOperator) } } \ No newline at end of file diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala index 2a2f60cc0..c5d97a18a 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala @@ -1245,6 +1245,20 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I override def toString = s"DataQuanta[$output]" + /** + * Casts this DataQuanta to a DataFrame if the underlying type is Record + * Throws an exception otherwise. + */ + def asDataFrame(): DataFrame = { + if (scala.reflect.classTag[Out].runtimeClass == classOf[Record]) { + this.asInstanceOf[DataFrame] + } else { + throw new UnsupportedOperationException( + s"Cannot cast DataQuanta[${scala.reflect.classTag[Out].runtimeClass.getSimpleName}] to DataFrame. " + + "Only DataQuanta[Record] is allowed." + ) + } + } } /** diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java index 7553f1988..afd9fc85d 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java @@ -1,41 +1,27 @@ package org.apache.wayang.basic.operators; -import org.apache.wayang.basic.data.Row; import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator; import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.basic.data.Record; import java.util.ArrayList; -public class SelectOperator extends UnaryToUnaryOperator { +public class SelectOperator extends UnaryToUnaryOperator { /** - * About the following property and the properties of new Operators for DF API: - * - * When working with DataFrames, user is forced to write untyped expressions instead - * of hard typed ones. e.g.: ds.filter($"age" > 21) instead of ds.filter(user => user.age > 21). + * When working with DataFrames, user writes untyped instead of functions. + * e.g.: ds.filter($"age" > 21) instead of ds.filter(user => user.age > 21). * For this reason none of the new Operators will exploit PredicateDescriptor, whose 'core' is a UDF. * - * Regarding SelectOperator, it might have a String (maybe wrapped) that will be exploited - * by the engine chosen for the actual execution. (it is common for big data - * engines to have this signature for the select method). Also other Operators (e.g. filter) - * will need an untyped expression instead of PredicateDescriptor. - * - * The untyped architecture of DF makes the compilation time less prone to find errors. - * However, big data engines exploit this in order to increase performances - * (see Predicate Pushdown). For these reasons, in order to have a proper - * DF API, it is necessary to create new versions of existing Operators that will - * be based on untyped expressions instead of UDF and will be able to properly exploit - * the most performative backends. An example: - * SparkFilterOperator extends FilterOperator whose core is an udf and exploits JavaRDD, - * instead Spark_Df_FilterOperator extends Filter_Df_Operator whose core is an untyped expression - * and exploits Spark Dataframe. + * SelectOperator might exploit Strings as it is common for big data engines + * to have this signature for the select method. * - * Note that both In an Out type is Row (due to DF's untyped architecture) + * Both In and Out type is [[Record]] (due to DF's untyped architecture) */ protected final ArrayList columns; public SelectOperator(ArrayList cols) { - super(DataSetType.createDefault(Row.class), DataSetType.createDefault(Row.class), true); + super(DataSetType.createDefault(org.apache.wayang.basic.data.Record.class), DataSetType.createDefault(org.apache.wayang.basic.data.Record.class), true); this.columns = cols; } diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java index 63cb8ef81..161492f8b 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java @@ -26,9 +26,7 @@ public class SparkSelectOperator extends SelectOperator public SparkSelectOperator(SelectOperator that) { super(that); } - /** - * evaluate function may work as follows (inspired ny SparkParquetSink and SparkFiterOperator) - */ + @Override public Tuple, Collection> evaluate(ChannelInstance[] inputs, ChannelInstance[] outputs, SparkExecutor sparkExecutor, @@ -47,16 +45,16 @@ public Tuple, Collection> eval return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext); } - /** - * TODO... - */ - @Override public boolean containsAction() { //this operator does not trigger the execution of the plan return false; } + /** + * Following methods are obviously TODO + */ + @Override public List getSupportedInputChannels(int index) { return List.of(); From 2d4645184e563891134807c60711a4a8c3939772 Mon Sep 17 00:00:00 2001 From: Carlo Maria Proietti Date: Fri, 27 Mar 2026 20:04:17 +0100 Subject: [PATCH 7/7] correction of schema class --- .../src/main/java/org/apache/wayang/basic/data/Schema.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java index c99fef9a5..78f0c7948 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java @@ -6,9 +6,9 @@ import java.util.Map; /** - * This class can describe the structure of a Row in a DataFrame. + * This class can describe the structure of a DataFrame [Record]. */ -public record Schema(List>> schema) { +public record Schema(List>> schema) { public int getIndexOfAttribute(String columnName) { if (columnName == null || schema == null) {