From b360c015d133a702441264f48362a7bd492abbd9 Mon Sep 17 00:00:00 2001 From: Yaobin Chen Date: Mon, 2 Mar 2026 16:43:41 +0800 Subject: [PATCH 1/2] support accurate filter rows recording during query execution --- .../tsfile/read/common/block/TsBlockUtil.java | 25 ++++++- .../tsfile/read/filter/basic/Filter.java | 39 ++++++++++ .../tsfile/read/reader/IPageReader.java | 3 + .../page/AbstractAlignedPageReader.java | 75 +++++++++++++++++++ .../tsfile/read/reader/page/PageReader.java | 52 ++++++++++--- 5 files changed, 181 insertions(+), 13 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java index c107c1ef1..703a47be7 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java @@ -25,6 +25,7 @@ import org.apache.tsfile.read.reader.series.PaginationController; import java.util.Arrays; +import java.util.function.Consumer; public class TsBlockUtil { @@ -75,10 +76,32 @@ public static TsBlock applyFilterAndLimitOffsetToTsBlock( TsBlockBuilder builder, Filter pushDownFilter, PaginationController paginationController) { + return applyFilterAndLimitOffsetToTsBlock( + unFilteredBlock, builder, pushDownFilter, paginationController, null); + } + + public static TsBlock applyFilterAndLimitOffsetToTsBlock( + TsBlock unFilteredBlock, + TsBlockBuilder builder, + Filter pushDownFilter, + PaginationController paginationController, + Consumer filterRowsRecorder) { + boolean[] selection = new boolean[unFilteredBlock.getPositionCount()]; Arrays.fill(selection, true); - boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(selection, unFilteredBlock); + boolean[] keepCurrentRow = + filterRowsRecorder == null + ? pushDownFilter.satisfyTsBlock(selection, unFilteredBlock) + : pushDownFilter.satisfyTsBlock(selection, unFilteredBlock, filterRowsRecorder); + return buildFilteredTsBlock(unFilteredBlock, builder, keepCurrentRow, paginationController); + } + + private static TsBlock buildFilteredTsBlock( + TsBlock unFilteredBlock, + TsBlockBuilder builder, + boolean[] keepCurrentRow, + PaginationController paginationController) { // construct time column int readEndIndex = buildTimeColumnWithPagination( diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java index 5cac76db6..8dd031eeb 100755 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.Consumer; /** * A Filter is an executable expression tree describing the criteria for which records to keep when @@ -117,6 +118,44 @@ public abstract class Filter { */ public abstract boolean[] satisfyTsBlock(boolean[] selection, TsBlock tsBlock); + public final boolean[] satisfyTsBlock( + boolean[] selection, TsBlock tsBlock, Consumer filterRowsRecorder) { + + int inputCount = countSelectedRows(selection); + boolean[] result = satisfyTsBlock(selection, tsBlock); + int outputCount = countSelectedRows(result); + if (inputCount > outputCount) { + filterRowsRecorder.accept((long) (inputCount - outputCount)); + } + + return result; + } + + private static int countSelectedRows(boolean[] selection) { + if (selection == null) return 0; + int count = 0; + int length = selection.length; + int i = 0; + + // calculate multi times + for (; i < length - 7; i += 8) { + count += + (selection[i] ? 1 : 0) + + (selection[i + 1] ? 1 : 0) + + (selection[i + 2] ? 1 : 0) + + (selection[i + 3] ? 1 : 0) + + (selection[i + 4] ? 1 : 0) + + (selection[i + 5] ? 1 : 0) + + (selection[i + 6] ? 1 : 0) + + (selection[i + 7] ? 1 : 0); + } + for (; i < length; i++) { + count += (selection[i] ? 1 : 0); + } + + return count; + } + /** * To examine whether the block can be skipped. * diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java index 1227fd012..55dcd5c18 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.List; +import java.util.function.Consumer; public interface IPageReader extends IMetadata { @@ -39,6 +40,8 @@ default BatchData getAllSatisfiedPageData() throws IOException { TsBlock getAllSatisfiedData() throws IOException; + TsBlock getAllSatisfiedData(Consumer filterRowsRecorder) throws IOException; + void addRecordFilter(Filter filter); // The 'modified' property is also true when a data type need to be modified in query and diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java index b3090a575..e678fa74a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java @@ -40,7 +40,9 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.function.Consumer; +import static java.util.Objects.requireNonNull; import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER; public abstract class AbstractAlignedPageReader implements IPageReader { @@ -215,6 +217,55 @@ public TsBlock getAllSatisfiedData() throws IOException { unFilteredBlock, builder, pushDownFilter, paginationController); } + /** + * get all satisfied data while record the number of filter rows. if one tuple is not satisfy by + * the filter and is deleted at the same time, the tuple cannot be considered as a filtered data. + */ + @Override + public TsBlock getAllSatisfiedData(Consumer filterRowsRecorder) throws IOException { + requireNonNull(filterRowsRecorder, "filterRowsRecorder is null"); + long[] timeBatch = timePageReader.getNextTimeBatch(); + + if (allPageDataSatisfy()) { + buildResultWithoutAnyFilterAndDelete(timeBatch); + return builder.build(); + } + + long allFilteredRows = 0; + // if !filter.satisfy, discard this row + boolean[] keepCurrentRow = new boolean[timeBatch.length]; + boolean globalTimeFilterAllSatisfy = globalTimeFilterAllSatisfy(); + if (globalTimeFilterAllSatisfy) { + Arrays.fill(keepCurrentRow, true); + } else { + // record the filtered rows number + long filteredRows = + updateKeepCurrentRowThroughGlobalTimeFilterWithRecord(keepCurrentRow, timeBatch); + allFilteredRows += filteredRows; + } + + if (timePageReader.isModified()) { + // if one row is deleted, it can't be considered as the filtered row + long deletedAndFilteredRows = + updateKeepCurrentRowThroughDeletionWithRecord(keepCurrentRow, timeBatch); + allFilteredRows -= deletedAndFilteredRows; + } + if (allFilteredRows != 0) { + filterRowsRecorder.accept(allFilteredRows); + } + boolean pushDownFilterAllSatisfy = pushDownFilterAllSatisfy(); + constructResult(keepCurrentRow, timeBatch, pushDownFilterAllSatisfy); + + TsBlock unFilteredBlock = builder.build(); + if (pushDownFilterAllSatisfy) { + // OFFSET & LIMIT has been consumed in buildTimeColumn + return unFilteredBlock; + } + builder.reset(); + return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock( + unFilteredBlock, builder, pushDownFilter, paginationController, filterRowsRecorder); + } + private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) throws IOException { if (paginationController.hasCurOffset(timeBatch.length)) { paginationController.consumeOffset(timeBatch.length); @@ -264,6 +315,17 @@ private void updateKeepCurrentRowThroughGlobalTimeFilter( } } + private long updateKeepCurrentRowThroughGlobalTimeFilterWithRecord( + boolean[] keepCurrentRow, long[] timeBatch) { + + long filteredRows = 0; + for (int i = 0, n = timeBatch.length; i < n; i++) { + keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null); + filteredRows += keepCurrentRow[i] ? 0 : 1; + } + return filteredRows; + } + private void updateKeepCurrentRowThroughDeletion(boolean[] keepCurrentRow, long[] timeBatch) { for (int i = 0, n = timeBatch.length; i < n; i++) { if (keepCurrentRow[i]) { @@ -272,6 +334,19 @@ private void updateKeepCurrentRowThroughDeletion(boolean[] keepCurrentRow, long[ } } + private long updateKeepCurrentRowThroughDeletionWithRecord( + boolean[] keepCurrentRow, long[] timeBatch) { + long deletedAndFilteredRows = 0; + for (int i = 0, n = timeBatch.length; i < n; i++) { + if (keepCurrentRow[i]) { + keepCurrentRow[i] = !timePageReader.isDeleted(timeBatch[i]); + } else { + deletedAndFilteredRows += timePageReader.isDeleted(timeBatch[i]) ? 1 : 0; + } + } + return deletedAndFilteredRows; + } + protected int buildTimeColumn( long[] timeBatch, boolean[] keepCurrentRow, boolean pushDownFilterAllSatisfy) { if (pushDownFilterAllSatisfy) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java index 2576706b2..06964d772 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.function.Consumer; import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER; import static org.apache.tsfile.utils.Preconditions.checkArgument; @@ -212,6 +213,11 @@ public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException { @Override public TsBlock getAllSatisfiedData() throws IOException { + return getAllSatisfiedData(null); + } + + @Override + public TsBlock getAllSatisfiedData(Consumer filterRowsRecorder) throws IOException { uncompressDataIfNecessary(); TsBlockBuilder builder; int initialExpectedEntries = (int) pageHeader.getStatistics().getCount(); @@ -223,14 +229,18 @@ public TsBlock getAllSatisfiedData() throws IOException { TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); ColumnBuilder valueBuilder = builder.getColumnBuilder(0); + long allFilteredRows = 0; boolean allSatisfy = recordFilter == null || recordFilter.allSatisfy(this); switch (dataType) { case BOOLEAN: while (timeDecoder.hasNext(timeBuffer)) { long timestamp = timeDecoder.readLong(timeBuffer); boolean aBoolean = valueDecoder.readBoolean(valueBuffer); - if (isDeleted(timestamp) - || (!allSatisfy && !recordFilter.satisfyBoolean(timestamp, aBoolean))) { + if (isDeleted(timestamp)) { + continue; + } + if (!allSatisfy && !recordFilter.satisfyBoolean(timestamp, aBoolean)) { + allFilteredRows++; continue; } if (paginationController.hasCurOffset()) { @@ -252,8 +262,11 @@ public TsBlock getAllSatisfiedData() throws IOException { while (timeDecoder.hasNext(timeBuffer)) { long timestamp = timeDecoder.readLong(timeBuffer); int anInt = valueDecoder.readInt(valueBuffer); - if (isDeleted(timestamp) - || (!allSatisfy && !recordFilter.satisfyInteger(timestamp, anInt))) { + if (isDeleted(timestamp)) { + continue; + } + if (!allSatisfy && !recordFilter.satisfyInteger(timestamp, anInt)) { + allFilteredRows++; continue; } if (paginationController.hasCurOffset()) { @@ -275,8 +288,11 @@ public TsBlock getAllSatisfiedData() throws IOException { while (timeDecoder.hasNext(timeBuffer)) { long timestamp = timeDecoder.readLong(timeBuffer); long aLong = valueDecoder.readLong(valueBuffer); - if (isDeleted(timestamp) - || (!allSatisfy && !recordFilter.satisfyLong(timestamp, aLong))) { + if (isDeleted(timestamp)) { + continue; + } + if (!allSatisfy && !recordFilter.satisfyLong(timestamp, aLong)) { + allFilteredRows++; continue; } if (paginationController.hasCurOffset()) { @@ -297,8 +313,11 @@ public TsBlock getAllSatisfiedData() throws IOException { while (timeDecoder.hasNext(timeBuffer)) { long timestamp = timeDecoder.readLong(timeBuffer); float aFloat = valueDecoder.readFloat(valueBuffer); - if (isDeleted(timestamp) - || (!allSatisfy && !recordFilter.satisfyFloat(timestamp, aFloat))) { + if (isDeleted(timestamp)) { + continue; + } + if (!allSatisfy && !recordFilter.satisfyFloat(timestamp, aFloat)) { + allFilteredRows++; continue; } if (paginationController.hasCurOffset()) { @@ -319,8 +338,11 @@ public TsBlock getAllSatisfiedData() throws IOException { while (timeDecoder.hasNext(timeBuffer)) { long timestamp = timeDecoder.readLong(timeBuffer); double aDouble = valueDecoder.readDouble(valueBuffer); - if (isDeleted(timestamp) - || (!allSatisfy && !recordFilter.satisfyDouble(timestamp, aDouble))) { + if (isDeleted(timestamp)) { + continue; + } + if (!allSatisfy && !recordFilter.satisfyDouble(timestamp, aDouble)) { + allFilteredRows++; continue; } if (paginationController.hasCurOffset()) { @@ -344,8 +366,11 @@ public TsBlock getAllSatisfiedData() throws IOException { while (timeDecoder.hasNext(timeBuffer)) { long timestamp = timeDecoder.readLong(timeBuffer); Binary aBinary = valueDecoder.readBinary(valueBuffer); - if (isDeleted(timestamp) - || (!allSatisfy && !recordFilter.satisfyBinary(timestamp, aBinary))) { + if (isDeleted(timestamp)) { + continue; + } + if (!allSatisfy && !recordFilter.satisfyBinary(timestamp, aBinary)) { + allFilteredRows++; continue; } if (paginationController.hasCurOffset()) { @@ -365,6 +390,9 @@ public TsBlock getAllSatisfiedData() throws IOException { default: throw new UnSupportedDataTypeException(String.valueOf(dataType)); } + if (filterRowsRecorder != null && allFilteredRows > 0) { + filterRowsRecorder.accept(allFilteredRows); + } return builder.build(); } From 6c80a411699dc9323f175555ce47d67a4cb3620c Mon Sep 17 00:00:00 2001 From: Yaobin Chen Date: Wed, 18 Mar 2026 15:18:06 +0800 Subject: [PATCH 2/2] change the Consumer type to LongConsumer type, support count the filtered rows during construct ChunkReader and init all page readers --- .../tsfile/read/common/block/TsBlockUtil.java | 4 +-- .../tsfile/read/filter/basic/Filter.java | 26 +++---------------- .../tsfile/read/reader/IPageReader.java | 14 ++++++++-- .../chunk/AbstractAlignedChunkReader.java | 19 +++++++++++--- .../reader/chunk/AbstractChunkReader.java | 6 ++++- .../read/reader/chunk/AlignedChunkReader.java | 24 +++++++++++++---- .../tsfile/read/reader/chunk/ChunkReader.java | 26 ++++++++++++++----- .../read/reader/chunk/TableChunkReader.java | 20 +++++++++++--- .../page/AbstractAlignedPageReader.java | 4 +-- .../tsfile/read/reader/page/PageReader.java | 4 +-- 10 files changed, 97 insertions(+), 50 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java index 703a47be7..d65ca1722 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java @@ -25,7 +25,7 @@ import org.apache.tsfile.read.reader.series.PaginationController; import java.util.Arrays; -import java.util.function.Consumer; +import java.util.function.LongConsumer; public class TsBlockUtil { @@ -85,7 +85,7 @@ public static TsBlock applyFilterAndLimitOffsetToTsBlock( TsBlockBuilder builder, Filter pushDownFilter, PaginationController paginationController, - Consumer filterRowsRecorder) { + LongConsumer filterRowsRecorder) { boolean[] selection = new boolean[unFilteredBlock.getPositionCount()]; Arrays.fill(selection, true); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java index 8dd031eeb..bebe3a4c3 100755 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java @@ -43,7 +43,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; -import java.util.function.Consumer; +import java.util.function.LongConsumer; /** * A Filter is an executable expression tree describing the criteria for which records to keep when @@ -119,13 +119,13 @@ public abstract class Filter { public abstract boolean[] satisfyTsBlock(boolean[] selection, TsBlock tsBlock); public final boolean[] satisfyTsBlock( - boolean[] selection, TsBlock tsBlock, Consumer filterRowsRecorder) { + boolean[] selection, TsBlock tsBlock, LongConsumer filterRowsRecorder) { int inputCount = countSelectedRows(selection); boolean[] result = satisfyTsBlock(selection, tsBlock); int outputCount = countSelectedRows(result); if (inputCount > outputCount) { - filterRowsRecorder.accept((long) (inputCount - outputCount)); + filterRowsRecorder.accept((inputCount - outputCount)); } return result; @@ -134,25 +134,7 @@ public final boolean[] satisfyTsBlock( private static int countSelectedRows(boolean[] selection) { if (selection == null) return 0; int count = 0; - int length = selection.length; - int i = 0; - - // calculate multi times - for (; i < length - 7; i += 8) { - count += - (selection[i] ? 1 : 0) - + (selection[i + 1] ? 1 : 0) - + (selection[i + 2] ? 1 : 0) - + (selection[i + 3] ? 1 : 0) - + (selection[i + 4] ? 1 : 0) - + (selection[i + 5] ? 1 : 0) - + (selection[i + 6] ? 1 : 0) - + (selection[i + 7] ? 1 : 0); - } - for (; i < length; i++) { - count += (selection[i] ? 1 : 0); - } - + for (boolean b : selection) count += b ? 1 : 0; return count; } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java index 55dcd5c18..68ce7ef32 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java @@ -28,7 +28,7 @@ import java.io.IOException; import java.util.List; -import java.util.function.Consumer; +import java.util.function.LongConsumer; public interface IPageReader extends IMetadata { @@ -40,7 +40,17 @@ default BatchData getAllSatisfiedPageData() throws IOException { TsBlock getAllSatisfiedData() throws IOException; - TsBlock getAllSatisfiedData(Consumer filterRowsRecorder) throws IOException; + /** + * Reads all rows from the page that satisfy the current filter, while also recording the number + * of rows filtered out. + * + *

This method behaves identically to {@link #getAllSatisfiedData()}, with the addition that it + * reports the count of rows that were discarded by the filter to the provided {@code + * filterRowsRecorder}. + * + * @param filterRowsRecorder, receives the number of rows filtered out; + */ + TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws IOException; void addRecordFilter(Filter filter); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java index 22d743530..85073a456 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java @@ -39,6 +39,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.function.LongConsumer; public abstract class AbstractAlignedChunkReader extends AbstractChunkReader { // chunk header of the time column @@ -59,9 +60,13 @@ public abstract class AbstractAlignedChunkReader extends AbstractChunkReader { @SuppressWarnings("unchecked") AbstractAlignedChunkReader( - Chunk timeChunk, List valueChunkList, long readStopTime, Filter queryFilter) + Chunk timeChunk, + List valueChunkList, + long readStopTime, + Filter queryFilter, + LongConsumer filterRowsRecorder) throws IOException { - super(readStopTime, queryFilter); + super(readStopTime, queryFilter, filterRowsRecorder); this.timeChunkHeader = timeChunk.getHeader(); this.timeChunkDataBuffer = timeChunk.getData(); this.timeDeleteIntervalList = timeChunk.getDeleteIntervalList(); @@ -163,8 +168,14 @@ protected boolean isEarlierThanReadStopTime(final PageHeader timePageHeader) { } protected boolean pageCanSkip(PageHeader pageHeader) { - return queryFilter != null - && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime()); + if (queryFilter != null + && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime())) { + if (filterRowsRecorder != null) { + this.filterRowsRecorder.accept(pageHeader.getStatistics().getCount()); + } + return true; + } + return false; } private void skipCurrentPage(PageHeader timePageHeader, List valuePageHeader) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java index 627ace07e..caaa58751 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; +import java.util.function.LongConsumer; public abstract class AbstractChunkReader implements IChunkReader { @@ -44,11 +45,14 @@ public abstract class AbstractChunkReader implements IChunkReader { // any filter, no matter value filter or time filter protected final Filter queryFilter; + protected LongConsumer filterRowsRecorder; + protected final List pageReaderList = new LinkedList<>(); - protected AbstractChunkReader(long readStopTime, Filter filter) { + protected AbstractChunkReader(long readStopTime, Filter filter, LongConsumer filterRowsRecorder) { this.readStopTime = readStopTime; this.queryFilter = filter; + this.filterRowsRecorder = filterRowsRecorder; } /** judge if has next page whose page header satisfies the filter. */ diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java index e7879734c..a7a4723bd 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java @@ -32,22 +32,36 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.LongConsumer; public class AlignedChunkReader extends AbstractAlignedChunkReader { public AlignedChunkReader( - Chunk timeChunk, List valueChunkList, long readStopTime, Filter queryFilter) + Chunk timeChunk, + List valueChunkList, + long readStopTime, + Filter queryFilter, + LongConsumer filterRowsRecorder) throws IOException { - super(timeChunk, valueChunkList, readStopTime, queryFilter); + super(timeChunk, valueChunkList, readStopTime, queryFilter, filterRowsRecorder); } public AlignedChunkReader(Chunk timeChunk, List valueChunkList) throws IOException { - this(timeChunk, valueChunkList, Long.MIN_VALUE, null); + this(timeChunk, valueChunkList, Long.MIN_VALUE, null, null); } public AlignedChunkReader(Chunk timeChunk, List valueChunkList, Filter queryFilter) throws IOException { - this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter); + this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, null); + } + + public AlignedChunkReader( + Chunk timeChunk, + List valueChunkList, + Filter queryFilter, + LongConsumer filterRowsRecorder) + throws IOException { + this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, filterRowsRecorder); } /** @@ -56,7 +70,7 @@ public AlignedChunkReader(Chunk timeChunk, List valueChunkList, Filter qu */ public AlignedChunkReader(Chunk timeChunk, List valueChunkList, long readStopTime) throws IOException { - this(timeChunk, valueChunkList, readStopTime, null); + this(timeChunk, valueChunkList, readStopTime, null, null); } @Override diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java index 806a3b818..c2b2301d7 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java @@ -37,6 +37,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.LongConsumer; public class ChunkReader extends AbstractChunkReader { @@ -47,8 +48,9 @@ public class ChunkReader extends AbstractChunkReader { private final EncryptParameter encryptParam; @SuppressWarnings("unchecked") - public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) { - super(readStopTime, queryFilter); + public ChunkReader( + Chunk chunk, long readStopTime, Filter queryFilter, LongConsumer filterRowsRecorder) { + super(readStopTime, queryFilter, filterRowsRecorder); this.chunkHeader = chunk.getHeader(); this.chunkDataBuffer = chunk.getData(); this.deleteIntervalList = chunk.getDeleteIntervalList(); @@ -57,11 +59,15 @@ public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) { } public ChunkReader(Chunk chunk) throws IOException { - this(chunk, Long.MIN_VALUE, null); + this(chunk, Long.MIN_VALUE, null, null); } public ChunkReader(Chunk chunk, Filter queryFilter) { - this(chunk, Long.MIN_VALUE, queryFilter); + this(chunk, Long.MIN_VALUE, queryFilter, null); + } + + public ChunkReader(Chunk chunk, Filter queryFilter, LongConsumer filterRowsRecorder) { + this(chunk, Long.MIN_VALUE, queryFilter, filterRowsRecorder); } /** @@ -69,7 +75,7 @@ public ChunkReader(Chunk chunk, Filter queryFilter) { * filtering out pages whose endTime is less than current timestamp. */ public ChunkReader(Chunk chunk, long readStopTime) { - this(chunk, readStopTime, null); + this(chunk, readStopTime, null, null); } private void initAllPageReaders(Statistics chunkStatistic) { @@ -99,8 +105,14 @@ private void initAllPageReaders(Statistics chunkStatisti } private boolean pageCanSkip(PageHeader pageHeader) { - return queryFilter != null - && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime()); + if (queryFilter != null + && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime())) { + if (filterRowsRecorder != null) { + this.filterRowsRecorder.accept(pageHeader.getStatistics().getCount()); + } + return true; + } + return false; } protected boolean pageDeleted(PageHeader pageHeader) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java index cc156e324..32bdbf1d2 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java @@ -32,20 +32,34 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.LongConsumer; // difference with AlignedChunkReader is that TableChunkReader works for TableScan and keep all null // rows public class TableChunkReader extends AbstractAlignedChunkReader { public TableChunkReader( - Chunk timeChunk, List valueChunkList, long readStopTime, Filter queryFilter) + Chunk timeChunk, + List valueChunkList, + long readStopTime, + Filter queryFilter, + LongConsumer filterRowsRecorder) throws IOException { - super(timeChunk, valueChunkList, readStopTime, queryFilter); + super(timeChunk, valueChunkList, readStopTime, queryFilter, filterRowsRecorder); } public TableChunkReader(Chunk timeChunk, List valueChunkList, Filter queryFilter) throws IOException { - this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter); + this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, null); + } + + public TableChunkReader( + Chunk timeChunk, + List valueChunkList, + Filter queryFilter, + LongConsumer filterRowsRecorder) + throws IOException { + this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, filterRowsRecorder); } @Override diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java index e678fa74a..2a95472b6 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java @@ -40,7 +40,7 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.function.Consumer; +import java.util.function.LongConsumer; import static java.util.Objects.requireNonNull; import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER; @@ -222,7 +222,7 @@ public TsBlock getAllSatisfiedData() throws IOException { * the filter and is deleted at the same time, the tuple cannot be considered as a filtered data. */ @Override - public TsBlock getAllSatisfiedData(Consumer filterRowsRecorder) throws IOException { + public TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws IOException { requireNonNull(filterRowsRecorder, "filterRowsRecorder is null"); long[] timeBatch = timePageReader.getNextTimeBatch(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java index 06964d772..2db020cf8 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java @@ -44,7 +44,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.function.Consumer; +import java.util.function.LongConsumer; import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER; import static org.apache.tsfile.utils.Preconditions.checkArgument; @@ -217,7 +217,7 @@ public TsBlock getAllSatisfiedData() throws IOException { } @Override - public TsBlock getAllSatisfiedData(Consumer filterRowsRecorder) throws IOException { + public TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws IOException { uncompressDataIfNecessary(); TsBlockBuilder builder; int initialExpectedEntries = (int) pageHeader.getStatistics().getCount();