diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java index 003703f86a4..eb2a5a356b7 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java @@ -65,7 +65,7 @@ public abstract class AColGroup implements Serializable { /** Public super types of compression ColGroups supported */ public static enum CompressionType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, LinearFunctional; + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, DDCLZW, LinearFunctional; public boolean isDense() { return this == DDC || this == CONST || this == DDCFOR || this == DDCFOR; @@ -86,7 +86,7 @@ public boolean isSDC() { * Protected such that outside the ColGroup package it should be unknown which specific subtype is used. */ protected static enum ColGroupType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DeltaDDC, + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DDCLZW, DeltaDDC, LinearFunctional; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java index ac4defcabd5..a3fdf1fc89f 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java @@ -1112,13 +1112,13 @@ protected boolean allowShallowIdentityRightMult() { public AColGroup convertToDeltaDDC() { int numCols = _colIndexes.size(); int numRows = _data.size(); - + DblArrayCountHashMap map = new DblArrayCountHashMap(Math.max(numRows, 64)); double[] rowDelta = new double[numCols]; double[] prevRow = new double[numCols]; DblArray dblArray = new DblArray(rowDelta); int[] rowToDictId = new int[numRows]; - + double[] dictVals = _dict.getValues(); for(int i = 0; i < numRows; i++) { @@ -1134,13 +1134,13 @@ public AColGroup convertToDeltaDDC() { prevRow[j] = val; } } - + rowToDictId[i] = map.increment(dblArray); } - + if(map.size() == 0) return new ColGroupEmpty(_colIndexes); - + ACount[] vals = map.extractValues(); final int nVals = vals.length; final double[] dictValues = new double[nVals * numCols]; @@ -1153,7 +1153,7 @@ public AColGroup convertToDeltaDDC() { oldIdToNewId[dac.id] = i; idx += numCols; } - + DeltaDictionary deltaDict = new DeltaDictionary(dictValues, numCols); AMapToData newData = MapToFactory.create(numRows, nVals); for(int i = 0; i < numRows; i++) { @@ -1162,4 +1162,7 @@ public AColGroup convertToDeltaDDC() { return ColGroupDeltaDDC.create(_colIndexes, deltaDict, newData, null); } + public AColGroup convertToDDCLZW() { + return ColGroupDDCLZW.create(_colIndexes, _dict, _data, null); + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java new file mode 100644 index 00000000000..f5d5112f263 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.colgroup; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutorService; + +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorSpecies; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P; +import org.apache.sysds.runtime.compress.colgroup.dictionary.*; +import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.colgroup.scheme.DDCLZWScheme; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.compress.estim.encoding.IEncode; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.functionobjects.*; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import shaded.parquet.it.unimi.dsi.fastutil.ints.IntArrayList; +import shaded.parquet.it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap; + +import java.util.HashMap; + +/** + * Class to encapsulate information about a column group that is encoded with dense dictionary encoding (DDC) whose + * mapping vector is additionally lzw compressed. Idea: - DDCLZW stores the mapping vector exclusively in compressed + * form. - No persistent MapToData cache is maintained. - Sequential operations decode on-the-fly, while operations + * requiring random access explicitly materialize and fall back to DDC. + */ +public class ColGroupDDCLZW extends APreAgg implements IMapToDataGroup { + private static final long serialVersionUID = -5769772089913918987L; + + private final int[] _dataLZW; // LZW compressed representation of the mapping + private final int _nRows; // Number of rows in the mapping vector + private final int _nUnique; // Number of unique values in the mapping vector + + // Builds a packed 64-bit key for (prefixCode(w), nextSymbol(k)) pairs used in the LZW dictionary. (TODO) + private static long packKey(int prefixCode, int nextSymbol) { + return (((long) prefixCode) << 32) | (nextSymbol & 0xffffffffL); + } + + // Compresses a mapping (AMapToData) into an LZW-compressed byte/integer/? array. + private static int[] compress(AMapToData data) { + if(data == null) + throw new IllegalArgumentException("Invalid input: data is null"); + + final int nRows = data.size(); + if(nRows <= 0) { + throw new IllegalArgumentException("Invalid input: data has no rows"); + } + + final int nUnique = data.getUnique(); + if(nUnique <= 0) { + throw new IllegalArgumentException("Invalid input: data has no unique values"); + } + + // Fast-path: single symbol + if(nRows == 1) + return new int[] {data.getIndex(0)}; + + // LZW dictionary. Maps (prefixCode, nextSymbol) -> newCode (to a new code). + // Using fastutil keeps lookups fast. (TODO improve time/space complexity) + final Long2IntLinkedOpenHashMap dict = new Long2IntLinkedOpenHashMap(1 << 16); + dict.defaultReturnValue(-1); + + // Output buffer (heuristic capacity; avoids frequent reallocs) + final IntArrayList out = new IntArrayList(Math.max(16, nRows / 2)); + + // Codes {0,...,nUnique - 1} are reserved for the original symbols. + int nextCode = nUnique; + + // Initialize w with the first input symbol. + // AMapToData stores dictionary indices, not actual data values. + // Since indices reference positions in an IDictionary, they are always in the valid index range 0 … nUnique−1; + int w = data.getIndex(0); + + // Process the remaining input symbols. + // Example: _data = [2,0,2,3,0,2,1,0,2]. + for(int i = 1; i < nRows; i++) { + final int k = data.getIndex(i); // next input symbol + + if(k < 0 || k >= nUnique) + throw new IllegalArgumentException("Symbol out of range: " + k + " (nUnique=" + nUnique + ")"); + + final long key = packKey(w, k); // encode (w,k) into long key + + int wk = dict.get(key); // look if wk exists in dict + if(wk != -1) { + w = wk; // wk exists in dict so replace w by wk and continue. + } + else { + // wk does not exist in dict. output current phrase, add new phrase, restart at k + out.add(w); + dict.put(key, nextCode++); + w = k; // Start new phrase with k + } + } + + out.add(w); + return out.toIntArray(); + } + + // Unpack upper 32 bits (w) of (w,k) key pair. + private static int unpackfirst(long key) { + return (int) (key >>> 32); + } + + // Unpack lower 32 bits (k) of (w,k) key pair. + private static int unpacksecond(long key) { + return (int) (key); + } + + // Append symbol to end of int-array. + private static int[] packint(int[] arr, int last) { + int[] result = Arrays.copyOf(arr, arr.length + 1); + result[arr.length] = last; + return result; + } + + // Reconstruct phrase to lzw-code. + private static int[] unpack(int code, int nUnique, Map dict) { + // Base symbol (implicit alphabet) + if(code < nUnique) + return new int[] {code}; + + Stack stack = new Stack<>(); + int c = code; + + while(c >= nUnique) { + Long key = dict.get(c); + if(key == null) + throw new IllegalStateException("Missing dictionary entry for code: " + c); + + int symbol = unpacksecond(key); + stack.push(symbol); + c = unpackfirst(key); + } + + // Basissymbol + stack.push(c); + int[] outarray = new int[stack.size()]; + int i = 0; + // korrekt ins Output schreiben + while(!stack.isEmpty()) { + outarray[i++] = stack.pop(); + } + return outarray; + } + + // Decompresses an LZW-compressed vector into its pre-compressed AMapToData form until index. + private static AMapToData decompressFull(int[] codes, int nUnique, int nRows) { + return decompress(codes, nUnique, nRows, nRows); + } + + private final class LZWMappingIterator { + private final Map dict = new HashMap<>(); // LZW-dictionary. Maps code -> (prefixCode, nextSymbol). + private int lzwIndex = 0; // Current position in the LZW-compressed mapping (_dataLZW). + private int mapIndex = 0; // Number of mapping symbols returned so far. + private int nextCode = _nUnique; // Next free LZW code. + private int[] currentPhrase = null; // Current phrase being decoded from the LZW-compressed mapping. + private int currentPhraseIndex = 0; // Next position in the current phrase to return. + private int[] oldPhrase = null; // Previous phrase. + private int oldCode = -1; // Previous code. + + LZWMappingIterator() { + lzwIndex = 1; // First code consumed during initialization. + oldCode = _dataLZW[0]; // Decode the first code into initial phrase. + oldPhrase = unpack(oldCode, _nUnique, dict); + currentPhrase = oldPhrase; + currentPhraseIndex = 0; + mapIndex = 0; // No mapping symbols have been returned yet. + } + + // True if there are more mapping symbols to decode. + boolean hasNext() { + return mapIndex < _nRows; + } + + /*void skip(int k) { + for(int i = 0; i < k; i++) + next(); + }*/ + + int next() { + if(!hasNext()) + throw new NoSuchElementException(); + + // If the current phrase still has symbols, return the next symbol from it. + if(currentPhraseIndex < currentPhrase.length) { + mapIndex++; + return currentPhrase[currentPhraseIndex++]; + } + + // Otherwises decode the next code into a new phrase. + if(lzwIndex >= _dataLZW.length) + throw new IllegalStateException("Invalid LZW index: " + lzwIndex); + + final int key = _dataLZW[lzwIndex++]; + + final int[] next; + if(key < _nUnique || dict.containsKey(key)) { + next = unpack(key, _nUnique, + dict); // Normal case: The code is either a base symbol or already present in the dictionary. + } + else { + next = packint(oldPhrase, oldPhrase[0]); // Special case. + } + + // Add new phrase to dictionary: nextCode -> (oldCode, firstSymbol(next)). + dict.put(nextCode++, packKey(oldCode, next[0])); + + // Advance decoder state. + oldCode = key; + oldPhrase = next; + + // Start returning symbols from the newly decoded phrase. + currentPhrase = next; + currentPhraseIndex = 0; + + mapIndex++; + return currentPhrase[currentPhraseIndex++]; + } + } + + // Decompresses an LZW-compressed vector into its pre-compressed AMapToData form until index. + private static AMapToData decompress(int[] codes, int nUnique, int nRows, int index) { + // Validate input arguments. + if(codes == null) + throw new IllegalArgumentException("codes is null"); + if(codes.length == 0) + throw new IllegalArgumentException("codes is empty"); + if(nUnique <= 0) + throw new IllegalArgumentException("Invalid alphabet size: " + nUnique); + if(nRows <= 0) { + throw new IllegalArgumentException("Invalid nRows: " + nRows); + } + if(index > nRows) { + throw new IllegalArgumentException("Index is larger than Data Length: " + index); + } + + // Return empty Map if index is zero. + if(index == 0) + return MapToFactory.create(0, nUnique); + + // Maps: code -> packKey(prefixCode, lastSymbolOfPhrase). + // Base symbols (0..nUnique-1) are implicit and not stored here. + final Map dict = new HashMap<>(); + + // Output mapping that will be reconstructed. + AMapToData out = MapToFactory.create(index, nUnique); + int outPos = 0; // Current write position in the output mapping. + + // Decode the first code. The first code always expands to a valid phrase without needing + // any dictionary entries. + int old = codes[0]; + int[] oldPhrase = unpack(old, nUnique, dict); + + for(int v : oldPhrase) { + if(outPos == index) + break; + out.set(outPos++, v); + } + + // Next free dictionary code. Codes 0..nUnique-1 are reserved for base symbols. + int nextCode = nUnique; + + // Process remaining codes. + for(int i = 1; i < codes.length; i++) { + int key = codes[i]; + + int[] next; + if(key < nUnique || dict.containsKey(key)) { + // Normal case: The code is either a base symbol or already present in the dictionary. + next = unpack(key, nUnique, dict); + } + else { + // KwKwK special case: The current code refers to a phrase that is being defined right now. + // next = oldPhrase + first(oldPhrase). + int first = oldPhrase[0]; + next = packint(oldPhrase, first); + } + + // Append the reconstructed phrase to the output mapping. + for(int v : next) { + if(outPos == index) + // Stop immediately once done. + return out; + out.set(outPos++, v); + } + + // Add new phrase to dictionary: nextCode -> (old, firstSymbol(next)). + final int first = next[0]; + dict.put(nextCode++, packKey(old, first)); + + // Advance. + old = key; + oldPhrase = next; + } + + // Safety check: decoder must produce exactly nRows symbols. + if(outPos != index) + throw new IllegalStateException("Decompression length mismatch: got " + outPos + " expected " + index); + + // Return the reconstructed mapping. + return out; + } + + // Build Constructor: Used when creating a new DDCLZW instance during compression/build time. (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + // Derive metadadata + _nRows = data.size(); + _nUnique = dict.getNumberOfValues(colIndexes.size()); + + // Compress mapping to LZW + _dataLZW = compress(data); + + if(CompressedMatrixBlock.debug) { + if(getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if(_nRows == 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if(data.getUnique() != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException( + "Invalid map to dict Map has:" + data.getUnique() + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if(c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + data.verify(); + } + } + + // Read Constructor: Used when creating this group from a serialized form (e.g., reading a compressed matrix from disk/memory stream). (TODO) + private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, int[] dataLZW, int nRows, int nUnique, + int[] cachedCounts) { + super(colIndexes, dict, cachedCounts); + + _dataLZW = dataLZW; + _nRows = nRows; + _nUnique = nUnique; + + if(CompressedMatrixBlock.debug) { + if(getNumValues() == 0) + throw new DMLCompressionException("Invalid construction with empty dictionary"); + if(_nRows <= 0) + throw new DMLCompressionException("Invalid length of the data. is zero"); + if(_nUnique != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid map to dict Map has:" + _nUnique + " while dict has " + + dict.getNumberOfValues(colIndexes.size())); + int[] c = getCounts(); + if(c.length != dict.getNumberOfValues(colIndexes.size())) + throw new DMLCompressionException("Invalid DDC Construction"); + } + } + + // Factory method for creating a column group. (AColGroup g = ColGroupDDCLZW.create(...);) + public static AColGroup create(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { + if(dict == null) + return new ColGroupEmpty(colIndexes); + else if(data.getUnique() == 1) + return ColGroupConst.create(colIndexes, dict); + else + return new ColGroupDDCLZW(colIndexes, dict, data, cachedCounts); + } + + public AColGroup convertToDDC() { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, _nRows); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + public AColGroup convertToDDC(int index) { + final AMapToData map = decompress(_dataLZW, _nUnique, _nRows, index); + final int[] counts = getCounts(); // may be null depending on your group + return ColGroupDDC.create(_colIndexes, _dict, map, counts); + } + + // Deserialize ColGroupDDCLZW object in binary stream. + public static ColGroupDDCLZW read(DataInput in) throws IOException { + final IColIndex colIndexes = ColIndexFactory.read(in); + final IDictionary dict = DictionaryFactory.read(in); + + // Metadata for lzw mapping. + final int nRows = in.readInt(); + final int nUnique = in.readInt(); + + // Read compressed mapping array. + final int len = in.readInt(); + if(len < 0) + throw new IOException("Invalid LZW data length: " + len); + + final int[] dataLZW = new int[len]; + for(int i = 0; i < len; i++) + dataLZW[i] = in.readInt(); + + // cachedCounts currently not serialized (mirror ColGroupDDC.read which passes null) + return new ColGroupDDCLZW(colIndexes, dict, dataLZW, nRows, nUnique, null); + } + + // Serialize a ColGroupDDC-object into binary stream. + @Override + public void write(DataOutput out) throws IOException { + _colIndexes.write(out); + _dict.write(out); + out.writeInt(_nRows); + out.writeInt(_nUnique); + out.writeInt(_dataLZW.length); // TODO: correct ? + for(int i : _dataLZW) + out.writeInt(i); + } + + @Override + public double getIdx(int r, int colIdx) { + if(r < 0 || r >= _nRows) + throw new DMLRuntimeException("Row index out of bounds"); + + if(colIdx < 0 || colIdx >= _colIndexes.size()) + throw new DMLRuntimeException("Column index out of bounds"); + + final LZWMappingIterator it = new LZWMappingIterator(); + int dictIdx = -1; + for(int i = 0; i <= r; i++) { + dictIdx = it.next(); + } + return _dict.getValue(dictIdx, colIdx, _colIndexes.size()); + } + + @Override + public CompressionType getCompType() { + return CompressionType.DDCLZW; + } + + @Override + protected ColGroupType getColGroupType() { + return ColGroupType.DDCLZW; + } + + @Override + public boolean containsValue(double pattern) { + return _dict.containsValue(pattern); + } + + @Override + public double getCost(ComputationCostEstimator e, int nRows) { + final int nVals = getNumValues(); + final int nCols = getNumCols(); + return e.getCost(nRows, nRows, nCols, nVals, _dict.getSparsity()); + } + + @Override + public ICLAScheme getCompressionScheme() { + //TODO: in ColGroupDDCFor nicht implementiert - sollen wir das erstellen? Inhalt: ncols wie DDC + return DDCLZWScheme.create(this); + } + + @Override + protected int numRowsToMultiply() { + return _nRows; + } + + @Override + protected AColGroup copyAndSet(IColIndex colIndexes, IDictionary newDictionary) { + return new ColGroupDDCLZW(colIndexes, newDictionary, _dataLZW, _nRows, _nUnique, getCachedCounts()); + } + + @Override + public long getExactSizeOnDisk() { + long ret = super.getExactSizeOnDisk(); + ret += 4; // _nRows size + ret += 4; // _nUnique size + ret += 4; // dataLZW.length + ret += (long) _dataLZW.length * 4; //lzw codes + return ret; + } + + @Override + public AMapToData getMapToData() { + return decompressFull(_dataLZW, _nUnique, _nRows); + } + + @Override + public boolean sameIndexStructure(AColGroupCompressed that) { + return that instanceof ColGroupDDCLZW && ((ColGroupDDCLZW) that)._dataLZW == this._dataLZW; + } + + @Override + protected double computeMxx(double c, Builtin builtin) { + return _dict.aggregate(c, builtin); + } + + @Override + protected void computeColMxx(double[] c, Builtin builtin) { + _dict.aggregateCols(c, builtin, _colIndexes); + } + + @Override + public AColGroup sliceRows(int rl, int ru) { + try { + if(rl < 0 || ru > _nRows) + throw new DMLRuntimeException("Invalid slice range: " + rl + " - " + ru); + + final int len = ru - rl; + if(len == 0) + return new ColGroupEmpty(_colIndexes); + + final int[] slicedMapping = new int[len]; + + final LZWMappingIterator it = new LZWMappingIterator(); + + for(int i = 0; i < rl; i++) + it.next(); + + for(int i = rl; i < ru; i++) + slicedMapping[i - rl] = it.next(); + + AMapToData slicedMappingAMapToData = MapToFactory.create(len, _nUnique); + for(int i = 0; i < len; i++) { + slicedMappingAMapToData.set(i, slicedMapping[i]); + } + + return new ColGroupDDCLZW(_colIndexes, _dict, slicedMappingAMapToData, null); + } + catch(Exception e) { + throw new DMLRuntimeException("Failed to slice out sub part DDCLZW: " + rl + ", " + ru, e); + } + } + + @Override + protected void decompressToDenseBlockTransposedSparseDictionary(DenseBlock db, int rl, int ru, SparseBlock sb) { + LZWMappingIterator it = new LZWMappingIterator(); + for(int i = 0; i < rl; i++) { + it.next(); + } + + for(int i = rl; i < ru; i++) { + final int vr = it.next(); + if(sb.isEmpty(vr)) + continue; + final int apos = sb.pos(vr); + final int alen = sb.size(vr) + apos; + final int[] aix = sb.indexes(vr); + final double[] aval = sb.values(vr); + for(int j = apos; j < alen; j++) { + final int rowOut = _colIndexes.get(aix[j]); + final double[] c = db.values(rowOut); + final int off = db.pos(rowOut); + c[off + i] += aval[j]; + } + } + } + + @Override + protected void decompressToDenseBlockTransposedDenseDictionary(DenseBlock db, int rl, int ru, double[] dict) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + g.decompressToDenseBlockTransposedDenseDictionary(db, rl, ru, dict); // Possible implementation with iterator. + + } + + @Override + protected void decompressToSparseBlockTransposedSparseDictionary(SparseBlockMCSR sbr, SparseBlock sb, int nColOut) { + + int[] colCounts = _dict.countNNZZeroColumns(getCounts()); + for(int j = 0; j < _colIndexes.size(); j++) + sbr.allocate(_colIndexes.get(j), colCounts[j]); + + LZWMappingIterator it = new LZWMappingIterator(); // Replace data.getIndex withiterator. + + for(int i = 0; i < _nRows; i++) { + int di = it.next(); + if(sb.isEmpty(di)) + continue; + + final int apos = sb.pos(di); + final int alen = sb.size(di) + apos; + final int[] aix = sb.indexes(di); + final double[] aval = sb.values(di); + + for(int j = apos; j < alen; j++) { + sbr.append(_colIndexes.get(aix[j]), i, aval[apos]); + } + } + } + + @Override + protected void decompressToSparseBlockTransposedDenseDictionary(SparseBlockMCSR db, double[] dict, int nColOut) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + g.decompressToSparseBlockTransposedDenseDictionary(db, dict, nColOut); // Possible implementation with iterator. + } + + @Override + protected void decompressToDenseBlockSparseDictionary(DenseBlock db, int rl, int ru, int offR, int offC, + SparseBlock sb) { + LZWMappingIterator it = new LZWMappingIterator(); + for(int i = 0; i < rl; i++) { + it.next(); // Skip to rl. + } + + for(int r = rl, offT = rl + offR; r < ru; r++, offT++) { + final int vr = it.next(); + if(sb.isEmpty(vr)) + continue; + final double[] c = db.values(offT); + final int off = db.pos(offT) + offC; + _colIndexes.decompressToDenseFromSparse(sb, vr, off, c); + } + } + + @Override + protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC, + double[] values) { + final int nCol = _colIndexes.size(); + final LZWMappingIterator it = new LZWMappingIterator(); + + for(int i = 0; i < rl; i++) { + it.next(); + } + + if(db.isContiguous() && nCol == db.getDim(1) && offC == 0) { + final int nColOut = db.getDim(1); + final double[] c = db.values(0); + + for(int i = rl; i < ru; i++) { + final int dictIdx = it.next(); + final int rowIndex = dictIdx * nCol; + final int rowBaseOff = (i + offR) * nColOut; + + for(int j = 0; j < nCol; j++) + c[rowBaseOff + j] = values[rowIndex + j]; + } + } + else { + for(int i = rl, offT = rl + offR; i < ru; i++, offT++) { + final double[] c = db.values(offT); + final int off = db.pos(offT) + offC; + final int dictIdx = it.next(); + final int rowIndex = dictIdx * nCol; + + for(int j = 0; j < nCol; j++) { + final int colIdx = _colIndexes.get(j); + c[off + colIdx] = values[rowIndex + j]; + } + } + } + } + + @Override + protected void decompressToSparseBlockSparseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC, + SparseBlock sb) { + LZWMappingIterator it = new LZWMappingIterator(); + for(int i = 0; i < rl; i++) { + it.next(); + } + + for(int r = rl, offT = rl + offR; r < ru; r++, offT++) { + final int vr = it.next(); + if(sb.isEmpty(vr)) + continue; + final int apos = sb.pos(vr); + final int alen = sb.size(vr) + apos; + final int[] aix = sb.indexes(vr); + final double[] aval = sb.values(vr); + for(int j = apos; j < alen; j++) + ret.append(offT, offC + _colIndexes.get(aix[j]), aval[j]); + } + } + + @Override + protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC, + double[] values) { + decompressToSparseBlockDenseDictionary(ret, rl, ru, offR, offC, values, _colIndexes.size()); + } + + protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC, + double[] values, int nCol) { + LZWMappingIterator it = new LZWMappingIterator(); + for(int i = 0; i < rl; i++) { + it.next(); + } + + for(int i = rl, offT = rl + offR; i < ru; i++, offT++) { + final int rowIndex = it.next() * nCol; + for(int j = 0; j < nCol; j++) + ret.append(offT, _colIndexes.get(j) + offC, values[rowIndex + j]); + } + } + + @Override // TODO: Implement! Pays of with LZW! + public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) { + convertToDDC().leftMultByMatrixNoPreAgg(matrix, result, rl, ru, cl, cu); // Fallback to DDC. + } + + @Override + public AColGroup scalarOperation(ScalarOperator op) { + if((op.fn instanceof Plus || op.fn instanceof Minus)) { + final double v0 = op.executeScalar(0); + if(v0 == 0) + return this; + } + + return new ColGroupDDCLZW(_colIndexes, _dict.applyScalarOp(op), _dataLZW, _nRows, _nUnique, getCachedCounts()); + } + + @Override + public AColGroup unaryOperation(UnaryOperator op) { + return new ColGroupDDCLZW(_colIndexes, _dict.applyUnaryOp(op), _dataLZW, _nRows, _nUnique, getCachedCounts()); + } + + @Override + public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean isRowSafe) { + IDictionary ret = _dict.binOpLeft(op, v, _colIndexes); + + AMapToData data = decompressFull(_dataLZW, _nUnique, _nRows); + return create(getColIndices(), ret, data, getCachedCounts()); + } + + @Override + public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, boolean isRowSafe) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + return g.binaryRowOpRight(op, v, isRowSafe); + } + + public int[] appendDataLZWMap(int[] dataLZW) { + int[] newDataLZW = new int[_dataLZW.length + dataLZW.length]; + System.arraycopy(_dataLZW, 0, newDataLZW, 0, _dataLZW.length); + System.arraycopy(dataLZW, 0, newDataLZW, _dataLZW.length, dataLZW.length); + return newDataLZW; + } + + @Override + public AColGroup append(AColGroup g) { + if(g instanceof ColGroupDDCLZW) { + if(g.getColIndices().equals(_colIndexes)) { + ColGroupDDCLZW gDDCLZW = (ColGroupDDCLZW) g; + if(gDDCLZW._dict.equals(_dict)) { + if(_nUnique == gDDCLZW._nUnique) { + int[] mergedMap = new int[this._nRows + gDDCLZW._nRows]; + + LZWMappingIterator it = new LZWMappingIterator(); + for(int i = 0; i < this._nRows; i++) { + mergedMap[i] = it.next(); + } + + LZWMappingIterator gLZWit = gDDCLZW.new LZWMappingIterator(); + for(int i = this._nRows; i < mergedMap.length; i++) { + mergedMap[i] = gLZWit.next(); + } + + AMapToData mergedDataAMap = MapToFactory.create(mergedMap.length, _nUnique); + int mergedDataAMapPos = 0; + + for(int j : mergedMap) { + mergedDataAMap.set(mergedDataAMapPos++, j); + } + + int[] mergedDataAMapCompressed = compress(mergedDataAMap); + + return new ColGroupDDCLZW(_colIndexes, _dict, mergedDataAMapCompressed, mergedMap.length, + _nUnique, null); + } + else + LOG.warn("Not same unique values therefore not appending DDCLZW\n" + _nUnique + "\n\n" + + gDDCLZW._nUnique); + } + else + LOG.warn("Not same Dictionaries therefore not appending DDCLZW\n" + _dict + "\n\n" + gDDCLZW._dict); + } + else + LOG.warn( + "Not same columns therefore not appending DDCLZW\n" + _colIndexes + "\n\n" + g.getColIndices()); + } + else + LOG.warn("Not DDCLZW but " + g.getClass().getSimpleName() + ", therefore not appending DDCLZW"); + return null; + } + + // TODO: adjust according to contract, "this shall only be appended once". + @Override + protected AColGroup appendNInternal(AColGroup[] g, int blen, int rlen) { + /*throw new NotImplementedException();*/ + int[] mergedMap = new int[rlen]; + int mergedMapPos = 0; + + for(int i = 1; i < g.length; i++) { + if(!_colIndexes.equals(g[i]._colIndexes)) { + LOG.warn("Not same columns therefore not appending DDCLZW\n" + _colIndexes + "\n\n" + g[i]._colIndexes); + return null; + } + + if(!(g[i] instanceof ColGroupDDCLZW)) { + LOG.warn("Not DDCLZW but " + g[i].getClass().getSimpleName() + ", therefore not appending DDCLZW"); + return null; + } + + final ColGroupDDCLZW gDDCLZW = (ColGroupDDCLZW) g[i]; + if(!gDDCLZW._dict.equals(_dict)) { + LOG.warn("Not same Dictionaries therefore not appending DDCLZW\n" + _dict + "\n\n" + gDDCLZW._dict); + return null; + } + if(!(_nUnique == gDDCLZW._nUnique)) { + LOG.warn( + "Not same unique values therefore not appending DDCLZW\n" + _nUnique + "\n\n" + gDDCLZW._nUnique); + return null; + } + } + + for(AColGroup group : g) { + ColGroupDDCLZW gDDCLZW = (ColGroupDDCLZW) group; + + LZWMappingIterator gLZWit = gDDCLZW.new LZWMappingIterator(); + for(int j = 0; j < gDDCLZW._nRows; j++) + mergedMap[mergedMapPos++] = gLZWit.next(); + } + + AMapToData mergedDataAMap = MapToFactory.create(rlen, _nUnique); + int mergedDataAMapPos = 0; + + for(int k = 0; k < rlen; k++) { + mergedDataAMap.set(k, mergedMap[k]); + } + + int[] mergedDataAMapCompressed = compress(mergedDataAMap); + + return new ColGroupDDCLZW(_colIndexes, _dict, mergedDataAMapCompressed, rlen, _nUnique, null); + } + + @Override + public AColGroup recompress() { + return this; // A new or the same column group depending on optimization goal. (Description DDC) + } + + @Override + public CompressedSizeInfoColGroup getCompressionInfo(int nRow) { + try { + IEncode enc = getEncoding(); + EstimationFactors ef = new EstimationFactors(_nUnique, _nRows, _nRows, _dict.getSparsity()); + return new CompressedSizeInfoColGroup(_colIndexes, ef, estimateInMemorySize(), getCompType(), enc); + } + catch(Exception e) { + throw new DMLCompressionException(this.toString(), e); + } + } + + @Override + protected AColGroup fixColIndexes(IColIndex newColIndex, int[] reordering) { + return new ColGroupDDCLZW(newColIndex, _dict.reorder(reordering), _dataLZW, _nRows, _nUnique, + getCachedCounts()); + } + + @Override // Correct ? + public void sparseSelection(MatrixBlock selection, P[] points, MatrixBlock ret, int rl, int ru) { + final SparseBlock sb = selection.getSparseBlock(); + final SparseBlock retB = ret.getSparseBlock(); + for(int r = rl; r < ru; r++) { + if(sb.isEmpty(r)) + continue; + final int sPos = sb.pos(r); + final int rowCompressed = sb.indexes(r)[sPos]; // column index with 1 + decompressToSparseBlock(retB, rowCompressed, rowCompressed + 1, r - rowCompressed, 0); + } + } + + @Override // Correct ? + protected void denseSelection(MatrixBlock selection, P[] points, MatrixBlock ret, int rl, int ru) { + // morph(CompressionType.UNCOMPRESSED, _data.size()).sparseSelection(selection, ret, rl, ru);; + final SparseBlock sb = selection.getSparseBlock(); + final DenseBlock retB = ret.getDenseBlock(); + for(int r = rl; r < ru; r++) { + if(sb.isEmpty(r)) + continue; + final int sPos = sb.pos(r); + final int rowCompressed = sb.indexes(r)[sPos]; // column index with 1 + decompressToDenseBlock(retB, rowCompressed, rowCompressed + 1, r - rowCompressed, 0); + } + } + + @Override + public AColGroup[] splitReshape(int multiplier, int nRow, int nColOrg) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + return g.splitReshape(multiplier, nRow, nColOrg); // Fallback to ddc. No splitReshapeDDCLZW implemented. + } + + // Not sure here. + @Override + protected boolean allowShallowIdentityRightMult() { + throw new NotImplementedException(); + } + + @Override + protected AColGroup allocateRightMultiplication(MatrixBlock right, IColIndex colIndexes, IDictionary preAgg) { + if(preAgg == null) + return null; + else + return new ColGroupDDCLZW(colIndexes, preAgg, _dataLZW, _nRows, _nUnique, getCachedCounts()); + } + + @Override + public void preAggregateDense(MatrixBlock m, double[] preAgg, int rl, int ru, int cl, int cu) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + g.preAggregateDense(m, preAgg, rl, ru, cl, cu); // Fallback to ddc. + } + + @Override + public void preAggregateSparse(SparseBlock sb, double[] preAgg, int rl, int ru, int cl, int cu) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + g.preAggregateSparse(sb, preAgg, rl, ru, cl, cu); // Fallback to ddc. + } + + @Override + protected void preAggregateThatDDCStructure(ColGroupDDC that, Dictionary ret) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + g.preAggregateThatDDCStructure(that, ret); // Fallback to ddc. + } + + @Override + protected void preAggregateThatSDCZerosStructure(ColGroupSDCZeros that, Dictionary ret) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + g.preAggregateThatSDCZerosStructure(that, ret); // Fallback to ddc. + } + + @Override + protected void preAggregateThatSDCSingleZerosStructure(ColGroupSDCSingleZeros that, Dictionary ret) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + g.preAggregateThatSDCSingleZerosStructure(that, ret); // Fallback to ddc. + + } + + @Override + protected void preAggregateThatRLEStructure(ColGroupRLE that, Dictionary ret) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + g.preAggregateThatRLEStructure(that, ret); // Fallback to ddc. + + } + + @Override + public void leftMMIdentityPreAggregateDense(MatrixBlock that, MatrixBlock ret, int rl, int ru, int cl, int cu) { + ColGroupDDC g = (ColGroupDDC) convertToDDC(); + g.leftMMIdentityPreAggregateDense(that, ret, rl, ru, cl, cu); // Fallback to ddc. + } + + @Override + protected int[] getCounts(int[] out) { + AMapToData data = decompressFull(_dataLZW, _nUnique, _nRows); + return data.getCounts(); + } + + @Override + protected void computeRowSums(double[] c, int rl, int ru, double[] preAgg) { + final LZWMappingIterator it = new LZWMappingIterator(); + for(int i = 0; i < rl; i++) + it.next(); + + for(int rix = rl; rix < ru; rix++) + c[rix] += preAgg[it.next()]; + } + + @Override + protected void computeRowMxx(double[] c, Builtin builtin, int rl, int ru, double[] preAgg) { + final LZWMappingIterator it = new LZWMappingIterator(); + for(int i = 0; i < rl; i++) + it.next(); + + for(int i = rl; i < ru; i++) + c[i] = builtin.execute(c[i], preAgg[it.next()]); + } + + @Override + protected void computeRowProduct(double[] c, int rl, int ru, double[] preAgg) { + final LZWMappingIterator it = new LZWMappingIterator(); + for(int i = 0; i < rl; i++) + it.next(); + + for(int rix = rl; rix < ru; rix++) + c[rix] *= preAgg[it.next()]; + } +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWScheme.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWScheme.java new file mode 100644 index 00000000000..f4bc4a023fa --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWScheme.java @@ -0,0 +1,25 @@ +package org.apache.sysds.runtime.compress.colgroup.scheme; + +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC; +import org.apache.sysds.runtime.compress.colgroup.ColGroupDDCLZW; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +public abstract class DDCLZWScheme extends DDCScheme { + // TODO: private int nUnique; Zu Datenspezifisch, überhaupt sinnvoll + + protected DDCLZWScheme(IColIndex cols) { + super(cols); + } + + public static DDCLZWScheme create(ColGroupDDCLZW g) { + return g.getNumCols() == 1 ? new DDCLZWSchemeSC(g) : new DDCLZWSchemeMC(g); + } + + public static DDCLZWScheme create(IColIndex cols) { + return cols.size() == 1 ? new DDCLZWSchemeSC(cols) : new DDCLZWSchemeMC(cols); + } + +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeMC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeMC.java new file mode 100644 index 00000000000..4d4f6509d47 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeMC.java @@ -0,0 +1,208 @@ +package org.apache.sysds.runtime.compress.colgroup.scheme; + +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC; +import org.apache.sysds.runtime.compress.colgroup.ColGroupDDCLZW; +import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty; +import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.readers.ReaderColumnSelection; +import org.apache.sysds.runtime.compress.utils.ACount; +import org.apache.sysds.runtime.compress.utils.DblArray; +import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap; +import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.Pair; + +public class DDCLZWSchemeMC extends DDCLZWScheme { + //private DDCSchemeMC ddcscheme; + private final DblArray emptyRow; + + private final DblArrayCountHashMap map; + + private DDCLZWSchemeMC(IColIndex cols, DblArrayCountHashMap map) { + super(cols); + this.map = map; + this.emptyRow = new DblArray(new double[cols.size()]); + } + + protected DDCLZWSchemeMC(ColGroupDDCLZW g) { + super(g.getColIndices()); + this.lastDict = g.getDictionary(); + final MatrixBlock mbDict = lastDict.getMBDict(this.cols.size()).getMatrixBlock(); + final int dictRows = mbDict.getNumRows(); + final int dictCols = mbDict.getNumColumns(); + + // Read the mapping data and materialize map. + map = new DblArrayCountHashMap(dictRows * 2); + final ReaderColumnSelection r = ReaderColumnSelection.createReader(mbDict, // + ColIndexFactory.create(dictCols), false, 0, dictRows); + + DblArray d = null; + while((d = r.nextRow()) != null) + map.increment(d); + + emptyRow = new DblArray(new double[dictCols]); + } + + protected DDCLZWSchemeMC(IColIndex cols) { + super(cols); + final int nCol = cols.size(); + this.map = new DblArrayCountHashMap(4); + this.emptyRow = new DblArray(new double[nCol]); + } + + @Override + protected AColGroup encodeV(MatrixBlock data, IColIndex columns) { + final int nRow = data.getNumRows(); + final ReaderColumnSelection reader = ReaderColumnSelection.createReader(// + data, columns, false, 0, nRow); + return encode(data, reader, nRow, columns); + } + + @Override + protected AColGroup encodeVT(MatrixBlock data, IColIndex columns) { + final int nRow = data.getNumColumns(); + final ReaderColumnSelection reader = ReaderColumnSelection.createReader(// + data, columns, true, 0, nRow); + return encode(data, reader, nRow, columns); + } + + private AColGroup encode(MatrixBlock data, ReaderColumnSelection reader, int nRow, IColIndex columns) { + final AMapToData d = MapToFactory.create(nRow, map.size()); + DblArray cellVals; + ACount emptyIdx = map.getC(emptyRow); + if(emptyIdx == null) { + + while((cellVals = reader.nextRow()) != null) { + final int row = reader.getCurrentRowIndex(); + + final int id = map.getId(cellVals); + d.set(row, id); + + } + } + else { + int r = 0; + while((cellVals = reader.nextRow()) != null) { + final int row = reader.getCurrentRowIndex(); + if(row != r) { + while(r < row) + d.set(r++, emptyIdx.id); + } + final int id = map.getId(cellVals); + d.set(row, id); + r++; + } + while(r < nRow) + d.set(r++, emptyIdx.id); + } + if(lastDict == null || lastDict.getNumberOfValues(columns.size()) != map.size()) + lastDict = DictionaryFactory.create(map, columns.size(), false, data.getSparsity()); + return ColGroupDDCLZW.create(columns, lastDict, d, null); + + } + + + @Override + protected ICLAScheme updateV(MatrixBlock data, IColIndex columns) { + final int nRow = data.getNumRows(); + final ReaderColumnSelection reader = ReaderColumnSelection.createReader(// + data, columns, false, 0, nRow); + return update(data, reader, nRow, columns); + } + + private ICLAScheme update(MatrixBlock data, ReaderColumnSelection reader, int nRow, IColIndex columns) { + DblArray d = null; + int r = 0; + while((d = reader.nextRow()) != null) { + final int cr = reader.getCurrentRowIndex(); + if(cr != r) { + map.increment(emptyRow, cr - r); + r = cr; + } + map.increment(d); + r++; + } + if(r < nRow) + map.increment(emptyRow, nRow - r - 1); + + return this; + } + + + @Override + protected ICLAScheme updateVT(MatrixBlock data, IColIndex columns) { + final int nRow = data.getNumColumns(); + final ReaderColumnSelection reader = ReaderColumnSelection.createReader(// + data, columns, true, 0, nRow); + return update(data, reader, nRow, columns); + } + + @Override + protected Pair tryUpdateAndEncodeT(MatrixBlock data, IColIndex columns) { + final int nRow = data.getNumColumns(); + final ReaderColumnSelection reader = ReaderColumnSelection.createReader(// + data, columns, true, 0, nRow); + return tryUpdateAndEncode(data, reader, nRow, columns); + } + + private Pair tryUpdateAndEncode(MatrixBlock data, ReaderColumnSelection reader, int nRow, + IColIndex columns) { + final AMapToData d = MapToFactory.create(nRow, map.size()); + int max = d.getUpperBoundValue(); + + DblArray cellVals; + ACount emptyIdx = map.getC(emptyRow); + if(emptyIdx == null) { + while((cellVals = reader.nextRow()) != null) { + final int row = reader.getCurrentRowIndex(); + final int id = map.increment(cellVals); + if(id > max) + throw new DMLCompressionException("Failed update and encode with " + max + " possible values"); + d.set(row, id); + } + } + else { + int r = 0; + while((cellVals = reader.nextRow()) != null) { + final int row = reader.getCurrentRowIndex(); + if(row != r) { + map.increment(emptyRow, row - r); + while(r < row) + d.set(r++, emptyIdx.id); + } + final int id = map.increment(cellVals); + if(id > max) + throw new DMLCompressionException( + "Failed update and encode with " + max + " possible values" + map + " " + map.size()); + d.set(row, id); + r++; + } + if(r < nRow) + + map.increment(emptyRow, nRow - r); + while(r < nRow) + d.set(r++, emptyIdx.id); + } + if(lastDict == null || lastDict.getNumberOfValues(columns.size()) != map.size()) + lastDict = DictionaryFactory.create(map, columns.size(), false, data.getSparsity()); + + AColGroup g = ColGroupDDCLZW.create(columns, lastDict, d, null); + ICLAScheme s = this; + return new Pair<>(s, g); + } + @Override + public ACLAScheme clone() { + return new DDCLZWSchemeMC(cols, map.clone()); + } + + @Override + protected final Object getMap() { + return map; + } +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeSC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeSC.java new file mode 100644 index 00000000000..ea0bf681810 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeSC.java @@ -0,0 +1,348 @@ +package org.apache.sysds.runtime.compress.colgroup.scheme; + +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC; +import org.apache.sysds.runtime.compress.colgroup.ColGroupDDCLZW; +import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty; +import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.Pair; + +public class DDCLZWSchemeSC extends DDCLZWScheme { + + // TODO: Dies ist eine Vorläufige Version - Code hauptsächlich wie in DDCSchemeSC + // Prüfen, ob abstrakte Vorgeschaltete Klasse möglich ist oder speichern des DDCSchemeSC als Attribut + + final private DoubleCountHashMap map; + + private DDCLZWSchemeSC(IColIndex cols, DoubleCountHashMap map) { + super(cols); + this.map = map; + } + + protected DDCLZWSchemeSC(ColGroupDDCLZW g) { + super(g.getColIndices()); + this.lastDict = g.getDictionary(); + int unique = lastDict.getNumberOfValues(1); + map = new DoubleCountHashMap(unique); + for(int i = 0; i < unique; i++) + map.increment(lastDict.getValue(i)); + } + + + protected DDCLZWSchemeSC(IColIndex cols) { + super(cols); + this.map = new DoubleCountHashMap(4); + } + + + + @Override + protected AColGroup encodeV(MatrixBlock data, IColIndex columns) { + if(data.isEmpty()) + return new ColGroupEmpty(columns); + final int nRow = data.getNumRows(); + + final AMapToData d = MapToFactory.create(nRow, map.size()); + + encode(data, d, cols.get(0)); + if(lastDict == null || lastDict.getNumberOfValues(columns.size()) != map.size()) + lastDict = DictionaryFactory.create(map); + + return ColGroupDDCLZW.create(columns, lastDict, d, null); + } + private void encodeSparse(MatrixBlock data, AMapToData d, int col) { + final int nRow = data.getNumRows(); + final SparseBlock sb = data.getSparseBlock(); + for(int i = 0; i < nRow; i++) + d.set(i, map.getId(sb.get(i, col))); + + } + private void encode(MatrixBlock data, AMapToData d, int col) { + if(data.isInSparseFormat()) + encodeSparse(data, d, col); + else if(data.getDenseBlock().isContiguous()) + encodeDense(data, d, col); + else + encodeGeneric(data, d, col); + } + + private void encodeDense(final MatrixBlock data, final AMapToData d, final int col) { + final int nRow = data.getNumRows(); + final double[] vals = data.getDenseBlockValues(); + final int nCol = data.getNumColumns(); + final int max = nRow * nCol; // guaranteed lower than intmax. + for(int i = 0, off = col; off < max; i++, off += nCol) + d.set(i, map.getId(vals[off])); + } + + private void encodeGeneric(MatrixBlock data, AMapToData d, int col) { + final int nRow = data.getNumRows(); + final DenseBlock db = data.getDenseBlock(); + for(int i = 0; i < nRow; i++) { + final double[] c = db.values(i); + final int off = db.pos(i) + col; + d.set(i, map.getId(c[off])); + } + } + + @Override + protected AColGroup encodeVT(MatrixBlock data, IColIndex columns) { + if(data.isEmpty()) + return new ColGroupEmpty(columns); + final int nRow = data.getNumColumns(); + + final AMapToData d = MapToFactory.create(nRow, map.size()); + + encodeT(data, d, cols.get(0)); + if(lastDict == null || lastDict.getNumberOfValues(columns.size()) != map.size()) + lastDict = DictionaryFactory.create(map); + + return ColGroupDDCLZW.create(columns, lastDict, d, null); + } + + private void encodeT(MatrixBlock data, AMapToData d, int col) { + if(data.isInSparseFormat()) + encodeSparseT(data, d, col); + else + encodeDenseT(data, d, col); + } + + private void encodeSparseT(MatrixBlock data, AMapToData d, int col) { + final SparseBlock sb = data.getSparseBlock(); + d.fill(map.getId(0.0)); + if(!sb.isEmpty(col)) { + int apos = sb.pos(col); + final int[] aix = sb.indexes(col); + final int alen = sb.size(col) + apos; + final double[] aval = sb.values(col); + while(apos < alen) { + final double v = aval[apos]; + final int idx = aix[apos++]; + d.set(idx, map.getId(v)); + } + } + } + + private void encodeDenseT(MatrixBlock data, AMapToData d, int col) { + final DenseBlock db = data.getDenseBlock(); + final double[] vals = db.values(col); + final int nCol = data.getNumColumns(); + for(int i = 0, off = db.pos(col); i < nCol; i++, off++) + d.set(i, map.getId(vals[off])); + } + + @Override + protected ICLAScheme updateV(MatrixBlock data, IColIndex columns) { + if(data.isEmpty()) + map.increment(0.0, data.getNumRows()); + else if(data.isInSparseFormat()) + updateSparse(data, columns.get(0)); + else if(data.getDenseBlock().isContiguous()) + updateDense(data, columns.get(0)); + else + updateGeneric(data, columns.get(0)); + + return this; + } + + private ICLAScheme updateSparse(MatrixBlock data, int col) { + final int nRow = data.getNumRows(); + final SparseBlock sb = data.getSparseBlock(); + for(int i = 0; i < nRow; i++) + map.increment(sb.get(i, col)); + return this; + } + + private ICLAScheme updateDense(MatrixBlock data, int col) { + + final int nRow = data.getNumRows(); + final double[] vals = data.getDenseBlockValues(); + final int nCol = data.getNumColumns(); + final int max = nRow * nCol; // guaranteed lower than intmax. + for(int off = col; off < max; off += nCol) + map.increment(vals[off]); + return this; + } + + private ICLAScheme updateGeneric(MatrixBlock data, int col) { + final int nRow = data.getNumRows(); + final DenseBlock db = data.getDenseBlock(); + for(int i = 0; i < nRow; i++) { + final double[] c = db.values(i); + final int off = db.pos(i) + col; + map.increment(c[off]); + } + return this; + } + + @Override + protected ICLAScheme updateVT(MatrixBlock data, IColIndex columns) { + if(data.isEmpty()) + map.increment(0.0, data.getNumColumns()); + else if(data.isInSparseFormat()) + updateSparseT(data, columns.get(0)); + else // dense and generic can be handled together if transposed + updateDenseT(data, columns.get(0)); + + return this; + } + + private void updateDenseT(MatrixBlock data, int col) { + final DenseBlock db = data.getDenseBlock(); + final double[] vals = db.values(col); + final int nCol = data.getNumColumns(); + for(int i = 0, off = db.pos(col); i < nCol; i++, off++) + map.increment(vals[off]); + } + + private void updateSparseT(MatrixBlock data, int col) { + final SparseBlock sb = data.getSparseBlock(); + + if(!sb.isEmpty(col)) { + int apos = sb.pos(col); + final int alen = sb.size(col) + apos; + final double[] aval = sb.values(col); + map.increment(0.0, alen - apos); + while(apos < alen) + map.increment(aval[apos++]); + } + else + map.increment(0.0, data.getNumColumns()); + + } + + @Override + public DDCLZWSchemeSC clone() { + return new DDCLZWSchemeSC(cols, map.clone()); + } + + @Override + protected final Object getMap() { + return map; + } + + // TODO: zwingend erforderlich? + @Override + protected Pair tryUpdateAndEncode(MatrixBlock data, IColIndex columns) { + if(data.isEmpty()) { + map.increment(0.0, data.getNumRows()); + return new Pair<>(this, new ColGroupEmpty(columns)); + } + final int nRow = data.getNumRows(); + + final AMapToData d = MapToFactory.create(nRow, map.size()); + + encodeAndUpdate(data, d, cols.get(0)); + if(lastDict == null || lastDict.getNumberOfValues(columns.size()) != map.size()) + lastDict = DictionaryFactory.create(map); + + return new Pair<>(this, ColGroupDDCLZW.create(columns, lastDict, d, null)); + } + + private void encodeAndUpdate(MatrixBlock data, AMapToData d, int col) { + final int max = d.getUpperBoundValue(); + if(data.isInSparseFormat()) + encodeAndUpdateSparse(data, d, col, max); + else if(data.getDenseBlock().isContiguous()) + encodeAndUpdateDense(data, d, col, max); + else + encodeAndUpdateGeneric(data, d, col, max); + } + + private void encodeAndUpdateSparse(MatrixBlock data, AMapToData d, int col, int max) { + final int nRow = data.getNumRows(); + final SparseBlock sb = data.getSparseBlock(); + + for(int i = 0; i < nRow; i++) { + int id = map.increment(sb.get(i, col)); + if(id > max) + throw new DMLCompressionException("Failed update and encode with " + max + " possible values"); + d.set(i, id); + } + + } + + private void encodeAndUpdateDense(final MatrixBlock data, final AMapToData d, final int col, int max) { + final int nRow = data.getNumRows(); + final double[] vals = data.getDenseBlockValues(); + final int nCol = data.getNumColumns(); + final int end = nRow * nCol; // guaranteed lower than intend. + for(int i = 0, off = col; off < end; i++, off += nCol) { + int id = map.increment(vals[off]); + if(id > max) + throw new DMLCompressionException("Failed update and encode with " + max + " possible values"); + d.set(i, id); + } + } + + private void encodeAndUpdateGeneric(MatrixBlock data, AMapToData d, int col, int max) { + final int nRow = data.getNumRows(); + final DenseBlock db = data.getDenseBlock(); + for(int i = 0; i < nRow; i++) { + final double[] c = db.values(i); + final int off = db.pos(i) + col; + int id = map.increment(c[off]); + if(id > max) + throw new DMLCompressionException("Failed update and encode with " + max + " possible values"); + d.set(i, id); + } + } + + @Override + protected Pair tryUpdateAndEncodeT(MatrixBlock data, IColIndex columns) { + if(data.isEmpty()) + return new Pair<>(this, new ColGroupEmpty(columns)); + final int nRow = data.getNumColumns(); + + final AMapToData d = MapToFactory.create(nRow, map.size()); + + encodeAndUpdateT(data, d, cols.get(0)); + if(lastDict == null || lastDict.getNumberOfValues(columns.size()) != map.size()) + lastDict = DictionaryFactory.create(map); + + return new Pair<>(this, ColGroupDDCLZW.create(columns, lastDict, d, null)); + } + + private void encodeAndUpdateT(MatrixBlock data, AMapToData d, int col) { + if(data.isInSparseFormat()) + encodeAndUpdateSparseT(data, d, col); + else + encodeAndUpdateDenseT(data, d, col); + } + + private void encodeAndUpdateSparseT(MatrixBlock data, AMapToData d, int col) { + final SparseBlock sb = data.getSparseBlock(); + if(!sb.isEmpty(col)) { + int apos = sb.pos(col); + final int[] aix = sb.indexes(col); + final int alen = sb.size(col) + apos; + d.fill(map.increment(0.0, data.getNumColumns() - alen - apos)); + final double[] aval = sb.values(col); + while(apos < alen) { + final double v = aval[apos]; + final int idx = aix[apos++]; + d.set(idx, map.increment(v)); + } + } + else + d.fill(map.increment(0.0, data.getNumColumns())); + } + + private void encodeAndUpdateDenseT(MatrixBlock data, AMapToData d, int col) { + final DenseBlock db = data.getDenseBlock(); + final double[] vals = db.values(col); + final int nCol = data.getNumColumns(); + for(int i = 0, off = db.pos(col); i < nCol; i++, off++) + d.set(i, map.increment(vals[off])); + } + + +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java index df353931c0b..0b60f2fb092 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java @@ -112,7 +112,7 @@ public CompressedSizeInfoColGroup(IColIndex columns, EstimationFactors facts, /** * Create empty or const. - * + * * @param columns columns * @param nRows number of rows * @param ct The type intended either Empty or Const @@ -170,7 +170,7 @@ public double getMinSize() { /** * Note cardinality is the same as number of distinct values. - * + * * @return cardinality or number of distinct values. */ public int getNumVals() { @@ -179,7 +179,7 @@ public int getNumVals() { /** * Number of offsets, or number of non zero values. - * + * * @return Number of non zeros or number of values. */ public int getNumOffs() { @@ -264,6 +264,11 @@ private static double getCompressionSize(IColIndex cols, CompressionType ct, Est nv = fact.numVals + (fact.numOffs < fact.numRows ? 1 : 0); return ColGroupSizes.estimateInMemorySizeDDC(numCols, contiguousColumns, nv, fact.numRows, fact.tupleSparsity, fact.lossy); + case DDCLZW: + // DDCLZW uses a DDC-like structure (dictionary + mapping). We estimate it as DDC for now. + nv = fact.numVals + (fact.numOffs < fact.numRows ? 1 : 0); + return ColGroupSizes.estimateInMemorySizeDDC(numCols, contiguousColumns, nv, fact.numRows, + fact.tupleSparsity, fact.lossy); case RLE: return ColGroupSizes.estimateInMemorySizeRLE(numCols, contiguousColumns, fact.numVals, fact.numRuns, fact.numRows, fact.tupleSparsity, fact.lossy); diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCLZWTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCLZWTest.java new file mode 100644 index 00000000000..27359e84ec6 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCLZWTest.java @@ -0,0 +1,1011 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.compress.colgroup; + +import java.io.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.*; +import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.compress.estim.ComEstExact; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.functionobjects.*; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.apache.sysds.runtime.util.DataConverter; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertNotNull; + +public class ColGroupDDCLZWTest { + protected static final Log LOG = LogFactory.getLog(ColGroupDDCLZWTest.class.getName()); + + @Test + public void testConvertToDDCLZWTemporary() { + // TODO: neue Methode zum Vergleich + IColIndex colIndexes = ColIndexFactory.create(2); + double[] dictValues = new double[] {10.0, 20.0, 11.0, 21.0, 12.0, 22.0}; + Dictionary dict = Dictionary.create(dictValues); + int[] src = new int[] { + // repeating base pattern + 0, 0, 2, 0, 2, 1, 0, 2, 1, 0, 2, 2, 0, 2, 1, 0, 2, 1, 0, 2, + // variation / shifted pattern + 1, 0, 1, 2, 0, 1, 2, 0, 1, 1, 0, 1, 2, 0, 1, 2, 0, 1, + // longer runs (good for phrase growth) + 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, + // mixed noise + 2, 1, 0, 2, 1, 0, 2, 1, 0, 0, 2, 1, 0, 2, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, + // repeating tail (tests dictionary reuse) + 2, 0, 2, 1, 0, 2, 1, 0, 2, 2, 0, 2, 1, 0, 2, 1, 0, 2, 0, 0, 0, 0, 0, 1}; + final int nRows = src.length; + final int nUnique = 3; + AMapToData data = MapToFactory.create(nRows, nUnique); + for(int i = 0; i < nRows; i++) + data.set(i, src[i]); + ColGroupDDC ddc = (ColGroupDDC) ColGroupDDC.create(colIndexes, dict, data, null); + AColGroup result = ddc.convertToDDCLZW(); + assertNotNull(result); + assertTrue(result instanceof ColGroupDDCLZW); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) result; + AColGroup ddclzwDecompressed = ddclzw.convertToDDC(); + assertNotNull(ddclzwDecompressed); + assertTrue(ddclzwDecompressed instanceof ColGroupDDC); + ColGroupDDC ddc2 = (ColGroupDDC) ddclzwDecompressed; + AMapToData d1 = ddc.getMapToData(); + AMapToData d2 = ddc2.getMapToData(); + assertEquals(d1.size(), d2.size()); + assertEquals(d1.getUnique(), d2.getUnique()); + for(int i = 0; i < d1.size(); i++) + assertEquals("mapping mismatch at row " + i, d1.getIndex(i), d2.getIndex(i)); + assertEquals(ddc.getColIndices(), ddc2.getColIndices()); + // Testen der Teildekompression: + // Index entspricht der Anzahl der Zeichen, die dekodiert werden sollen (0 bis Index-1) + int index = 10; + ColGroupDDC ddcIndex = (ColGroupDDC) ddclzw.convertToDDC(index); + AMapToData d3 = ddcIndex.getMapToData(); + assertEquals(index, d3.size()); + assertEquals(ddc.getColIndices(), ddcIndex.getColIndices()); + for(int i = 0; i < index; i++) { + assertEquals(d1.getIndex(i), d3.getIndex(i)); + } + // Testen von SliceRows + int low = 3; + int high = 10; + AColGroup slice = ddclzw.sliceRows(low, high); + if(slice instanceof ColGroupDDCLZW ddclzwslice) { + ColGroupDDC ddcSlice = (ColGroupDDC) ddclzwslice.convertToDDC(); + ColGroupDDC ddcSlice2 = (ColGroupDDC) ddc.sliceRows(low, high); + AMapToData d4 = ddcSlice.getMapToData(); + AMapToData d5 = ddcSlice2.getMapToData(); + assertEquals(d5.size(), d4.size()); + assertEquals(d5.getUnique(), d4.getUnique()); + for(int i = 0; i < d4.size(); i++) + assertEquals("mapping mismatch at row " + i, d4.getIndex(i), d5.getIndex(i)); + } + + // Testen von compute RowSums + double[] sumsddc = new double[high - low]; + //ddc.computeColSums(sumsddc, low, high, ); + + } + + /** + * Creates a sample DDC group for unit tests + */ + private ColGroupDDC createTestDDC(int[] mapping, int nCols, int nUnique) { + IColIndex colIndexes = ColIndexFactory.create(nCols); + + double[] dictValues = new double[nUnique * nCols]; + for(int i = 0; i < nUnique; i++) { + for(int c = 0; c < nCols; c++) { + dictValues[i * nCols + c] = (i + 1) * 10.0 + c; + } + } + Dictionary dict = Dictionary.create(dictValues); + + AMapToData data = MapToFactory.create(mapping.length, nUnique); + for(int i = 0; i < mapping.length; i++) { + data.set(i, mapping[i]); + } + + AColGroup result = ColGroupDDC.create(colIndexes, dict, data, null); + assertTrue("The result is of class '" + result.getClass() + "'", result instanceof ColGroupDDC); + return (ColGroupDDC) result; + } + + /** + * Asserts that two maps are identical + */ + private void assertMapsEqual(AMapToData expected, AMapToData actual) { + assertEquals("Size mismatch", expected.size(), actual.size()); + assertEquals("Unique count mismatch", expected.getUnique(), actual.getUnique()); + + for(int i = 0; i < expected.size(); i++) { + assertEquals("Mapping mismatch at row " + i, expected.getIndex(i), actual.getIndex(i)); + } + } + + /** + * Applies DDCLZW compression/decompression and asserts that it's left unchanged + */ + private void assertLosslessCompression(ColGroupDDC original) { + // Compress + AColGroup compressed = original.convertToDDCLZW(); + assertNotNull("Compression returned null", compressed); + assertTrue(compressed instanceof ColGroupDDCLZW); + + // Decompress + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) compressed; + AColGroup decompressed = ddclzw.convertToDDC(); + assertNotNull("Decompression returned null", decompressed); + assertTrue(decompressed instanceof ColGroupDDC); + + // Assert + ColGroupDDC result = (ColGroupDDC) decompressed; + + AMapToData d1 = original.getMapToData(); + AMapToData d2 = result.getMapToData(); + + assertMapsEqual(d1, d2); + assertEquals("Column indices mismatch", original.getColIndices(), result.getColIndices()); + + assertEquals("Size mismatch", d1.size(), d2.size()); + assertEquals("Unique count mismatch", d1.getUnique(), d2.getUnique()); + + for(int i = 0; i < d1.size(); i++) { + assertEquals("Mapping mismatch at row " + i, d1.getIndex(i), d2.getIndex(i)); + } + } + + /** + * Asserts "partial decompression" up to the `index` + */ + private void assertPartialDecompression(ColGroupDDCLZW ddclzw, AMapToData original, int index) { + ColGroupDDC partial = (ColGroupDDC) ddclzw.convertToDDC(index); + AMapToData partialMap = partial.getMapToData(); + + assertEquals("Partial size incorrect", index, partialMap.size()); + + for(int i = 0; i < index; i++) { + assertEquals("Partial map mismatch at " + i, original.getIndex(i), partialMap.getIndex(i)); + } + } + + /** + * Asserts if the slice operation matches DDC's slice + */ + private void assertSlice(ColGroupDDCLZW ddclzw, ColGroupDDC originalDDC, int low, int high) { + AColGroup sliced = ddclzw.sliceRows(low, high); + assertTrue(sliced instanceof ColGroupDDCLZW); + + ColGroupDDCLZW ddclzwSlice = (ColGroupDDCLZW) sliced; + ColGroupDDC ddcSlice = (ColGroupDDC) ddclzwSlice.convertToDDC(); + ColGroupDDC expectedSlice = (ColGroupDDC) originalDDC.sliceRows(low, high); + + assertMapsEqual(expectedSlice.getMapToData(), ddcSlice.getMapToData()); + } + + @Test + public void testConvertToDDCLZWBasicNew() { + int[] src = new int[] {0, 0, 2, 0, 2, 1, 0, 2, 1, 0, 2, 2, 0, 2, 1, 0, 2, 1, 0, 2, 1, 0, 1, 2, 0, 1, 2, 0, 1, 1, + 0, 1, 2, 0, 1, 2, 0, 1, 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 2, 1, 0, 2, 1, 0, 2, 1, 0, 0, 2, 1, 0, + 2, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, 2, 0, 2, 1, 0, 2, 1, 0, 2, 2, 0, 2, 1, 0, 2, 1, 0, 2, 0, 0, 0, 0, 0, 1}; + + // Create DDC with 2 columns, 3 unique values + ColGroupDDC ddc = createTestDDC(src, 2, 3); + + assertLosslessCompression(ddc); + + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + assertPartialDecompression(ddclzw, ddc.getMapToData(), 101); + assertSlice(ddclzw, ddc, 3, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testPartialDecompressionOutOfBounds() { + int[] src = new int[] {1, 3, 4, 4, 3, 2, 3, 4, 1, 4, 4, 4, 4, 1, 4, 1, 4, 1, 4, 0, 1, 3, 4, 4, 3, 2, 3, 4, 1, 4, + 4, 4, 4, 1, 4, 1, 4, 1, 4, 0,}; + + ColGroupDDC ddc = createTestDDC(src, 3, 5); + + assertLosslessCompression(ddc); + + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + assertPartialDecompression(ddclzw, ddc.getMapToData(), 40); + assertPartialDecompression(ddclzw, ddc.getMapToData(), 41); // Should throw out of bounds + } + + @Test + public void testLengthTwo() { + int[] src = new int[] {0, 1}; + + ColGroupDDC ddc = createTestDDC(src, 1, 2); + + assertLosslessCompression(ddc); + + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + assertPartialDecompression(ddclzw, ddc.getMapToData(), 0); + assertPartialDecompression(ddclzw, ddc.getMapToData(), 2); + } + + @Test + public void testConvertToDDCLZWBasic() { + // TODO: new methods for comparison + IColIndex colIndexes = ColIndexFactory.create(2); + double[] dictValues = new double[] {10.0, 20.0, 11.0, 21.0, 12.0, 22.0}; + Dictionary dict = Dictionary.create(dictValues); + + int[] src = new int[] { + // repeating base pattern + 0, 0, 2, 0, 2, 1, 0, 2, 1, 0, 2, 2, 0, 2, 1, 0, 2, 1, 0, 2, + // variation / shifted pattern + 1, 0, 1, 2, 0, 1, 2, 0, 1, 1, 0, 1, 2, 0, 1, 2, 0, 1, + // longer runs (good for phrase growth) + 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, + // mixed noise + 2, 1, 0, 2, 1, 0, 2, 1, 0, 0, 2, 1, 0, 2, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, + // repeating tail (tests dictionary reuse) + 2, 0, 2, 1, 0, 2, 1, 0, 2, 2, 0, 2, 1, 0, 2, 1, 0, 2, 0, 0, 0, 0, 0, 1}; + + final int nRows = src.length; + final int nUnique = 3; + AMapToData data = MapToFactory.create(nRows, nUnique); + for(int i = 0; i < nRows; i++) + data.set(i, src[i]); + + ColGroupDDC ddc = (ColGroupDDC) ColGroupDDC.create(colIndexes, dict, data, null); + AColGroup result = ddc.convertToDDCLZW(); + + assertNotNull(result); + assertTrue(result instanceof ColGroupDDCLZW); + + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) result; + AColGroup ddclzwDecompressed = ddclzw.convertToDDC(); + + assertNotNull(ddclzwDecompressed); + assertTrue(ddclzwDecompressed instanceof ColGroupDDC); + + ColGroupDDC ddc2 = (ColGroupDDC) ddclzwDecompressed; + + AMapToData d1 = ddc.getMapToData(); + AMapToData d2 = ddc2.getMapToData(); + + assertEquals(d1.size(), d2.size()); + assertEquals(d1.getUnique(), d2.getUnique()); + for(int i = 0; i < d1.size(); i++) + assertEquals("mapping mismatch at row " + i, d1.getIndex(i), d2.getIndex(i)); + + assertEquals(ddc.getColIndices(), ddc2.getColIndices()); + + // Test partial decompression: + // `index` is the amount of numbers to decode + int index = 10; + ColGroupDDC ddcIndex = (ColGroupDDC) ddclzw.convertToDDC(index); + + AMapToData d3 = ddcIndex.getMapToData(); + assertEquals(index, d3.size()); + assertEquals(ddc.getColIndices(), ddcIndex.getColIndices()); + + for(int i = 0; i < index; i++) { + assertEquals(d1.getIndex(i), d3.getIndex(i)); + } + + // Test SliceRows + int low = 3; + int high = 10; + AColGroup slice = ddclzw.sliceRows(low, high); + if(slice instanceof ColGroupDDCLZW ddclzwslice) { + ColGroupDDC ddcSlice = (ColGroupDDC) ddclzwslice.convertToDDC(); + ColGroupDDC ddcSlice2 = (ColGroupDDC) ddc.sliceRows(low, high); + + AMapToData d4 = ddcSlice.getMapToData(); + AMapToData d5 = ddcSlice2.getMapToData(); + + assertEquals(d5.size(), d4.size()); + assertEquals(d5.getUnique(), d4.getUnique()); + + for(int i = 0; i < d4.size(); i++) + assertEquals("mapping mismatch at row " + i, d4.getIndex(i), d5.getIndex(i)); + } + + // Compute RowSums + // double[] sumsddc = new double[high - low]; + //ddc.computeColSums(sumsddc, low, high, ); + } + + @Test + public void testGetIdxFirstElement() { + int[] src = new int[] {0, 1, 2, 1, 0}; + ColGroupDDC ddc = createTestDDC(src, 2, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + double expected = ddc.getIdx(0, 0); + assertEquals(expected, ddclzw.getIdx(0, 0), 0.0001); + } + + @Test + public void testGetIdxLastElement() { + int[] src = new int[] {0, 1, 2, 1, 0}; + ColGroupDDC ddc = createTestDDC(src, 2, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + int lastRow = src.length - 1; + double expected = ddc.getIdx(lastRow, 1); + assertEquals(expected, ddclzw.getIdx(lastRow, 1), 0.0001); + } + + @Test + public void testGetIdxAllElements() { + int[] src = new int[] {0, 1, 2, 1, 0, 2, 1}; + ColGroupDDC ddc = createTestDDC(src, 3, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + for(int row = 0; row < src.length; row++) { + for(int col = 0; col < 2; col++) { + double expected = ddc.getIdx(row, col); + double actual = ddclzw.getIdx(row, col); + assertEquals("Mismatch at [" + row + "," + col + "]", expected, actual, 0.0001); + } + } + } + + @Test + public void testGetIdxWithRepeatingPattern() { + int[] src = new int[] {0, 1, 0, 1, 0, 1, 0, 1}; + ColGroupDDC ddc = createTestDDC(src, 1, 2); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + double expected = ddc.getIdx(3, 0); + assertEquals(expected, ddclzw.getIdx(3, 0), 0.0001); + } + + @Test(expected = DMLRuntimeException.class) + public void testGetIdxRowOutOfBoundsNegative() { + int[] src = new int[] {0, 1, 2}; + ColGroupDDC ddc = createTestDDC(src, 1, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + ddclzw.getIdx(-1, 0); + } + + @Test(expected = DMLRuntimeException.class) + public void testGetIdxRowOutOfBounds() { + int[] src = new int[] {0, 1, 2}; + ColGroupDDC ddc = createTestDDC(src, 1, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + ddclzw.getIdx(10, 0); + } + + @Test(expected = DMLRuntimeException.class) + public void testGetIdxColOutOfBoundsNegative() { + int[] src = new int[] {0, 1, 2}; + ColGroupDDC ddc = createTestDDC(src, 3, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + ddclzw.getIdx(0, -1); + } + + @Test(expected = DMLRuntimeException.class) + public void testGetIdxColOutOfBounds() { + int[] src = new int[] {0, 1, 2}; + ColGroupDDC ddc = createTestDDC(src, 3, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + ddclzw.getIdx(0, 10); + } + + @Test + public void testSliceRowsSingleRow() { + int[] src = new int[] {0, 1, 2, 1, 0, 2, 1}; + ColGroupDDC ddc = createTestDDC(src, 1, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + assertSlice(ddclzw, ddc, 3, 4); + } + + @Test + public void testSliceRowsMiddleRange() { + int[] src = new int[] {0, 1, 2, 0, 1, 2, 0, 1, 2, 0}; + ColGroupDDC ddc = createTestDDC(src, 2, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + assertSlice(ddclzw, ddc, 2, 7); + } + + @Test + public void testSliceRowsEntireRange() { + int[] src = new int[] {0, 1, 0, 1, 2}; + ColGroupDDC ddc = createTestDDC(src, 1, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + assertSlice(ddclzw, ddc, 0, src.length); + } + + @Test + public void testSliceRowsBeginning() { + int[] src = new int[] {0, 1, 2, 1, 0, 2}; + ColGroupDDC ddc = createTestDDC(src, 1, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + assertSlice(ddclzw, ddc, 0, 3); + } + + @Test + public void testSliceRowsEnd() { + int[] src = new int[] {0, 1, 2, 1, 0, 2}; + ColGroupDDC ddc = createTestDDC(src, 2, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + assertSlice(ddclzw, ddc, 3, 6); + } + + @Test + public void testSliceRowsWithLongRuns() { + int[] src = new int[30]; + Arrays.fill(src, 0, 10, 0); + Arrays.fill(src, 10, 20, 1); + Arrays.fill(src, 20, 30, 2); + + ColGroupDDC ddc = createTestDDC(src, 1, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + assertSlice(ddclzw, ddc, 5, 25); + } + + @Test + public void testCreateWithNullDictionary() { + IColIndex colIndexes = ColIndexFactory.create(1); + int[] src = new int[] {0, 1, 2}; + AMapToData data = MapToFactory.create(3, 3); + for(int i = 0; i < 3; i++) { + data.set(i, src[i]); + } + + AColGroup result = ColGroupDDCLZW.create(colIndexes, null, data, null); + assertTrue("Should create ColGroupEmpty", result instanceof ColGroupEmpty); + } + + @Test + public void testCreateWithSingleUnique() { + IColIndex colIndexes = ColIndexFactory.create(1); + double[] dictValues = new double[] {42.0}; + Dictionary dict = Dictionary.create(dictValues); + + int[] src = new int[] {0, 0, 0, 0}; + AMapToData data = MapToFactory.create(4, 1); + for(int i = 0; i < 4; i++) { + data.set(i, 0); + } + + AColGroup result = ColGroupDDCLZW.create(colIndexes, dict, data, null); + assertTrue("Should create ColGroupConst", result instanceof ColGroupConst); + } + + @Test + public void testCreateValidDDCLZW() { + int[] src = new int[] {0, 1, 0, 1, 2}; + ColGroupDDC ddc = createTestDDC(src, 1, 3); + + AColGroup result = ddc.convertToDDCLZW(); + assertTrue("Should create ColGroupDDCLZW", result instanceof ColGroupDDCLZW); + } + + @Test + public void testCreateWithMultipleColumns() { + int[] src = new int[] {0, 1, 2, 1, 0}; + ColGroupDDC ddc = createTestDDC(src, 3, 3); + + AColGroup result = ddc.convertToDDCLZW(); + assertTrue("Should create ColGroupDDCLZW", result instanceof ColGroupDDCLZW); + } + + @Test + public void testSameNumber() { + int[] src = new int[20]; + Arrays.fill(src, 2); + + ColGroupDDC ddc = createTestDDC(src, 1, 3); + assertLosslessCompression(ddc); + } + + @Test + public void testAlternatingNumbers() { + int[] src = new int[30]; + for(int i = 0; i < src.length; i++) { + src[i] = i % 2; + } + + ColGroupDDC ddc = createTestDDC(src, 1, 2); + assertLosslessCompression(ddc); + } + + @Test + public void testLongPatterns() { + int[] src = new int[50]; + Arrays.fill(src, 0, 15, 0); + Arrays.fill(src, 15, 30, 1); + Arrays.fill(src, 30, 45, 2); + Arrays.fill(src, 45, 50, 0); + + ColGroupDDC ddc = createTestDDC(src, 1, 3); + assertLosslessCompression(ddc); + } + + @Test + public void testSameIndexStructure() { + int[] src = new int[] {0, 1, 0, 1}; + ColGroupDDC ddc = createTestDDC(src, 1, 2); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + assertTrue("Same object should have same structure", ddclzw.sameIndexStructure(ddclzw)); + } + + @Test + public void testSameIndexStructureDifferent() { + int[] src = new int[] {0, 1, 0, 1}; + + ColGroupDDC ddc1 = createTestDDC(src, 1, 2); + ColGroupDDC ddc2 = createTestDDC(src, 1, 2); + + ColGroupDDCLZW ddclzw1 = (ColGroupDDCLZW) ddc1.convertToDDCLZW(); + ColGroupDDCLZW ddclzw2 = (ColGroupDDCLZW) ddc2.convertToDDCLZW(); + + // Different objects have different _dataLZW arrays + assertFalse("Different objects should have different structure", ddclzw1.sameIndexStructure(ddclzw2)); + } + + @Test + public void testSameIndexStructureDdcLzw() { + int[] src = new int[] {0, 1, 2, 1, 0}; + ColGroupDDC ddc = createTestDDC(src, 1, 3); + ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW(); + + assertFalse("Different types should not have same structure", ddclzw.sameIndexStructure(ddc)); + } + + @Test + public void testRepetitiveData() { + int[] src = new int[] {0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, + 1}; + + ColGroupDDC ddc = createTestDDC(src, 1, 2); + assertLosslessCompression(ddc); + } + + @Test + public void testNoRepetition() { + int[] src = new int[20]; + for(int i = 0; i < src.length; i++) { + src[i] = i; + } + + ColGroupDDC ddc = createTestDDC(src, 1, 20); + assertLosslessCompression(ddc); + } + + public void testDecompressToDenseBlock(double[][] data, boolean isTransposed) { + if(isTransposed) { + throw new NotImplementedException("Delta encoding for transposed matrices not yet implemented"); + } + + MatrixBlock mbt = DataConverter.convertToMatrixBlock(data); + + final int numCols = mbt.getNumColumns(); + final int numRows = mbt.getNumRows(); + IColIndex colIndexes = ColIndexFactory.create(numCols); + + try { + CompressionSettingsBuilder csb = new CompressionSettingsBuilder().setSamplingRatio(1.0) + .setValidCompressions(EnumSet.of(AColGroup.CompressionType.DDCLZW)).setTransposeInput("false"); + CompressionSettings cs = csb.create(); + + final CompressedSizeInfoColGroup cgi = new ComEstExact(mbt, cs).getColGroupInfo(colIndexes); + CompressedSizeInfo csi = new CompressedSizeInfo(cgi); + AColGroup cg = ColGroupFactory.compressColGroups(mbt, csi, cs, 1).get(0); + + MatrixBlock ret = new MatrixBlock(numRows, numCols, false); + ret.allocateDenseBlock(); + cg.decompressToDenseBlock(ret.getDenseBlock(), 0, numRows); + + MatrixBlock expected = DataConverter.convertToMatrixBlock(data); + assertArrayEquals(expected.getDenseBlockValues(), ret.getDenseBlockValues(), 0.01); + + } + catch(NotImplementedException e) { + throw e; + } + catch(Exception e) { + e.printStackTrace(); + throw new DMLRuntimeException("Failed construction : " + this.getClass().getSimpleName(), e); + } + } + + @Test + public void testDecompressToDenseBlockSingleColumn() { + testDecompressToDenseBlock(new double[][] {{1, 2, 3, 4, 5}}, false); + } + + @Test(expected = NotImplementedException.class) + public void testDecompressToDenseBlockSingleColumnTransposed() { + testDecompressToDenseBlock(new double[][] {{1}, {2}, {3}, {4}, {5}}, true); + } + + @Test + public void testDecompressToDenseBlockTwoColumns() { + testDecompressToDenseBlock(new double[][] {{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}}, false); + } + + @Test(expected = NotImplementedException.class) + public void testDecompressToDenseBlockTwoColumnsTransposed() { + testDecompressToDenseBlock(new double[][] {{1, 2, 3, 4, 5}, {1, 1, 1, 1, 1}}, true); + } + + public void testDecompressToDenseBlockPartialRange(double[][] data, boolean isTransposed, int rl, int ru) { + if(isTransposed) { + throw new NotImplementedException("Delta encoding for transposed matrices not yet implemented"); + } + + MatrixBlock mbt = DataConverter.convertToMatrixBlock(data); + + final int numCols = mbt.getNumColumns(); + final int numRows = mbt.getNumRows(); + IColIndex colIndexes = ColIndexFactory.create(numCols); + + try { + CompressionSettingsBuilder csb = new CompressionSettingsBuilder().setSamplingRatio(1.0) + .setValidCompressions(EnumSet.of(AColGroup.CompressionType.DDCLZW)).setTransposeInput("false"); + CompressionSettings cs = csb.create(); + + final CompressedSizeInfoColGroup cgi = new ComEstExact(mbt, cs).getColGroupInfo(colIndexes); + CompressedSizeInfo csi = new CompressedSizeInfo(cgi); + AColGroup cg = ColGroupFactory.compressColGroups(mbt, csi, cs, 1).get(0); + + assertTrue("Column group should be DDCLZW, not Const", cg instanceof ColGroupDDCLZW); + + MatrixBlock ret = new MatrixBlock(numRows, numCols, false); + ret.allocateDenseBlock(); + cg.decompressToDenseBlock(ret.getDenseBlock(), rl, ru); + + MatrixBlock expected = DataConverter.convertToMatrixBlock(data); + for(int i = rl; i < ru; i++) { + for(int j = 0; j < numCols; j++) { + double expectedValue = expected.get(i, j); + double actualValue = ret.get(i, j); + assertArrayEquals(new double[] {expectedValue}, new double[] {actualValue}, 0.01); + } + } + + } + catch(NotImplementedException e) { + throw e; + } + catch(Exception e) { + e.printStackTrace(); + throw new DMLRuntimeException("Failed partial range decompression : " + this.getClass().getSimpleName(), e); + } + } + + @Test + public void testDecompressToDenseBlockPartialRangeSingleColumn() { + testDecompressToDenseBlockPartialRange(new double[][] {{1}, {2}, {3}, {4}, {5}}, false, 2, 5); + } + + @Test + public void testDecompressToDenseBlockPartialRangeTwoColumns() { + testDecompressToDenseBlockPartialRange(new double[][] {{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}}, false, 1, 4); + } + + @Test + public void testDecompressToDenseBlockPartialRangeFromMiddle() { + testDecompressToDenseBlockPartialRange(new double[][] {{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}, {6, 7}}, false, + 3, 6); + } + + @Test + public void testSerializationSingleColumn() throws IOException { + double[][] data = {{1}, {2}, {3}, {4}, {5}}; + MatrixBlock mbt = DataConverter.convertToMatrixBlock(data); + final int numCols = mbt.getNumColumns(); + final int numRows = mbt.getNumRows(); + IColIndex colIndexes = ColIndexFactory.create(numCols); + + CompressionSettingsBuilder csb = new CompressionSettingsBuilder().setSamplingRatio(1.0) + .setValidCompressions(EnumSet.of(AColGroup.CompressionType.DDCLZW)).setTransposeInput("false"); + CompressionSettings cs = csb.create(); + + final CompressedSizeInfoColGroup cgi = new ComEstExact(mbt, cs).getColGroupInfo(colIndexes); + CompressedSizeInfo csi = new CompressedSizeInfo(cgi); + AColGroup cg = ColGroupFactory.compressColGroups(mbt, csi, cs, 1).get(0); + + assertTrue("Original should be ColGroupDDCLZW", cg instanceof ColGroupDDCLZW); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + ColGroupIO.writeGroups(dos, Collections.singletonList(cg)); + assertEquals(cg.getExactSizeOnDisk() + 4, bos.size()); + + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInputStream dis = new DataInputStream(bis); + AColGroup deserialized = ColGroupIO.readGroups(dis, numRows).get(0); + + assertTrue("Deserialized should be ColGroupDDCLZW", deserialized instanceof ColGroupDDCLZW); + assertEquals("Compression type should match", cg.getCompType(), deserialized.getCompType()); + assertEquals("Exact size on disk should match", cg.getExactSizeOnDisk(), deserialized.getExactSizeOnDisk()); + + MatrixBlock originalDecompressed = new MatrixBlock(numRows, numCols, false); + originalDecompressed.allocateDenseBlock(); + cg.decompressToDenseBlock(originalDecompressed.getDenseBlock(), 0, numRows); + + MatrixBlock deserializedDecompressed = new MatrixBlock(numRows, numCols, false); + deserializedDecompressed.allocateDenseBlock(); + deserialized.decompressToDenseBlock(deserializedDecompressed.getDenseBlock(), 0, numRows); + + for(int i = 0; i < numRows; i++) { + for(int j = 0; j < numCols; j++) { + assertArrayEquals(new double[] {originalDecompressed.get(i, j)}, + new double[] {deserializedDecompressed.get(i, j)}, 0.01); + } + } + } + + @Test + public void testSerializationTwoColumns() throws IOException { + double[][] data = {{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}}; + MatrixBlock mbt = DataConverter.convertToMatrixBlock(data); + final int numCols = mbt.getNumColumns(); + final int numRows = mbt.getNumRows(); + IColIndex colIndexes = ColIndexFactory.create(numCols); + + CompressionSettingsBuilder csb = new CompressionSettingsBuilder().setSamplingRatio(1.0) + .setValidCompressions(EnumSet.of(AColGroup.CompressionType.DeltaDDC)).setPreferDeltaEncoding(true) + .setTransposeInput("false"); + CompressionSettings cs = csb.create(); + + final CompressedSizeInfoColGroup cgi = new ComEstExact(mbt, cs).getDeltaColGroupInfo(colIndexes); + CompressedSizeInfo csi = new CompressedSizeInfo(cgi); + AColGroup original = ColGroupFactory.compressColGroups(mbt, csi, cs, 1).get(0); + + assertTrue("Original should be ColGroupDeltaDDC", original instanceof ColGroupDeltaDDC); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + ColGroupIO.writeGroups(dos, Collections.singletonList(original)); + assertEquals(original.getExactSizeOnDisk() + 4, bos.size()); + + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInputStream dis = new DataInputStream(bis); + AColGroup deserialized = ColGroupIO.readGroups(dis, numRows).get(0); + + assertTrue("Deserialized should be ColGroupDeltaDDC", deserialized instanceof ColGroupDeltaDDC); + assertEquals("Compression type should match", original.getCompType(), deserialized.getCompType()); + assertEquals("Exact size on disk should match", original.getExactSizeOnDisk(), + deserialized.getExactSizeOnDisk()); + + MatrixBlock originalDecompressed = new MatrixBlock(numRows, numCols, false); + originalDecompressed.allocateDenseBlock(); + original.decompressToDenseBlock(originalDecompressed.getDenseBlock(), 0, numRows); + + MatrixBlock deserializedDecompressed = new MatrixBlock(numRows, numCols, false); + deserializedDecompressed.allocateDenseBlock(); + deserialized.decompressToDenseBlock(deserializedDecompressed.getDenseBlock(), 0, numRows); + + for(int i = 0; i < numRows; i++) { + for(int j = 0; j < numCols; j++) { + assertArrayEquals(new double[] {originalDecompressed.get(i, j)}, + new double[] {deserializedDecompressed.get(i, j)}, 0.01); + } + } + } + + private AColGroup compressForTest(double[][] data) { + MatrixBlock mb = DataConverter.convertToMatrixBlock(data); + IColIndex colIndexes = ColIndexFactory.create(data[0].length); + CompressionSettings cs = new CompressionSettingsBuilder().setValidCompressions( + EnumSet.of(AColGroup.CompressionType.DDCLZW)).create(); + + final CompressedSizeInfoColGroup cgi = new ComEstExact(mb, cs).getDeltaColGroupInfo(colIndexes); + CompressedSizeInfo csi = new CompressedSizeInfo(cgi); + return ColGroupFactory.compressColGroups(mb, csi, cs, 1).get(0); + } + + @Test + public void testScalarEquals() { + double[][] data = {{0}, {1}, {2}, {3}, {0}}; + AColGroup cg = compressForTest(data); + assertTrue(cg instanceof ColGroupDeltaDDC); + + ScalarOperator op = new RightScalarOperator(Equals.getEqualsFnObject(), 0.0); + AColGroup res = cg.scalarOperation(op); + + MatrixBlock ret = new MatrixBlock(5, 1, false); + ret.allocateDenseBlock(); + res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5); + + assertEquals(1.0, ret.get(0, 0), 0.0); + assertEquals(0.0, ret.get(1, 0), 0.0); + assertEquals(0.0, ret.get(2, 0), 0.0); + assertEquals(0.0, ret.get(3, 0), 0.0); + assertEquals(1.0, ret.get(4, 0), 0.0); + } + + @Test + public void testScalarGreaterThan() { + double[][] data = {{0}, {1}, {2}, {3}, {0}}; + AColGroup cg = compressForTest(data); + assertTrue(cg instanceof ColGroupDeltaDDC); + + ScalarOperator op = new RightScalarOperator(GreaterThan.getGreaterThanFnObject(), 1.5); + AColGroup res = cg.scalarOperation(op); + + MatrixBlock ret = new MatrixBlock(5, 1, false); + ret.allocateDenseBlock(); + res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5); + + assertEquals(0.0, ret.get(0, 0), 0.0); + assertEquals(0.0, ret.get(1, 0), 0.0); + assertEquals(1.0, ret.get(2, 0), 0.0); + assertEquals(1.0, ret.get(3, 0), 0.0); + assertEquals(0.0, ret.get(4, 0), 0.0); + } + + @Test + public void testScalarPlus() { + double[][] data = {{1}, {2}, {3}, {4}, {5}}; + AColGroup cg = compressForTest(data); + assertTrue(cg instanceof ColGroupDeltaDDC); + + ScalarOperator op = new RightScalarOperator(Plus.getPlusFnObject(), 10.0); + AColGroup res = cg.scalarOperation(op); + assertTrue("Should remain DeltaDDC after shift", res instanceof ColGroupDeltaDDC); + + MatrixBlock ret = new MatrixBlock(5, 1, false); + ret.allocateDenseBlock(); + res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5); + + assertEquals(11.0, ret.get(0, 0), 0.0); + assertEquals(12.0, ret.get(1, 0), 0.0); + assertEquals(13.0, ret.get(2, 0), 0.0); + assertEquals(14.0, ret.get(3, 0), 0.0); + assertEquals(15.0, ret.get(4, 0), 0.0); + } + + @Test + public void testScalarMinus() { + double[][] data = {{11}, {12}, {13}, {14}, {15}}; + AColGroup cg = compressForTest(data); + assertTrue(cg instanceof ColGroupDeltaDDC); + + ScalarOperator op = new RightScalarOperator(Minus.getMinusFnObject(), 10.0); + AColGroup res = cg.scalarOperation(op); + assertTrue("Should remain DeltaDDC after shift", res instanceof ColGroupDeltaDDC); + + MatrixBlock ret = new MatrixBlock(5, 1, false); + ret.allocateDenseBlock(); + res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5); + + assertEquals(1.0, ret.get(0, 0), 0.0); + assertEquals(2.0, ret.get(1, 0), 0.0); + assertEquals(3.0, ret.get(2, 0), 0.0); + assertEquals(4.0, ret.get(3, 0), 0.0); + assertEquals(5.0, ret.get(4, 0), 0.0); + } + + @Test + public void testUnaryOperationSqrt() { + double[][] data = {{1}, {4}, {9}, {16}, {25}}; + AColGroup cg = compressForTest(data); + assertTrue(cg instanceof ColGroupDeltaDDC); + + UnaryOperator op = new UnaryOperator(Builtin.getBuiltinFnObject(Builtin.BuiltinCode.SQRT)); + AColGroup res = cg.unaryOperation(op); + + MatrixBlock ret = new MatrixBlock(5, 1, false); + ret.allocateDenseBlock(); + res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5); + + assertEquals(1.0, ret.get(0, 0), 0.01); + assertEquals(2.0, ret.get(1, 0), 0.01); + assertEquals(3.0, ret.get(2, 0), 0.01); + assertEquals(4.0, ret.get(3, 0), 0.01); + assertEquals(5.0, ret.get(4, 0), 0.01); + } + + @Test + public void testScalarEqualsMultiColumn() { + double[][] data = {{0, 1}, {1, 2}, {2, 3}, {3, 4}, {0, 1}}; + AColGroup cg = compressForTest(data); + assertTrue(cg instanceof ColGroupDeltaDDC); + + ScalarOperator op = new RightScalarOperator(Equals.getEqualsFnObject(), 0.0); + AColGroup res = cg.scalarOperation(op); + + MatrixBlock ret = new MatrixBlock(5, 2, false); + ret.allocateDenseBlock(); + res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5); + + assertEquals(1.0, ret.get(0, 0), 0.0); + assertEquals(0.0, ret.get(0, 1), 0.0); + assertEquals(0.0, ret.get(1, 0), 0.0); + assertEquals(0.0, ret.get(1, 1), 0.0); + assertEquals(0.0, ret.get(2, 0), 0.0); + assertEquals(0.0, ret.get(2, 1), 0.0); + assertEquals(0.0, ret.get(3, 0), 0.0); + assertEquals(0.0, ret.get(3, 1), 0.0); + assertEquals(1.0, ret.get(4, 0), 0.0); + assertEquals(0.0, ret.get(4, 1), 0.0); + } + + @Test + public void testScalarMultiply() { + double[][] data = {{1}, {2}, {3}, {4}, {5}}; + AColGroup cg = compressForTest(data); + assertTrue(cg instanceof ColGroupDeltaDDC); + + ScalarOperator op = new RightScalarOperator(Multiply.getMultiplyFnObject(), 2.0); + AColGroup res = cg.scalarOperation(op); + + MatrixBlock ret = new MatrixBlock(5, 1, false); + ret.allocateDenseBlock(); + res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5); + + assertEquals(2.0, ret.get(0, 0), 0.0); + assertEquals(4.0, ret.get(1, 0), 0.0); + assertEquals(6.0, ret.get(2, 0), 0.0); + assertEquals(8.0, ret.get(3, 0), 0.0); + assertEquals(10.0, ret.get(4, 0), 0.0); + } + + @Test + public void testScalarDivide() { + double[][] data = {{2}, {4}, {6}, {8}, {10}}; + AColGroup cg = compressForTest(data); + assertTrue(cg instanceof ColGroupDeltaDDC); + + ScalarOperator op = new RightScalarOperator(Divide.getDivideFnObject(), 2.0); + AColGroup res = cg.scalarOperation(op); + + MatrixBlock ret = new MatrixBlock(5, 1, false); + ret.allocateDenseBlock(); + res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5); + + assertEquals(1.0, ret.get(0, 0), 0.0); + assertEquals(2.0, ret.get(1, 0), 0.0); + assertEquals(3.0, ret.get(2, 0), 0.0); + assertEquals(4.0, ret.get(3, 0), 0.0); + assertEquals(5.0, ret.get(4, 0), 0.0); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCTest.java index 0f04cfc9c27..834e453c9e8 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDDCTest.java @@ -25,9 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.sysds.runtime.compress.colgroup.AColGroup; -import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC; -import org.apache.sysds.runtime.compress.colgroup.ColGroupDeltaDDC; +import org.apache.sysds.runtime.compress.colgroup.*; import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;