From 0414880e970af3ecca73e1abe859593ca532a026 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Sun, 10 May 2026 22:33:08 +0200 Subject: [PATCH 1/2] Enable streaming doc fetch added in PR42 --- quickwit/quickwit-search/src/client.rs | 37 +++++++++++--------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index f5026020739..0dd6d02c875 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::Duration; use bytesize::ByteSize; +use futures::TryStreamExt; use http::Uri; use quickwit_proto::search::{GetKvRequest, PutKvRequest, ReportSplitsRequest}; use quickwit_proto::tonic::Request; @@ -151,29 +152,23 @@ impl SearchServiceClient { ) -> crate::Result { match &mut self.client_impl { SearchServiceClientImpl::Grpc(grpc_client) => { + let nb_docs_fetched = request.partial_hits.len(); let tonic_request = Request::new(request); - // let nb_docs_fetched = request.partial_hits.len(); - - // get all in one shot - let tonic_response = grpc_client - .fetch_docs(tonic_request) + let all_hits = grpc_client + .stream_fetch_docs(tonic_request) .await - .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; - Ok(tonic_response.into_inner()) - - // stream in batches (activate once this was deployed once) - // let all_hits = grpc_client - // .stream_fetch_docs(tonic_request) - // .await - // .map_err(|tonic_error| parse_grpc_error(&tonic_error))? - // .into_inner() - // .map_err(|tonic_error| parse_grpc_error(&tonic_error)) - // .try_fold(Vec::with_capacity(nb_docs_fetched), |mut acc, response| async move - // { acc.extend(response.hits); - // Ok(acc) - // }) - // .await?; - // Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits }) + .map_err(|tonic_error| parse_grpc_error(&tonic_error))? + .into_inner() + .map_err(|tonic_error| parse_grpc_error(&tonic_error)) + .try_fold( + Vec::with_capacity(nb_docs_fetched), + |mut acc, response| async move { + acc.extend(response.hits); + Ok(acc) + }, + ) + .await?; + Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits }) } SearchServiceClientImpl::Local(service) => service.fetch_docs(request).await, } From 5cb4e4da844c3bf48fda42f3df2e5d9f42bc3e5e Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 12 May 2026 12:55:01 +0200 Subject: [PATCH 2/2] Add error improvement TODO --- quickwit/quickwit-search/src/client.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 0dd6d02c875..628325b2efc 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -159,6 +159,7 @@ impl SearchServiceClient { .await .map_err(|tonic_error| parse_grpc_error(&tonic_error))? .into_inner() + // TODO stream item errors are all collapsed into SearchError::Internal .map_err(|tonic_error| parse_grpc_error(&tonic_error)) .try_fold( Vec::with_capacity(nb_docs_fetched),