diff --git a/src/body/body.rs b/src/body/body.rs index 9dc1a034f9..14c62573ac 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -733,6 +733,55 @@ mod tests { assert!(rx.data().await.is_none()); } + use pin_project_lite::pin_project; + + pin_project! { + #[derive(Debug)] + struct Inner { + #[pin] + body: Body, + } + } + + use crate::common::buf::BufList; + use bytes::Bytes; + + impl Inner { + fn new(body: Body) -> Self { + Self { body } + } + + async fn collect(self) -> std::result::Result, crate::Error> { + let mut output = BufList::new(); + let body = self.body; + futures_util::pin_mut!(body); + while let Some(buf) = body.data().await { + output.push(buf?); + } + Ok(output) + } + } + + use bytes::Buf; + + #[tokio::test] + async fn read_from_channel_body() { + let (mut sender, body) = Body::channel(); + let byte_stream = Inner::new(body); + tokio::spawn(async move { + sender.send_data(Bytes::from("data 1")).await.unwrap(); + sender.send_data(Bytes::from("data 2")).await.unwrap(); + sender.send_data(Bytes::from("data 3")).await.unwrap(); + }); + + let mut aggregated_bytes: BufList = byte_stream.collect().await.expect("no errors"); + + assert_eq!( + aggregated_bytes.copy_to_bytes(aggregated_bytes.remaining()), + Bytes::from("data 1data 2data 3") + ); + } + #[test] fn channel_ready() { let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);