Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.apache.wayang.api

import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.SelectOperator
import org.apache.wayang.core.plan.wayangplan.ElementaryOperator

import java.util.{Arrays, ArrayList => JArrayList}

/**
* DataFrame abstraction for Apache Wayang, specializing DataQuanta for [[Record]](s).
* The idea is the following: DataQuanta[] is a good abstraction for both hard-typed structures and Dataframes.
* Taking Wayang-Spark as example, DataQuanta can abstract both JavaRdd (DataQuanta[Person]);
* and Dataset[Row] i.e. DataFrame (DataQuanta[Record]).
*
* For this reason, it is possible for a Wayang DataFrame to be a wrapper around a DataQuanta[[Record]].
* DataFrame API will provide methods that take expressions as input instead of udf. This allows the new API
* to leverage modern engines (e.g. Spark Dataframe) with their advanced optimizations (e.g. Predicate Pushdown).
*
* In this draft, DataFrame extends DataQuanta allowing the user to call Dataframe-style methods.
*
*/
class DataFrame private (operator: ElementaryOperator)(implicit planBuilder: PlanBuilder)
extends DataQuanta[Record](operator) {

/**
* Selects specific columns and returns a new DataFrame.
*/
def select(columns: String*): DataFrame = {
val cols = new JArrayList[String](Arrays.asList(columns: _*))
val selectOperator = new SelectOperator(cols)
this.connectTo(selectOperator, 0)
new DataFrame(selectOperator)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,20 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I

override def toString = s"DataQuanta[$output]"

/**
* Casts this DataQuanta to a DataFrame if the underlying type is Record
* Throws an exception otherwise.
*/
def asDataFrame(): DataFrame = {
if (scala.reflect.classTag[Out].runtimeClass == classOf[Record]) {
this.asInstanceOf[DataFrame]
} else {
throw new UnsupportedOperationException(
s"Cannot cast DataQuanta[${scala.reflect.classTag[Out].runtimeClass.getSimpleName}] to DataFrame. " +
"Only DataQuanta[Record] is allowed."
)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.wayang.basic.data;

import java.util.List;

/**
* Represents a single row of data with an associated schema.
* @param schema The metadata describing the structure of the fields.
*/
public record Row(List<Object> fields, Schema schema) {

public int size() {
return fields != null ? fields.size() : 0;
}

public Object getAttribute(String attribute) {
int indexOfAttribute = schema.getIndexOfAttribute(attribute);
if (indexOfAttribute >= 0) {
return this.getAttributeAtIndex(indexOfAttribute);
}
return null;
}

private Object getAttributeAtIndex(int index) {
return fields != null ? fields.get(index) : null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.wayang.basic.data;

import org.apache.wayang.core.types.DataUnitType;

import java.util.List;
import java.util.Map;

/**
* This class can describe the structure of a DataFrame [Record].
*/
public record Schema(List<Map.Entry<String, DataUnitType<?>>> schema) {

public int getIndexOfAttribute(String columnName) {
if (columnName == null || schema == null) {
return -1;
}
int size = schema.size();
for (int i = 0; i < size; i++) {
if (schema.get(i).getKey().equals(columnName)) {
return i;
}
}
return -1; // Not found, of course in a real implementation an exception would be thrown
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.apache.wayang.basic.operators;

import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.basic.data.Record;

import java.util.ArrayList;

public class SelectOperator extends UnaryToUnaryOperator<Record, Record> {

/**
* When working with DataFrames, user writes untyped instead of functions.
* e.g.: ds.filter($"age" > 21) instead of ds.filter(user => user.age > 21).
* For this reason none of the new Operators will exploit PredicateDescriptor, whose 'core' is a UDF.
*
* SelectOperator might exploit Strings as it is common for big data engines
* to have this signature for the select method.
*
* Both In and Out type is [[Record]] (due to DF's untyped architecture)
*/
protected final ArrayList<String> columns;

public SelectOperator(ArrayList<String> cols) {
super(DataSetType.createDefault(org.apache.wayang.basic.data.Record.class), DataSetType.createDefault(org.apache.wayang.basic.data.Record.class), true);
this.columns = cols;
}

public SelectOperator(SelectOperator that) {
super(that);
this.columns = that.getColumns();
}

private ArrayList<String> getColumns() {
return this.columns;
}

/**
Obviously, this class lacks a region for cardinality estimation.
*/

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class Mappings {
new ZipWithIdMapping(),
new KafkaTopicSinkMapping(),
new KafkaTopicSourceMapping(),
new ParquetSinkMapping()
new ParquetSinkMapping(),
new SelectMapping()

);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.apache.wayang.spark.mapping;

import org.apache.wayang.basic.operators.SelectOperator;
import org.apache.wayang.core.mapping.*;
import org.apache.wayang.spark.operators.SparkSelectOperator;
import org.apache.wayang.spark.platform.SparkPlatform;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;

/**
* inspired by similar classes (e.g. FilterMapping)
*/
public class SelectMapping implements Mapping {
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
this.createSubplanPattern(),
this.createReplacementSubplanFactory(),
SparkPlatform.getInstance()
));
}

private SubplanPattern createSubplanPattern() {
OperatorPattern<SelectOperator> operatorPattern = new OperatorPattern<>(
"select", new SelectOperator((ArrayList<String>) null), false);
return SubplanPattern.createSingleton(operatorPattern);
}

private ReplacementSubplanFactory createReplacementSubplanFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<SelectOperator>(
(matchedOperator, epoch) -> new SparkSelectOperator(matchedOperator).at(epoch)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.apache.wayang.spark.operators;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.apache.wayang.basic.operators.SelectOperator;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.spark.channels.DatasetChannel;
import org.apache.wayang.spark.execution.SparkExecutor;

/**
* This class exploits Spark Dataset, in Spark a DataFrame is nothing but a Dataset<Row>
*/
import org.apache.spark.sql.Dataset;
import java.util.Collection;
import java.util.List;

public class SparkSelectOperator extends SelectOperator
implements SparkExecutionOperator{

public SparkSelectOperator(SelectOperator that) {
super(that);
}

@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] inputs,
ChannelInstance[] outputs, SparkExecutor sparkExecutor,
OptimizationContext.OperatorContext operatorContext) {
assert inputs.length == this.getNumInputs();
assert outputs.length == this.getNumOutputs();

//in Spark, a DF is nothing but a Dataset<Row>
final Dataset<Row> inputDs = this.obtainDataset(inputs[0], sparkExecutor); //get the input DF from inputs, assuming I have obtainDataset
Column[] columns = this.columns.stream()
.map(functions::col)
.toArray(Column[]::new);
final Dataset<Row> outputDs = inputDs.select(columns);
((DatasetChannel.Instance) outputs[0]).accept(outputDs, sparkExecutor);

return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext);
}

@Override
public boolean containsAction() {
//this operator does not trigger the execution of the plan
return false;
}

/**
* Following methods are obviously TODO
*/

@Override
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
return List.of();
}

@Override
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
return List.of();
}
}