From 36bd9fc5d24bd521bc8da2f4fdd3ac293b84af79 Mon Sep 17 00:00:00 2001 From: AI-Reviewer-QS Date: Fri, 20 Feb 2026 09:33:17 +0800 Subject: [PATCH] fix: prevent pipe deadlock in shell command execution Drain stdout and stderr concurrently with child.wait() using tokio::join to prevent deadlocks when command output exceeds the OS pipe buffer (64KB on Linux, 16KB on macOS). Use AsyncReadExt::take() for memory-bounded reads and tokio::io::copy to sink for draining excess output. Add regression test that generates 128KB of output to verify the fix prevents deadlocks. --- src/tools/builtin/shell.rs | 76 +++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 17 deletions(-) diff --git a/src/tools/builtin/shell.rs b/src/tools/builtin/shell.rs index 26d6e03492..320273f685 100644 --- a/src/tools/builtin/shell.rs +++ b/src/tools/builtin/shell.rs @@ -507,25 +507,47 @@ impl ShellTool { .spawn() .map_err(|e| ToolError::ExecutionFailed(format!("Failed to spawn command: {}", e)))?; - // Wait with timeout + // Drain stdout/stderr concurrently with wait() to prevent deadlocks. + // If we call wait() without draining the pipes and the child's output + // exceeds the OS pipe buffer (64KB Linux, 16KB macOS), the child blocks + // on write and wait() never returns. + let stdout_handle = child.stdout.take(); + let stderr_handle = child.stderr.take(); + let result = tokio::time::timeout(timeout, async { - let status = child.wait().await?; - - // Read stdout - let mut stdout = String::new(); - if let Some(mut out) = child.stdout.take() { - let mut buf = vec![0u8; MAX_OUTPUT_SIZE]; - let n = out.read(&mut buf).await.unwrap_or(0); - stdout = String::from_utf8_lossy(&buf[..n]).to_string(); - } + let stdout_fut = async { + if let Some(mut out) = stdout_handle { + let mut buf = Vec::new(); + (&mut out) + .take(MAX_OUTPUT_SIZE as u64) + .read_to_end(&mut buf) + .await + .ok(); + // Drain any remaining output so the child does not block + tokio::io::copy(&mut out, &mut tokio::io::sink()).await.ok(); + String::from_utf8_lossy(&buf).to_string() + } else { + String::new() + } + }; - // Read stderr - let mut stderr = String::new(); - if let Some(mut err) = child.stderr.take() { - let mut buf = vec![0u8; MAX_OUTPUT_SIZE]; - let n = err.read(&mut buf).await.unwrap_or(0); - stderr = String::from_utf8_lossy(&buf[..n]).to_string(); - } + let stderr_fut = async { + if let Some(mut err) = stderr_handle { + let mut buf = Vec::new(); + (&mut err) + .take(MAX_OUTPUT_SIZE as u64) + .read_to_end(&mut buf) + .await + .ok(); + tokio::io::copy(&mut err, &mut tokio::io::sink()).await.ok(); + String::from_utf8_lossy(&buf).to_string() + } else { + String::new() + } + }; + + let (stdout, stderr, wait_result) = tokio::join!(stdout_fut, stderr_fut, child.wait()); + let status = wait_result?; // Combine output let output = if stderr.is_empty() { @@ -1184,6 +1206,26 @@ mod tests { ); } + #[tokio::test] + async fn test_large_output_command() { + let tool = ShellTool::new().with_timeout(Duration::from_secs(10)); + let ctx = JobContext::default(); + + // Generate output larger than OS pipe buffer (64KB on Linux, 16KB on macOS). + // Without draining pipes before wait(), this would deadlock. + let result = tool + .execute( + serde_json::json!({"command": "python3 -c \"print('A' * 131072)\""}), + &ctx, + ) + .await + .unwrap(); + + let output = result.result.get("output").unwrap().as_str().unwrap(); + assert_eq!(output.len(), MAX_OUTPUT_SIZE); + assert_eq!(result.result.get("exit_code").unwrap().as_i64().unwrap(), 0); + } + #[tokio::test] async fn test_netcat_blocked_at_execution() { let tool = ShellTool::new();