Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/pretty-donuts-fetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@vercel/sandbox": patch
---

Handle abort signal and early stream close in runCommand to avoid misleading Zod error
122 changes: 122 additions & 0 deletions packages/vercel-sandbox/src/api-client/api-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { APIClient } from "./api-client.js";
import { APIError, StreamError } from "./api-error.js";
import { createNdjsonStream } from "../../test-utils/mock-response.js";
import { z } from "zod";

describe("APIClient", () => {
describe("getLogs", () => {
Expand Down Expand Up @@ -274,6 +275,127 @@ describe("APIClient", () => {
});
}
});

it("throws abort error (not Zod error) when signal aborts before stream finishes", async () => {
const commandData = {
command: {
id: "cmd_123",
name: "python3",
args: ["script.py"],
cwd: "/",
sandboxId: "sbx_123",
exitCode: null,
startedAt: 1,
},
};

const encoder = new TextEncoder();
const firstChunk = JSON.stringify(commandData) + "\n";
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(encoder.encode(firstChunk));
},
cancel() {},
});

mockFetch.mockResolvedValue(
new Response(stream, {
headers: { "content-type": "application/x-ndjson" },
}),
);

const controller = new AbortController();
const result = await client.runCommand({
sandboxId: "sbx_123",
command: "python3",
args: ["script.py"],
env: {},
sudo: false,
wait: true,
signal: controller.signal,
});

expect(result.command.id).toBe("cmd_123");

controller.abort();

await expect(result.finished).rejects.toThrow();
await expect(result.finished).rejects.not.toBeInstanceOf(z.ZodError);
}, 10000);

it("throws StreamError when stream closes before finished chunk arrives", async () => {
const commandData = {
command: {
id: "cmd_123",
name: "python3",
args: ["script.py"],
cwd: "/",
sandboxId: "sbx_123",
exitCode: null,
startedAt: 1,
},
};

const encoder = new TextEncoder();
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(
encoder.encode(JSON.stringify(commandData) + "\n"),
);
controller.close();
},
});

mockFetch.mockResolvedValue(
new Response(stream, {
headers: { "content-type": "application/x-ndjson" },
}),
);

const result = await client.runCommand({
sandboxId: "sbx_123",
command: "python3",
args: ["script.py"],
env: {},
sudo: false,
wait: true,
});

expect(result.command.id).toBe("cmd_123");

await expect(result.finished).rejects.toThrow(
"Stream ended before command finished",
);
await expect(result.finished).rejects.toBeInstanceOf(StreamError);
});

it("rejects when signal is already aborted before stream starts", async () => {
const stream = new ReadableStream<Uint8Array>({
start() {},
cancel() {},
});

mockFetch.mockResolvedValue(
new Response(stream, {
headers: { "content-type": "application/x-ndjson" },
}),
);

const controller = new AbortController();
controller.abort();

await expect(
client.runCommand({
sandboxId: "sbx_123",
command: "python3",
args: ["script.py"],
env: {},
sudo: false,
wait: true,
signal: controller.signal,
}),
).rejects.toThrow();
});
});

describe("stopSandbox", () => {
Expand Down
63 changes: 57 additions & 6 deletions packages/vercel-sandbox/src/api-client/api-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,32 @@ export class APIClient extends BaseClient {
}

const jsonlinesStream = jsonlines.parse();
pipe(response.body, jsonlinesStream).catch((err) => {
console.error("Error piping command stream:", err);
});
pipe(response.body, jsonlinesStream, { signal: params.signal }).catch(
(err) => {
console.error("Error piping command stream:", err);
},
);

const iterator = jsonlinesStream[Symbol.asyncIterator]();
const commandChunk = await iterator.next();
if (commandChunk.done) {
throw new StreamError(
"stream_ended_early",
"Stream ended before command data was received",
params.sandboxId,
);
}
const { command } = CommandResponse.parse(commandChunk.value);

const finished = (async () => {
const finishedChunk = await iterator.next();
if (finishedChunk.done) {
throw new StreamError(
"stream_ended_early",
"Stream ended before command finished",
params.sandboxId,
);
}
const { command } = CommandFinishedResponse.parse(finishedChunk.value);
return command;
})();
Expand Down Expand Up @@ -577,7 +593,7 @@ export class APIClient extends BaseClient {
}

const jsonlinesStream = jsonlines.parse();
pipe(response.body, jsonlinesStream).catch((err) => {
pipe(response.body, jsonlinesStream, { signal }).catch((err) => {
console.error("Error piping logs:", err);
});

Expand Down Expand Up @@ -711,8 +727,38 @@ export class APIClient extends BaseClient {
async function pipe(
readable: ReadableStream<Uint8Array>,
output: NodeJS.WritableStream,
options?: { signal?: AbortSignal },
) {
const reader = readable.getReader();
let aborted = false;

const signal = options?.signal;
const onAbort = () => {
aborted = true;
const reason =
signal?.reason ??
new DOMException("The operation was aborted.", "AbortError");
void reader.cancel(reason).catch(() => {
// ignore cancel errors when aborting
});

if ("destroy" in output && typeof output.destroy === "function") {
output.destroy(reason as Error);
return;
}

output.emit("error", reason);
output.end();
};

if (signal) {
if (signal.aborted) {
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
}

try {
while (true) {
const read = await reader.read();
Expand All @@ -724,9 +770,14 @@ async function pipe(
}
}
} catch (err) {
output.emit("error", err);
if (!aborted) {
output.emit("error", err);
}
} finally {
output.end();
signal?.removeEventListener("abort", onAbort);
if (!aborted) {
output.end();
}
}
}

Expand Down
Loading