diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java
index c107c1ef1..d65ca1722 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java
@@ -25,6 +25,7 @@
import org.apache.tsfile.read.reader.series.PaginationController;
import java.util.Arrays;
+import java.util.function.LongConsumer;
public class TsBlockUtil {
@@ -75,10 +76,32 @@ public static TsBlock applyFilterAndLimitOffsetToTsBlock(
TsBlockBuilder builder,
Filter pushDownFilter,
PaginationController paginationController) {
+ return applyFilterAndLimitOffsetToTsBlock(
+ unFilteredBlock, builder, pushDownFilter, paginationController, null);
+ }
+
+ public static TsBlock applyFilterAndLimitOffsetToTsBlock(
+ TsBlock unFilteredBlock,
+ TsBlockBuilder builder,
+ Filter pushDownFilter,
+ PaginationController paginationController,
+ LongConsumer filterRowsRecorder) {
+
boolean[] selection = new boolean[unFilteredBlock.getPositionCount()];
Arrays.fill(selection, true);
- boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(selection, unFilteredBlock);
+ boolean[] keepCurrentRow =
+ filterRowsRecorder == null
+ ? pushDownFilter.satisfyTsBlock(selection, unFilteredBlock)
+ : pushDownFilter.satisfyTsBlock(selection, unFilteredBlock, filterRowsRecorder);
+ return buildFilteredTsBlock(unFilteredBlock, builder, keepCurrentRow, paginationController);
+ }
+
+ private static TsBlock buildFilteredTsBlock(
+ TsBlock unFilteredBlock,
+ TsBlockBuilder builder,
+ boolean[] keepCurrentRow,
+ PaginationController paginationController) {
// construct time column
int readEndIndex =
buildTimeColumnWithPagination(
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java
index 5cac76db6..bebe3a4c3 100755
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/filter/basic/Filter.java
@@ -43,6 +43,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.function.LongConsumer;
/**
* A Filter is an executable expression tree describing the criteria for which records to keep when
@@ -117,6 +118,26 @@ public abstract class Filter {
*/
public abstract boolean[] satisfyTsBlock(boolean[] selection, TsBlock tsBlock);
+ public final boolean[] satisfyTsBlock(
+ boolean[] selection, TsBlock tsBlock, LongConsumer filterRowsRecorder) {
+
+ int inputCount = countSelectedRows(selection);
+ boolean[] result = satisfyTsBlock(selection, tsBlock);
+ int outputCount = countSelectedRows(result);
+ if (inputCount > outputCount) {
+ filterRowsRecorder.accept((inputCount - outputCount));
+ }
+
+ return result;
+ }
+
+ private static int countSelectedRows(boolean[] selection) {
+ if (selection == null) return 0;
+ int count = 0;
+ for (boolean b : selection) count += b ? 1 : 0;
+ return count;
+ }
+
/**
* To examine whether the block can be skipped.
*
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java
index 1227fd012..68ce7ef32 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/IPageReader.java
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.function.LongConsumer;
public interface IPageReader extends IMetadata {
@@ -39,6 +40,18 @@ default BatchData getAllSatisfiedPageData() throws IOException {
TsBlock getAllSatisfiedData() throws IOException;
+ /**
+ * Reads all rows from the page that satisfy the current filter, while also recording the number
+ * of rows filtered out.
+ *
+ *
This method behaves identically to {@link #getAllSatisfiedData()}, with the addition that it
+ * reports the count of rows that were discarded by the filter to the provided {@code
+ * filterRowsRecorder}.
+ *
+ * @param filterRowsRecorder, receives the number of rows filtered out;
+ */
+ TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws IOException;
+
void addRecordFilter(Filter filter);
// The 'modified' property is also true when a data type need to be modified in query and
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
index 22d743530..85073a456 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
@@ -39,6 +39,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.LongConsumer;
public abstract class AbstractAlignedChunkReader extends AbstractChunkReader {
// chunk header of the time column
@@ -59,9 +60,13 @@ public abstract class AbstractAlignedChunkReader extends AbstractChunkReader {
@SuppressWarnings("unchecked")
AbstractAlignedChunkReader(
- Chunk timeChunk, List valueChunkList, long readStopTime, Filter queryFilter)
+ Chunk timeChunk,
+ List valueChunkList,
+ long readStopTime,
+ Filter queryFilter,
+ LongConsumer filterRowsRecorder)
throws IOException {
- super(readStopTime, queryFilter);
+ super(readStopTime, queryFilter, filterRowsRecorder);
this.timeChunkHeader = timeChunk.getHeader();
this.timeChunkDataBuffer = timeChunk.getData();
this.timeDeleteIntervalList = timeChunk.getDeleteIntervalList();
@@ -163,8 +168,14 @@ protected boolean isEarlierThanReadStopTime(final PageHeader timePageHeader) {
}
protected boolean pageCanSkip(PageHeader pageHeader) {
- return queryFilter != null
- && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime());
+ if (queryFilter != null
+ && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime())) {
+ if (filterRowsRecorder != null) {
+ this.filterRowsRecorder.accept(pageHeader.getStatistics().getCount());
+ }
+ return true;
+ }
+ return false;
}
private void skipCurrentPage(PageHeader timePageHeader, List valuePageHeader) {
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java
index 627ace07e..caaa58751 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java
@@ -31,6 +31,7 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import java.util.function.LongConsumer;
public abstract class AbstractChunkReader implements IChunkReader {
@@ -44,11 +45,14 @@ public abstract class AbstractChunkReader implements IChunkReader {
// any filter, no matter value filter or time filter
protected final Filter queryFilter;
+ protected LongConsumer filterRowsRecorder;
+
protected final List pageReaderList = new LinkedList<>();
- protected AbstractChunkReader(long readStopTime, Filter filter) {
+ protected AbstractChunkReader(long readStopTime, Filter filter, LongConsumer filterRowsRecorder) {
this.readStopTime = readStopTime;
this.queryFilter = filter;
+ this.filterRowsRecorder = filterRowsRecorder;
}
/** judge if has next page whose page header satisfies the filter. */
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
index e7879734c..a7a4723bd 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
@@ -32,22 +32,36 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.function.LongConsumer;
public class AlignedChunkReader extends AbstractAlignedChunkReader {
public AlignedChunkReader(
- Chunk timeChunk, List valueChunkList, long readStopTime, Filter queryFilter)
+ Chunk timeChunk,
+ List valueChunkList,
+ long readStopTime,
+ Filter queryFilter,
+ LongConsumer filterRowsRecorder)
throws IOException {
- super(timeChunk, valueChunkList, readStopTime, queryFilter);
+ super(timeChunk, valueChunkList, readStopTime, queryFilter, filterRowsRecorder);
}
public AlignedChunkReader(Chunk timeChunk, List valueChunkList) throws IOException {
- this(timeChunk, valueChunkList, Long.MIN_VALUE, null);
+ this(timeChunk, valueChunkList, Long.MIN_VALUE, null, null);
}
public AlignedChunkReader(Chunk timeChunk, List valueChunkList, Filter queryFilter)
throws IOException {
- this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter);
+ this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, null);
+ }
+
+ public AlignedChunkReader(
+ Chunk timeChunk,
+ List valueChunkList,
+ Filter queryFilter,
+ LongConsumer filterRowsRecorder)
+ throws IOException {
+ this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, filterRowsRecorder);
}
/**
@@ -56,7 +70,7 @@ public AlignedChunkReader(Chunk timeChunk, List valueChunkList, Filter qu
*/
public AlignedChunkReader(Chunk timeChunk, List valueChunkList, long readStopTime)
throws IOException {
- this(timeChunk, valueChunkList, readStopTime, null);
+ this(timeChunk, valueChunkList, readStopTime, null, null);
}
@Override
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
index 806a3b818..c2b2301d7 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
@@ -37,6 +37,7 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.function.LongConsumer;
public class ChunkReader extends AbstractChunkReader {
@@ -47,8 +48,9 @@ public class ChunkReader extends AbstractChunkReader {
private final EncryptParameter encryptParam;
@SuppressWarnings("unchecked")
- public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) {
- super(readStopTime, queryFilter);
+ public ChunkReader(
+ Chunk chunk, long readStopTime, Filter queryFilter, LongConsumer filterRowsRecorder) {
+ super(readStopTime, queryFilter, filterRowsRecorder);
this.chunkHeader = chunk.getHeader();
this.chunkDataBuffer = chunk.getData();
this.deleteIntervalList = chunk.getDeleteIntervalList();
@@ -57,11 +59,15 @@ public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) {
}
public ChunkReader(Chunk chunk) throws IOException {
- this(chunk, Long.MIN_VALUE, null);
+ this(chunk, Long.MIN_VALUE, null, null);
}
public ChunkReader(Chunk chunk, Filter queryFilter) {
- this(chunk, Long.MIN_VALUE, queryFilter);
+ this(chunk, Long.MIN_VALUE, queryFilter, null);
+ }
+
+ public ChunkReader(Chunk chunk, Filter queryFilter, LongConsumer filterRowsRecorder) {
+ this(chunk, Long.MIN_VALUE, queryFilter, filterRowsRecorder);
}
/**
@@ -69,7 +75,7 @@ public ChunkReader(Chunk chunk, Filter queryFilter) {
* filtering out pages whose endTime is less than current timestamp.
*/
public ChunkReader(Chunk chunk, long readStopTime) {
- this(chunk, readStopTime, null);
+ this(chunk, readStopTime, null, null);
}
private void initAllPageReaders(Statistics extends Serializable> chunkStatistic) {
@@ -99,8 +105,14 @@ private void initAllPageReaders(Statistics extends Serializable> chunkStatisti
}
private boolean pageCanSkip(PageHeader pageHeader) {
- return queryFilter != null
- && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime());
+ if (queryFilter != null
+ && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime())) {
+ if (filterRowsRecorder != null) {
+ this.filterRowsRecorder.accept(pageHeader.getStatistics().getCount());
+ }
+ return true;
+ }
+ return false;
}
protected boolean pageDeleted(PageHeader pageHeader) {
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
index cc156e324..32bdbf1d2 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
@@ -32,20 +32,34 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.function.LongConsumer;
// difference with AlignedChunkReader is that TableChunkReader works for TableScan and keep all null
// rows
public class TableChunkReader extends AbstractAlignedChunkReader {
public TableChunkReader(
- Chunk timeChunk, List valueChunkList, long readStopTime, Filter queryFilter)
+ Chunk timeChunk,
+ List valueChunkList,
+ long readStopTime,
+ Filter queryFilter,
+ LongConsumer filterRowsRecorder)
throws IOException {
- super(timeChunk, valueChunkList, readStopTime, queryFilter);
+ super(timeChunk, valueChunkList, readStopTime, queryFilter, filterRowsRecorder);
}
public TableChunkReader(Chunk timeChunk, List valueChunkList, Filter queryFilter)
throws IOException {
- this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter);
+ this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, null);
+ }
+
+ public TableChunkReader(
+ Chunk timeChunk,
+ List valueChunkList,
+ Filter queryFilter,
+ LongConsumer filterRowsRecorder)
+ throws IOException {
+ this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, filterRowsRecorder);
}
@Override
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
index b3090a575..2a95472b6 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
@@ -40,7 +40,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.function.LongConsumer;
+import static java.util.Objects.requireNonNull;
import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
public abstract class AbstractAlignedPageReader implements IPageReader {
@@ -215,6 +217,55 @@ public TsBlock getAllSatisfiedData() throws IOException {
unFilteredBlock, builder, pushDownFilter, paginationController);
}
+ /**
+ * get all satisfied data while record the number of filter rows. if one tuple is not satisfy by
+ * the filter and is deleted at the same time, the tuple cannot be considered as a filtered data.
+ */
+ @Override
+ public TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws IOException {
+ requireNonNull(filterRowsRecorder, "filterRowsRecorder is null");
+ long[] timeBatch = timePageReader.getNextTimeBatch();
+
+ if (allPageDataSatisfy()) {
+ buildResultWithoutAnyFilterAndDelete(timeBatch);
+ return builder.build();
+ }
+
+ long allFilteredRows = 0;
+ // if !filter.satisfy, discard this row
+ boolean[] keepCurrentRow = new boolean[timeBatch.length];
+ boolean globalTimeFilterAllSatisfy = globalTimeFilterAllSatisfy();
+ if (globalTimeFilterAllSatisfy) {
+ Arrays.fill(keepCurrentRow, true);
+ } else {
+ // record the filtered rows number
+ long filteredRows =
+ updateKeepCurrentRowThroughGlobalTimeFilterWithRecord(keepCurrentRow, timeBatch);
+ allFilteredRows += filteredRows;
+ }
+
+ if (timePageReader.isModified()) {
+ // if one row is deleted, it can't be considered as the filtered row
+ long deletedAndFilteredRows =
+ updateKeepCurrentRowThroughDeletionWithRecord(keepCurrentRow, timeBatch);
+ allFilteredRows -= deletedAndFilteredRows;
+ }
+ if (allFilteredRows != 0) {
+ filterRowsRecorder.accept(allFilteredRows);
+ }
+ boolean pushDownFilterAllSatisfy = pushDownFilterAllSatisfy();
+ constructResult(keepCurrentRow, timeBatch, pushDownFilterAllSatisfy);
+
+ TsBlock unFilteredBlock = builder.build();
+ if (pushDownFilterAllSatisfy) {
+ // OFFSET & LIMIT has been consumed in buildTimeColumn
+ return unFilteredBlock;
+ }
+ builder.reset();
+ return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
+ unFilteredBlock, builder, pushDownFilter, paginationController, filterRowsRecorder);
+ }
+
private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) throws IOException {
if (paginationController.hasCurOffset(timeBatch.length)) {
paginationController.consumeOffset(timeBatch.length);
@@ -264,6 +315,17 @@ private void updateKeepCurrentRowThroughGlobalTimeFilter(
}
}
+ private long updateKeepCurrentRowThroughGlobalTimeFilterWithRecord(
+ boolean[] keepCurrentRow, long[] timeBatch) {
+
+ long filteredRows = 0;
+ for (int i = 0, n = timeBatch.length; i < n; i++) {
+ keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null);
+ filteredRows += keepCurrentRow[i] ? 0 : 1;
+ }
+ return filteredRows;
+ }
+
private void updateKeepCurrentRowThroughDeletion(boolean[] keepCurrentRow, long[] timeBatch) {
for (int i = 0, n = timeBatch.length; i < n; i++) {
if (keepCurrentRow[i]) {
@@ -272,6 +334,19 @@ private void updateKeepCurrentRowThroughDeletion(boolean[] keepCurrentRow, long[
}
}
+ private long updateKeepCurrentRowThroughDeletionWithRecord(
+ boolean[] keepCurrentRow, long[] timeBatch) {
+ long deletedAndFilteredRows = 0;
+ for (int i = 0, n = timeBatch.length; i < n; i++) {
+ if (keepCurrentRow[i]) {
+ keepCurrentRow[i] = !timePageReader.isDeleted(timeBatch[i]);
+ } else {
+ deletedAndFilteredRows += timePageReader.isDeleted(timeBatch[i]) ? 1 : 0;
+ }
+ }
+ return deletedAndFilteredRows;
+ }
+
protected int buildTimeColumn(
long[] timeBatch, boolean[] keepCurrentRow, boolean pushDownFilterAllSatisfy) {
if (pushDownFilterAllSatisfy) {
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
index 2576706b2..2db020cf8 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
@@ -44,6 +44,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.function.LongConsumer;
import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
import static org.apache.tsfile.utils.Preconditions.checkArgument;
@@ -212,6 +213,11 @@ public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
@Override
public TsBlock getAllSatisfiedData() throws IOException {
+ return getAllSatisfiedData(null);
+ }
+
+ @Override
+ public TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws IOException {
uncompressDataIfNecessary();
TsBlockBuilder builder;
int initialExpectedEntries = (int) pageHeader.getStatistics().getCount();
@@ -223,14 +229,18 @@ public TsBlock getAllSatisfiedData() throws IOException {
TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
+ long allFilteredRows = 0;
boolean allSatisfy = recordFilter == null || recordFilter.allSatisfy(this);
switch (dataType) {
case BOOLEAN:
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
- if (isDeleted(timestamp)
- || (!allSatisfy && !recordFilter.satisfyBoolean(timestamp, aBoolean))) {
+ if (isDeleted(timestamp)) {
+ continue;
+ }
+ if (!allSatisfy && !recordFilter.satisfyBoolean(timestamp, aBoolean)) {
+ allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
@@ -252,8 +262,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
int anInt = valueDecoder.readInt(valueBuffer);
- if (isDeleted(timestamp)
- || (!allSatisfy && !recordFilter.satisfyInteger(timestamp, anInt))) {
+ if (isDeleted(timestamp)) {
+ continue;
+ }
+ if (!allSatisfy && !recordFilter.satisfyInteger(timestamp, anInt)) {
+ allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
@@ -275,8 +288,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
long aLong = valueDecoder.readLong(valueBuffer);
- if (isDeleted(timestamp)
- || (!allSatisfy && !recordFilter.satisfyLong(timestamp, aLong))) {
+ if (isDeleted(timestamp)) {
+ continue;
+ }
+ if (!allSatisfy && !recordFilter.satisfyLong(timestamp, aLong)) {
+ allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
@@ -297,8 +313,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
float aFloat = valueDecoder.readFloat(valueBuffer);
- if (isDeleted(timestamp)
- || (!allSatisfy && !recordFilter.satisfyFloat(timestamp, aFloat))) {
+ if (isDeleted(timestamp)) {
+ continue;
+ }
+ if (!allSatisfy && !recordFilter.satisfyFloat(timestamp, aFloat)) {
+ allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
@@ -319,8 +338,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
double aDouble = valueDecoder.readDouble(valueBuffer);
- if (isDeleted(timestamp)
- || (!allSatisfy && !recordFilter.satisfyDouble(timestamp, aDouble))) {
+ if (isDeleted(timestamp)) {
+ continue;
+ }
+ if (!allSatisfy && !recordFilter.satisfyDouble(timestamp, aDouble)) {
+ allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
@@ -344,8 +366,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
Binary aBinary = valueDecoder.readBinary(valueBuffer);
- if (isDeleted(timestamp)
- || (!allSatisfy && !recordFilter.satisfyBinary(timestamp, aBinary))) {
+ if (isDeleted(timestamp)) {
+ continue;
+ }
+ if (!allSatisfy && !recordFilter.satisfyBinary(timestamp, aBinary)) {
+ allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
@@ -365,6 +390,9 @@ public TsBlock getAllSatisfiedData() throws IOException {
default:
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
+ if (filterRowsRecorder != null && allFilteredRows > 0) {
+ filterRowsRecorder.accept(allFilteredRows);
+ }
return builder.build();
}