diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index f5026020739..628325b2efc 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::Duration; use bytesize::ByteSize; +use futures::TryStreamExt; use http::Uri; use quickwit_proto::search::{GetKvRequest, PutKvRequest, ReportSplitsRequest}; use quickwit_proto::tonic::Request; @@ -151,29 +152,24 @@ impl SearchServiceClient { ) -> crate::Result { match &mut self.client_impl { SearchServiceClientImpl::Grpc(grpc_client) => { + let nb_docs_fetched = request.partial_hits.len(); let tonic_request = Request::new(request); - // let nb_docs_fetched = request.partial_hits.len(); - - // get all in one shot - let tonic_response = grpc_client - .fetch_docs(tonic_request) + let all_hits = grpc_client + .stream_fetch_docs(tonic_request) .await - .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; - Ok(tonic_response.into_inner()) - - // stream in batches (activate once this was deployed once) - // let all_hits = grpc_client - // .stream_fetch_docs(tonic_request) - // .await - // .map_err(|tonic_error| parse_grpc_error(&tonic_error))? - // .into_inner() - // .map_err(|tonic_error| parse_grpc_error(&tonic_error)) - // .try_fold(Vec::with_capacity(nb_docs_fetched), |mut acc, response| async move - // { acc.extend(response.hits); - // Ok(acc) - // }) - // .await?; - // Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits }) + .map_err(|tonic_error| parse_grpc_error(&tonic_error))? + .into_inner() + // TODO stream item errors are all collapsed into SearchError::Internal + .map_err(|tonic_error| parse_grpc_error(&tonic_error)) + .try_fold( + Vec::with_capacity(nb_docs_fetched), + |mut acc, response| async move { + acc.extend(response.hits); + Ok(acc) + }, + ) + .await?; + Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits }) } SearchServiceClientImpl::Local(service) => service.fetch_docs(request).await, }