Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 59 additions & 17 deletions src/tools/builtin/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,25 +507,47 @@ impl ShellTool {
.spawn()
.map_err(|e| ToolError::ExecutionFailed(format!("Failed to spawn command: {}", e)))?;

// Wait with timeout
// Drain stdout/stderr concurrently with wait() to prevent deadlocks.
// If we call wait() without draining the pipes and the child's output
// exceeds the OS pipe buffer (64KB Linux, 16KB macOS), the child blocks
// on write and wait() never returns.
let stdout_handle = child.stdout.take();
let stderr_handle = child.stderr.take();

let result = tokio::time::timeout(timeout, async {
let status = child.wait().await?;

// Read stdout
let mut stdout = String::new();
if let Some(mut out) = child.stdout.take() {
let mut buf = vec![0u8; MAX_OUTPUT_SIZE];
let n = out.read(&mut buf).await.unwrap_or(0);
stdout = String::from_utf8_lossy(&buf[..n]).to_string();
}
let stdout_fut = async {
if let Some(mut out) = stdout_handle {
let mut buf = Vec::new();
(&mut out)
.take(MAX_OUTPUT_SIZE as u64)
.read_to_end(&mut buf)
.await
.ok();
// Drain any remaining output so the child does not block
tokio::io::copy(&mut out, &mut tokio::io::sink()).await.ok();
String::from_utf8_lossy(&buf).to_string()
} else {
String::new()
}
};

// Read stderr
let mut stderr = String::new();
if let Some(mut err) = child.stderr.take() {
let mut buf = vec![0u8; MAX_OUTPUT_SIZE];
let n = err.read(&mut buf).await.unwrap_or(0);
stderr = String::from_utf8_lossy(&buf[..n]).to_string();
}
let stderr_fut = async {
if let Some(mut err) = stderr_handle {
let mut buf = Vec::new();
(&mut err)
.take(MAX_OUTPUT_SIZE as u64)
.read_to_end(&mut buf)
.await
.ok();
tokio::io::copy(&mut err, &mut tokio::io::sink()).await.ok();
String::from_utf8_lossy(&buf).to_string()
} else {
String::new()
}
};

let (stdout, stderr, wait_result) = tokio::join!(stdout_fut, stderr_fut, child.wait());
let status = wait_result?;

// Combine output
let output = if stderr.is_empty() {
Expand Down Expand Up @@ -1184,6 +1206,26 @@ mod tests {
);
}

#[tokio::test]
async fn test_large_output_command() {
let tool = ShellTool::new().with_timeout(Duration::from_secs(10));
let ctx = JobContext::default();

// Generate output larger than OS pipe buffer (64KB on Linux, 16KB on macOS).
// Without draining pipes before wait(), this would deadlock.
let result = tool
.execute(
serde_json::json!({"command": "python3 -c \"print('A' * 131072)\""}),
&ctx,
)
.await
.unwrap();

let output = result.result.get("output").unwrap().as_str().unwrap();
assert_eq!(output.len(), MAX_OUTPUT_SIZE);
assert_eq!(result.result.get("exit_code").unwrap().as_i64().unwrap(), 0);
}

#[tokio::test]
async fn test_netcat_blocked_at_execution() {
let tool = ShellTool::new();
Expand Down
Loading