diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index fe4cb177bb..b9c092db89 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -14,7 +14,7 @@ from haystack.document_stores.types import DuplicatePolicy from haystack.utils.auth import Secret from haystack.utils.misc import _normalize_metadata_field_name -from opensearchpy import AsyncHttpConnection, AsyncOpenSearch, OpenSearch +from opensearchpy import AsyncHttpConnection, AsyncOpenSearch, OpenSearch, TransportError from opensearchpy.helpers import async_bulk, bulk from haystack_integrations.document_stores.opensearch.auth import AsyncAWSAuth, AWSAuth @@ -979,7 +979,27 @@ def _bm25_retrieval( all_terms_must_match=all_terms_must_match, custom_query=custom_query, ) - documents = self._search_documents(search_params) + try: + documents = self._search_documents(search_params) + except TransportError as e: + if "too_many_clauses" in f"{e.info} {e.error}" and fuzziness not in (0, "0") and custom_query is None: + logger.warning( + "BM25 query with fuzziness='{fuzziness}' exceeded OpenSearch's clause limit. " + "Retrying with fuzziness=0 (exact matching). Consider reducing query length or " + "setting fuzziness=0 explicitly if this occurs frequently.", + fuzziness=fuzziness, + ) + search_params = self._prepare_bm25_search_request( + query=query, + filters=filters, + fuzziness=0, + top_k=top_k, + all_terms_must_match=all_terms_must_match, + custom_query=custom_query, + ) + documents = self._search_documents(search_params) + else: + raise OpenSearchDocumentStore._postprocess_bm25_search_results(results=documents, scale_score=scale_score) return documents @@ -1019,7 +1039,27 @@ async def _bm25_retrieval_async( all_terms_must_match=all_terms_must_match, custom_query=custom_query, ) - documents = await self._search_documents_async(search_params) + try: + documents = await self._search_documents_async(search_params) + except TransportError as e: + if "too_many_clauses" in f"{e.info} {e.error}" and fuzziness not in (0, "0") and custom_query is None: + logger.warning( + "BM25 query with fuzziness='{fuzziness}' exceeded OpenSearch's clause limit. " + "Retrying with fuzziness=0 (exact matching). Consider reducing query length or " + "setting fuzziness=0 explicitly if this occurs frequently.", + fuzziness=fuzziness, + ) + search_params = self._prepare_bm25_search_request( + query=query, + filters=filters, + fuzziness=0, + top_k=top_k, + all_terms_must_match=all_terms_must_match, + custom_query=custom_query, + ) + documents = await self._search_documents_async(search_params) + else: + raise OpenSearchDocumentStore._postprocess_bm25_search_results(results=documents, scale_score=scale_score) return documents diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index 37a83884ac..28a8f277f9 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 import random -from unittest.mock import patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from haystack.dataclasses.document import Document @@ -17,7 +17,7 @@ GetMetadataFieldsInfoTest, GetMetadataFieldUniqueValuesTest, ) -from opensearchpy.exceptions import RequestError +from opensearchpy.exceptions import RequestError, TransportError from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES @@ -224,6 +224,155 @@ def test_routing_in_delete(mock_bulk, _mock_opensearch_client): assert "_routing" not in actions[2] +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +def test_bm25_retrieval_retries_with_fuzziness_zero_on_too_many_clauses(_mock_opensearch_client, caplog): + store = OpenSearchDocumentStore(hosts="testhost") + store._client = MagicMock() + + too_many_clauses_error = TransportError( + 500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024" + ) + store._client.search.side_effect = [ + too_many_clauses_error, + {"hits": {"hits": []}}, + ] + + results = store._bm25_retrieval("a very long query", fuzziness="AUTO") + + assert results == [] + assert store._client.search.call_count == 2 + # Verify the retry used fuzziness=0 + second_call_body = store._client.search.call_args_list[1].kwargs["body"] + assert second_call_body["query"]["bool"]["must"][0]["multi_match"]["fuzziness"] == 0 + assert "Retrying with fuzziness=0" in caplog.text + + +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +def test_bm25_retrieval_no_retry_when_fuzziness_already_zero(_mock_opensearch_client): + store = OpenSearchDocumentStore(hosts="testhost") + store._client = MagicMock() + + too_many_clauses_error = TransportError( + 500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024" + ) + store._client.search.side_effect = too_many_clauses_error + + with pytest.raises(TransportError): + store._bm25_retrieval("a very long query", fuzziness=0) + + assert store._client.search.call_count == 1 + + +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +def test_bm25_retrieval_no_retry_with_custom_query(_mock_opensearch_client): + store = OpenSearchDocumentStore(hosts="testhost") + store._client = MagicMock() + + too_many_clauses_error = TransportError( + 500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024" + ) + store._client.search.side_effect = too_many_clauses_error + + custom_query = {"query": {"match": {"content": "$query"}}} + with pytest.raises(TransportError): + store._bm25_retrieval("a very long query", fuzziness="AUTO", custom_query=custom_query) + + assert store._client.search.call_count == 1 + + +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +def test_bm25_retrieval_reraises_other_transport_errors(_mock_opensearch_client): + store = OpenSearchDocumentStore(hosts="testhost") + store._client = MagicMock() + + other_error = TransportError(500, "parsing_exception", {"error": {"reason": "some other error"}}) + store._client.search.side_effect = other_error + + with pytest.raises(TransportError): + store._bm25_retrieval("some query", fuzziness="AUTO") + + assert store._client.search.call_count == 1 + + +@pytest.mark.asyncio +@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch") +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +async def test_bm25_retrieval_async_retries_with_fuzziness_zero_on_too_many_clauses( + _mock_opensearch_client, _mock_async_client, caplog +): + store = OpenSearchDocumentStore(hosts="testhost") + store._async_client = AsyncMock() + + too_many_clauses_error = TransportError( + 500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024" + ) + store._async_client.search.side_effect = [ + too_many_clauses_error, + {"hits": {"hits": []}}, + ] + + results = await store._bm25_retrieval_async("a very long query", fuzziness="AUTO") + + assert results == [] + assert store._async_client.search.call_count == 2 + second_call_body = store._async_client.search.call_args_list[1].kwargs["body"] + assert second_call_body["query"]["bool"]["must"][0]["multi_match"]["fuzziness"] == 0 + assert "Retrying with fuzziness=0" in caplog.text + + +@pytest.mark.asyncio +@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch") +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +async def test_bm25_retrieval_async_no_retry_when_fuzziness_already_zero(_mock_opensearch_client, _mock_async_client): + store = OpenSearchDocumentStore(hosts="testhost") + store._async_client = AsyncMock() + + too_many_clauses_error = TransportError( + 500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024" + ) + store._async_client.search.side_effect = too_many_clauses_error + + with pytest.raises(TransportError): + await store._bm25_retrieval_async("a very long query", fuzziness=0) + + assert store._async_client.search.call_count == 1 + + +@pytest.mark.asyncio +@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch") +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +async def test_bm25_retrieval_async_no_retry_with_custom_query(_mock_opensearch_client, _mock_async_client): + store = OpenSearchDocumentStore(hosts="testhost") + store._async_client = AsyncMock() + + too_many_clauses_error = TransportError( + 500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024" + ) + store._async_client.search.side_effect = too_many_clauses_error + + custom_query = {"query": {"match": {"content": "$query"}}} + with pytest.raises(TransportError): + await store._bm25_retrieval_async("a very long query", fuzziness="AUTO", custom_query=custom_query) + + assert store._async_client.search.call_count == 1 + + +@pytest.mark.asyncio +@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch") +@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch") +async def test_bm25_retrieval_async_reraises_other_transport_errors(_mock_opensearch_client, _mock_async_client): + store = OpenSearchDocumentStore(hosts="testhost") + store._async_client = AsyncMock() + + other_error = TransportError(500, "parsing_exception", {"error": {"reason": "some other error"}}) + store._async_client.search.side_effect = other_error + + with pytest.raises(TransportError): + await store._bm25_retrieval_async("some query", fuzziness="AUTO") + + assert store._async_client.search.call_count == 1 + + @pytest.mark.integration class TestDocumentStore( CountDocumentsByFilterTest, @@ -333,6 +482,33 @@ def test_bm25_retrieval_with_fuzziness( assert "functional" in res[1].content assert "functional" in res[2].content + def test_bm25_retrieval_with_fuzziness_overflow(self, document_store: OpenSearchDocumentStore, caplog): + """ + Test that a long query with fuzziness="AUTO" that exceeds OpenSearch's maxClauseCount + is automatically retried with fuzziness=0 instead of raising an error. + """ + # Build an index vocabulary of similar 5-character words. With fuzziness="AUTO", + # 5-char words get edit distance 1, so each query term fuzzy-matches many similar + # indexed terms, causing clause expansion beyond the default maxClauseCount (1024). + # With fuzziness=0, each term produces exactly 1 clause, staying well under the limit. + words = [f"foo{chr(97 + i)}{chr(97 + j)}" for i in range(20) for j in range(26)] # 520 words + + chunk_size = 52 + docs = [ + Document(content=" ".join(words[i : i + chunk_size]), id=str(idx)) + for idx, i in enumerate(range(0, len(words), chunk_size)) + ] + document_store.write_documents(docs) + + # Query with a subset of words. With fuzziness="AUTO", each 5-char term expands + # to match ~45 similar indexed terms, pushing total clauses well above 1024. + long_query = " ".join(words[:100]) + + # This should not raise: the too_many_clauses error is caught and retried with fuzziness=0 + res = document_store._bm25_retrieval(long_query, top_k=3, fuzziness="AUTO") + assert isinstance(res, list) + assert "Retrying with fuzziness=0" in caplog.text + def test_bm25_retrieval_with_filters(self, document_store: OpenSearchDocumentStore, test_documents: list[Document]): document_store.write_documents(test_documents) res = document_store._bm25_retrieval( diff --git a/integrations/opensearch/tests/test_document_store_async.py b/integrations/opensearch/tests/test_document_store_async.py index c72927d9b3..c3df531a63 100644 --- a/integrations/opensearch/tests/test_document_store_async.py +++ b/integrations/opensearch/tests/test_document_store_async.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 + import pytest from haystack.dataclasses import Document from haystack.document_stores.errors import DocumentStoreError @@ -60,6 +61,53 @@ async def test_bm25_retrieval_all_terms_must_match_false( assert len(res) == 5 assert all("functional" in doc.content for doc in res) + @pytest.mark.asyncio + async def test_bm25_retrieval_with_fuzziness( + self, document_store: OpenSearchDocumentStore, test_documents: list[Document] + ): + document_store.write_documents(test_documents) + + query_with_typo = "functinal" + # Query without fuzziness to search for the exact match + res = await document_store._bm25_retrieval_async(query_with_typo, top_k=3, fuzziness="0") + # Nothing is found as the query contains a typo + assert res == [] + + # Query with fuzziness with the same query + res = await document_store._bm25_retrieval_async(query_with_typo, top_k=3, fuzziness="1") + assert len(res) == 3 + assert "functional" in res[0].content + assert "functional" in res[1].content + assert "functional" in res[2].content + + @pytest.mark.asyncio + async def test_bm25_retrieval_with_fuzziness_overflow(self, document_store: OpenSearchDocumentStore, caplog): + """ + Test that a long query with fuzziness="AUTO" that exceeds OpenSearch's maxClauseCount + is automatically retried with fuzziness=0 instead of raising an error. + """ + # Build an index vocabulary of similar 5-character words. With fuzziness="AUTO", + # 5-char words get edit distance 1, so each query term fuzzy-matches many similar + # indexed terms, causing clause expansion beyond the default maxClauseCount (1024). + # With fuzziness=0, each term produces exactly 1 clause, staying well under the limit. + words = [f"foo{chr(97 + i)}{chr(97 + j)}" for i in range(20) for j in range(26)] # 520 words + + chunk_size = 52 + docs = [ + Document(content=" ".join(words[i : i + chunk_size]), id=str(idx)) + for idx, i in enumerate(range(0, len(words), chunk_size)) + ] + document_store.write_documents(docs) + + # Query with a subset of words. With fuzziness="AUTO", each 5-char term expands + # to match ~45 similar indexed terms, pushing total clauses well above 1024. + long_query = " ".join(words[:100]) + + # This should not raise: the too_many_clauses error is caught and retried with fuzziness=0 + res = await document_store._bm25_retrieval_async(long_query, top_k=3, fuzziness="AUTO") + assert isinstance(res, list) + assert "Retrying with fuzziness=0" in caplog.text + @pytest.mark.asyncio async def test_bm25_retrieval_with_filters( self, document_store: OpenSearchDocumentStore, test_documents: list[Document]