Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from haystack.document_stores.types import DuplicatePolicy
from haystack.utils.auth import Secret
from haystack.utils.misc import _normalize_metadata_field_name
from opensearchpy import AsyncHttpConnection, AsyncOpenSearch, OpenSearch
from opensearchpy import AsyncHttpConnection, AsyncOpenSearch, OpenSearch, TransportError
from opensearchpy.helpers import async_bulk, bulk

from haystack_integrations.document_stores.opensearch.auth import AsyncAWSAuth, AWSAuth
Expand Down Expand Up @@ -979,7 +979,27 @@ def _bm25_retrieval(
all_terms_must_match=all_terms_must_match,
custom_query=custom_query,
)
documents = self._search_documents(search_params)
try:
documents = self._search_documents(search_params)
except TransportError as e:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the request/discussion I see a RequestErrorinstead

Is this the correct exception to catch?

Unhandled exception
RequestError: RequestError(400, 'search_phase_execution_exception', 'too_many_clauses: maxClauseCount is set to 1024')

/home/haystackd/.local/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py, line 70, _run_component

/home/haystackd/.local/lib/python3.12/site-packages/haystack_integrations/components/retrievers/opensearch/bm25_retriever.py, line 269, run

/home/haystackd/.local/lib/python3.12/site-packages/haystack_integrations/components/retrievers/opensearch/bm25_retriever.py, line 266, run

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was created when our integration still used version 2 of opensearch. My guess would be that the exception changed from version 2 to version 3. TransportError is what I get on my end with the latest OpenSearch version.

In any case, RequestError is a subclass of TransportError in opensearch-py (see the exception hierarchy). So catching TransportError also catches RequestError.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfect! thanks for the clarification 👍🏽

if "too_many_clauses" in f"{e.info} {e.error}" and fuzziness not in (0, "0") and custom_query is None:
logger.warning(
"BM25 query with fuzziness='{fuzziness}' exceeded OpenSearch's clause limit. "
"Retrying with fuzziness=0 (exact matching). Consider reducing query length or "
"setting fuzziness=0 explicitly if this occurs frequently.",
fuzziness=fuzziness,
)
search_params = self._prepare_bm25_search_request(
query=query,
filters=filters,
fuzziness=0,
top_k=top_k,
all_terms_must_match=all_terms_must_match,
custom_query=custom_query,
)
documents = self._search_documents(search_params)
else:
raise
OpenSearchDocumentStore._postprocess_bm25_search_results(results=documents, scale_score=scale_score)
return documents

Expand Down Expand Up @@ -1019,7 +1039,27 @@ async def _bm25_retrieval_async(
all_terms_must_match=all_terms_must_match,
custom_query=custom_query,
)
documents = await self._search_documents_async(search_params)
try:
documents = await self._search_documents_async(search_params)
except TransportError as e:
if "too_many_clauses" in f"{e.info} {e.error}" and fuzziness not in (0, "0") and custom_query is None:
logger.warning(
"BM25 query with fuzziness='{fuzziness}' exceeded OpenSearch's clause limit. "
"Retrying with fuzziness=0 (exact matching). Consider reducing query length or "
"setting fuzziness=0 explicitly if this occurs frequently.",
fuzziness=fuzziness,
)
search_params = self._prepare_bm25_search_request(
query=query,
filters=filters,
fuzziness=0,
top_k=top_k,
all_terms_must_match=all_terms_must_match,
custom_query=custom_query,
)
documents = await self._search_documents_async(search_params)
else:
raise
OpenSearchDocumentStore._postprocess_bm25_search_results(results=documents, scale_score=scale_score)
return documents

Expand Down
180 changes: 178 additions & 2 deletions integrations/opensearch/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# SPDX-License-Identifier: Apache-2.0

import random
from unittest.mock import patch
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from haystack.dataclasses.document import Document
Expand All @@ -17,7 +17,7 @@
GetMetadataFieldsInfoTest,
GetMetadataFieldUniqueValuesTest,
)
from opensearchpy.exceptions import RequestError
from opensearchpy.exceptions import RequestError, TransportError

from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES
Expand Down Expand Up @@ -224,6 +224,155 @@ def test_routing_in_delete(mock_bulk, _mock_opensearch_client):
assert "_routing" not in actions[2]


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
def test_bm25_retrieval_retries_with_fuzziness_zero_on_too_many_clauses(_mock_opensearch_client, caplog):
store = OpenSearchDocumentStore(hosts="testhost")
store._client = MagicMock()

too_many_clauses_error = TransportError(
500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024"
)
store._client.search.side_effect = [
too_many_clauses_error,
{"hits": {"hits": []}},
]

results = store._bm25_retrieval("a very long query", fuzziness="AUTO")

assert results == []
assert store._client.search.call_count == 2
# Verify the retry used fuzziness=0
second_call_body = store._client.search.call_args_list[1].kwargs["body"]
assert second_call_body["query"]["bool"]["must"][0]["multi_match"]["fuzziness"] == 0
assert "Retrying with fuzziness=0" in caplog.text


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
def test_bm25_retrieval_no_retry_when_fuzziness_already_zero(_mock_opensearch_client):
store = OpenSearchDocumentStore(hosts="testhost")
store._client = MagicMock()

too_many_clauses_error = TransportError(
500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024"
)
store._client.search.side_effect = too_many_clauses_error

with pytest.raises(TransportError):
store._bm25_retrieval("a very long query", fuzziness=0)

assert store._client.search.call_count == 1


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
def test_bm25_retrieval_no_retry_with_custom_query(_mock_opensearch_client):
store = OpenSearchDocumentStore(hosts="testhost")
store._client = MagicMock()

too_many_clauses_error = TransportError(
500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024"
)
store._client.search.side_effect = too_many_clauses_error

custom_query = {"query": {"match": {"content": "$query"}}}
with pytest.raises(TransportError):
store._bm25_retrieval("a very long query", fuzziness="AUTO", custom_query=custom_query)

assert store._client.search.call_count == 1


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
def test_bm25_retrieval_reraises_other_transport_errors(_mock_opensearch_client):
store = OpenSearchDocumentStore(hosts="testhost")
store._client = MagicMock()

other_error = TransportError(500, "parsing_exception", {"error": {"reason": "some other error"}})
store._client.search.side_effect = other_error

with pytest.raises(TransportError):
store._bm25_retrieval("some query", fuzziness="AUTO")

assert store._client.search.call_count == 1


@pytest.mark.asyncio
@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch")
@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
async def test_bm25_retrieval_async_retries_with_fuzziness_zero_on_too_many_clauses(
_mock_opensearch_client, _mock_async_client, caplog
):
store = OpenSearchDocumentStore(hosts="testhost")
store._async_client = AsyncMock()

too_many_clauses_error = TransportError(
500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024"
)
store._async_client.search.side_effect = [
too_many_clauses_error,
{"hits": {"hits": []}},
]

results = await store._bm25_retrieval_async("a very long query", fuzziness="AUTO")

assert results == []
assert store._async_client.search.call_count == 2
second_call_body = store._async_client.search.call_args_list[1].kwargs["body"]
assert second_call_body["query"]["bool"]["must"][0]["multi_match"]["fuzziness"] == 0
assert "Retrying with fuzziness=0" in caplog.text


@pytest.mark.asyncio
@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch")
@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
async def test_bm25_retrieval_async_no_retry_when_fuzziness_already_zero(_mock_opensearch_client, _mock_async_client):
store = OpenSearchDocumentStore(hosts="testhost")
store._async_client = AsyncMock()

too_many_clauses_error = TransportError(
500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024"
)
store._async_client.search.side_effect = too_many_clauses_error

with pytest.raises(TransportError):
await store._bm25_retrieval_async("a very long query", fuzziness=0)

assert store._async_client.search.call_count == 1


@pytest.mark.asyncio
@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch")
@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
async def test_bm25_retrieval_async_no_retry_with_custom_query(_mock_opensearch_client, _mock_async_client):
store = OpenSearchDocumentStore(hosts="testhost")
store._async_client = AsyncMock()

too_many_clauses_error = TransportError(
500, "search_phase_execution_exception", "too_many_clauses: maxClauseCount is set to 1024"
)
store._async_client.search.side_effect = too_many_clauses_error

custom_query = {"query": {"match": {"content": "$query"}}}
with pytest.raises(TransportError):
await store._bm25_retrieval_async("a very long query", fuzziness="AUTO", custom_query=custom_query)

assert store._async_client.search.call_count == 1


@pytest.mark.asyncio
@patch("haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch")
@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
async def test_bm25_retrieval_async_reraises_other_transport_errors(_mock_opensearch_client, _mock_async_client):
store = OpenSearchDocumentStore(hosts="testhost")
store._async_client = AsyncMock()

other_error = TransportError(500, "parsing_exception", {"error": {"reason": "some other error"}})
store._async_client.search.side_effect = other_error

with pytest.raises(TransportError):
await store._bm25_retrieval_async("some query", fuzziness="AUTO")

assert store._async_client.search.call_count == 1


@pytest.mark.integration
class TestDocumentStore(
CountDocumentsByFilterTest,
Expand Down Expand Up @@ -333,6 +482,33 @@ def test_bm25_retrieval_with_fuzziness(
assert "functional" in res[1].content
assert "functional" in res[2].content

def test_bm25_retrieval_with_fuzziness_overflow(self, document_store: OpenSearchDocumentStore, caplog):
"""
Test that a long query with fuzziness="AUTO" that exceeds OpenSearch's maxClauseCount
is automatically retried with fuzziness=0 instead of raising an error.
"""
# Build an index vocabulary of similar 5-character words. With fuzziness="AUTO",
# 5-char words get edit distance 1, so each query term fuzzy-matches many similar
# indexed terms, causing clause expansion beyond the default maxClauseCount (1024).
# With fuzziness=0, each term produces exactly 1 clause, staying well under the limit.
words = [f"foo{chr(97 + i)}{chr(97 + j)}" for i in range(20) for j in range(26)] # 520 words

chunk_size = 52
docs = [
Document(content=" ".join(words[i : i + chunk_size]), id=str(idx))
for idx, i in enumerate(range(0, len(words), chunk_size))
]
document_store.write_documents(docs)

# Query with a subset of words. With fuzziness="AUTO", each 5-char term expands
# to match ~45 similar indexed terms, pushing total clauses well above 1024.
long_query = " ".join(words[:100])

# This should not raise: the too_many_clauses error is caught and retried with fuzziness=0
res = document_store._bm25_retrieval(long_query, top_k=3, fuzziness="AUTO")
assert isinstance(res, list)
assert "Retrying with fuzziness=0" in caplog.text

def test_bm25_retrieval_with_filters(self, document_store: OpenSearchDocumentStore, test_documents: list[Document]):
document_store.write_documents(test_documents)
res = document_store._bm25_retrieval(
Expand Down
48 changes: 48 additions & 0 deletions integrations/opensearch/tests/test_document_store_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0


import pytest
from haystack.dataclasses import Document
from haystack.document_stores.errors import DocumentStoreError
Expand Down Expand Up @@ -60,6 +61,53 @@ async def test_bm25_retrieval_all_terms_must_match_false(
assert len(res) == 5
assert all("functional" in doc.content for doc in res)

@pytest.mark.asyncio
async def test_bm25_retrieval_with_fuzziness(
self, document_store: OpenSearchDocumentStore, test_documents: list[Document]
):
document_store.write_documents(test_documents)

query_with_typo = "functinal"
# Query without fuzziness to search for the exact match
res = await document_store._bm25_retrieval_async(query_with_typo, top_k=3, fuzziness="0")
# Nothing is found as the query contains a typo
assert res == []

# Query with fuzziness with the same query
res = await document_store._bm25_retrieval_async(query_with_typo, top_k=3, fuzziness="1")
assert len(res) == 3
assert "functional" in res[0].content
assert "functional" in res[1].content
assert "functional" in res[2].content

@pytest.mark.asyncio
async def test_bm25_retrieval_with_fuzziness_overflow(self, document_store: OpenSearchDocumentStore, caplog):
"""
Test that a long query with fuzziness="AUTO" that exceeds OpenSearch's maxClauseCount
is automatically retried with fuzziness=0 instead of raising an error.
"""
# Build an index vocabulary of similar 5-character words. With fuzziness="AUTO",
# 5-char words get edit distance 1, so each query term fuzzy-matches many similar
# indexed terms, causing clause expansion beyond the default maxClauseCount (1024).
# With fuzziness=0, each term produces exactly 1 clause, staying well under the limit.
words = [f"foo{chr(97 + i)}{chr(97 + j)}" for i in range(20) for j in range(26)] # 520 words

chunk_size = 52
docs = [
Document(content=" ".join(words[i : i + chunk_size]), id=str(idx))
for idx, i in enumerate(range(0, len(words), chunk_size))
]
document_store.write_documents(docs)

# Query with a subset of words. With fuzziness="AUTO", each 5-char term expands
# to match ~45 similar indexed terms, pushing total clauses well above 1024.
long_query = " ".join(words[:100])

# This should not raise: the too_many_clauses error is caught and retried with fuzziness=0
res = await document_store._bm25_retrieval_async(long_query, top_k=3, fuzziness="AUTO")
assert isinstance(res, list)
assert "Retrying with fuzziness=0" in caplog.text

@pytest.mark.asyncio
async def test_bm25_retrieval_with_filters(
self, document_store: OpenSearchDocumentStore, test_documents: list[Document]
Expand Down
Loading