Skip to content

Commit a989a87

Browse files
authored
Merge pull request #160 from 0xB10C/2026-01-fix-reads-on-closed-socket
fix async protocol.read().await not returning on closed sockets
2 parents 8c46943 + 0719bcc commit a989a87

File tree

2 files changed

+95
-2
lines changed

2 files changed

+95
-2
lines changed

protocol/src/futures.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,15 @@ where
406406
bytes_read,
407407
} => {
408408
while *bytes_read < NUM_LENGTH_BYTES {
409-
*bytes_read += self.reader.read(&mut length_bytes[*bytes_read..]).await?;
409+
let len = self.reader.read(&mut length_bytes[*bytes_read..]).await?;
410+
if len == 0 {
411+
return Err(std::io::Error::new(
412+
std::io::ErrorKind::ConnectionAborted,
413+
"read zero bytes",
414+
)
415+
.into());
416+
}
417+
*bytes_read += len;
410418
}
411419

412420
let packet_bytes_len = self.inbound_cipher.decrypt_packet_len(*length_bytes);
@@ -417,7 +425,15 @@ where
417425
bytes_read,
418426
} => {
419427
while *bytes_read < packet_bytes.len() {
420-
*bytes_read += self.reader.read(&mut packet_bytes[*bytes_read..]).await?;
428+
let len = self.reader.read(&mut packet_bytes[*bytes_read..]).await?;
429+
if len == 0 {
430+
return Err(std::io::Error::new(
431+
std::io::ErrorKind::ConnectionAborted,
432+
"read zero bytes",
433+
)
434+
.into());
435+
}
436+
*bytes_read += len;
421437
}
422438

423439
let plaintext_len = InboundCipher::decryption_buffer_len(packet_bytes.len());

protocol/tests/round_trips.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,83 @@ fn hello_world_happy_path() {
154154
assert_eq!(message, decrypted_message[1..].to_vec()); // Skip header byte
155155
}
156156

157+
#[tokio::test]
158+
#[cfg(feature = "tokio")]
159+
async fn pingpong_with_closed_connection_async() {
160+
use bip324::{futures::Protocol, io::Payload};
161+
use bitcoin::consensus;
162+
use p2p::message::{NetworkMessage, V2NetworkMessage};
163+
use tokio::net::TcpListener;
164+
use tokio::net::TcpStream;
165+
166+
// Start a server that responds to exactly one Ping(x) message with a
167+
// Pong(x) message and then stops. This allows testing to read from a closed stream.
168+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
169+
let addr = listener.local_addr().unwrap();
170+
let server = tokio::spawn(async move {
171+
let (stream, _) = listener.accept().await.unwrap();
172+
let (reader, writer) = stream.into_split();
173+
let mut protocol = Protocol::new(
174+
p2p::Magic::REGTEST,
175+
bip324::Role::Responder,
176+
None, // no garbage
177+
None, // no decoys
178+
reader,
179+
writer,
180+
)
181+
.await
182+
.unwrap();
183+
184+
let payload = protocol.read().await.unwrap();
185+
let received_message =
186+
consensus::deserialize::<V2NetworkMessage>(payload.contents()).unwrap();
187+
if let NetworkMessage::Ping(x) = received_message.payload() {
188+
let pong = V2NetworkMessage::new(NetworkMessage::Pong(*x));
189+
let message = consensus::serialize(&pong);
190+
protocol.write(&Payload::genuine(message)).await.unwrap();
191+
println!("Pong sent, stopping server.")
192+
} else {
193+
panic!("Expected Ping, but received: {received_message:?}");
194+
}
195+
});
196+
197+
let stream = TcpStream::connect(addr).await.unwrap();
198+
199+
let (reader, writer) = stream.into_split();
200+
201+
// Initialize high-level async protocol with handshake
202+
println!("Starting async BIP-324 handshake");
203+
let mut protocol = Protocol::new(
204+
p2p::Magic::REGTEST,
205+
bip324::Role::Initiator,
206+
None, // no garbage
207+
None, // no decoys
208+
reader,
209+
writer,
210+
)
211+
.await
212+
.unwrap();
213+
214+
println!("Sending Ping using async Protocol::write()");
215+
let ping = V2NetworkMessage::new(NetworkMessage::Ping(45324));
216+
let message = consensus::serialize(&ping);
217+
protocol.write(&Payload::genuine(message)).await.unwrap();
218+
219+
println!("Reading response using async Protocol::read()");
220+
let payload = protocol.read().await.unwrap();
221+
let response_message = consensus::deserialize::<V2NetworkMessage>(payload.contents()).unwrap();
222+
223+
assert_eq!(NetworkMessage::Pong(45324), *response_message.payload());
224+
225+
println!("Successfully ping-pong message using async Protocol API!");
226+
server.await.unwrap();
227+
228+
println!(
229+
"Trying to read another message from the server, while the connection is already closed."
230+
);
231+
assert!(protocol.read().await.is_err());
232+
}
233+
157234
#[test]
158235
#[cfg(feature = "std")]
159236
fn regtest_handshake() {

0 commit comments

Comments
 (0)