Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async-query-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {
id 'jacoco'
id 'antlr'
id 'com.diffplug.spotless' version '6.22.0'
id 'com.github.johnrengelman.shadow'
id 'com.gradleup.shadow'
}

repositories {
Expand Down
13 changes: 7 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ allprojects {
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:5.4.4"
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5:5.3.4"
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5-h2:5.3.4"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:2.18.2"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:2.18.2"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:2.18.2"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.2"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.2"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.2"
resolutionStrategy.force "com.fasterxml.jackson:jackson-bom:2.18.6"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:2.18.6"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:2.18.6"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:2.18.6"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.6"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.6"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.6"
resolutionStrategy.force 'com.google.protobuf:protobuf-java:3.25.5'
resolutionStrategy.force 'org.locationtech.jts:jts-core:1.19.0'
resolutionStrategy.force 'com.google.errorprone:error_prone_annotations:2.28.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.storage.TableScanOperator;

/**
* A plan node which blocks issuing a request in {@link #open} and getting results in {@link
Expand Down Expand Up @@ -51,4 +52,22 @@ public ExecutionEngine.Schema schema() {
public void open() {
// no-op, no search should be invoked.
}

/**
* Force cleanup of server-side resources. When a cursor is explicitly closed, any underlying
* table scan must release its resources (e.g. PIT) unconditionally, even if pagination is not
* complete.
*/
@Override
public void close() {
forceCloseChildren(input);
}

private void forceCloseChildren(PhysicalPlan node) {
if (node instanceof TableScanOperator) {
((TableScanOperator) node).forceClose();
} else {
node.getChild().forEach(this::forceCloseChildren);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ public List<PhysicalPlan> getChild() {
return Collections.emptyList();
}

/**
* Force cleanup of server-side resources (e.g. PIT, scroll) regardless of pagination state. Used
* when the client explicitly closes a cursor mid-pagination. Default implementation delegates to
* {@link #close()}.
*/
public void forceClose() {
close();
}

/**
* Explain the execution plan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.storage.TableScanOperator;

@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
public class CursorCloseOperatorTest {
Expand All @@ -36,11 +37,23 @@ public void open_is_not_propagated() {
}

@Test
public void close_is_propagated() {
var child = mock(PhysicalPlan.class);
public void close_calls_forceClose_on_table_scan() {
var child = mock(TableScanOperator.class);
var plan = new CursorCloseOperator(child);
plan.close();
verify(child).close();
verify(child).forceClose();
verify(child, never()).close();
}

@Test
public void close_traverses_tree_to_find_table_scan() {
var scan = mock(TableScanOperator.class);
// Wrap the scan in a regular PhysicalPlan node
var middle = mock(PhysicalPlan.class);
org.mockito.Mockito.when(middle.getChild()).thenReturn(java.util.List.of(scan));
var plan = new CursorCloseOperator(middle);
plan.close();
verify(scan).forceClose();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchParseException;
import org.opensearch.common.Numbers;
import org.opensearch.common.geo.GeoPoint;
Expand All @@ -25,7 +27,7 @@
/** The Implementation of Content to represent {@link JsonNode}. */
@RequiredArgsConstructor
public class OpenSearchJsonContent implements Content {

private static final Logger LOG = LogManager.getLogger();
private final JsonNode value;

@Override
Expand Down Expand Up @@ -156,6 +158,9 @@ public Pair<Double, Double> geoValue() {
GeoUtils.parseGeoPoint(parser, point, true);
return Pair.of(point.getLat(), point.getLon());
} catch (IOException ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error parsing geo point '{}'", value);
}
throw new OpenSearchParseException("error parsing geo point", ex);
}
}
Expand All @@ -175,7 +180,11 @@ private long parseLongValue(JsonNode node) {
}
return Numbers.toLong(node.textValue(), true);
} else {
throw new OpenSearchParseException("node must be a number");
if (LOG.isDebugEnabled()) {
LOG.debug("node '{}' must be a number", node);
}
throw new OpenSearchParseException(
String.format("node must be a number, found %s", node.getNodeType()));
}
}

Expand All @@ -189,7 +198,11 @@ private double parseDoubleValue(JsonNode node) {
}
return Double.parseDouble(node.textValue());
} else {
throw new OpenSearchParseException("node must be a number");
if (LOG.isDebugEnabled()) {
LOG.debug("node '{}' must be a number", node);
}
throw new OpenSearchParseException(
String.format("node must be a number, found %s", node.getNodeType()));
}
}

Expand All @@ -200,7 +213,11 @@ private boolean parseBooleanValue(JsonNode node) {
} else if (node.isTextual()) {
return Boolean.parseBoolean(node.textValue());
} else {
throw new OpenSearchParseException("node must be a boolean");
if (LOG.isDebugEnabled()) {
LOG.debug("node '{}' must be a boolean", node);
}
throw new OpenSearchParseException(
String.format("node must be a boolean, found %s", node.getNodeType()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public ExplainResponseNode visitTableScan(
listener.onResponse(openSearchExplain.apply(plan));
} catch (Exception e) {
listener.onFailure(e);
} finally {
plan.close();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public class OpenSearchIndexScan extends TableScanOperator implements Serializab
/** Number of rows returned. */
private Integer queryCount;

/**
* Whether the cursor (including PIT) has been serialized for a subsequent page request. When
* true, {@link #close()} must preserve the PIT because a future request will resume from it.
*/
private boolean cursorSerialized = false;

/** Search response for current batch. */
private Iterator<ExprValue> iterator;

Expand Down Expand Up @@ -110,7 +116,26 @@ private void fetchNextBatch() {
public void close() {
super.close();

client.cleanup(request);
if (request.hasAnotherBatch() && cursorSerialized) {
// PIT has been serialized into a cursor for the next page request.
// Only clean up in-memory state; the PIT must survive for the next request.
client.cleanup(request);
} else {
// No more pages, or query failed/aborted before cursor was serialized.
// Force delete the PIT to prevent leaking.
client.forceCleanup(request);
}
}

/**
* Force cleanup of server-side resources (PIT) regardless of pagination state. Used by {@link
* org.opensearch.sql.planner.physical.CursorCloseOperator} when the client explicitly closes a
* cursor mid-pagination.
*/
@Override
public void forceClose() {
super.close();
client.forceCleanup(request);
}

@Override
Expand Down Expand Up @@ -176,5 +201,8 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.write(reqAsBytes, 0, reqOut.size());

out.writeInt(maxResponseSize);

// Mark that the PIT has been serialized into a cursor, so close() preserves it.
cursorSerialized = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void query_empty_result() {
indexScan.open();
assertFalse(indexScan.hasNext());
}
verify(client).cleanup(any());
verify(client).forceCleanup(any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void query_empty_result() {
indexScan.open();
assertFalse(indexScan.hasNext());
}
verify(client).cleanup(any());
verify(client).forceCleanup(any());
}

@Test
Expand All @@ -249,7 +249,7 @@ void query_all_results_with_query() {
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
() -> assertFalse(indexScan.hasNext()));
}
verify(client).cleanup(any());
verify(client).forceCleanup(any());
}

static final OpenSearchRequest.IndexName EMPLOYEES_INDEX =
Expand Down Expand Up @@ -277,7 +277,7 @@ void query_all_results_with_scroll() {
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
() -> assertFalse(indexScan.hasNext()));
}
verify(client).cleanup(any());
verify(client).forceCleanup(any());
}

@Test
Expand All @@ -304,7 +304,7 @@ void query_some_results_with_query() {
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
() -> assertFalse(indexScan.hasNext()));
}
verify(client).cleanup(any());
verify(client).forceCleanup(any());
}

@Test
Expand All @@ -327,7 +327,50 @@ void query_some_results_with_scroll() {
() -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()),
() -> assertFalse(indexScan.hasNext()));
}
verify(client).cleanup(any());
verify(client).forceCleanup(any());
}

/**
* When close() is called mid-pagination without cursor serialization (e.g., query failed or
* aborted), the PIT should be force-deleted to prevent leaking.
*/
@Test
void close_mid_pagination_without_cursor_serialized_should_force_cleanup() {
var request = mock(OpenSearchRequest.class);
when(request.hasAnotherBatch()).thenReturn(true);
var indexScan = new OpenSearchIndexScan(client, request);
indexScan.close();
verify(client).forceCleanup(request);
verify(client, never()).cleanup(any());
}

/**
* When close() is called after cursor has been serialized (normal pagination), the PIT should be
* preserved via cleanup() (in-memory only, no PIT deletion).
*/
@Test
@SneakyThrows
void close_mid_pagination_with_cursor_serialized_should_cleanup() {
var request = mock(OpenSearchRequest.class);
when(request.hasAnotherBatch()).thenReturn(true);
var indexScan = new OpenSearchIndexScan(client, request);

// Simulate successful cursor serialization by calling writeExternal
var out = mock(ObjectOutput.class);
indexScan.writeExternal(out);

indexScan.close();
verify(client).cleanup(request);
verify(client, never()).forceCleanup(any());
}

/** forceClose() should always force-delete the PIT regardless of pagination state. */
@Test
void forceClose_should_always_force_cleanup() {
var request = mock(OpenSearchRequest.class);
var indexScan = new OpenSearchIndexScan(client, request);
indexScan.forceClose();
verify(client).forceCleanup(request);
}

static void mockTwoPageResponse(OpenSearchClient client) {
Expand Down Expand Up @@ -360,7 +403,7 @@ void query_results_limited_by_query_size() {
() -> assertEquals(employee(2, "Smith", "HR"), indexScan.next()),
() -> assertFalse(indexScan.hasNext()));
}
verify(client).cleanup(any());
verify(client).forceCleanup(any());
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ plugins {
id 'jacoco'
id 'opensearch.opensearchplugin'
id 'com.diffplug.spotless' version '6.22.0'
id 'com.gradleup.shadow' version "8.3.6"
id 'com.gradleup.shadow'
}

apply plugin: 'opensearch.pluginzip'
Expand Down
Loading