Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.tsfile.read.reader.series.PaginationController;

import java.util.Arrays;
import java.util.function.LongConsumer;

public class TsBlockUtil {

Expand Down Expand Up @@ -75,10 +76,32 @@ public static TsBlock applyFilterAndLimitOffsetToTsBlock(
TsBlockBuilder builder,
Filter pushDownFilter,
PaginationController paginationController) {
return applyFilterAndLimitOffsetToTsBlock(
unFilteredBlock, builder, pushDownFilter, paginationController, null);
}

public static TsBlock applyFilterAndLimitOffsetToTsBlock(
TsBlock unFilteredBlock,
TsBlockBuilder builder,
Filter pushDownFilter,
PaginationController paginationController,
LongConsumer filterRowsRecorder) {

boolean[] selection = new boolean[unFilteredBlock.getPositionCount()];
Arrays.fill(selection, true);
boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(selection, unFilteredBlock);
boolean[] keepCurrentRow =
filterRowsRecorder == null
? pushDownFilter.satisfyTsBlock(selection, unFilteredBlock)
: pushDownFilter.satisfyTsBlock(selection, unFilteredBlock, filterRowsRecorder);

return buildFilteredTsBlock(unFilteredBlock, builder, keepCurrentRow, paginationController);
}

private static TsBlock buildFilteredTsBlock(
TsBlock unFilteredBlock,
TsBlockBuilder builder,
boolean[] keepCurrentRow,
PaginationController paginationController) {
// construct time column
int readEndIndex =
buildTimeColumnWithPagination(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.LongConsumer;

/**
* A Filter is an executable expression tree describing the criteria for which records to keep when
Expand Down Expand Up @@ -117,6 +118,26 @@ public abstract class Filter {
*/
public abstract boolean[] satisfyTsBlock(boolean[] selection, TsBlock tsBlock);

public final boolean[] satisfyTsBlock(
boolean[] selection, TsBlock tsBlock, LongConsumer filterRowsRecorder) {

int inputCount = countSelectedRows(selection);
boolean[] result = satisfyTsBlock(selection, tsBlock);
int outputCount = countSelectedRows(result);
if (inputCount > outputCount) {
filterRowsRecorder.accept((inputCount - outputCount));
}

return result;
}

private static int countSelectedRows(boolean[] selection) {
if (selection == null) return 0;
int count = 0;
for (boolean b : selection) count += b ? 1 : 0;
return count;
}

/**
* To examine whether the block can be skipped.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.List;
import java.util.function.LongConsumer;

public interface IPageReader extends IMetadata {

Expand All @@ -39,6 +40,18 @@ default BatchData getAllSatisfiedPageData() throws IOException {

TsBlock getAllSatisfiedData() throws IOException;

/**
* Reads all rows from the page that satisfy the current filter, while also recording the number
* of rows filtered out.
*
* <p>This method behaves identically to {@link #getAllSatisfiedData()}, with the addition that it
* reports the count of rows that were discarded by the filter to the provided {@code
* filterRowsRecorder}.
*
* @param filterRowsRecorder, receives the number of rows filtered out;
*/
TsBlock getAllSatisfiedData(LongConsumer filterRowsRecorder) throws IOException;

void addRecordFilter(Filter filter);

// The 'modified' property is also true when a data type need to be modified in query and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.LongConsumer;

public abstract class AbstractAlignedChunkReader extends AbstractChunkReader {
// chunk header of the time column
Expand All @@ -59,9 +60,13 @@ public abstract class AbstractAlignedChunkReader extends AbstractChunkReader {

@SuppressWarnings("unchecked")
AbstractAlignedChunkReader(
Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter queryFilter)
Chunk timeChunk,
List<Chunk> valueChunkList,
long readStopTime,
Filter queryFilter,
LongConsumer filterRowsRecorder)
throws IOException {
super(readStopTime, queryFilter);
super(readStopTime, queryFilter, filterRowsRecorder);
this.timeChunkHeader = timeChunk.getHeader();
this.timeChunkDataBuffer = timeChunk.getData();
this.timeDeleteIntervalList = timeChunk.getDeleteIntervalList();
Expand Down Expand Up @@ -163,8 +168,14 @@ protected boolean isEarlierThanReadStopTime(final PageHeader timePageHeader) {
}

protected boolean pageCanSkip(PageHeader pageHeader) {
return queryFilter != null
&& !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime());
if (queryFilter != null
&& !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime())) {
if (filterRowsRecorder != null) {
this.filterRowsRecorder.accept(pageHeader.getStatistics().getCount());
}
return true;
}
return false;
}

private void skipCurrentPage(PageHeader timePageHeader, List<PageHeader> valuePageHeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.function.LongConsumer;

public abstract class AbstractChunkReader implements IChunkReader {

Expand All @@ -44,11 +45,14 @@ public abstract class AbstractChunkReader implements IChunkReader {
// any filter, no matter value filter or time filter
protected final Filter queryFilter;

protected LongConsumer filterRowsRecorder;

protected final List<IPageReader> pageReaderList = new LinkedList<>();

protected AbstractChunkReader(long readStopTime, Filter filter) {
protected AbstractChunkReader(long readStopTime, Filter filter, LongConsumer filterRowsRecorder) {
this.readStopTime = readStopTime;
this.queryFilter = filter;
this.filterRowsRecorder = filterRowsRecorder;
}

/** judge if has next page whose page header satisfies the filter. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,36 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.LongConsumer;

public class AlignedChunkReader extends AbstractAlignedChunkReader {

public AlignedChunkReader(
Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter queryFilter)
Chunk timeChunk,
List<Chunk> valueChunkList,
long readStopTime,
Filter queryFilter,
LongConsumer filterRowsRecorder)
throws IOException {
super(timeChunk, valueChunkList, readStopTime, queryFilter);
super(timeChunk, valueChunkList, readStopTime, queryFilter, filterRowsRecorder);
}

public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList) throws IOException {
this(timeChunk, valueChunkList, Long.MIN_VALUE, null);
this(timeChunk, valueChunkList, Long.MIN_VALUE, null, null);
}

public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter queryFilter)
throws IOException {
this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter);
this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, null);
}

public AlignedChunkReader(
Chunk timeChunk,
List<Chunk> valueChunkList,
Filter queryFilter,
LongConsumer filterRowsRecorder)
throws IOException {
this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, filterRowsRecorder);
}

/**
Expand All @@ -56,7 +70,7 @@ public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter qu
*/
public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime)
throws IOException {
this(timeChunk, valueChunkList, readStopTime, null);
this(timeChunk, valueChunkList, readStopTime, null, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.LongConsumer;

public class ChunkReader extends AbstractChunkReader {

Expand All @@ -47,8 +48,9 @@ public class ChunkReader extends AbstractChunkReader {
private final EncryptParameter encryptParam;

@SuppressWarnings("unchecked")
public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) {
super(readStopTime, queryFilter);
public ChunkReader(
Chunk chunk, long readStopTime, Filter queryFilter, LongConsumer filterRowsRecorder) {
super(readStopTime, queryFilter, filterRowsRecorder);
this.chunkHeader = chunk.getHeader();
this.chunkDataBuffer = chunk.getData();
this.deleteIntervalList = chunk.getDeleteIntervalList();
Expand All @@ -57,19 +59,23 @@ public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) {
}

public ChunkReader(Chunk chunk) throws IOException {
this(chunk, Long.MIN_VALUE, null);
this(chunk, Long.MIN_VALUE, null, null);
}

public ChunkReader(Chunk chunk, Filter queryFilter) {
this(chunk, Long.MIN_VALUE, queryFilter);
this(chunk, Long.MIN_VALUE, queryFilter, null);
}

public ChunkReader(Chunk chunk, Filter queryFilter, LongConsumer filterRowsRecorder) {
this(chunk, Long.MIN_VALUE, queryFilter, filterRowsRecorder);
}

/**
* Constructor of ChunkReader by timestamp. This constructor is used to accelerate queries by
* filtering out pages whose endTime is less than current timestamp.
*/
public ChunkReader(Chunk chunk, long readStopTime) {
this(chunk, readStopTime, null);
this(chunk, readStopTime, null, null);
}

private void initAllPageReaders(Statistics<? extends Serializable> chunkStatistic) {
Expand Down Expand Up @@ -99,8 +105,14 @@ private void initAllPageReaders(Statistics<? extends Serializable> chunkStatisti
}

private boolean pageCanSkip(PageHeader pageHeader) {
return queryFilter != null
&& !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime());
if (queryFilter != null
&& !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime())) {
if (filterRowsRecorder != null) {
this.filterRowsRecorder.accept(pageHeader.getStatistics().getCount());
}
return true;
}
return false;
}

protected boolean pageDeleted(PageHeader pageHeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,34 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.LongConsumer;

// difference with AlignedChunkReader is that TableChunkReader works for TableScan and keep all null
// rows
public class TableChunkReader extends AbstractAlignedChunkReader {

public TableChunkReader(
Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter queryFilter)
Chunk timeChunk,
List<Chunk> valueChunkList,
long readStopTime,
Filter queryFilter,
LongConsumer filterRowsRecorder)
throws IOException {
super(timeChunk, valueChunkList, readStopTime, queryFilter);
super(timeChunk, valueChunkList, readStopTime, queryFilter, filterRowsRecorder);
}

public TableChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter queryFilter)
throws IOException {
this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter);
this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, null);
}

public TableChunkReader(
Chunk timeChunk,
List<Chunk> valueChunkList,
Filter queryFilter,
LongConsumer filterRowsRecorder)
throws IOException {
this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter, filterRowsRecorder);
}

@Override
Expand Down
Loading
Loading