diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala new file mode 100644 index 000000000..6ca4e3761 --- /dev/null +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataFrame.scala @@ -0,0 +1,34 @@ +package org.apache.wayang.api + +import org.apache.wayang.basic.data.Record +import org.apache.wayang.basic.operators.SelectOperator +import org.apache.wayang.core.plan.wayangplan.ElementaryOperator + +import java.util.{Arrays, ArrayList => JArrayList} + +/** + * DataFrame abstraction for Apache Wayang, specializing DataQuanta for [[Record]](s). + * The idea is the following: DataQuanta[] is a good abstraction for both hard-typed structures and Dataframes. + * Taking Wayang-Spark as example, DataQuanta can abstract both JavaRdd (DataQuanta[Person]); + * and Dataset[Row] i.e. DataFrame (DataQuanta[Record]). + * + * For this reason, it is possible for a Wayang DataFrame to be a wrapper around a DataQuanta[[Record]]. + * DataFrame API will provide methods that take expressions as input instead of udf. This allows the new API + * to leverage modern engines (e.g. Spark Dataframe) with their advanced optimizations (e.g. Predicate Pushdown). + * + * In this draft, DataFrame extends DataQuanta allowing the user to call Dataframe-style methods. + * + */ +class DataFrame private (operator: ElementaryOperator)(implicit planBuilder: PlanBuilder) + extends DataQuanta[Record](operator) { + + /** + * Selects specific columns and returns a new DataFrame. + */ + def select(columns: String*): DataFrame = { + val cols = new JArrayList[String](Arrays.asList(columns: _*)) + val selectOperator = new SelectOperator(cols) + this.connectTo(selectOperator, 0) + new DataFrame(selectOperator) + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala index 2a2f60cc0..c5d97a18a 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala @@ -1245,6 +1245,20 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I override def toString = s"DataQuanta[$output]" + /** + * Casts this DataQuanta to a DataFrame if the underlying type is Record + * Throws an exception otherwise. + */ + def asDataFrame(): DataFrame = { + if (scala.reflect.classTag[Out].runtimeClass == classOf[Record]) { + this.asInstanceOf[DataFrame] + } else { + throw new UnsupportedOperationException( + s"Cannot cast DataQuanta[${scala.reflect.classTag[Out].runtimeClass.getSimpleName}] to DataFrame. " + + "Only DataQuanta[Record] is allowed." + ) + } + } } /** diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.java new file mode 100644 index 000000000..f3a0237d5 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Row.java @@ -0,0 +1,26 @@ +package org.apache.wayang.basic.data; + +import java.util.List; + +/** + * Represents a single row of data with an associated schema. + * @param schema The metadata describing the structure of the fields. + */ +public record Row(List fields, Schema schema) { + + public int size() { + return fields != null ? fields.size() : 0; + } + + public Object getAttribute(String attribute) { + int indexOfAttribute = schema.getIndexOfAttribute(attribute); + if (indexOfAttribute >= 0) { + return this.getAttributeAtIndex(indexOfAttribute); + } + return null; + } + + private Object getAttributeAtIndex(int index) { + return fields != null ? fields.get(index) : null; + } +} \ No newline at end of file diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java new file mode 100644 index 000000000..78f0c7948 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Schema.java @@ -0,0 +1,26 @@ +package org.apache.wayang.basic.data; + +import org.apache.wayang.core.types.DataUnitType; + +import java.util.List; +import java.util.Map; + +/** + * This class can describe the structure of a DataFrame [Record]. + */ +public record Schema(List>> schema) { + + public int getIndexOfAttribute(String columnName) { + if (columnName == null || schema == null) { + return -1; + } + int size = schema.size(); + for (int i = 0; i < size; i++) { + if (schema.get(i).getKey().equals(columnName)) { + return i; + } + } + return -1; // Not found, of course in a real implementation an exception would be thrown + } + +} diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java new file mode 100644 index 000000000..afd9fc85d --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/SelectOperator.java @@ -0,0 +1,41 @@ +package org.apache.wayang.basic.operators; + +import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.basic.data.Record; + +import java.util.ArrayList; + +public class SelectOperator extends UnaryToUnaryOperator { + + /** + * When working with DataFrames, user writes untyped instead of functions. + * e.g.: ds.filter($"age" > 21) instead of ds.filter(user => user.age > 21). + * For this reason none of the new Operators will exploit PredicateDescriptor, whose 'core' is a UDF. + * + * SelectOperator might exploit Strings as it is common for big data engines + * to have this signature for the select method. + * + * Both In and Out type is [[Record]] (due to DF's untyped architecture) + */ + protected final ArrayList columns; + + public SelectOperator(ArrayList cols) { + super(DataSetType.createDefault(org.apache.wayang.basic.data.Record.class), DataSetType.createDefault(org.apache.wayang.basic.data.Record.class), true); + this.columns = cols; + } + + public SelectOperator(SelectOperator that) { + super(that); + this.columns = that.getColumns(); + } + + private ArrayList getColumns() { + return this.columns; + } + + /** + Obviously, this class lacks a region for cardinality estimation. + */ + +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java index 1e3642a9a..cdfaa64be 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java @@ -61,7 +61,8 @@ public class Mappings { new ZipWithIdMapping(), new KafkaTopicSinkMapping(), new KafkaTopicSourceMapping(), - new ParquetSinkMapping() + new ParquetSinkMapping(), + new SelectMapping() ); diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java new file mode 100644 index 000000000..d2cde74e8 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/SelectMapping.java @@ -0,0 +1,36 @@ +package org.apache.wayang.spark.mapping; + +import org.apache.wayang.basic.operators.SelectOperator; +import org.apache.wayang.core.mapping.*; +import org.apache.wayang.spark.operators.SparkSelectOperator; +import org.apache.wayang.spark.platform.SparkPlatform; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * inspired by similar classes (e.g. FilterMapping) + */ +public class SelectMapping implements Mapping { + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + SparkPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern operatorPattern = new OperatorPattern<>( + "select", new SelectOperator((ArrayList) null), false); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new SparkSelectOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java new file mode 100644 index 000000000..161492f8b --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkSelectOperator.java @@ -0,0 +1,67 @@ +package org.apache.wayang.spark.operators; + +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.functions; +import org.apache.wayang.basic.operators.SelectOperator; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.DatasetChannel; +import org.apache.wayang.spark.execution.SparkExecutor; + +/** + * This class exploits Spark Dataset, in Spark a DataFrame is nothing but a Dataset + */ +import org.apache.spark.sql.Dataset; +import java.util.Collection; +import java.util.List; + +public class SparkSelectOperator extends SelectOperator + implements SparkExecutionOperator{ + + public SparkSelectOperator(SelectOperator that) { + super(that); + } + + @Override + public Tuple, Collection> evaluate(ChannelInstance[] inputs, + ChannelInstance[] outputs, SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == this.getNumInputs(); + assert outputs.length == this.getNumOutputs(); + + //in Spark, a DF is nothing but a Dataset + final Dataset inputDs = this.obtainDataset(inputs[0], sparkExecutor); //get the input DF from inputs, assuming I have obtainDataset + Column[] columns = this.columns.stream() + .map(functions::col) + .toArray(Column[]::new); + final Dataset outputDs = inputDs.select(columns); + ((DatasetChannel.Instance) outputs[0]).accept(outputDs, sparkExecutor); + + return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext); + } + + @Override + public boolean containsAction() { + //this operator does not trigger the execution of the plan + return false; + } + + /** + * Following methods are obviously TODO + */ + + @Override + public List getSupportedInputChannels(int index) { + return List.of(); + } + + @Override + public List getSupportedOutputChannels(int index) { + return List.of(); + } +}