Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions protocol/src/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,15 @@ where
bytes_read,
} => {
while *bytes_read < NUM_LENGTH_BYTES {
*bytes_read += self.reader.read(&mut length_bytes[*bytes_read..]).await?;
let len = self.reader.read(&mut length_bytes[*bytes_read..]).await?;
if len == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::ConnectionAborted,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with ConnectionAborted here. ConnectionReset is an alternative. Happy to be convinced to use ConnectionReset if someone has strong feelings about it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion here, ConnectionAborted is fine with me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use UnexpectedEof? It looks like that is what tokio bubbles up in similar scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this too, but found the Rust stdlib docs to not to be a perfect fit for our case? https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.UnexpectedEof

An error returned when an operation could not be completed because an “end of file” was reached prematurely.
This typically means that an operation could only succeed if it read a particular number of bytes but only a smaller number of bytes could be read.

(but also don't really care too much personally)

"read zero bytes",
)
.into());
}
*bytes_read += len;
}

let packet_bytes_len = self.inbound_cipher.decrypt_packet_len(*length_bytes);
Expand All @@ -417,7 +425,15 @@ where
bytes_read,
} => {
while *bytes_read < packet_bytes.len() {
*bytes_read += self.reader.read(&mut packet_bytes[*bytes_read..]).await?;
let len = self.reader.read(&mut packet_bytes[*bytes_read..]).await?;
if len == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::ConnectionAborted,
"read zero bytes",
)
.into());
}
*bytes_read += len;
}

let plaintext_len = InboundCipher::decryption_buffer_len(packet_bytes.len());
Expand Down
77 changes: 77 additions & 0 deletions protocol/tests/round_trips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,83 @@ fn hello_world_happy_path() {
assert_eq!(message, decrypted_message[1..].to_vec()); // Skip header byte
}

#[tokio::test]
#[cfg(feature = "tokio")]
async fn pingpong_with_closed_connection_async() {
use bip324::{futures::Protocol, io::Payload};
use bitcoin::consensus;
use p2p::message::{NetworkMessage, V2NetworkMessage};
use tokio::net::TcpListener;
use tokio::net::TcpStream;

// Start a server that responds to exactly one Ping(x) message with a
// Pong(x) message and then stops. This allows testing to read from a closed stream.
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let (reader, writer) = stream.into_split();
let mut protocol = Protocol::new(
p2p::Magic::REGTEST,
bip324::Role::Responder,
None, // no garbage
None, // no decoys
reader,
writer,
)
.await
.unwrap();

let payload = protocol.read().await.unwrap();
let received_message =
consensus::deserialize::<V2NetworkMessage>(payload.contents()).unwrap();
if let NetworkMessage::Ping(x) = received_message.payload() {
let pong = V2NetworkMessage::new(NetworkMessage::Pong(*x));
let message = consensus::serialize(&pong);
protocol.write(&Payload::genuine(message)).await.unwrap();
println!("Pong sent, stopping server.")
} else {
panic!("Expected Ping, but received: {received_message:?}");
}
});

let stream = TcpStream::connect(addr).await.unwrap();

let (reader, writer) = stream.into_split();

// Initialize high-level async protocol with handshake
println!("Starting async BIP-324 handshake");
let mut protocol = Protocol::new(
p2p::Magic::REGTEST,
bip324::Role::Initiator,
None, // no garbage
None, // no decoys
reader,
writer,
)
.await
.unwrap();

println!("Sending Ping using async Protocol::write()");
let ping = V2NetworkMessage::new(NetworkMessage::Ping(45324));
let message = consensus::serialize(&ping);
protocol.write(&Payload::genuine(message)).await.unwrap();

println!("Reading response using async Protocol::read()");
let payload = protocol.read().await.unwrap();
let response_message = consensus::deserialize::<V2NetworkMessage>(payload.contents()).unwrap();

assert_eq!(NetworkMessage::Pong(45324), *response_message.payload());

println!("Successfully ping-pong message using async Protocol API!");
server.await.unwrap();

println!(
"Trying to read another message from the server, while the connection is already closed."
);
assert!(protocol.read().await.is_err());
Copy link
Contributor Author

@0xB10C 0xB10C Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm the test still passed when async writing to the closed protocol. I think that should return an error too? At least it doesn't hang..

index 5b96eca..e7cb5d1 100644
--- a/protocol/tests/round_trips.rs
+++ b/protocol/tests/round_trips.rs
@@ -229,6 +229,10 @@ async fn pingpong_with_closed_connection_async() {
         "Trying to read another message from the server, while the connection is already closed."
     );
     assert!(protocol.read().await.is_err());
+
+    let ping = V2NetworkMessage::new(NetworkMessage::Ping(45324));
+    let message = consensus::serialize(&ping);
+    protocol.write(&Payload::genuine(message)).await.unwrap();
 }

 #[test]

Probably a good followup once this is merged.

}

#[test]
#[cfg(feature = "std")]
fn regtest_handshake() {
Expand Down