Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions ohkami/src/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl Response {
pub(crate) async fn send(
self,
conn: &mut (impl AsyncWrite + Unpin)
) -> Upgrade {
) -> std::io::Result<Upgrade> {
match self.content {
Content::None => {
let mut buf = Vec::<u8>::with_capacity(
Expand All @@ -322,10 +322,10 @@ impl Response {
crate::push_unchecked!(buf <- self.status.line());
self.headers.write_unchecked_to(&mut buf);
}
conn.write_all(&buf).await.expect("Failed to send response");
conn.flush().await.expect("Failed to flush connection");
conn.write_all(&buf).await?;
conn.flush().await?;

Upgrade::None
Ok(Upgrade::None)
}

Content::Payload(bytes) => {
Expand All @@ -338,10 +338,10 @@ impl Response {
self.headers.write_unchecked_to(&mut buf);
crate::push_unchecked!(buf <- bytes);
}
conn.write_all(&buf).await.expect("Failed to send response");
conn.flush().await.expect("Failed to flush connection");
conn.write_all(&buf).await?;
conn.flush().await?;

Upgrade::None
Ok(Upgrade::None)
}

#[cfg(feature="sse")]
Expand All @@ -353,8 +353,8 @@ impl Response {
crate::push_unchecked!(buf <- self.status.line());
self.headers.write_unchecked_to(&mut buf);
}
conn.write_all(&buf).await.expect("Failed to send response");
conn.flush().await.expect("Failed to flush connection");
conn.write_all(&buf).await?;
conn.flush().await?;

while let Some(chunk) = stream.next().await {
let mut message = Vec::with_capacity(
Expand All @@ -378,13 +378,13 @@ impl Response {
#[cfg(feature="DEBUG")]
println!("\n[sending chunk]\n{}", chunk.escape_ascii());

conn.write_all(&chunk).await.expect("Failed to send response");
conn.flush().await.expect("Failed to flush connection");
conn.write_all(&chunk).await?;
conn.flush().await?;
}
conn.write_all(b"0\r\n\r\n").await.expect("Failed to send response");
conn.flush().await.expect("Failed to flush connection");
conn.write_all(b"0\r\n\r\n").await?;
conn.flush().await?;

Upgrade::None
Ok(Upgrade::None)
}

#[cfg(all(feature="ws", feature="__rt_native__"))]
Expand All @@ -396,10 +396,10 @@ impl Response {
crate::push_unchecked!(buf <- self.status.line());
self.headers.write_unchecked_to(&mut buf);
}
conn.write_all(&buf).await.expect("Failed to send response");
conn.flush().await.expect("Failed to flush connection");
conn.write_all(&buf).await?;
conn.flush().await?;

Upgrade::WebSocket(ws)
Ok(Upgrade::WebSocket(ws))
}
}
}
Expand Down
30 changes: 26 additions & 4 deletions ohkami/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ impl<C: Connection> Session<C> {
}

pub(crate) async fn manage(mut self) {
#[cold] #[inline(never)]
#[cold]
#[inline(never)]
fn panicking(panic: Box<dyn Any + Send>) -> Response {
if let Some(msg) = panic.downcast_ref::<String>() {
crate::WARNING!("[Panicked]: {msg}");
Expand All @@ -52,6 +53,17 @@ impl<C: Connection> Session<C> {
crate::Response::InternalServerError()
}

#[cold]
#[inline(never)]
fn handle_send_failure(error: std::io::Error) {
use std::io::ErrorKind::*;
if matches!(error.kind(), BrokenPipe | ConnectionReset | ConnectionAborted) {
crate::WARNING!("Client disconnected before response could be sent: {error}");
} else {
crate::ERROR!("Failed to send response to the client: {error}");
}
}

let mut req = Request::init(self.ip);
let mut req = unsafe { Pin::new_unchecked(&mut req) };
let upgrade = loop {
Expand Down Expand Up @@ -83,7 +95,13 @@ impl<C: Connection> Session<C> {
Ok(future) => future.await,
Err(panic) => panicking(panic),
};
let upgrade = res.send(&mut self.connection).await;
let upgrade = match res.send(&mut self.connection).await {
Ok(upgrade) => upgrade,
Err(e) => {
handle_send_failure(e);
break Upgrade::None;
}
};

if !upgrade.is_none() {
break upgrade;
Expand All @@ -94,7 +112,11 @@ impl<C: Connection> Session<C> {
},
Ok(None) => break Upgrade::None,
Err(res) => {
res.send(&mut self.connection).await;
if let Err(e) = res.send(&mut self.connection).await {
handle_send_failure(e);
break Upgrade::None;
}
// here response was sent, so assuming just request was malformed and we can continue
continue;
},
}
Expand Down Expand Up @@ -134,4 +156,4 @@ impl<C: Connection> Session<C> {
},
}
}
}
}
12 changes: 12 additions & 0 deletions samples/issue_459/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "issue_459"
version = "0.1.0"
edition = "2021"

[dependencies]
# set `default-features = false` to assure "DEBUG" feature be off even when DEBUGing `../ohkami`
ohkami = { path = "../../ohkami", default-features = false, features = ["rt_tokio"] }
tokio = { version = "1", features = ["full"] }

[features]
DEBUG = ["ohkami/DEBUG"]
11 changes: 11 additions & 0 deletions samples/issue_459/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# A minimal sample to reproduce issue #459

To test [issue #459](https://github.com/ohkami-rs/ohkami/issues/459), run `cargo run`, and in another terminal:

```sh
timeout -sKILL 0.01 curl localhost:5000
```

This makes server panic, and may lead to `process didn't exit successfully`, as for v0.23.3.

v0.23.4 fixes the behavior to safely print warnings.
15 changes: 15 additions & 0 deletions samples/issue_459/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use ohkami::prelude::*;

async fn large_response() -> String {
(1..=100000)
.map(|i| format!("This is line #{i}\n"))
.collect::<String>()
}

#[tokio::main]
async fn main() {
Ohkami::new((
// route("/").get(large_response),
"/".GET(large_response),
)).howl("localhost:3000").await
}