diff --git a/src/main/java/org/apache/sysds/hops/codegen/template/TemplateRow.java b/src/main/java/org/apache/sysds/hops/codegen/template/TemplateRow.java index 955bf778b8b..fc430374599 100644 --- a/src/main/java/org/apache/sysds/hops/codegen/template/TemplateRow.java +++ b/src/main/java/org/apache/sysds/hops/codegen/template/TemplateRow.java @@ -71,7 +71,7 @@ public class TemplateRow extends TemplateBase private static final OpOp1[] SUPPORTED_VECT_UNARY = new OpOp1[]{ OpOp1.EXP, OpOp1.SQRT, OpOp1.LOG, OpOp1.ABS, OpOp1.ROUND, OpOp1.CEIL, OpOp1.FLOOR, OpOp1.SIGN, OpOp1.SIN, OpOp1.COS, OpOp1.TAN, OpOp1.ASIN, OpOp1.ACOS, OpOp1.ATAN, OpOp1.SINH, OpOp1.COSH, OpOp1.TANH, - OpOp1.CUMSUM, OpOp1.CUMMIN, OpOp1.CUMMAX, OpOp1.SPROP, OpOp1.SIGMOID}; + OpOp1.CUMSUM, OpOp1.ROWCUMSUM, OpOp1.CUMMIN, OpOp1.CUMMAX, OpOp1.SPROP, OpOp1.SIGMOID}; private static final OpOp2[] SUPPORTED_VECT_BINARY = new OpOp2[]{ OpOp2.MULT, OpOp2.DIV, OpOp2.MINUS, OpOp2.PLUS, OpOp2.POW, OpOp2.MIN, OpOp2.MAX, OpOp2.XOR, OpOp2.EQUAL, OpOp2.NOTEQUAL, OpOp2.LESS, OpOp2.LESSEQUAL, OpOp2.GREATER, OpOp2.GREATEREQUAL, diff --git a/src/main/java/org/apache/sysds/resource/cost/CPCostUtils.java b/src/main/java/org/apache/sysds/resource/cost/CPCostUtils.java index 0558e17c9cd..47261198c15 100644 --- a/src/main/java/org/apache/sysds/resource/cost/CPCostUtils.java +++ b/src/main/java/org/apache/sysds/resource/cost/CPCostUtils.java @@ -481,6 +481,7 @@ public static long getInstNFLOP( costs = 40; break; case "ucumk+": + case "urowcumk+": case "ucummin": case "ucummax": case "ucum*": diff --git a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java index 8e9aef94660..324b1acbcfc 100644 --- a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java +++ b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java @@ -96,6 +96,7 @@ public enum BuiltinCode { AUTODIFF, SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS, String2BuiltinCode.put( "floor" , BuiltinCode.FLOOR); String2BuiltinCode.put( "ucumk+" , BuiltinCode.CUMSUM); String2BuiltinCode.put( "urowcumk+" , BuiltinCode.ROWCUMSUM); + String2BuiltinCode.put("rowCumsum", BuiltinCode.ROWCUMSUM); String2BuiltinCode.put( "ucum*" , BuiltinCode.CUMPROD); String2BuiltinCode.put( "ucumk+*", BuiltinCode.CUMSUMPROD); String2BuiltinCode.put( "ucummin", BuiltinCode.CUMMIN); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/GPUInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/GPUInstructionParser.java index 4fca2ad0eae..cf767f3dc07 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/GPUInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/GPUInstructionParser.java @@ -145,6 +145,7 @@ public class GPUInstructionParser extends InstructionParser // Cumulative Ops String2GPUInstructionType.put( "ucumk+" , GPUINSTRUCTION_TYPE.BuiltinUnary); + String2GPUInstructionType.put( "urowcumk+", GPUINSTRUCTION_TYPE.BuiltinUnary); String2GPUInstructionType.put( "ucum*" , GPUINSTRUCTION_TYPE.BuiltinUnary); String2GPUInstructionType.put( "ucumk+*" , GPUINSTRUCTION_TYPE.BuiltinUnary); String2GPUInstructionType.put( "ucummin" , GPUINSTRUCTION_TYPE.BuiltinUnary); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/CumulativeOffsetFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/CumulativeOffsetFEDInstruction.java index 0e333285c0f..c4d1f308774 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/fed/CumulativeOffsetFEDInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/CumulativeOffsetFEDInstruction.java @@ -51,6 +51,8 @@ private CumulativeOffsetFEDInstruction(Operator op, CPOperand in1, CPOperand in2 if ("bcumoffk+".equals(opcode)) _uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucumk+")); + else if ("browcumoffk+".equals(opcode)) + _uop = new UnaryOperator(Builtin.getBuiltinFnObject("urowcumk+")); else if ("bcumoff*".equals(opcode)) _uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucum*")); else if ("bcumoff+*".equals(opcode)) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java index f125c01d4b0..61f9c4e06d0 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java @@ -76,7 +76,7 @@ public static UnaryMatrixFEDInstruction parseInstruction(String str) { in.split(parts[1]); out.split(parts[2]); ValueFunction func = Builtin.getBuiltinFnObject(opcode); - if(Arrays.asList(new String[] {"ucumk+", "ucum*", "ucumk+*", "ucummin", "ucummax", "exp", "log", "sigmoid"}) + if(Arrays.asList(new String[] {"ucumk+", "urowcumk+", "ucum*", "ucumk+*", "ucummin", "ucummax", "exp", "log", "sigmoid"}) .contains(opcode)) { UnaryOperator op = new UnaryOperator(func, Integer.parseInt(parts[3]), Boolean.parseBoolean(parts[4])); return new UnaryMatrixFEDInstruction(op, in, out, opcode, str); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/gpu/MatrixBuiltinGPUInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/gpu/MatrixBuiltinGPUInstruction.java index 0557ccc2791..250b0bf843a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/gpu/MatrixBuiltinGPUInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/gpu/MatrixBuiltinGPUInstruction.java @@ -92,6 +92,10 @@ public void processInstruction(ExecutionContext ec) { LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "cumulative_sum", mat, _output.getName()); break; + case "urowcumk+": + LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "row_cumulative_sum", mat, + _output.getName()); + break; case "ucum*": LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "cumulative_prod", mat, _output.getName()); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java index b511ac2e257..be13ba98432 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeAggregateSPInstruction.java @@ -19,7 +19,6 @@ package org.apache.sysds.runtime.instructions.spark; - import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFunction; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; @@ -45,100 +44,199 @@ private CumulativeAggregateSPInstruction(AggregateUnaryOperator op, CPOperand in super(SPType.CumsumAggregate, op, null, in1, out, null, opcode, istr); } - public static CumulativeAggregateSPInstruction parseInstruction( String str ) { - String[] parts = InstructionUtils.getInstructionPartsWithValueType( str ); - InstructionUtils.checkNumFields ( parts, 2 ); + public static CumulativeAggregateSPInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + // parts: opcode, in, out => 3 fields + InstructionUtils.checkNumFields(parts, 3); + String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); CPOperand out = new CPOperand(parts[2]); + AggregateUnaryOperator aggun = InstructionUtils.parseCumulativeAggregateUnaryOperator(opcode); - return new CumulativeAggregateSPInstruction(aggun, in1, out, opcode, str); + return new CumulativeAggregateSPInstruction(aggun, in1, out, opcode, str); } - + @Override public void processInstruction(ExecutionContext ec) { - SparkExecutionContext sec = (SparkExecutionContext)ec; + SparkExecutionContext sec = (SparkExecutionContext) ec; DataCharacteristics mc = sec.getDataCharacteristics(input1.getName()); + + // get input + JavaPairRDD in = + sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); + + if ("urowcumk+".equals(getOpcode())) { + // rowcumsum: aggregate phase should output carry/end-values per block + processRowCumsumAggregate(sec, in, mc); + } + else { + // regular cumsum aggregate phase + processCumsum(sec, in, mc); + } + } + + private void processRowCumsumAggregate(SparkExecutionContext sec, JavaPairRDD in, DataCharacteristics mc) { + Tuple2, JavaPairRDD> res = + processRowCumsumWithEndValues(in); + + JavaPairRDD endValues = res._2; + + sec.setRDDHandleForVariable(output.getName(), endValues); + sec.addLineageRDD(output.getName(), input1.getName()); + + // output characteristics: same rows as input, but 1 column (per-row carry) + MatrixCharacteristics mcOut = new MatrixCharacteristics(mc); + mcOut.setCols(1); + sec.getDataCharacteristics(output.getName()).set(mcOut); + } + + /** + * Helper for rowcumsum: + * returns (localRowCumsumBlocks, endValuesBlocks). + */ + public static Tuple2, JavaPairRDD> + processRowCumsumWithEndValues(JavaPairRDD in) { + + JavaPairRDD localRowCumsum = + in.mapToPair(new LocalRowCumsumFunction()); + + JavaPairRDD endValues = + localRowCumsum.mapToPair(new ExtractEndValuesFunction()); + + return new Tuple2<>(localRowCumsum, endValues); + } + + /** + * Original cumsum aggregate phase (keep intact). + */ + private void processCumsum(SparkExecutionContext sec, JavaPairRDD in, DataCharacteristics mc) { DataCharacteristics mcOut = new MatrixCharacteristics(mc); long rlen = mc.getRows(); int blen = mc.getBlocksize(); - mcOut.setRows((long)(Math.ceil((double)rlen/blen))); - - //get input - JavaPairRDD in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() ); - - //execute unary aggregate (w/ implicit drop correction) + mcOut.setRows((long) (Math.ceil((double) rlen / blen))); + + // execute unary aggregate (w/ implicit drop correction) AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr; - JavaPairRDD out = - in.mapToPair(new RDDCumAggFunction(auop, rlen, blen)); - //merge partial aggregates, adjusting for correct number of partitions - //as size can significant shrink (1K) but also grow (sparse-dense) + JavaPairRDD out = + in.mapToPair(new RDDCumAggFunction(auop, rlen, blen)); + + // merge partial aggregates, adjusting for correct number of partitions int numParts = SparkUtils.getNumPreferredPartitions(mcOut); - int minPar = (int)Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks()); + int minPar = (int) Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks()); out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, minPar), false); - - //put output handle in symbol table + + // put output handle in symbol table sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); sec.getDataCharacteristics(output.getName()).set(mcOut); } - private static class RDDCumAggFunction implements PairFunction, MatrixIndexes, MatrixBlock> - { + + private static class LocalRowCumsumFunction + implements PairFunction, MatrixIndexes, MatrixBlock> { + + private static final long serialVersionUID = 123L; + + private static final UnaryOperator ROWCUMSUM_OP = + new UnaryOperator(Builtin.getBuiltinFnObject("urowcumk+")); + + @Override + public Tuple2 call(Tuple2 kv) { + MatrixIndexes idx = kv._1; + MatrixBlock inputBlock = kv._2; + + MatrixBlock outBlock = inputBlock.unaryOperations(ROWCUMSUM_OP, new MatrixBlock()); + return new Tuple2<>(idx, outBlock); + } + } + + + private static class ExtractEndValuesFunction + implements PairFunction, MatrixIndexes, MatrixBlock> { + + private static final long serialVersionUID = 123L; + + @Override + public Tuple2 call(Tuple2 kv) { + MatrixIndexes idx = kv._1; + MatrixBlock cumsumBlock = kv._2; + + int r = cumsumBlock.getNumRows(); + int c = cumsumBlock.getNumColumns(); + MatrixBlock endValuesBlock = new MatrixBlock(r, 1, false); + + if (c > 0) { + int lastCol = c - 1; + for (int i = 0; i < r; i++) { + endValuesBlock.set(i, 0, cumsumBlock.get(i, lastCol)); + } + } + else { + // degenerate case: empty block + for (int i = 0; i < r; i++) { + endValuesBlock.set(i, 0, 0.0); + } + } + + return new Tuple2<>(idx, endValuesBlock); + } + } + + private static class RDDCumAggFunction + implements PairFunction, MatrixIndexes, MatrixBlock> { + private static final long serialVersionUID = 11324676268945117L; - + private final AggregateUnaryOperator _op; private UnaryOperator _uop = null; private final long _rlen; private final int _blen; - - public RDDCumAggFunction( AggregateUnaryOperator op, long rlen, int blen ) { + + public RDDCumAggFunction(AggregateUnaryOperator op, long rlen, int blen) { _op = op; _rlen = rlen; _blen = blen; } - + @Override - public Tuple2 call( Tuple2 arg0 ) - throws Exception - { + public Tuple2 call(Tuple2 arg0) throws Exception { MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); MatrixIndexes ixOut = new MatrixIndexes(); MatrixBlock blkOut = new MatrixBlock(); - - //process instruction + + // process instruction AggregateUnaryOperator aop = _op; - if( aop.aggOp.increOp.fn instanceof PlusMultiply ) { //cumsumprod + if (aop.aggOp.increOp.fn instanceof PlusMultiply) { // cumsumprod aop.indexFn.execute(ixIn, ixOut); - if( _uop == null ) + if (_uop == null) _uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucumk+*")); MatrixBlock t1 = blkIn.unaryOperations(_uop, new MatrixBlock()); - MatrixBlock t2 = blkIn.slice(0, blkIn.getNumRows()-1, 1, 1, new MatrixBlock()); + MatrixBlock t2 = blkIn.slice(0, blkIn.getNumRows() - 1, 1, 1, new MatrixBlock()); blkOut.reset(1, 2); - blkOut.set(0, 0, t1.get(t1.getNumRows()-1, 0)); + blkOut.set(0, 0, t1.get(t1.getNumRows() - 1, 0)); blkOut.set(0, 1, t2.prod()); } - else { //general case - OperationsOnMatrixValues.performAggregateUnary( ixIn, blkIn, ixOut, blkOut, aop, _blen); - if( aop.aggOp.existsCorrection() ) + else { // general case + OperationsOnMatrixValues.performAggregateUnary(ixIn, blkIn, ixOut, blkOut, aop, _blen); + if (aop.aggOp.existsCorrection()) blkOut.dropLastRowsOrColumns(aop.aggOp.correction); } - - //cumsum expand partial aggregates - long rlenOut = (long)Math.ceil((double)_rlen/_blen); - long rixOut = (long)Math.ceil((double)ixIn.getRowIndex()/_blen); - int rlenBlk = (int) Math.min(rlenOut-(rixOut-1)*_blen, _blen); + + // cumsum expand partial aggregates + long rlenOut = (long) Math.ceil((double) _rlen / _blen); + long rixOut = (long) Math.ceil((double) ixIn.getRowIndex() / _blen); + int rlenBlk = (int) Math.min(rlenOut - (rixOut - 1) * _blen, _blen); int clenBlk = blkOut.getNumColumns(); - int posBlk = (int) ((ixIn.getRowIndex()-1) % _blen); - - //construct sparse output blocks (single row in target block size) + int posBlk = (int) ((ixIn.getRowIndex() - 1) % _blen); + + // construct sparse output blocks (single row in target block size) MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, true); - blkOut2.copy(posBlk, posBlk, 0, clenBlk-1, blkOut, true); + blkOut2.copy(posBlk, posBlk, 0, clenBlk - 1, blkOut, true); ixOut.setIndexes(rixOut, ixOut.getColumnIndex()); - - //output new tuple + return new Tuple2<>(ixOut, blkOut2); } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java index 61b61b15332..f7418ca44ad 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java @@ -19,7 +19,13 @@ package org.apache.sysds.runtime.instructions.spark; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; @@ -40,10 +46,8 @@ import org.apache.sysds.runtime.meta.DataCharacteristics; import org.apache.sysds.runtime.util.DataConverter; import org.apache.sysds.runtime.util.UtilFunctions; -import scala.Tuple2; -import java.util.ArrayList; -import java.util.Iterator; +import scala.Tuple2; public class CumulativeOffsetSPInstruction extends BinarySPInstruction { private UnaryOperator _uop = null; @@ -51,7 +55,8 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction { private final double _initValue ; private final boolean _broadcast; - private CumulativeOffsetSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, double init, boolean broadcast, String opcode, String istr) { + private CumulativeOffsetSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, + double init, boolean broadcast, String opcode, String istr) { super(SPType.CumsumOffset, op, in1, in2, out, opcode, istr); if (Opcodes.BCUMOFFKP.toString().equals(opcode)) @@ -73,15 +78,18 @@ else if (Opcodes.BCUMOFFMAX.toString().equals(opcode)) _broadcast = broadcast; } - public static CumulativeOffsetSPInstruction parseInstruction ( String str ) { - String[] parts = InstructionUtils.getInstructionPartsWithValueType( str ); - InstructionUtils.checkNumFields(parts, 5); + public static CumulativeOffsetSPInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + // parts: opcode, in1, in2, out, init, broadcast => 6 fields + InstructionUtils.checkNumFields(parts, 6); + String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); CPOperand in2 = new CPOperand(parts[2]); CPOperand out = new CPOperand(parts[3]); double init = Double.parseDouble(parts[4]); boolean broadcast = Boolean.parseBoolean(parts[5]); + return new CumulativeOffsetSPInstruction(null, in1, in2, out, init, broadcast, opcode, str); } @@ -92,12 +100,17 @@ public void processInstruction(ExecutionContext ec) { DataCharacteristics mc2 = sec.getDataCharacteristics(input2.getName()); long rlen = mc2.getRows(); int blen = mc2.getBlocksize(); - + + if (Opcodes.BROWCUMOFFKP.toString().equals(getOpcode())) { + processRowCumsumOffsets(sec, mc1, mc2); + return; + } + //get and join inputs JavaPairRDD inData = sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); JavaPairRDD> joined = null; boolean broadcast = _broadcast && !SparkUtils.isHashPartitioned(inData); - + if( broadcast ) { //broadcast offsets and broadcast join with data PartitionedBroadcast inAgg = sec.getBroadcastForVariable(input2.getName()); @@ -106,18 +119,18 @@ public void processInstruction(ExecutionContext ec) { else { //prepare aggregates (cumsplit of offsets) and repartition join with data joined = inData.join(sec - .getBinaryMatrixBlockRDDHandleForVariable(input2.getName()) - .flatMapToPair(new RDDCumSplitFunction(_initValue, rlen, blen))); + .getBinaryMatrixBlockRDDHandleForVariable(input2.getName()) + .flatMapToPair(new RDDCumSplitFunction(_initValue, rlen, blen))); } - + //execute cumulative offset (apply cumulative op w/ offsets) JavaPairRDD out = joined - .mapValues(new RDDCumOffsetFunction(_uop, _cumsumprod)); - + .mapValues(new RDDCumOffsetFunction(_uop, _cumsumprod)); + //put output handle in symbol table if( _cumsumprod ) sec.getDataCharacteristics(output.getName()) - .set(mc1.getRows(), 1, mc1.getBlocksize(), mc1.getBlocksize()); + .set(mc1.getRows(), 1, mc1.getBlocksize(), mc1.getBlocksize()); else //general case updateUnaryOutputDataCharacteristics(sec); sec.setRDDHandleForVariable(output.getName(), out); @@ -125,6 +138,117 @@ public void processInstruction(ExecutionContext ec) { sec.addLineage(output.getName(), input2.getName(), broadcast); } + /** + * Distributed rowcumsum offset application: + * - endValues: for each (rowBlock, colBlock), a (rowsInBlock x 1) column vector with the last value of each row + * - compute per-(rowBlock,colBlock) offsets via prefix-scan across colBlocks (within each rowBlock) + * - join offsets with localRowCumsum blocks and add offsets row-wise + * + * This matches the paper’s two-phase scan: local scan + carry propagation. + */ + public static JavaPairRDD processRowCumsumOffsetsDirectly( + JavaPairRDD localRowCumsum, + JavaPairRDD endValues) { + + // Group end-values by row-block, then sort by col-block and compute prefix offsets + JavaPairRDD>> groupedByRowBlock = endValues + .mapToPair(new PairFunction, Long, Tuple2>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2> call(Tuple2 t) { + long rowBlock = t._1.getRowIndex(); + long colBlock = t._1.getColumnIndex(); + return new Tuple2<>(rowBlock, new Tuple2<>(colBlock, t._2)); + } + }) + .groupByKey(); + + // Produce offsets per (rowBlock, colBlock) as a MatrixBlock (rowsInBlock x 1) + JavaPairRDD offsetsByBlock = groupedByRowBlock + .flatMapToPair(new PairFlatMapFunction>>, MatrixIndexes, MatrixBlock>() { + private static final long serialVersionUID = 1L; + + @Override + public Iterator> call(Tuple2>> t) { + long rowBlock = t._1; + + List> cols = new ArrayList<>(); + for (Tuple2 x : t._2) + cols.add(x); + + cols.sort(Comparator.comparingLong(Tuple2::_1)); + + int numRows = 0; + if (!cols.isEmpty()) + numRows = cols.get(0)._2.getNumRows(); + + double[] cumulative = new double[numRows]; + List> out = new ArrayList<>(cols.size()); + + for (Tuple2 cb : cols) { + long colBlock = cb._1; + MatrixBlock endBlock = cb._2; // (numRows x 1) + + // offsets for THIS block = cumulative sum of all previous blocks + MatrixBlock offsetBlock = new MatrixBlock(numRows, 1, false); + for (int i = 0; i < numRows; i++) + offsetBlock.set(i, 0, cumulative[i]); + + out.add(new Tuple2<>(new MatrixIndexes(rowBlock, colBlock), offsetBlock)); + + // update cumulative by adding this block’s end-values + for (int i = 0; i < numRows; i++) + cumulative[i] += endBlock.get(i, 0); + } + + return out.iterator(); + } + }); + + // Join local rowcumsum with offsets and add offsets row-wise + return localRowCumsum + .leftOuterJoin(offsetsByBlock) + .mapToPair(new PairFunction>>, MatrixIndexes, MatrixBlock>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2 call( + Tuple2>> t) { + + MatrixIndexes ix = t._1; + MatrixBlock local = t._2._1; + MatrixBlock off = t._2._2.isPresent() ? t._2._2.get() : null; + + int r = local.getNumRows(); + int c = local.getNumColumns(); + MatrixBlock out = new MatrixBlock(r, c, false); + + for (int i = 0; i < r; i++) { + double rowOffset = (off != null) ? off.get(i, 0) : 0.0; + for (int j = 0; j < c; j++) + out.set(i, j, local.get(i, j) + rowOffset); + } + + return new Tuple2<>(ix, out); + } + }); + } + + private void processRowCumsumOffsets(SparkExecutionContext sec, DataCharacteristics mc1, DataCharacteristics mc2) { + JavaPairRDD localRowCumsum = + sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); + JavaPairRDD endValues = + sec.getBinaryMatrixBlockRDDHandleForVariable(input2.getName()); + + JavaPairRDD out = processRowCumsumOffsetsDirectly(localRowCumsum, endValues); + + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + sec.addLineageRDD(output.getName(), input2.getName()); + sec.getDataCharacteristics(output.getName()).set(mc1); + } + public double getInitValue() { return _initValue; } @@ -133,36 +257,38 @@ public boolean getBroadcast() { return _broadcast; } - private static class RDDCumSplitFunction implements PairFlatMapFunction, MatrixIndexes, MatrixBlock> + // --- existing generic cumsum offset machinery below (unchanged) --- + + private static class RDDCumSplitFunction implements PairFlatMapFunction, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = -8407407527406576965L; - + private double _initValue = 0; private int _blen = -1; private long _lastRowBlockIndex; - + public RDDCumSplitFunction( double initValue, long rlen, int blen ) { _initValue = initValue; _blen = blen; _lastRowBlockIndex = (long)Math.ceil((double)rlen/blen); } - + @Override - public Iterator> call( Tuple2 arg0 ) - throws Exception + public Iterator> call( Tuple2 arg0 ) + throws Exception { ArrayList> ret = new ArrayList<>(); - + MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); - + long rixOffset = (ixIn.getRowIndex()-1)*_blen; boolean firstBlk = (ixIn.getRowIndex() == 1); boolean lastBlk = (ixIn.getRowIndex() == _lastRowBlockIndex ); - - //introduce offsets w/ init value for first row - if( firstBlk ) { + + //introduce offsets w/ init value for first row + if( firstBlk ) { MatrixIndexes tmpix = new MatrixIndexes(1, ixIn.getColumnIndex()); MatrixBlock tmpblk = new MatrixBlock(1, blkIn.getNumColumns(), blkIn.isInSparseFormat()); if( _initValue != 0 ){ @@ -171,7 +297,7 @@ public Iterator> call( Tuple2(tmpix, tmpblk)); } - + //output splitting (shift by one), preaggregated offset used by subsequent block for( int i=0; i> call( Tuple2(tmpix, tmpblk)); } - + return ret.iterator(); } } - - private static class RDDCumSplitLookupFunction implements PairFunction, MatrixIndexes, Tuple2> + + private static class RDDCumSplitLookupFunction implements PairFunction, MatrixIndexes, Tuple2> { private static final long serialVersionUID = -2785629043886477479L; - + private final PartitionedBroadcast _pbc; private final double _initValue; private final int _blen; - + public RDDCumSplitLookupFunction(PartitionedBroadcast pbc, double initValue, long rlen, int blen) { _pbc = pbc; _initValue = initValue; _blen = blen; } - + @Override public Tuple2> call(Tuple2 arg0) throws Exception { MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); - + //compute block and row indexes long brix = UtilFunctions.computeBlockIndex(ixIn.getRowIndex()-1, _blen); int rix = UtilFunctions.computeCellInBlock(ixIn.getRowIndex()-1, _blen); - + //lookup offset row and return joined output MatrixBlock off = (ixIn.getRowIndex() == 1) ? new MatrixBlock(1, blkIn.getNumColumns(), _initValue) : - _pbc.getBlock((int)brix, (int)ixIn.getColumnIndex()).slice(rix, rix); + _pbc.getBlock((int)brix, (int)ixIn.getColumnIndex()).slice(rix, rix); return new Tuple2<>(ixIn, new Tuple2<>(blkIn,off)); } } - private static class RDDCumOffsetFunction implements Function, MatrixBlock> + private static class RDDCumOffsetFunction implements Function, MatrixBlock> { private static final long serialVersionUID = -5804080263258064743L; private final UnaryOperator _uop; private final boolean _cumsumprod; - + public RDDCumOffsetFunction(UnaryOperator uop, boolean cumsumprod) { _uop = uop; _cumsumprod = cumsumprod; @@ -231,17 +357,17 @@ public RDDCumOffsetFunction(UnaryOperator uop, boolean cumsumprod) { @Override public MatrixBlock call(Tuple2 arg0) throws Exception { //prepare inputs and outputs - MatrixBlock dblkIn = arg0._1(); //original data + MatrixBlock dblkIn = arg0._1(); //original data MatrixBlock oblkIn = arg0._2(); //offset row vector - + //allocate output block MatrixBlock blkOut = new MatrixBlock(dblkIn.getNumRows(), - _cumsumprod ? 1 : dblkIn.getNumColumns(), false); - + _cumsumprod ? 1 : dblkIn.getNumColumns(), false); + //blockwise cumagg computation, incl offset aggregation return LibMatrixAgg.cumaggregateUnaryMatrix(dblkIn, blkOut, _uop, - DataConverter.convertToDoubleVector(oblkIn, false, - ((Builtin)_uop.fn).bFunc == BuiltinCode.CUMSUM)); + DataConverter.convertToDoubleVector(oblkIn, false, + ((Builtin)_uop.fn).bFunc == BuiltinCode.CUMSUM)); } } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java index eebeef0b2ec..e316d848100 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java @@ -21,249 +21,74 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysds.runtime.functionobjects.KahanPlus; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; -import org.apache.sysds.runtime.instructions.cp.KahanObject; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.matrix.operators.UnaryOperator; -import scala.Serializable; import scala.Tuple2; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class UnaryMatrixSPInstruction extends UnarySPInstruction { protected UnaryMatrixSPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String instr) { super(SPType.Unary, op, in, out, opcode, instr); } - + public static UnarySPInstruction parseInstruction ( String str ) { CPOperand in = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN); CPOperand out = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN); String opcode = parseUnaryInstruction(str, in, out); return new UnaryMatrixSPInstruction( - InstructionUtils.parseUnaryOperator(opcode), in, out, opcode, str); + InstructionUtils.parseUnaryOperator(opcode), in, out, opcode, str); } - @Override + @Override public void processInstruction(ExecutionContext ec) { SparkExecutionContext sec = (SparkExecutionContext)ec; - - //get input - JavaPairRDD in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() ); - - //execute unary builtin operation - UnaryOperator uop = (UnaryOperator) _optr; - JavaPairRDD out = in.mapValues(new RDDMatrixBuiltinUnaryOp(uop)); - - //set output RDD - updateUnaryOutputDataCharacteristics(sec); - sec.setRDDHandleForVariable(output.getName(), out); - sec.addLineageRDD(output.getName(), input1.getName()); - - //FIXME: implement similar to cumsum through - // CumulativeAggregateSPInstruction (Spark) - // UnaryMatrixCPInstruction (local cumsum on aggregates) - // CumulativeOffsetSPInstruction (Spark) - if ( "urowcumk+".equals(getOpcode()) ) { - - JavaPairRDD< MatrixIndexes, Tuple2 > localRowcumsum = in.mapToPair( new LocalRowCumsumFunction() ); - - // Collect end-values of every block of every row for offset calc by grouping by global row index - JavaPairRDD< Long, Iterable> > rowEndValues = localRowcumsum - .mapToPair( tuple2 -> { - // get index of block - MatrixIndexes indexes = tuple2._1; - // get cum matrix block - MatrixBlock localRowcumsumBlock = tuple2._2._2; - - // get row and column block index - long rowBlockIndex = indexes.getRowIndex(); - long colBlockIndex = indexes.getColumnIndex(); - - // Save end value of every row of every block (if block is empty save 0) - double[] endValues = new double[ localRowcumsumBlock.getNumRows() ]; - - for ( int i = 0; i < localRowcumsumBlock.getNumRows(); i ++ ) { - if (localRowcumsumBlock.getNumColumns() > 0) - endValues[i] = localRowcumsumBlock.get(i, localRowcumsumBlock.getNumColumns() - 1); - else - endValues[i] = 0.0 ; - } - return new Tuple2<>(rowBlockIndex, new Tuple3<>(rowBlockIndex, colBlockIndex, endValues)); - } - ).groupByKey(); - // compute offset for every block - List< Tuple2 , double[]> > offsetList = rowEndValues - .flatMapToPair(tuple2 -> { - Long rowBlockIndex = tuple2._1; - List< Tuple3 > colValues = new ArrayList<>(); - for ( Tuple3 cv : tuple2._2 ) - colValues.add(cv); - - // sort blocks from one row by column index - colValues.sort(Comparator.comparing(Tuple3::_2)); + // get input + JavaPairRDD in = + sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName()); - // get number of rows of a block by counting amount of end (row) values of said block - int numberOfRows = 0; - if ( !colValues.isEmpty() ) { - Tuple3 firstTuple = colValues.get(0); - double[] lastValuesArray = firstTuple._3(); - numberOfRows = lastValuesArray.length; - } + // Only do distributed rowcumsum logic for the rowcumsum opcode. + // Otherwise do the default unary builtin blockwise operation. + if ("urowcumk+".equals(getOpcode())) { - List, double[]>> blockOffsets = new ArrayList<>(); - double[] cumulativeOffsets = new double[numberOfRows]; - for (Tuple3 colValue : colValues) { - Long colBlockIndex = colValue._2(); - double[] endValues = colValue._3(); + // rowcumsum processing (distributed: aggregate + offsets) + Tuple2, JavaPairRDD> results = + CumulativeAggregateSPInstruction.processRowCumsumWithEndValues(in); - // copy current offsets - double[] currentOffsets = cumulativeOffsets.clone(); - - // and save block indexes with its offsets - blockOffsets.add( new Tuple2<>(new Tuple2<>(rowBlockIndex, colBlockIndex), currentOffsets) ); - - for ( int i = 0; i < numberOfRows && i < endValues.length; i++ ) { - cumulativeOffsets[i] += endValues[i]; - } - } - return blockOffsets.iterator(); - } - ).collect(); - - // convert list to map for easier access to offsets - Map< Tuple2, double[] > offsetMap = new HashMap<>(); - for (Tuple2, double[]> offset : offsetList) { - offsetMap.put(offset._1, offset._2); - } - - out = localRowcumsum.mapToPair( new FinalRowCumsumFunction(offsetMap)) ; + JavaPairRDD rowEndValues = + CumulativeOffsetSPInstruction.processRowCumsumOffsetsDirectly(results._1, results._2); updateUnaryOutputDataCharacteristics(sec); - sec.setRDDHandleForVariable(output.getName(), out); + sec.setRDDHandleForVariable(output.getName(), rowEndValues); sec.addLineageRDD(output.getName(), input1.getName()); } - } - - - - private static class LocalRowCumsumFunction implements PairFunction< Tuple2, MatrixIndexes, Tuple2 > { - private static final long serialVersionUID = 2388003441846068046L; - - @Override - public Tuple2< MatrixIndexes, Tuple2 > call(Tuple2 tuple2) { - - - MatrixBlock inputBlock = tuple2._2; - MatrixBlock cumsumBlock = new MatrixBlock( inputBlock.getNumRows(), inputBlock.getNumColumns(), false ); - - - for ( int i = 0; i < inputBlock.getNumRows(); i++ ) { - - KahanObject kbuff = new KahanObject(0, 0); - KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); - - for ( int j = 0; j < inputBlock.getNumColumns(); j++ ) { - - double val = inputBlock.get(i, j); - kplus.execute2(kbuff, val); - cumsumBlock.set(i, j, kbuff._sum); - } - } - // original index, original matrix and local cumsum block - return new Tuple2<>( tuple2._1, new Tuple2<>(inputBlock, cumsumBlock) ); - } - } - - - - - private static class FinalRowCumsumFunction implements PairFunction >, MatrixIndexes, MatrixBlock> { - private static final long serialVersionUID = -6738155890298916270L; - // map block indexes to the row offsets - private final Map< Tuple2, double[] > offsetMap; - - public FinalRowCumsumFunction(Map, double[]> offsetMap) { - this.offsetMap = offsetMap; - } - - - @Override - public Tuple2 call( Tuple2< MatrixIndexes, Tuple2 > tuple ) { - - MatrixIndexes indexes = tuple._1; - MatrixBlock inputBlock = tuple._2._1; - MatrixBlock localRowCumsumBlock = tuple._2._2; - - // key to get the row offset for this block - Tuple2 blockKey = new Tuple2<>( indexes.getRowIndex(), indexes.getColumnIndex()) ; - double[] offsets = offsetMap.get(blockKey); - - MatrixBlock cumsumBlock = new MatrixBlock( inputBlock.getNumRows(), inputBlock.getNumColumns(), false ); - - - for ( int i = 0; i < inputBlock.getNumRows(); i++ ) { - - double rowOffset = 0.0; - if ( offsets != null && i < offsets.length ) { - rowOffset = offsets[i]; - } - - for ( int j = 0; j < inputBlock.getNumColumns(); j++ ) { - double cumsumValue = localRowCumsumBlock.get(i, j); - cumsumBlock.set(i, j, cumsumValue + rowOffset); - } - } - - // block index and final cumsum block - return new Tuple2<>(indexes, cumsumBlock); - } - } + else { + // execute unary builtin operation (blockwise) + UnaryOperator uop = (UnaryOperator) _optr; + JavaPairRDD out = + in.mapValues(new RDDMatrixBuiltinUnaryOp(uop)); - - - // helper class - private static class Tuple3 implements Serializable { - - private static final long serialVersionUID = 123; - private final Type2 _2; - private final Type3 _3; - - - public Tuple3( Type1 _1, Type2 _2, Type3 _3 ) { - this._2 = _2; - this._3 = _3; - } - - public Type2 _2() { - return _2; - } - - public Type3 _3() { - return _3; + // set output RDD + updateUnaryOutputDataCharacteristics(sec); + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); } } - private static class RDDMatrixBuiltinUnaryOp implements Function + private static class RDDMatrixBuiltinUnaryOp implements Function { private static final long serialVersionUID = -3128192099832877491L; - + private UnaryOperator _op = null; - + public RDDMatrixBuiltinUnaryOp(UnaryOperator u_op) { _op = u_op; } @@ -274,4 +99,3 @@ public MatrixBlock call(MatrixBlock arg0) throws Exception { } } } - diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge01_WideDense_100x200.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge01_WideDense_100x200.dml new file mode 100644 index 00000000000..4a2f18f2ff3 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge01_WideDense_100x200.dml @@ -0,0 +1,45 @@ +# Test-specific focus: +# Wide dense matrix spanning multiple column blocks. +# +# Why this matters: +# Row-wise cumulative sum in Spark can fail when carries are not propagated +# correctly across column blocks/partitions. This test checks continuity of +# cumulative sums across the full row width. +# +# Matrix construction: +# X[i,j] = i + 0.01*j +# +# Expected result (closed-form, no loops): +# E[i,j] = sum_{k=1..j} (i + 0.01*k) +# = j*i + 0.01 * j*(j+1)/2 +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. That helper exists only in the JUnit test harness. +# - Exec mode is controlled by CLI: -exec spark +# +# IMPORTANT: +# - The builtin function name is "rowcumsum" (lowercase), not "rowCumsum". +# - Do NOT source scripts/builtin/rowCumsum.dml because it does not exist. + +n = 100; m = 200; + +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = i + 0.01*j +X = ri %*% matrix(1, 1, m) + 0.01 * (matrix(1, n, 1) %*% cj); + +# Compute row-wise cumulative sum (native builtin) +Y = rowcumsum(X); + +# Expected: E[i,j] = j*i + 0.01 * j*(j+1)/2 +J = matrix(1, n, 1) %*% cj; # j replicated down +E = J * (ri %*% matrix(1, 1, m)) + 0.01 * (J * (J + 1)) / 2; # closed form + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge02_WideAltSign_50x500.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge02_WideAltSign_50x500.dml new file mode 100644 index 00000000000..074ee33574a --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge02_WideAltSign_50x500.dml @@ -0,0 +1,41 @@ +# Test-specific focus: +# Wide matrix with alternating positive and negative values. +# +# Why this matters: +# Carry propagation errors often surface when signs alternate, because +# missing or duplicated carries cannot cancel out silently. +# +# Matrix construction: +# X[i,j] alternates sign by column and increases in magnitude with (i + j). +# +# Expected result: +# Computed via an explicit sequential loop over columns, serving as a +# trusted reference implementation. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 50; m = 500; +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = (j%2==0 ? -1 : 1) * (i + j) +S = ((cj %% 2) * (-2) + 1); # j odd => +1, j even => -1 +X = (ri %*% matrix(1, 1, m) + matrix(1, n, 1) %*% cj) * (matrix(1, n, 1) %*% S); + +Y = rowcumsum(X); + +# Expected via explicit sequential loop over columns +E = matrix(0, n, m); +E[,1] = X[,1]; +for (j in 2:m) { + E[,j] = E[,j-1] + X[,j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge03_Tall_2000x30.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge03_Tall_2000x30.dml new file mode 100644 index 00000000000..b78ade4680a --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge03_Tall_2000x30.dml @@ -0,0 +1,44 @@ +# Test-specific focus: +# Very tall matrix with moderate width. +# +# Why this matters: +# Ensures correctness when: +# - Many row blocks are involved +# - Row-wise operations are distributed across Spark partitions +# +# This test verifies that: +# - Row-wise independence is preserved +# - No cross-row interference occurs during carry propagation +# +# Expected result: +# Computed via a sequential column-wise loop (trusted reference). +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 2000; m = 30; +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = sin(i) + j +# - sin(i) varies per row (helps detect cross-row interference) +# - + j ensures a predictable column-wise growth pattern +X = sin(ri %*% matrix(1, 1, m)) + (matrix(1, n, 1) %*% cj); + +# Row-wise cumulative sum (must be supported by your implementation) +Y = rowcumsum(X); + +# Expected via explicit sequential loop over columns +E = matrix(0, n, m); +E[,1] = X[,1]; +for (j in 2:m) { + E[,j] = E[,j-1] + X[,j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge04_SparseImpulses_100x400.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge04_SparseImpulses_100x400.dml new file mode 100644 index 00000000000..9ad65ca64aa --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge04_SparseImpulses_100x400.dml @@ -0,0 +1,53 @@ +# Test-specific focus: +# Sparse-like structure with isolated non-zero impulses. +# +# Why this matters: +# In sparse or near-sparse matrices, incorrect carry handling often +# manifests as: +# - Sudden jumps +# - Missing propagation after zero blocks +# +# Matrix construction: +# - Mostly zeros +# - Deterministic impulses at known columns and rows +# +# Expected behavior: +# Cumulative sum should only change at impulse locations and remain +# constant elsewhere. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 100; m = 400; +X = matrix(0, n, m); + +# Put impulses in a deterministic way +# Row 1: spikes of +10 every 50 columns +for (j in 1:m) { + if (j %% 50 == 0) { + X[1, j] = 10; + } +} + +# Row 50: ramp in first 100 columns (1..100) +for (j in 1:100) { + X[50, j] = j; +} + +# Row-wise cumulative sum (must be supported by your implementation) +Y = rowcumsum(X); + +# Expected via explicit sequential loop over columns +E = matrix(0, n, m); +E[, 1] = X[, 1]; +for (j in 2:m) { + E[, j] = E[, j-1] + X[, j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge05_NegHeavy_300x250.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge05_NegHeavy_300x250.dml new file mode 100644 index 00000000000..99c6da8daaf --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge05_NegHeavy_300x250.dml @@ -0,0 +1,44 @@ +# Test-specific focus: +# Negative-dominated values with increasing magnitude. +# +# Why this matters: +# Ensures numerical correctness when cumulative sums are decreasing +# and values span multiple column blocks. +# +# This test stresses: +# - Sign correctness (cumsum should keep decreasing) +# - Carry propagation across column blocks +# - No accidental abs() / truncation / overflow-ish behavior +# +# Expected result: +# Verified via a sequential loop reference (per-row prefix sum). +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 300; m = 250; +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = - (i*j) / 1000 +# Construct i replicated across columns and j replicated across rows, then multiply. +I = ri %*% matrix(1, 1, m); +J = matrix(1, n, 1) %*% cj; +X = - (I * J) / 1000; + +Y = rowcumsum(X); + +# Expected via explicit sequential loop over columns +E = matrix(0, n, m); +E[, 1] = X[, 1]; +for (j in 2:m) { + E[, j] = E[, j-1] + X[, j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge06_FloatTiny_150x600.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge06_FloatTiny_150x600.dml new file mode 100644 index 00000000000..21f93a36adc --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge06_FloatTiny_150x600.dml @@ -0,0 +1,33 @@ +fix this only dont give the test :# Test-specific focus: +# Floating-point stability with very small values. +# +# Why this matters: +# Distributed prefix sums can amplify floating-point error if +# offsets are applied incorrectly or in the wrong order. +# +# Matrix construction: +# Values on the order of 1e-6 with mixed signs. +# +# Expected behavior: +# Numerical error remains within tight tolerance (1e-10). + +setExecMode("spark"); + +n=150; m=600; +ri = matrix(seq(1,n), n, 1); +cj = matrix(seq(1,m), 1, m); + +# X[i,j] = 1e-6 * (i - j) +X = 1e-6 * ( (ri %*% matrix(1,1,m)) - (matrix(1,n,1) %*% cj) ); + +Y = rowCumsum(X); + +E = matrix(0, n, m); +E[,1] = X[,1]; +for(j in 2:m){ + E[,j] = E[,j-1] + X[,j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-10); # slightly stricter because values are tiny +write(pass, $1); \ No newline at end of file diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge07_BlockCarry_64x1024.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge07_BlockCarry_64x1024.dml new file mode 100644 index 00000000000..42fd0c9e461 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge07_BlockCarry_64x1024.dml @@ -0,0 +1,45 @@ +# Test-specific focus: +# Stress test for carry propagation across many column blocks. +# +# Why this matters: +# Spark row-wise prefix sums commonly break at block boundaries when +# carries are not propagated correctly. This test uses a very simple +# data pattern with a closed-form expected result, so any block-boundary +# error shows up immediately. +# +# Matrix construction: +# X[i,j] = i (constant across each row) +# +# Expected result (closed-form, no loops): +# E[i,j] = sum_{k=1..j} i = j * i +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. That helper exists only in the JUnit harness. +# - Exec mode is controlled by CLI: -exec spark + +n = 64; m = 1024; + +# Row index column vector i = 1..n +ri = matrix(seq(1, n), n, 1); + +# Construct X so every row i is constant: X[i,j] = i +X = ri %*% matrix(1, 1, m); + +# Compute row-wise cumulative sum +Y = rowcumsum(X); + +# Column index row vector j = 1..m +cj = matrix(seq(1, m), 1, m); +J = matrix(1, n, 1) %*% cj; # replicate j down the rows +R = ri %*% matrix(1, 1, m); # replicate i across columns + +# Expected: E[i,j] = j * i +E = J * R; + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge08_DeterministicHash_128x256.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge08_DeterministicHash_128x256.dml new file mode 100644 index 00000000000..b03474e1479 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge08_DeterministicHash_128x256.dml @@ -0,0 +1,46 @@ +# Test-specific focus: +# Pseudo-random but deterministic data pattern. +# +# Why this matters: +# Captures realistic, irregular data distributions while remaining +# fully reproducible. +# +# This test ensures: +# - No assumptions are made about monotonicity or smoothness +# - Carry propagation works for arbitrary values +# +# Expected result: +# Computed via trusted sequential loop. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. That helper exists only in the JUnit harness. +# - Exec mode is controlled by CLI: -exec spark + +n = 128; m = 256; + +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# deterministic pseudo-random-ish: +# X[i,j] = ((i*1103515245 + j*12345) %% 97) - 48 +A = (ri %*% matrix(1, 1, m)) * 1103515245; +B = (matrix(1, n, 1) %*% cj) * 12345; +X = ((A + B) %% 97) - 48; + +# row-wise cumsum (builtin name is lower-case in DML) +Y = rowcumsum(X); + +# Expected result via sequential loop +E = matrix(0, n, m); +E[, 1] = X[, 1]; +for (j in 2:m) { + E[, j] = E[, j-1] + X[, j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge09_SpotChecks_10x1000.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge09_SpotChecks_10x1000.dml new file mode 100644 index 00000000000..3cd29cd3900 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge09_SpotChecks_10x1000.dml @@ -0,0 +1,45 @@ +# Test-specific focus: +# Explicit spot-check validation with known exact values. +# +# Why this matters: +# Provides human-readable correctness checks that can be discussed +# and verified manually. +# +# This test: +# - Checks global correctness via max error +# - Verifies specific entries with analytically known results +# +# Useful for debugging and supervisor review. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 10; m = 1000; + +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +# X[i,j] = i + j +X = ri %*% matrix(1, 1, m) + matrix(1, n, 1) %*% cj; + +# Row-wise cumulative sum (builtin is lower-case in DML) +Y = rowcumsum(X); + +# Closed-form expected: +# E[i,j] = sum_{k=1..j} (i+k) = j*i + j*(j+1)/2 +J = matrix(1, n, 1) %*% cj; +E = J * (ri %*% matrix(1, 1, m)) + (J * (J + 1)) / 2; + +err = max(abs(Y - E)); + +# Spot checks (exact integers) +s1 = Y[1, 1]; # should be 1+1 = 2 +s2 = Y[1, 10]; # sum_{k=1..10}(1+k)= 10*1 + 55 = 65 +s3 = Y[3, 1000]; # 1000*3 + 1000*1001/2 = 3000 + 500500 = 503500 + +pass = (err < 1e-9) & (s1 == 2) & (s2 == 65) & (s3 == 503500); + +write(pass, $1); diff --git a/src/test/scripts/functions/unary/matrix/RowCumsumLarge10_Regression_256x256.dml b/src/test/scripts/functions/unary/matrix/RowCumsumLarge10_Regression_256x256.dml new file mode 100644 index 00000000000..6687a78c64f --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/RowCumsumLarge10_Regression_256x256.dml @@ -0,0 +1,43 @@ +# Test-specific focus: +# Regression-style test combining multiple patterns. +# +# Why this matters: +# Designed to catch subtle bugs introduced by future refactoring. +# +# Matrix characteristics: +# - Mixed signs +# - Row-dependent offsets +# - Moderate size across both dimensions +# +# Expected result: +# Sequential loop reference. +# +# Output: +# Writes TRUE/FALSE to $1 +# +# NOTE: +# - Do NOT call setExecMode() here. Exec mode is controlled by CLI: -exec spark + +n = 256; m = 256; + +# Create structured matrix with mixed sign and magnitude +ri = matrix(seq(1, n), n, 1); +cj = matrix(seq(1, m), 1, m); + +X = ((ri %*% matrix(1, 1, m)) - (matrix(1, n, 1) %*% cj)) / 10; +X = X + ((ri %% 7) %*% matrix(1, 1, m)); # add row-dependent offsets + +# Row-wise cumulative sum (builtin is lower-case in DML) +Y = rowcumsum(X); + +# Reference via loop +E = matrix(0, n, m); +E[, 1] = X[, 1]; +for (j in 2:m) { + E[, j] = E[, j-1] + X[, j]; +} + +err = max(abs(Y - E)); +pass = (err < 1e-9); + +write(pass, $1);