Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class TemplateRow extends TemplateBase
private static final OpOp1[] SUPPORTED_VECT_UNARY = new OpOp1[]{
OpOp1.EXP, OpOp1.SQRT, OpOp1.LOG, OpOp1.ABS, OpOp1.ROUND, OpOp1.CEIL, OpOp1.FLOOR, OpOp1.SIGN,
OpOp1.SIN, OpOp1.COS, OpOp1.TAN, OpOp1.ASIN, OpOp1.ACOS, OpOp1.ATAN, OpOp1.SINH, OpOp1.COSH, OpOp1.TANH,
OpOp1.CUMSUM, OpOp1.CUMMIN, OpOp1.CUMMAX, OpOp1.SPROP, OpOp1.SIGMOID};
OpOp1.CUMSUM, OpOp1.ROWCUMSUM, OpOp1.CUMMIN, OpOp1.CUMMAX, OpOp1.SPROP, OpOp1.SIGMOID};
private static final OpOp2[] SUPPORTED_VECT_BINARY = new OpOp2[]{
OpOp2.MULT, OpOp2.DIV, OpOp2.MINUS, OpOp2.PLUS, OpOp2.POW, OpOp2.MIN, OpOp2.MAX, OpOp2.XOR,
OpOp2.EQUAL, OpOp2.NOTEQUAL, OpOp2.LESS, OpOp2.LESSEQUAL, OpOp2.GREATER, OpOp2.GREATEREQUAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ public static long getInstNFLOP(
costs = 40;
break;
case "ucumk+":
case "urowcumk+":
case "ucummin":
case "ucummax":
case "ucum*":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public enum BuiltinCode { AUTODIFF, SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS,
String2BuiltinCode.put( "floor" , BuiltinCode.FLOOR);
String2BuiltinCode.put( "ucumk+" , BuiltinCode.CUMSUM);
String2BuiltinCode.put( "urowcumk+" , BuiltinCode.ROWCUMSUM);
String2BuiltinCode.put("rowCumsum", BuiltinCode.ROWCUMSUM);
String2BuiltinCode.put( "ucum*" , BuiltinCode.CUMPROD);
String2BuiltinCode.put( "ucumk+*", BuiltinCode.CUMSUMPROD);
String2BuiltinCode.put( "ucummin", BuiltinCode.CUMMIN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public class GPUInstructionParser extends InstructionParser

// Cumulative Ops
String2GPUInstructionType.put( "ucumk+" , GPUINSTRUCTION_TYPE.BuiltinUnary);
String2GPUInstructionType.put( "urowcumk+", GPUINSTRUCTION_TYPE.BuiltinUnary);
String2GPUInstructionType.put( "ucum*" , GPUINSTRUCTION_TYPE.BuiltinUnary);
String2GPUInstructionType.put( "ucumk+*" , GPUINSTRUCTION_TYPE.BuiltinUnary);
String2GPUInstructionType.put( "ucummin" , GPUINSTRUCTION_TYPE.BuiltinUnary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private CumulativeOffsetFEDInstruction(Operator op, CPOperand in1, CPOperand in2

if ("bcumoffk+".equals(opcode))
_uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucumk+"));
else if ("browcumoffk+".equals(opcode))
_uop = new UnaryOperator(Builtin.getBuiltinFnObject("urowcumk+"));
else if ("bcumoff*".equals(opcode))
_uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucum*"));
else if ("bcumoff+*".equals(opcode))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static UnaryMatrixFEDInstruction parseInstruction(String str) {
in.split(parts[1]);
out.split(parts[2]);
ValueFunction func = Builtin.getBuiltinFnObject(opcode);
if(Arrays.asList(new String[] {"ucumk+", "ucum*", "ucumk+*", "ucummin", "ucummax", "exp", "log", "sigmoid"})
if(Arrays.asList(new String[] {"ucumk+", "urowcumk+", "ucum*", "ucumk+*", "ucummin", "ucummax", "exp", "log", "sigmoid"})
.contains(opcode)) {
UnaryOperator op = new UnaryOperator(func, Integer.parseInt(parts[3]), Boolean.parseBoolean(parts[4]));
return new UnaryMatrixFEDInstruction(op, in, out, opcode, str);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ public void processInstruction(ExecutionContext ec) {
LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "cumulative_sum", mat,
_output.getName());
break;
case "urowcumk+":
LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "row_cumulative_sum", mat,
_output.getName());
break;
case "ucum*":
LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "cumulative_prod", mat,
_output.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.sysds.runtime.instructions.spark;


import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
Expand All @@ -45,100 +44,199 @@ private CumulativeAggregateSPInstruction(AggregateUnaryOperator op, CPOperand in
super(SPType.CumsumAggregate, op, null, in1, out, null, opcode, istr);
}

public static CumulativeAggregateSPInstruction parseInstruction( String str ) {
String[] parts = InstructionUtils.getInstructionPartsWithValueType( str );
InstructionUtils.checkNumFields ( parts, 2 );
public static CumulativeAggregateSPInstruction parseInstruction(String str) {
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
// parts: opcode, in, out => 3 fields
InstructionUtils.checkNumFields(parts, 3);

String opcode = parts[0];
CPOperand in1 = new CPOperand(parts[1]);
CPOperand out = new CPOperand(parts[2]);

AggregateUnaryOperator aggun = InstructionUtils.parseCumulativeAggregateUnaryOperator(opcode);
return new CumulativeAggregateSPInstruction(aggun, in1, out, opcode, str);
return new CumulativeAggregateSPInstruction(aggun, in1, out, opcode, str);
}

@Override
public void processInstruction(ExecutionContext ec) {
SparkExecutionContext sec = (SparkExecutionContext)ec;
SparkExecutionContext sec = (SparkExecutionContext) ec;
DataCharacteristics mc = sec.getDataCharacteristics(input1.getName());

// get input
JavaPairRDD<MatrixIndexes, MatrixBlock> in =
sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName());

if ("urowcumk+".equals(getOpcode())) {
// rowcumsum: aggregate phase should output carry/end-values per block
processRowCumsumAggregate(sec, in, mc);
}
else {
// regular cumsum aggregate phase
processCumsum(sec, in, mc);
}
}

private void processRowCumsumAggregate(SparkExecutionContext sec, JavaPairRDD<MatrixIndexes, MatrixBlock> in, DataCharacteristics mc) {
Tuple2<JavaPairRDD<MatrixIndexes, MatrixBlock>, JavaPairRDD<MatrixIndexes, MatrixBlock>> res =
processRowCumsumWithEndValues(in);

JavaPairRDD<MatrixIndexes, MatrixBlock> endValues = res._2;

sec.setRDDHandleForVariable(output.getName(), endValues);
sec.addLineageRDD(output.getName(), input1.getName());

// output characteristics: same rows as input, but 1 column (per-row carry)
MatrixCharacteristics mcOut = new MatrixCharacteristics(mc);
mcOut.setCols(1);
sec.getDataCharacteristics(output.getName()).set(mcOut);
}

/**
* Helper for rowcumsum:
* returns (localRowCumsumBlocks, endValuesBlocks).
*/
public static Tuple2<JavaPairRDD<MatrixIndexes, MatrixBlock>, JavaPairRDD<MatrixIndexes, MatrixBlock>>
processRowCumsumWithEndValues(JavaPairRDD<MatrixIndexes, MatrixBlock> in) {

JavaPairRDD<MatrixIndexes, MatrixBlock> localRowCumsum =
in.mapToPair(new LocalRowCumsumFunction());

JavaPairRDD<MatrixIndexes, MatrixBlock> endValues =
localRowCumsum.mapToPair(new ExtractEndValuesFunction());

return new Tuple2<>(localRowCumsum, endValues);
}

/**
* Original cumsum aggregate phase (keep intact).
*/
private void processCumsum(SparkExecutionContext sec, JavaPairRDD<MatrixIndexes, MatrixBlock> in, DataCharacteristics mc) {
DataCharacteristics mcOut = new MatrixCharacteristics(mc);
long rlen = mc.getRows();
int blen = mc.getBlocksize();
mcOut.setRows((long)(Math.ceil((double)rlen/blen)));

//get input
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );

//execute unary aggregate (w/ implicit drop correction)
mcOut.setRows((long) (Math.ceil((double) rlen / blen)));

// execute unary aggregate (w/ implicit drop correction)
AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr;
JavaPairRDD<MatrixIndexes,MatrixBlock> out =
in.mapToPair(new RDDCumAggFunction(auop, rlen, blen));
//merge partial aggregates, adjusting for correct number of partitions
//as size can significant shrink (1K) but also grow (sparse-dense)
JavaPairRDD<MatrixIndexes, MatrixBlock> out =
in.mapToPair(new RDDCumAggFunction(auop, rlen, blen));

// merge partial aggregates, adjusting for correct number of partitions
int numParts = SparkUtils.getNumPreferredPartitions(mcOut);
int minPar = (int)Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks());
int minPar = (int) Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks());
out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, minPar), false);
//put output handle in symbol table

// put output handle in symbol table
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
sec.getDataCharacteristics(output.getName()).set(mcOut);
}

private static class RDDCumAggFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock>
{

private static class LocalRowCumsumFunction
implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> {

private static final long serialVersionUID = 123L;

private static final UnaryOperator ROWCUMSUM_OP =
new UnaryOperator(Builtin.getBuiltinFnObject("urowcumk+"));

@Override
public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, MatrixBlock> kv) {
MatrixIndexes idx = kv._1;
MatrixBlock inputBlock = kv._2;

MatrixBlock outBlock = inputBlock.unaryOperations(ROWCUMSUM_OP, new MatrixBlock());
return new Tuple2<>(idx, outBlock);
}
}


private static class ExtractEndValuesFunction
implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> {

private static final long serialVersionUID = 123L;

@Override
public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, MatrixBlock> kv) {
MatrixIndexes idx = kv._1;
MatrixBlock cumsumBlock = kv._2;

int r = cumsumBlock.getNumRows();
int c = cumsumBlock.getNumColumns();
MatrixBlock endValuesBlock = new MatrixBlock(r, 1, false);

if (c > 0) {
int lastCol = c - 1;
for (int i = 0; i < r; i++) {
endValuesBlock.set(i, 0, cumsumBlock.get(i, lastCol));
}
}
else {
// degenerate case: empty block
for (int i = 0; i < r; i++) {
endValuesBlock.set(i, 0, 0.0);
}
}

return new Tuple2<>(idx, endValuesBlock);
}
}

private static class RDDCumAggFunction
implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> {

private static final long serialVersionUID = 11324676268945117L;

private final AggregateUnaryOperator _op;
private UnaryOperator _uop = null;
private final long _rlen;
private final int _blen;
public RDDCumAggFunction( AggregateUnaryOperator op, long rlen, int blen ) {

public RDDCumAggFunction(AggregateUnaryOperator op, long rlen, int blen) {
_op = op;
_rlen = rlen;
_blen = blen;
}

@Override
public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 )
throws Exception
{
public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception {
MatrixIndexes ixIn = arg0._1();
MatrixBlock blkIn = arg0._2();

MatrixIndexes ixOut = new MatrixIndexes();
MatrixBlock blkOut = new MatrixBlock();
//process instruction

// process instruction
AggregateUnaryOperator aop = _op;
if( aop.aggOp.increOp.fn instanceof PlusMultiply ) { //cumsumprod
if (aop.aggOp.increOp.fn instanceof PlusMultiply) { // cumsumprod
aop.indexFn.execute(ixIn, ixOut);
if( _uop == null )
if (_uop == null)
_uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucumk+*"));
MatrixBlock t1 = blkIn.unaryOperations(_uop, new MatrixBlock());
MatrixBlock t2 = blkIn.slice(0, blkIn.getNumRows()-1, 1, 1, new MatrixBlock());
MatrixBlock t2 = blkIn.slice(0, blkIn.getNumRows() - 1, 1, 1, new MatrixBlock());
blkOut.reset(1, 2);
blkOut.set(0, 0, t1.get(t1.getNumRows()-1, 0));
blkOut.set(0, 0, t1.get(t1.getNumRows() - 1, 0));
blkOut.set(0, 1, t2.prod());
}
else { //general case
OperationsOnMatrixValues.performAggregateUnary( ixIn, blkIn, ixOut, blkOut, aop, _blen);
if( aop.aggOp.existsCorrection() )
else { // general case
OperationsOnMatrixValues.performAggregateUnary(ixIn, blkIn, ixOut, blkOut, aop, _blen);
if (aop.aggOp.existsCorrection())
blkOut.dropLastRowsOrColumns(aop.aggOp.correction);
}
//cumsum expand partial aggregates
long rlenOut = (long)Math.ceil((double)_rlen/_blen);
long rixOut = (long)Math.ceil((double)ixIn.getRowIndex()/_blen);
int rlenBlk = (int) Math.min(rlenOut-(rixOut-1)*_blen, _blen);

// cumsum expand partial aggregates
long rlenOut = (long) Math.ceil((double) _rlen / _blen);
long rixOut = (long) Math.ceil((double) ixIn.getRowIndex() / _blen);
int rlenBlk = (int) Math.min(rlenOut - (rixOut - 1) * _blen, _blen);
int clenBlk = blkOut.getNumColumns();
int posBlk = (int) ((ixIn.getRowIndex()-1) % _blen);
//construct sparse output blocks (single row in target block size)
int posBlk = (int) ((ixIn.getRowIndex() - 1) % _blen);

// construct sparse output blocks (single row in target block size)
MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, true);
blkOut2.copy(posBlk, posBlk, 0, clenBlk-1, blkOut, true);
blkOut2.copy(posBlk, posBlk, 0, clenBlk - 1, blkOut, true);
ixOut.setIndexes(rixOut, ixOut.getColumnIndex());

//output new tuple

return new Tuple2<>(ixOut, blkOut2);
}
}
Expand Down
Loading