Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/daemon/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use protocol::{
parse_legacy_daemon_message,
};

use crate::{config::DaemonConfig, error::DaemonError, systemd};
use crate::{config::DaemonConfig, daemon_stream::DaemonStream, error::DaemonError, systemd};

mod help;
pub(crate) mod tracing_stream;
Expand Down
2 changes: 1 addition & 1 deletion crates/daemon/src/daemon/sections/greeting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub(crate) fn read_trimmed_line<R: BufRead>(reader: &mut R) -> io::Result<Option
}

fn advertise_capabilities(
stream: &mut TcpStream,
stream: &mut DaemonStream,
modules: &[ModuleRuntime],
messages: &LegacyMessageCache,
) -> io::Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions crates/daemon/src/daemon/sections/legacy_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl LegacyMessageCache {

fn write(
&self,
stream: &mut TcpStream,
stream: &mut DaemonStream,
limiter: &mut Option<BandwidthLimiter>,
message: LegacyDaemonMessage<'_>,
) -> io::Result<()> {
Expand All @@ -52,15 +52,15 @@ impl LegacyMessageCache {

fn write_ok(
&self,
stream: &mut TcpStream,
stream: &mut DaemonStream,
limiter: &mut Option<BandwidthLimiter>,
) -> io::Result<()> {
write_limited(stream, limiter, &self.ok)
}

fn write_exit(
&self,
stream: &mut TcpStream,
stream: &mut DaemonStream,
limiter: &mut Option<BandwidthLimiter>,
) -> io::Result<()> {
write_limited(stream, limiter, &self.exit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ enum AuthenticationStatus {
///
/// Returns `Granted` if authentication succeeded, `Denied` otherwise.
fn perform_module_authentication(
reader: &mut BufReader<TcpStream>,
reader: &mut BufReader<DaemonStream>,
limiter: &mut Option<BandwidthLimiter>,
module: &ModuleDefinition,
peer_ip: IpAddr,
Expand Down Expand Up @@ -205,7 +205,7 @@ fn check_secrets_file_permissions(path: &Path) -> io::Result<()> {
///
/// upstream: clientserver.c:762 - `@ERROR: auth failed on module %s\n`
fn send_auth_failed(
stream: &mut TcpStream,
stream: &mut DaemonStream,
module: &ModuleDefinition,
limiter: &mut Option<BandwidthLimiter>,
messages: &LegacyMessageCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn format_module_listing_line(name: &str, comment: &str) -> String {
/// to access. Only modules marked as listable and that permit the peer's IP
/// address are included in the response.
fn respond_with_module_list(
stream: &mut TcpStream,
stream: &mut DaemonStream,
limiter: &mut Option<BandwidthLimiter>,
modules: &[ModuleRuntime],
motd_lines: &[String],
Expand Down
12 changes: 6 additions & 6 deletions crates/daemon/src/daemon/sections/module_access/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/// with the module request metadata so helper functions receive a single
/// context parameter instead of many individual arguments.
struct ModuleRequestContext<'a> {
reader: &'a mut BufReader<TcpStream>,
reader: &'a mut BufReader<DaemonStream>,
limiter: &'a mut Option<BandwidthLimiter>,
peer_ip: IpAddr,
session_peer_host: Option<&'a str>,
Expand All @@ -38,7 +38,7 @@ impl<'a> ModuleRequestContext<'a> {

/// Sends an error message and exit marker to the client.
fn send_error_and_exit(
stream: &mut TcpStream,
stream: &mut DaemonStream,
limiter: &mut Option<BandwidthLimiter>,
messages: &LegacyMessageCache,
payload: &str,
Expand All @@ -56,7 +56,7 @@ fn send_error_and_exit(
///
/// upstream: clientserver.c:733 - `@ERROR: access denied to %s from %s (%s)\n`
fn deny_module(
stream: &mut TcpStream,
stream: &mut DaemonStream,
module: &ModuleDefinition,
peer_ip: IpAddr,
host: Option<&str>,
Expand All @@ -81,7 +81,7 @@ fn deny_module(
/// This confirms that the module request was accepted and the client
/// may proceed with sending its arguments.
fn send_daemon_ok(
stream: &mut TcpStream,
stream: &mut DaemonStream,
limiter: &mut Option<BandwidthLimiter>,
messages: &LegacyMessageCache,
) -> io::Result<()> {
Expand Down Expand Up @@ -186,7 +186,7 @@ fn handle_authentication(
///
/// Sends an error message and logs the event.
fn handle_unknown_module(
stream: &mut TcpStream,
stream: &mut DaemonStream,
limiter: &mut Option<BandwidthLimiter>,
messages: &LegacyMessageCache,
request: &str,
Expand Down Expand Up @@ -241,7 +241,7 @@ fn handle_module_denied(
/// Returns an I/O error if the connection fails, otherwise `Ok(())`.
#[allow(clippy::too_many_arguments)]
fn respond_with_module_request(
reader: &mut BufReader<TcpStream>,
reader: &mut BufReader<DaemonStream>,
limiter: &mut Option<BandwidthLimiter>,
modules: &[ModuleRuntime],
request: &str,
Expand Down
6 changes: 3 additions & 3 deletions crates/daemon/src/daemon/sections/module_access/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ fn setup_transfer_streams(
let stream = ctx.reader.get_mut();
stream.set_nodelay(true)?;

let read_stream = match stream.try_clone() {
let read_stream = match stream.tcp_stream().try_clone() {
Ok(s) => s,
Err(err) => {
let payload = format!("@ERROR: failed to clone stream: {err}");
Expand All @@ -238,7 +238,7 @@ fn setup_transfer_streams(
}
};

let write_stream = match stream.try_clone() {
let write_stream = match stream.tcp_stream().try_clone() {
Ok(s) => s,
Err(err) => {
return Err(io::Error::other(format!(
Expand All @@ -252,7 +252,7 @@ fn setup_transfer_streams(

/// Builds the handshake result for the transfer.
fn build_handshake_result(
reader: &BufReader<TcpStream>,
reader: &BufReader<DaemonStream>,
negotiated_protocol: Option<ProtocolVersion>,
client_args: Vec<String>,
module: &ModuleRuntime,
Expand Down
2 changes: 1 addition & 1 deletion crates/daemon/src/daemon/sections/module_parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn canonical_option(text: &str) -> String {
token.to_ascii_lowercase()
}

fn apply_module_timeout(stream: &TcpStream, module: &ModuleDefinition) -> io::Result<()> {
fn apply_module_timeout(stream: &DaemonStream, module: &ModuleDefinition) -> io::Result<()> {
if let Some(timeout) = module.timeout {
let duration = Duration::from_secs(timeout.get());
stream.set_read_timeout(Some(duration))?;
Expand Down
5 changes: 2 additions & 3 deletions crates/daemon/src/daemon/sections/proxy_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,8 @@ fn parse_v1_header<R: Read>(prefix: &[u8; 12], reader: &mut R) -> io::Result<Opt
///
/// upstream: clientserver.c:1298 - `read_proxy_protocol_header()` is called
/// before any rsync protocol data when `proxy protocol = true` in the config.
pub(crate) fn parse_proxy_header(stream: &TcpStream) -> io::Result<Option<SocketAddr>> {
// Borrow as Read via reference - TcpStream implements Read for &TcpStream.
let mut reader = BufReader::new(stream);
pub(crate) fn parse_proxy_header(stream: &mut DaemonStream) -> io::Result<Option<SocketAddr>> {
let mut reader = BufReader::new(&mut *stream);
parse_proxy_header_from(&mut reader)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ fn serve_connections(
bandwidth_burst,
reverse_lookup,
proxy_protocol,
#[cfg(feature = "daemon-tls")]
tls_acceptor: None,
};

if listeners.len() == 1 {
Expand Down
57 changes: 50 additions & 7 deletions crates/daemon/src/daemon/sections/server_runtime/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ struct AcceptLoopState<'a> {
bandwidth_burst: Option<NonZeroU64>,
reverse_lookup: bool,
proxy_protocol: bool,
/// TLS acceptor for wrapping accepted TCP connections when the daemon is
/// configured with certificate material. `None` means plain TCP only.
#[cfg(feature = "daemon-tls")]
tls_acceptor: Option<crate::tls::TlsAcceptor>,
}

/// Checks signal flags and performs maintenance tasks between accept iterations.
Expand Down Expand Up @@ -118,7 +122,7 @@ fn check_signals_and_maintain(
/// `@ERROR: max connections (%d) reached -- try again later\n` to the
/// client via `io_printf(f_out, ...)`.
fn refuse_if_at_capacity(
stream: &mut TcpStream,
stream: &mut DaemonStream,
peer_addr: SocketAddr,
state: &AcceptLoopState<'_>,
) -> bool {
Expand Down Expand Up @@ -183,7 +187,7 @@ pub(crate) fn log_max_connections_rejection(
/// upstream: clientserver.c - fork per connection; we use threads with
/// `catch_unwind` for equivalent crash isolation.
fn spawn_connection_worker(
stream: TcpStream,
stream: DaemonStream,
raw_peer_addr: SocketAddr,
state: &AcceptLoopState<'_>,
) -> thread::JoinHandle<WorkerResult> {
Expand Down Expand Up @@ -250,15 +254,15 @@ fn spawn_connection_worker(

/// Applies socket options to an accepted stream and logs any failure.
fn apply_client_options(
stream: &TcpStream,
stream: &DaemonStream,
client_socket_options: &[SocketOption],
log_sink: Option<&SharedLogSink>,
) {
// upstream: clientserver.c - set_socket_options() is called
// on the accepted client fd before the session handler runs.
if !client_socket_options.is_empty() {
if let Err(error) =
apply_socket_options_to_stream(stream, client_socket_options)
apply_socket_options_to_stream(stream.tcp_stream(), client_socket_options)
{
if let Some(log) = log_sink {
let text = format!(
Expand All @@ -271,6 +275,37 @@ fn apply_client_options(
}
}

/// Wraps an accepted `TcpStream` into a [`DaemonStream`].
///
/// When the `daemon-tls` feature is enabled and a TLS acceptor is configured,
/// the stream is wrapped through `tls::wrap_stream()` to perform the TLS
/// handshake. If the handshake fails, the error is logged and `None` is
/// returned so the accept loop can skip the connection.
///
/// When TLS is not configured (or the feature is disabled), the stream is
/// wrapped as `DaemonStream::Plain`.
fn wrap_accepted_stream(
tcp_stream: TcpStream,
#[allow(unused_variables)] state: &AcceptLoopState<'_>,
) -> Option<DaemonStream> {
#[cfg(feature = "daemon-tls")]
if let Some(ref acceptor) = state.tls_acceptor {
return match crate::tls::wrap_stream(acceptor, tcp_stream) {
Ok(tls_stream) => Some(DaemonStream::Tls(Box::new(tls_stream))),
Err(error) => {
if let Some(log) = state.log_sink.as_ref() {
let text = format!("TLS handshake failed: {error}");
let message = rsync_warning!(text).with_role(Role::Daemon);
log_message(log, &message);
}
None
}
};
}

Some(DaemonStream::plain(tcp_stream))
}

/// Updates the systemd connection status after a new connection is accepted.
fn update_connection_status_after_accept(state: &mut AcceptLoopState<'_>) {
let current_active = state.workers.len();
Expand Down Expand Up @@ -307,8 +342,8 @@ fn run_single_listener_loop(
}

match listener.accept() {
Ok((mut stream, raw_peer_addr)) => {
if let Err(error) = stream.set_nonblocking(false) {
Ok((tcp_stream, raw_peer_addr)) => {
if let Err(error) = tcp_stream.set_nonblocking(false) {
if let Some(log) = state.log_sink.as_ref() {
let text = format!(
"failed to set accepted socket to blocking: {error}"
Expand All @@ -319,6 +354,10 @@ fn run_single_listener_loop(
continue;
}

let Some(mut stream) = wrap_accepted_stream(tcp_stream, state) else {
continue;
};

apply_client_options(&stream, &state.client_socket_options, state.log_sink.as_ref());

if refuse_if_at_capacity(&mut stream, raw_peer_addr, state) {
Expand Down Expand Up @@ -434,7 +473,11 @@ fn run_dual_stack_loop(

// Use recv_timeout to allow periodic worker reaping and signal checks
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(Ok((mut stream, raw_peer_addr))) => {
Ok(Ok((tcp_stream, raw_peer_addr))) => {
let Some(mut stream) = wrap_accepted_stream(tcp_stream, state) else {
continue;
};

apply_client_options(&stream, &state.client_socket_options, state.log_sink.as_ref());

if refuse_if_at_capacity(&mut stream, raw_peer_addr, state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ fn bind_with_backlog(addr: SocketAddr, backlog: i32) -> io::Result<TcpListener>
}

/// Configures read/write timeouts on an accepted client stream.
fn configure_stream(stream: &TcpStream) -> io::Result<()> {
fn configure_stream(stream: &DaemonStream) -> io::Result<()> {
stream.set_read_timeout(Some(SOCKET_TIMEOUT))?;
stream.set_write_timeout(Some(SOCKET_TIMEOUT))
}
14 changes: 10 additions & 4 deletions crates/daemon/src/daemon/sections/server_runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,8 @@ fn test_accept_loop_state<'a>(
bandwidth_burst: None,
reverse_lookup: false,
proxy_protocol: false,
#[cfg(feature = "daemon-tls")]
tls_acceptor: None,
}
}

Expand Down Expand Up @@ -769,8 +771,9 @@ fn refuse_if_at_capacity_admits_when_no_cap_configured() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let local = listener.local_addr().expect("local addr");
let client_handle = thread::spawn(move || TcpStream::connect(local).expect("connect"));
let (mut server_stream, peer) = listener.accept().expect("accept");
let (server_stream, peer) = listener.accept().expect("accept");
let _client = client_handle.join().expect("client connect");
let mut server_stream = DaemonStream::plain(server_stream);

let flags = no_op_signal_flags();
let config_path: Option<PathBuf> = None;
Expand Down Expand Up @@ -805,8 +808,9 @@ fn accept_loop_refuses_when_at_capacity() {
let local = listener.local_addr().expect("local addr");
let client_handle =
thread::spawn(move || TcpStream::connect(local).expect("connect third client"));
let (mut server_stream, peer) = listener.accept().expect("accept third client");
let (server_stream, peer) = listener.accept().expect("accept third client");
let mut client_stream = client_handle.join().expect("client connect");
let mut server_stream = DaemonStream::plain(server_stream);

let flags = no_op_signal_flags();
let config_path: Option<PathBuf> = None;
Expand Down Expand Up @@ -855,8 +859,9 @@ fn refuse_if_at_capacity_emits_structured_warning() {
let local = listener.local_addr().expect("local addr");
let client_handle =
thread::spawn(move || TcpStream::connect(local).expect("connect refused client"));
let (mut server_stream, peer) = listener.accept().expect("accept refused client");
let (server_stream, peer) = listener.accept().expect("accept refused client");
let _client = client_handle.join().expect("client connect");
let mut server_stream = DaemonStream::plain(server_stream);

let log_dir = tempfile::tempdir().expect("log dir");
let log_path = log_dir.path().join("daemon.log");
Expand Down Expand Up @@ -921,8 +926,9 @@ fn accept_loop_recovers_after_disconnect() {
let local = listener.local_addr().expect("local addr");
let client_handle =
thread::spawn(move || TcpStream::connect(local).expect("connect after drain"));
let (mut server_stream, peer) = listener.accept().expect("accept after drain");
let (server_stream, peer) = listener.accept().expect("accept after drain");
let _client = client_handle.join().expect("client connect");
let mut server_stream = DaemonStream::plain(server_stream);

let flags = no_op_signal_flags();
let config_path: Option<PathBuf> = None;
Expand Down
Loading
Loading