Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 17 additions & 21 deletions quickwit/quickwit-search/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use bytesize::ByteSize;
use futures::TryStreamExt;
use http::Uri;
use quickwit_proto::search::{GetKvRequest, PutKvRequest, ReportSplitsRequest};
use quickwit_proto::tonic::Request;
Expand Down Expand Up @@ -151,29 +152,24 @@ impl SearchServiceClient {
) -> crate::Result<quickwit_proto::search::FetchDocsResponse> {
match &mut self.client_impl {
SearchServiceClientImpl::Grpc(grpc_client) => {
let nb_docs_fetched = request.partial_hits.len();
Comment thread
rdettai-sk marked this conversation as resolved.
let tonic_request = Request::new(request);
// let nb_docs_fetched = request.partial_hits.len();

// get all in one shot
let tonic_response = grpc_client
.fetch_docs(tonic_request)
let all_hits = grpc_client
.stream_fetch_docs(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?;
Ok(tonic_response.into_inner())

// stream in batches (activate once this was deployed once)
// let all_hits = grpc_client
// .stream_fetch_docs(tonic_request)
// .await
// .map_err(|tonic_error| parse_grpc_error(&tonic_error))?
// .into_inner()
// .map_err(|tonic_error| parse_grpc_error(&tonic_error))
// .try_fold(Vec::with_capacity(nb_docs_fetched), |mut acc, response| async move
// { acc.extend(response.hits);
// Ok(acc)
// })
// .await?;
// Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits })
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?
Comment thread
rdettai-sk marked this conversation as resolved.
.into_inner()
// TODO stream item errors are all collapsed into SearchError::Internal
.map_err(|tonic_error| parse_grpc_error(&tonic_error))
.try_fold(
Comment on lines +160 to +164
Vec::with_capacity(nb_docs_fetched),
|mut acc, response| async move {
acc.extend(response.hits);
Ok(acc)
},
)
.await?;
Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits })
}
SearchServiceClientImpl::Local(service) => service.fetch_docs(request).await,
}
Expand Down
Loading