Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 257 additions & 12 deletions quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use quickwit_index_management::IndexService;
use quickwit_metastore::*;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_proto::search::{
CountHits, ListFieldsResponse, PartialHit, ScrollRequest, SearchResponse, SortByValue,
SortDatetimeFormat,
CountHits, ListFieldsResponse, PartialHit, ScrollRequest, SearchRequest, SearchResponse,
SortByValue, SortDatetimeFormat,
};
use quickwit_proto::types::IndexUid;
use quickwit_query::BooleanOperand;
Expand Down Expand Up @@ -365,10 +365,14 @@ fn build_request_for_es_api(
.track_total_hits
.or(search_body.track_total_hits)
{
None => CountHits::Underestimate,
Some(TrackTotalHits::Track(false)) => CountHits::Underestimate,
Some(TrackTotalHits::Count(count)) if count <= max_hits as i64 => CountHits::Underestimate,
Some(TrackTotalHits::Track(true) | TrackTotalHits::Count(_)) => CountHits::CountAll,
// A query without aggregation or a size/max_hits set to 0 is likely used for count purpose.
// Setting CountHits::Underestimate would return 0, which is not expected by users.
// CountHits::CountAll is a better default in this case.
None if max_hits == 0 && aggregation_request.is_none() => CountHits::CountAll,
None => CountHits::Underestimate,
}
.into();

Expand Down Expand Up @@ -526,9 +530,11 @@ async fn es_compat_index_search(
let allow_partial_search_results = search_params.allow_partial_search_results();
let (search_request, append_shard_doc) =
build_request_for_es_api(index_id_patterns, search_params, search_body, user_agent)?;
let search_response: SearchResponse = search_service.root_search(search_request).await?;
let search_response: SearchResponse =
search_service.root_search(search_request.clone()).await?;
let elapsed = start_instant.elapsed();
let mut search_response_rest: ElasticsearchResponse = convert_to_es_search_response(
Some(&search_request),
search_response,
Comment on lines 531 to 538
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this comment.
I assumed that cloning the request struct was OK as we don't do it thousand of time per request

append_shard_doc,
_source_excludes,
Expand Down Expand Up @@ -889,11 +895,14 @@ async fn es_compat_index_multi_search(
let _source_includes = multi_search_params._source_includes.clone();
async move {
let start_instant = Instant::now();
let search_response: SearchResponse =
search_service.clone().root_search(search_request).await?;
let search_response: SearchResponse = search_service
.clone()
.root_search(search_request.clone())
.await?;
let elapsed = start_instant.elapsed();
let mut search_response_rest: ElasticsearchResponse =
convert_to_es_search_response(
Some(&search_request),
search_response,
append_shard_doc,
_source_excludes,
Expand Down Expand Up @@ -947,8 +956,14 @@ async fn es_scroll(
// However, passing that parameter is cumbersome, so we cut some corner and forbid the
// use of scroll requests in combination with allow_partial_results set to false.
let allow_failed_splits = true;
let mut search_response_rest: ElasticsearchResponse =
convert_to_es_search_response(search_response, false, None, None, allow_failed_splits)?;
let mut search_response_rest: ElasticsearchResponse = convert_to_es_search_response(
None,
search_response,
false,
None,
None,
allow_failed_splits,
)?;
search_response_rest.took = start_instant.elapsed().as_millis() as u32;
Ok(search_response_rest)
}
Expand Down Expand Up @@ -1010,6 +1025,7 @@ fn convert_to_es_stats_response(

#[allow(clippy::result_large_err)]
fn convert_to_es_search_response(
search_request_opt: Option<&SearchRequest>,
resp: SearchResponse,
append_shard_doc: bool,
_source_excludes: Option<Vec<String>>,
Expand Down Expand Up @@ -1043,12 +1059,28 @@ fn convert_to_es_search_response(
let num_failed_splits = resp.failed_splits.len() as u32;
let num_successful_splits = resp.num_successful_splits as u32;
let num_total_splits = num_successful_splits + num_failed_splits;
let relation = if num_failed_splits > 0 {
TotalHitsRelation::GreaterThanOrEqualTo
} else {
search_request_opt
.and_then(|req| {
CountHits::try_from(req.count_hits)
.ok()
.zip(Some(req.max_hits))
})
.map(|(count_hits, max_hits)| match count_hits {
CountHits::Underestimate if resp.num_hits < max_hits => TotalHitsRelation::Equal,
CountHits::Underestimate => TotalHitsRelation::GreaterThanOrEqualTo,
CountHits::CountAll => TotalHitsRelation::Equal,
})
.unwrap_or(TotalHitsRelation::Equal)
};
Ok(ElasticsearchResponse {
timed_out: false,
hits: HitsMetadata {
total: Some(TotalHits {
value: resp.num_hits,
relation: TotalHitsRelation::Equal,
relation,
}),
max_score: None,
hits,
Expand Down Expand Up @@ -1271,7 +1303,8 @@ mod tests {
failed_splits: vec![split_error.clone()],
..Default::default()
};
convert_to_es_search_response(search_response, false, None, None, false).unwrap_err();
convert_to_es_search_response(None, search_response, false, None, None, false)
.unwrap_err();
}
{
let search_response = SearchResponse {
Expand All @@ -1282,8 +1315,13 @@ mod tests {
// if we allow partial search results, this should not fail, but we report the presence
// of failed splits in the fail shard response.
let es_search_resp =
convert_to_es_search_response(search_response, false, None, None, true).unwrap();
convert_to_es_search_response(None, search_response, false, None, None, true)
.unwrap();
assert_eq!(es_search_resp.shards.failed, 1);
assert_eq!(
es_search_resp.hits.total.unwrap().relation,
TotalHitsRelation::GreaterThanOrEqualTo
);
}
{
let search_response = SearchResponse {
Expand All @@ -1292,13 +1330,15 @@ mod tests {
};
// Event if we allow partial search results, with a fail and no success, we have a
// failure.
convert_to_es_search_response(search_response, false, None, None, true).unwrap_err();
convert_to_es_search_response(None, search_response, false, None, None, true)
.unwrap_err();
}
{
// Not having any splits (no failure + no success) is not considered a failure.
for allow_partial in [true, false] {
let search_response = SearchResponse::default();
let es_search_resp = convert_to_es_search_response(
None,
search_response,
false,
None,
Expand All @@ -1310,4 +1350,209 @@ mod tests {
}
}
}

#[test]
fn test_build_request_size_zero_no_aggs_no_track_uses_count_all() {
// size=0, no aggregations, no track_total_hits -> CountAll (pure count intent)
let params = SearchQueryParams {
size: Some(0),
..Default::default()
};
let (req, _) = build_request_for_es_api(
vec!["my-index".to_string()],
params,
SearchBody::default(),
None,
)
.unwrap();
assert_eq!(req.count_hits, CountHits::CountAll as i32);
}

#[test]
fn test_build_request_size_zero_with_aggs_no_track_uses_underestimate() {
// size=0, WITH aggregations, no track_total_hits -> Underestimate
// (hit count is irrelevant when aggregations are the goal)
let params = SearchQueryParams {
size: Some(0),
..Default::default()
};
let mut body = SearchBody::default();
body.aggs.insert(
"my_agg".to_string(),
serde_json::json!({"terms": {"field": "status"}}),
);
let (req, _) =
build_request_for_es_api(vec!["my-index".to_string()], params, body, None).unwrap();
assert_eq!(req.count_hits, CountHits::Underestimate as i32);
}

#[test]
fn test_build_request_nonzero_size_no_track_uses_underestimate() {
// Default size (>0), no track_total_hits -> Underestimate
let params = SearchQueryParams::default();
let (req, _) = build_request_for_es_api(
vec!["my-index".to_string()],
params,
SearchBody::default(),
None,
)
.unwrap();
assert_eq!(req.count_hits, CountHits::Underestimate as i32);
assert_eq!(req.max_hits, 10); // default size
}

#[test]
fn test_build_request_track_true_uses_count_all() {
let params = SearchQueryParams {
track_total_hits: Some(TrackTotalHits::Track(true)),
..Default::default()
};
let (req, _) = build_request_for_es_api(
vec!["my-index".to_string()],
params,
SearchBody::default(),
None,
)
.unwrap();
assert_eq!(req.count_hits, CountHits::CountAll as i32);
}

#[test]
fn test_build_request_track_false_uses_underestimate() {
let params = SearchQueryParams {
track_total_hits: Some(TrackTotalHits::Track(false)),
..Default::default()
};
let (req, _) = build_request_for_es_api(
vec!["my-index".to_string()],
params,
SearchBody::default(),
None,
)
.unwrap();
assert_eq!(req.count_hits, CountHits::Underestimate as i32);
}

fn make_search_request_for_relation(count_hits: CountHits, max_hits: u64) -> SearchRequest {
SearchRequest {
index_id_patterns: vec!["my-index".to_string()],
query_ast: serde_json::to_string(&QueryAst::MatchAll).unwrap(),
max_hits,
count_hits: count_hits as i32,
..Default::default()
}
}

#[test]
fn test_relation_underestimate_fewer_hits_than_max_is_equal() {
// num_hits < max_hits -> all docs were found -> Equal
let req = make_search_request_for_relation(CountHits::Underestimate, 10);
let resp = SearchResponse {
num_hits: 5,
num_successful_splits: 1,
..Default::default()
};
let es_resp =
convert_to_es_search_response(Some(&req), resp, false, None, None, false).unwrap();
assert_eq!(
es_resp.hits.total.unwrap().relation,
TotalHitsRelation::Equal
);
assert_eq!(es_resp.hits.total.unwrap().value, 5);
}

#[test]
fn test_relation_underestimate_hits_equal_max_is_gte() {
// num_hits == max_hits -> there may be more -> GreaterThanOrEqualTo
let req = make_search_request_for_relation(CountHits::Underestimate, 10);
let resp = SearchResponse {
num_hits: 10,
num_successful_splits: 1,
..Default::default()
};
let es_resp =
convert_to_es_search_response(Some(&req), resp, false, None, None, false).unwrap();
assert_eq!(
es_resp.hits.total.unwrap().relation,
TotalHitsRelation::GreaterThanOrEqualTo
);
}

#[test]
fn test_relation_underestimate_hits_exceed_max_is_gte() {
// num_hits > max_hits -> definitely more -> GreaterThanOrEqualTo
let req = make_search_request_for_relation(CountHits::Underestimate, 10);
let resp = SearchResponse {
num_hits: 42,
num_successful_splits: 1,
..Default::default()
};
let es_resp =
convert_to_es_search_response(Some(&req), resp, false, None, None, false).unwrap();
assert_eq!(
es_resp.hits.total.unwrap().relation,
TotalHitsRelation::GreaterThanOrEqualTo
);
}

#[test]
fn test_relation_count_all_is_always_equal() {
// CountAll means exact count -> always Equal regardless of hit count vs max_hits
for num_hits in [0u64, 5, 10, 100] {
let req = make_search_request_for_relation(CountHits::CountAll, 10);
let resp = SearchResponse {
num_hits,
num_successful_splits: 1,
..Default::default()
};
let es_resp =
convert_to_es_search_response(Some(&req), resp, false, None, None, false).unwrap();
assert_eq!(
es_resp.hits.total.unwrap().relation,
TotalHitsRelation::Equal,
"expected Equal for num_hits={num_hits}"
);
}
}

#[test]
fn test_relation_no_request_defaults_to_equal() {
// No SearchRequest (e.g. scroll) -> Equal
let resp = SearchResponse {
num_hits: 99,
num_successful_splits: 1,
..Default::default()
};
let es_resp = convert_to_es_search_response(None, resp, false, None, None, false).unwrap();
assert_eq!(
es_resp.hits.total.unwrap().relation,
TotalHitsRelation::Equal
);
}

#[test]
fn test_relation_failed_splits_overrides_to_gte() {
// Any failed split means the hit count is incomplete -> GreaterThanOrEqualTo,
// regardless of count_hits mode.
for count_hits in [CountHits::CountAll, CountHits::Underestimate] {
let req = make_search_request_for_relation(count_hits, 10);
let resp = SearchResponse {
num_hits: 3,
num_successful_splits: 1,
failed_splits: vec![SplitSearchError {
error: "some-error".to_string(),
split_id: "some-split-id".to_string(),
retryable_error: true,
}],
..Default::default()
};
let es_resp =
convert_to_es_search_response(Some(&req), resp, false, None, None, true).unwrap();
assert_eq!(
es_resp.hits.total.unwrap().relation,
TotalHitsRelation::GreaterThanOrEqualTo,
"expected GreaterThanOrEqualTo for count_hits={count_hits:?}"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ expected:
hits:
total:
value: 100
relation: "eq"
relation: "gte"
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ expected:
hits:
total:
value: 60
relation: "eq"
relation: "gte"
hits:
$expect: "len(val) == 10"
---
Expand All @@ -16,7 +16,7 @@ expected:
hits:
total:
value: 60
relation: "eq"
relation: "gte"
hits:
$expect: "len(val) == 3"
---
Expand All @@ -34,7 +34,7 @@ expected:
hits:
total:
value: 60
relation: "eq"
relation: "gte"
hits:
$expect: "len(val) == 3"
---
Expand Down
Loading
Loading