Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions src/net.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
fmt::Display,
io::{self, Read, Write},
io::{self, BufRead, BufReader, Write},
net::{SocketAddr, TcpListener, TcpStream},
sync::{
mpsc::{self, SendError},
Expand Down Expand Up @@ -95,15 +95,17 @@ impl ConnectionExt for ConnectionConfig {
let mut write_half = WriteTransport::V1(self.network().default_network_magic());
let mut read_half = ReadTransport::V1(self.network().default_network_magic());
write_half.write_message(NetworkMessage::Version(version), &mut tcp_stream)?;
let (mut handshake, messages) = match read_half.read_message(&mut tcp_stream)? {
let tcp_stream_clone = tcp_stream.try_clone()?;
let mut buf_reader = BufReader::new(tcp_stream_clone);
let (mut handshake, messages) = match read_half.read_message(&mut buf_reader)? {
Some(message) => self.start_handshake(unix_time, message, nonce)?,
None => return Err(Error::MissingVersion),
};
for message in messages {
write_half.write_message(message, &mut tcp_stream)?;
}
loop {
if let Some(message) = read_half.read_message(&mut tcp_stream)? {
if let Some(message) = read_half.read_message(&mut buf_reader)? {
match handshake.negotiate(message)? {
Some((completed_handshake, responses)) => {
for response in responses {
Expand All @@ -127,9 +129,8 @@ impl ConnectionExt for ConnectionConfig {
outbound_ping_state: Arc::clone(&outbound_ping),
};
let (tx, rx) = mpsc::channel();
let tcp_stream_clone = tcp_stream.try_clone()?;
let open_writer = OpenWriter {
tcp_stream: tcp_stream_clone,
tcp_stream,
transport: write_half,
receiver: rx,
outbound_ping_state: Arc::clone(&outbound_ping),
Expand All @@ -142,7 +143,7 @@ impl ConnectionExt for ConnectionConfig {
task_handle: write_handle,
};
let reader = ConnectionReader {
tcp_stream,
tcp_stream: buf_reader,
transport: read_half,
their_preferences: Arc::clone(&arc_pref),
timed_messages,
Expand Down Expand Up @@ -277,7 +278,7 @@ impl OpenWriter {
/// Read messages from an open connection.
#[derive(Debug)]
pub struct ConnectionReader {
tcp_stream: TcpStream,
tcp_stream: BufReader<TcpStream>,
transport: ReadTransport,
their_preferences: Arc<Mutex<Preferences>>,
timed_messages: Arc<Mutex<TimedMessages>>,
Expand Down Expand Up @@ -399,7 +400,10 @@ enum ReadTransport {
}

impl ReadTransport {
fn read_message<R: Read>(&mut self, reader: &mut R) -> Result<Option<NetworkMessage>, Error> {
fn read_message<R: BufRead>(
&mut self,
reader: &mut R,
) -> Result<Option<NetworkMessage>, Error> {
match self {
ReadTransport::V1(magic) => {
let mut message_buf = vec![0; 24];
Expand Down