From 0635b9a44c5e54f72a692f6dd8c6c8143ca4ea2b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 13 Mar 2026 02:29:11 +0000 Subject: [PATCH 1/2] Fix PIT (Point in Time) resource leaks in v2 query engine (#5221) * Fix PIT (Point in Time) resource leaks in v2 query engine Signed-off-by: Lantao Jin * Fix PIT context leaks when a query without cursor/pagination fails during execution Signed-off-by: Lantao Jin * Avoid to expose sensitive document Signed-off-by: Lantao Jin --------- Signed-off-by: Lantao Jin (cherry picked from commit e11e11ebb095249105a5070785066e26ff00a691) Signed-off-by: github-actions[bot] --- .../planner/physical/CursorCloseOperator.java | 19 +++++++ .../sql/storage/TableScanOperator.java | 9 +++ .../physical/CursorCloseOperatorTest.java | 19 ++++++- .../data/utils/OpenSearchJsonContent.java | 25 +++++++-- .../executor/OpenSearchExecutionEngine.java | 2 + .../storage/scan/OpenSearchIndexScan.java | 30 +++++++++- .../OpenSearchIndexScanPaginationTest.java | 2 +- .../storage/scan/OpenSearchIndexScanTest.java | 55 +++++++++++++++++-- 8 files changed, 146 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java index 688ffa0d8d..054268867f 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java @@ -9,6 +9,7 @@ import lombok.RequiredArgsConstructor; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.storage.TableScanOperator; /** * A plan node which blocks issuing a request in {@link #open} and getting results in {@link @@ -51,4 +52,22 @@ public ExecutionEngine.Schema schema() { public void open() { // no-op, no search should be invoked. } + + /** + * Force cleanup of server-side resources. When a cursor is explicitly closed, any underlying + * table scan must release its resources (e.g. PIT) unconditionally, even if pagination is not + * complete. + */ + @Override + public void close() { + forceCloseChildren(input); + } + + private void forceCloseChildren(PhysicalPlan node) { + if (node instanceof TableScanOperator scan) { + scan.forceClose(); + } else { + node.getChild().forEach(this::forceCloseChildren); + } + } } diff --git a/core/src/main/java/org/opensearch/sql/storage/TableScanOperator.java b/core/src/main/java/org/opensearch/sql/storage/TableScanOperator.java index 130516b3ef..6b2dd17314 100644 --- a/core/src/main/java/org/opensearch/sql/storage/TableScanOperator.java +++ b/core/src/main/java/org/opensearch/sql/storage/TableScanOperator.java @@ -26,6 +26,15 @@ public List getChild() { return Collections.emptyList(); } + /** + * Force cleanup of server-side resources (e.g. PIT, scroll) regardless of pagination state. Used + * when the client explicitly closes a cursor mid-pagination. Default implementation delegates to + * {@link #close()}. + */ + public void forceClose() { + close(); + } + /** * Explain the execution plan. * diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/CursorCloseOperatorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/CursorCloseOperatorTest.java index 5ae30faa30..208137ef11 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/CursorCloseOperatorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/CursorCloseOperatorTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.DisplayNameGeneration; import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; +import org.opensearch.sql.storage.TableScanOperator; @DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) public class CursorCloseOperatorTest { @@ -36,11 +37,23 @@ public void open_is_not_propagated() { } @Test - public void close_is_propagated() { - var child = mock(PhysicalPlan.class); + public void close_calls_forceClose_on_table_scan() { + var child = mock(TableScanOperator.class); var plan = new CursorCloseOperator(child); plan.close(); - verify(child).close(); + verify(child).forceClose(); + verify(child, never()).close(); + } + + @Test + public void close_traverses_tree_to_find_table_scan() { + var scan = mock(TableScanOperator.class); + // Wrap the scan in a regular PhysicalPlan node + var middle = mock(PhysicalPlan.class); + org.mockito.Mockito.when(middle.getChild()).thenReturn(java.util.List.of(scan)); + var plan = new CursorCloseOperator(middle); + plan.close(); + verify(scan).forceClose(); } @Test diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/OpenSearchJsonContent.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/OpenSearchJsonContent.java index d79a466665..2944aae77f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/OpenSearchJsonContent.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/OpenSearchJsonContent.java @@ -13,6 +13,8 @@ import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchParseException; import org.opensearch.common.Numbers; import org.opensearch.common.geo.GeoPoint; @@ -25,7 +27,7 @@ /** The Implementation of Content to represent {@link JsonNode}. */ @RequiredArgsConstructor public class OpenSearchJsonContent implements Content { - + private static final Logger LOG = LogManager.getLogger(); private final JsonNode value; @Override @@ -156,6 +158,9 @@ public Pair geoValue() { GeoUtils.parseGeoPoint(parser, point, true); return Pair.of(point.getLat(), point.getLon()); } catch (IOException ex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Error parsing geo point '{}'", value); + } throw new OpenSearchParseException("error parsing geo point", ex); } } @@ -175,7 +180,11 @@ private long parseLongValue(JsonNode node) { } return Numbers.toLong(node.textValue(), true); } else { - throw new OpenSearchParseException("node must be a number"); + if (LOG.isDebugEnabled()) { + LOG.debug("node '{}' must be a number", node); + } + throw new OpenSearchParseException( + String.format("node must be a number, found %s", node.getNodeType())); } } @@ -189,7 +198,11 @@ private double parseDoubleValue(JsonNode node) { } return Double.parseDouble(node.textValue()); } else { - throw new OpenSearchParseException("node must be a number"); + if (LOG.isDebugEnabled()) { + LOG.debug("node '{}' must be a number", node); + } + throw new OpenSearchParseException( + String.format("node must be a number, found %s", node.getNodeType())); } } @@ -200,7 +213,11 @@ private boolean parseBooleanValue(JsonNode node) { } else if (node.isTextual()) { return Boolean.parseBoolean(node.textValue()); } else { - throw new OpenSearchParseException("node must be a boolean"); + if (LOG.isDebugEnabled()) { + LOG.debug("node '{}' must be a boolean", node); + } + throw new OpenSearchParseException( + String.format("node must be a boolean, found %s", node.getNodeType())); } } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index c591f75805..51619ed069 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -156,6 +156,8 @@ public ExplainResponseNode visitTableScan( listener.onResponse(openSearchExplain.apply(plan)); } catch (Exception e) { listener.onFailure(e); + } finally { + plan.close(); } }); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java index 9d3e4001c6..6d79c793c6 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java @@ -46,6 +46,12 @@ public class OpenSearchIndexScan extends TableScanOperator implements Serializab /** Number of rows returned. */ private Integer queryCount; + /** + * Whether the cursor (including PIT) has been serialized for a subsequent page request. When + * true, {@link #close()} must preserve the PIT because a future request will resume from it. + */ + private boolean cursorSerialized = false; + /** Search response for current batch. */ private Iterator iterator; @@ -110,7 +116,26 @@ private void fetchNextBatch() { public void close() { super.close(); - client.cleanup(request); + if (request.hasAnotherBatch() && cursorSerialized) { + // PIT has been serialized into a cursor for the next page request. + // Only clean up in-memory state; the PIT must survive for the next request. + client.cleanup(request); + } else { + // No more pages, or query failed/aborted before cursor was serialized. + // Force delete the PIT to prevent leaking. + client.forceCleanup(request); + } + } + + /** + * Force cleanup of server-side resources (PIT) regardless of pagination state. Used by {@link + * org.opensearch.sql.planner.physical.CursorCloseOperator} when the client explicitly closes a + * cursor mid-pagination. + */ + @Override + public void forceClose() { + super.close(); + client.forceCleanup(request); } @Override @@ -176,5 +201,8 @@ public void writeExternal(ObjectOutput out) throws IOException { out.write(reqAsBytes, 0, reqOut.size()); out.writeInt(maxResponseSize); + + // Mark that the PIT has been serialized into a cursor, so close() preserves it. + cursorSerialized = true; } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java index 473152a7a4..276e091db5 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java @@ -79,7 +79,7 @@ void query_empty_result() { indexScan.open(); assertFalse(indexScan.hasNext()); } - verify(client).cleanup(any()); + verify(client).forceCleanup(any()); } @Test diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index 3fb6abf7f0..89cd43f800 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -223,7 +223,7 @@ void query_empty_result() { indexScan.open(); assertFalse(indexScan.hasNext()); } - verify(client).cleanup(any()); + verify(client).forceCleanup(any()); } @Test @@ -249,7 +249,7 @@ void query_all_results_with_query() { () -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()), () -> assertFalse(indexScan.hasNext())); } - verify(client).cleanup(any()); + verify(client).forceCleanup(any()); } static final OpenSearchRequest.IndexName EMPLOYEES_INDEX = @@ -277,7 +277,7 @@ void query_all_results_with_scroll() { () -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()), () -> assertFalse(indexScan.hasNext())); } - verify(client).cleanup(any()); + verify(client).forceCleanup(any()); } @Test @@ -304,7 +304,7 @@ void query_some_results_with_query() { () -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()), () -> assertFalse(indexScan.hasNext())); } - verify(client).cleanup(any()); + verify(client).forceCleanup(any()); } @Test @@ -327,7 +327,50 @@ void query_some_results_with_scroll() { () -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()), () -> assertFalse(indexScan.hasNext())); } - verify(client).cleanup(any()); + verify(client).forceCleanup(any()); + } + + /** + * When close() is called mid-pagination without cursor serialization (e.g., query failed or + * aborted), the PIT should be force-deleted to prevent leaking. + */ + @Test + void close_mid_pagination_without_cursor_serialized_should_force_cleanup() { + var request = mock(OpenSearchRequest.class); + when(request.hasAnotherBatch()).thenReturn(true); + var indexScan = new OpenSearchIndexScan(client, request); + indexScan.close(); + verify(client).forceCleanup(request); + verify(client, never()).cleanup(any()); + } + + /** + * When close() is called after cursor has been serialized (normal pagination), the PIT should be + * preserved via cleanup() (in-memory only, no PIT deletion). + */ + @Test + @SneakyThrows + void close_mid_pagination_with_cursor_serialized_should_cleanup() { + var request = mock(OpenSearchRequest.class); + when(request.hasAnotherBatch()).thenReturn(true); + var indexScan = new OpenSearchIndexScan(client, request); + + // Simulate successful cursor serialization by calling writeExternal + var out = mock(ObjectOutput.class); + indexScan.writeExternal(out); + + indexScan.close(); + verify(client).cleanup(request); + verify(client, never()).forceCleanup(any()); + } + + /** forceClose() should always force-delete the PIT regardless of pagination state. */ + @Test + void forceClose_should_always_force_cleanup() { + var request = mock(OpenSearchRequest.class); + var indexScan = new OpenSearchIndexScan(client, request); + indexScan.forceClose(); + verify(client).forceCleanup(request); } static void mockTwoPageResponse(OpenSearchClient client) { @@ -360,7 +403,7 @@ void query_results_limited_by_query_size() { () -> assertEquals(employee(2, "Smith", "HR"), indexScan.next()), () -> assertFalse(indexScan.hasNext())); } - verify(client).cleanup(any()); + verify(client).forceCleanup(any()); } @Test From 7076ad4e6ccdf8ed5e90379016f240bd6e13aa67 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Fri, 13 Mar 2026 11:01:06 +0800 Subject: [PATCH 2/2] Resolve compile error Signed-off-by: Lantao Jin --- async-query-core/build.gradle | 2 +- build.gradle | 13 +++++++------ .../sql/planner/physical/CursorCloseOperator.java | 4 ++-- plugin/build.gradle | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/async-query-core/build.gradle b/async-query-core/build.gradle index 892b49a58a..98d7591274 100644 --- a/async-query-core/build.gradle +++ b/async-query-core/build.gradle @@ -9,7 +9,7 @@ plugins { id 'jacoco' id 'antlr' id 'com.diffplug.spotless' version '6.22.0' - id 'com.github.johnrengelman.shadow' + id 'com.gradleup.shadow' } repositories { diff --git a/build.gradle b/build.gradle index 4c32749c7a..8307fc4e5b 100644 --- a/build.gradle +++ b/build.gradle @@ -139,12 +139,13 @@ allprojects { resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:5.4.4" resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5:5.3.4" resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5-h2:5.3.4" - resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:2.18.2" - resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:2.18.2" - resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:2.18.2" - resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.2" - resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.2" - resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.2" + resolutionStrategy.force "com.fasterxml.jackson:jackson-bom:2.18.6" + resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:2.18.6" + resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:2.18.6" + resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:2.18.6" + resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.6" + resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.6" + resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.6" resolutionStrategy.force 'com.google.protobuf:protobuf-java:3.25.5' resolutionStrategy.force 'org.locationtech.jts:jts-core:1.19.0' resolutionStrategy.force 'com.google.errorprone:error_prone_annotations:2.28.0' diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java index 054268867f..c6bc1a91fa 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java @@ -64,8 +64,8 @@ public void close() { } private void forceCloseChildren(PhysicalPlan node) { - if (node instanceof TableScanOperator scan) { - scan.forceClose(); + if (node instanceof TableScanOperator) { + ((TableScanOperator) node).forceClose(); } else { node.getChild().forEach(this::forceCloseChildren); } diff --git a/plugin/build.gradle b/plugin/build.gradle index f1b62ceb90..0c3519843b 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -31,7 +31,7 @@ plugins { id 'jacoco' id 'opensearch.opensearchplugin' id 'com.diffplug.spotless' version '6.22.0' - id 'com.gradleup.shadow' version "8.3.6" + id 'com.gradleup.shadow' } apply plugin: 'opensearch.pluginzip'