diff --git a/.changeset/pretty-donuts-fetch.md b/.changeset/pretty-donuts-fetch.md new file mode 100644 index 00000000..4daa50fa --- /dev/null +++ b/.changeset/pretty-donuts-fetch.md @@ -0,0 +1,5 @@ +--- +"@vercel/sandbox": patch +--- + +Handle abort signal and early stream close in runCommand to avoid misleading Zod error diff --git a/packages/vercel-sandbox/src/api-client/api-client.test.ts b/packages/vercel-sandbox/src/api-client/api-client.test.ts index ff227f42..5d08d9ec 100644 --- a/packages/vercel-sandbox/src/api-client/api-client.test.ts +++ b/packages/vercel-sandbox/src/api-client/api-client.test.ts @@ -2,6 +2,7 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import { APIClient } from "./api-client.js"; import { APIError, StreamError } from "./api-error.js"; import { createNdjsonStream } from "../../test-utils/mock-response.js"; +import { z } from "zod"; describe("APIClient", () => { describe("getLogs", () => { @@ -274,6 +275,127 @@ describe("APIClient", () => { }); } }); + + it("throws abort error (not Zod error) when signal aborts before stream finishes", async () => { + const commandData = { + command: { + id: "cmd_123", + name: "python3", + args: ["script.py"], + cwd: "/", + sandboxId: "sbx_123", + exitCode: null, + startedAt: 1, + }, + }; + + const encoder = new TextEncoder(); + const firstChunk = JSON.stringify(commandData) + "\n"; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(firstChunk)); + }, + cancel() {}, + }); + + mockFetch.mockResolvedValue( + new Response(stream, { + headers: { "content-type": "application/x-ndjson" }, + }), + ); + + const controller = new AbortController(); + const result = await client.runCommand({ + sandboxId: "sbx_123", + command: "python3", + args: ["script.py"], + env: {}, + sudo: false, + wait: true, + signal: controller.signal, + }); + + expect(result.command.id).toBe("cmd_123"); + + controller.abort(); + + await expect(result.finished).rejects.toThrow(); + await expect(result.finished).rejects.not.toBeInstanceOf(z.ZodError); + }, 10000); + + it("throws StreamError when stream closes before finished chunk arrives", async () => { + const commandData = { + command: { + id: "cmd_123", + name: "python3", + args: ["script.py"], + cwd: "/", + sandboxId: "sbx_123", + exitCode: null, + startedAt: 1, + }, + }; + + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode(JSON.stringify(commandData) + "\n"), + ); + controller.close(); + }, + }); + + mockFetch.mockResolvedValue( + new Response(stream, { + headers: { "content-type": "application/x-ndjson" }, + }), + ); + + const result = await client.runCommand({ + sandboxId: "sbx_123", + command: "python3", + args: ["script.py"], + env: {}, + sudo: false, + wait: true, + }); + + expect(result.command.id).toBe("cmd_123"); + + await expect(result.finished).rejects.toThrow( + "Stream ended before command finished", + ); + await expect(result.finished).rejects.toBeInstanceOf(StreamError); + }); + + it("rejects when signal is already aborted before stream starts", async () => { + const stream = new ReadableStream({ + start() {}, + cancel() {}, + }); + + mockFetch.mockResolvedValue( + new Response(stream, { + headers: { "content-type": "application/x-ndjson" }, + }), + ); + + const controller = new AbortController(); + controller.abort(); + + await expect( + client.runCommand({ + sandboxId: "sbx_123", + command: "python3", + args: ["script.py"], + env: {}, + sudo: false, + wait: true, + signal: controller.signal, + }), + ).rejects.toThrow(); + }); }); describe("stopSandbox", () => { diff --git a/packages/vercel-sandbox/src/api-client/api-client.ts b/packages/vercel-sandbox/src/api-client/api-client.ts index 1f226a3e..d1b0a4e5 100644 --- a/packages/vercel-sandbox/src/api-client/api-client.ts +++ b/packages/vercel-sandbox/src/api-client/api-client.ts @@ -263,16 +263,32 @@ export class APIClient extends BaseClient { } const jsonlinesStream = jsonlines.parse(); - pipe(response.body, jsonlinesStream).catch((err) => { - console.error("Error piping command stream:", err); - }); + pipe(response.body, jsonlinesStream, { signal: params.signal }).catch( + (err) => { + console.error("Error piping command stream:", err); + }, + ); const iterator = jsonlinesStream[Symbol.asyncIterator](); const commandChunk = await iterator.next(); + if (commandChunk.done) { + throw new StreamError( + "stream_ended_early", + "Stream ended before command data was received", + params.sandboxId, + ); + } const { command } = CommandResponse.parse(commandChunk.value); const finished = (async () => { const finishedChunk = await iterator.next(); + if (finishedChunk.done) { + throw new StreamError( + "stream_ended_early", + "Stream ended before command finished", + params.sandboxId, + ); + } const { command } = CommandFinishedResponse.parse(finishedChunk.value); return command; })(); @@ -577,7 +593,7 @@ export class APIClient extends BaseClient { } const jsonlinesStream = jsonlines.parse(); - pipe(response.body, jsonlinesStream).catch((err) => { + pipe(response.body, jsonlinesStream, { signal }).catch((err) => { console.error("Error piping logs:", err); }); @@ -711,8 +727,38 @@ export class APIClient extends BaseClient { async function pipe( readable: ReadableStream, output: NodeJS.WritableStream, + options?: { signal?: AbortSignal }, ) { const reader = readable.getReader(); + let aborted = false; + + const signal = options?.signal; + const onAbort = () => { + aborted = true; + const reason = + signal?.reason ?? + new DOMException("The operation was aborted.", "AbortError"); + void reader.cancel(reason).catch(() => { + // ignore cancel errors when aborting + }); + + if ("destroy" in output && typeof output.destroy === "function") { + output.destroy(reason as Error); + return; + } + + output.emit("error", reason); + output.end(); + }; + + if (signal) { + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener("abort", onAbort, { once: true }); + } + } + try { while (true) { const read = await reader.read(); @@ -724,9 +770,14 @@ async function pipe( } } } catch (err) { - output.emit("error", err); + if (!aborted) { + output.emit("error", err); + } } finally { - output.end(); + signal?.removeEventListener("abort", onAbort); + if (!aborted) { + output.end(); + } } }