From 728d8af6b7a15cd9adc2927cb778aa44d73da4ec Mon Sep 17 00:00:00 2001 From: Stepan Koltsov Date: Wed, 25 Mar 2026 06:49:18 +0000 Subject: [PATCH] Simplify Future for RequestDispatch --- tarpc/src/client.rs | 32 +++++--------------------------- tarpc/src/lib.rs | 40 +--------------------------------------- 2 files changed, 6 insertions(+), 66 deletions(-) diff --git a/tarpc/src/client.rs b/tarpc/src/client.rs index a9ea95495..65c818118 100644 --- a/tarpc/src/client.rs +++ b/tarpc/src/client.rs @@ -19,7 +19,6 @@ use futures::{prelude::*, ready, stream::Fuse, task::*}; use in_flight_requests::InFlightRequests; use pin_project::pin_project; use std::{ - any::Any, convert::TryFrom, fmt, pin::Pin, @@ -269,7 +268,6 @@ where transport: transport.fuse(), in_flight_requests: InFlightRequests::default(), pending_requests, - terminal_error: None, }, } } @@ -291,11 +289,6 @@ pub struct RequestDispatch { in_flight_requests: InFlightRequests>, /// Configures limits to prevent unlimited resource usage. config: Config, - /// Produces errors that can be sent in response to any unprocessed requests at the time - /// RequestDispatch is dropped. Correctness note: this field should only be populated by - /// RequestDispatch::poll, which relies on downcasting the Any to a concrete error type - /// determined within the poll function. - terminal_error: Option>, } impl RequestDispatch @@ -353,12 +346,6 @@ where self.as_mut().project().pending_requests } - fn terminal_error_mut<'a>( - self: &'a mut Pin<&mut Self>, - ) -> &'a mut Option> { - self.as_mut().project().terminal_error - } - fn pump_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -659,20 +646,13 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - loop { - if let Some(e) = self.terminal_error_mut() { + let result = ready!(self.run(cx)); + match result { + Ok(()) => Poll::Ready(Ok(())), + Err(e) => { tracing::debug!("RpcError::Channel"); - let e: ChannelError = e - .clone() - .downcast() - .expect("Invariant: ChannelError must store a C::Error"); ready!(self.shut_down_with_terminal_error(cx, e.clone().upcast_error())); - return Poll::Ready(Err(e)); - } - let result = ready!(self.run(cx)); - match result { - Ok(()) => return Poll::Ready(Ok(())), - Err(e) => *self.terminal_error_mut() = Some(e.upcast_any()), + Poll::Ready(Err(e)) } } } @@ -986,7 +966,6 @@ mod tests { canceled_requests, in_flight_requests: InFlightRequests::default(), config: Config::default(), - terminal_error: None, }); let channel = Channel { to_dispatch, @@ -1082,7 +1061,6 @@ mod tests { canceled_requests, in_flight_requests: InFlightRequests::default(), config: Config::default(), - terminal_error: None, }; let channel = Channel { diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 7e1944305..bb18ce378 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -250,7 +250,7 @@ pub(crate) mod util; pub use crate::transport::sealed::Transport; -use std::{any::Any, error::Error, io, sync::Arc, time::Instant}; +use std::{error::Error, io, sync::Arc, time::Instant}; /// A message from a client to a server. #[derive(Debug)] @@ -445,44 +445,6 @@ where } } -impl ChannelError -where - E: Send + Sync + 'static, -{ - /// Converts the ChannelError's source error type to a dyn Any. This is useful in type-erased - /// contexts, for example, storing a ChannelError in a non-generic type like - /// [`client::RpcError`]. - fn upcast_any(self) -> ChannelError { - use ChannelError::*; - match self { - Read(e) => Read(e), - Ready(e) => Ready(e), - Write(e) => Write(e), - Flush(e) => Flush(e), - Close(e) => Close(e), - } - } -} - -impl ChannelError { - /// Converts the ChannelError's source error type to a concrete type. This is useful in - /// type-erased contexts, for example, storing a ChannelError in a non-generic type like - /// [`Client::RpcError`]. - fn downcast(self) -> Result, Self> - where - E: Any + Send + Sync, - { - use ChannelError::*; - match self { - Read(e) => e.downcast::().map(Read).map_err(Read), - Ready(e) => e.downcast::().map(Ready).map_err(Ready), - Write(e) => e.downcast::().map(Write).map_err(Write), - Flush(e) => e.downcast::().map(Flush).map_err(Flush), - Close(e) => e.downcast::().map(Close).map_err(Close), - } - } -} - impl ServerError { /// Returns a new server error with `kind` and `detail`. pub fn new(kind: io::ErrorKind, detail: String) -> ServerError {