Skip to content

Commit ed6112a

Browse files
committed
fix: emit tool-input-end eagerly for sequential tool calls (#488)
Previously, tool-input-end and tool-call events were batched in flush() at stream close, making all tool calls appear simultaneously "in progress." Now each tool call is closed as soon as the next one starts or a null finalization chunk arrives. Closes #488 Made-with: Cursor
1 parent 68aaf47 commit ed6112a

3 files changed

Lines changed: 915 additions & 19 deletions

File tree

.changeset/eager-tool-input-end.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"workers-ai-provider": patch
3+
---
4+
5+
Emit `tool-input-end` and `tool-call` events eagerly when streaming multiple tool calls, instead of deferring all of them to stream close. Previously, all tool calls appeared "in progress" simultaneously because `tool-input-end` was only emitted in `flush()`. Now each tool call is closed as soon as the next one starts or a null finalization chunk is received, matching the behavior of other AI SDK providers.

packages/workers-ai-provider/src/streaming.ts

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,11 @@ export function getMappedStream(
9393
// Track tool call streaming state per index.
9494
// When we see the first chunk for a tool call index, we emit tool-input-start.
9595
// Subsequent argument deltas emit tool-input-delta.
96-
// All open tool calls are closed with tool-input-end in flush().
96+
// tool-input-end is emitted eagerly when a new tool index starts or a null
97+
// finalization chunk arrives; any remaining open calls are closed in flush().
9798
const activeToolCalls = new Map<number, { id: string; toolName: string; args: string }>();
99+
const closedToolCalls = new Set<number>();
100+
let lastActiveToolIndex: number | null = null;
98101

99102
// Step 1: Decode bytes into SSE lines
100103
const sseStream = rawStream.pipeThrough(new SSEDecoder());
@@ -224,18 +227,10 @@ export function getMappedStream(
224227
},
225228

226229
flush(controller) {
227-
// Close all open tool call inputs and emit complete tool-call events
228-
for (const [, tc] of activeToolCalls) {
229-
controller.enqueue({ type: "tool-input-end", id: tc.id });
230-
// Emit the complete tool-call event — the AI SDK expects both
231-
// incremental tool-input-* events AND a final tool-call event,
232-
// matching how @ai-sdk/openai-compatible works.
233-
controller.enqueue({
234-
type: "tool-call",
235-
toolCallId: tc.id,
236-
toolName: tc.toolName,
237-
input: tc.args,
238-
});
230+
// Close any tool calls that weren't already closed during streaming
231+
for (const [idx] of activeToolCalls) {
232+
if (closedToolCalls.has(idx)) continue;
233+
closeToolCall(idx, controller);
239234
}
240235

241236
// Close open text/reasoning blocks
@@ -264,24 +259,51 @@ export function getMappedStream(
264259
}),
265260
);
266261

262+
/**
263+
* Emit tool-input-end + tool-call for a tool call that is complete.
264+
*/
265+
function closeToolCall(
266+
index: number,
267+
controller: TransformStreamDefaultController<LanguageModelV3StreamPart>,
268+
) {
269+
const tc = activeToolCalls.get(index);
270+
if (!tc || closedToolCalls.has(index)) return;
271+
closedToolCalls.add(index);
272+
controller.enqueue({ type: "tool-input-end", id: tc.id });
273+
controller.enqueue({
274+
type: "tool-call",
275+
toolCallId: tc.id,
276+
toolName: tc.toolName,
277+
input: tc.args,
278+
});
279+
}
280+
267281
/**
268282
* Emit incremental tool call events from streaming chunks.
269283
*
270284
* Workers AI streams tool calls as:
271285
* Chunk A: { id, type, index, function: { name } } — start
272286
* Chunk B: { index, function: { arguments: "partial..." } } — args delta
273287
* Chunk C: { index, function: { arguments: "rest..." } } — args delta
274-
* Chunk D: { id: null, type: null, function: { name: null } } — finalize (skip)
288+
* Chunk D: { id: null, type: null, function: { name: null } } — finalize
275289
*
276290
* We emit tool-input-start on first sight, tool-input-delta for each
277-
* argument chunk, and tool-input-end in flush().
291+
* argument chunk, and tool-input-end eagerly — either when a new tool
292+
* index starts (closing the previous one) or on a null finalization
293+
* chunk. Any remaining open calls are closed in flush().
278294
*/
279295
function emitToolCallDeltas(
280296
toolCalls: Record<string, unknown>[],
281297
controller: TransformStreamDefaultController<LanguageModelV3StreamPart>,
282298
) {
283299
for (const tc of toolCalls) {
284-
if (isNullFinalizationChunk(tc)) continue;
300+
if (isNullFinalizationChunk(tc)) {
301+
// Null finalization sentinel — close the last active tool call
302+
if (lastActiveToolIndex != null) {
303+
closeToolCall(lastActiveToolIndex, controller);
304+
}
305+
continue;
306+
}
285307

286308
const tcIndex = (tc.index as number) ?? 0;
287309
const fn = tc.function as Record<string, unknown> | undefined;
@@ -290,18 +312,22 @@ export function getMappedStream(
290312
const tcId = tc.id as string | null;
291313

292314
if (!activeToolCalls.has(tcIndex)) {
293-
// First chunk for this tool call — emit tool-input-start
315+
// A new tool call is starting — close the previous one first
316+
if (lastActiveToolIndex != null && lastActiveToolIndex !== tcIndex) {
317+
closeToolCall(lastActiveToolIndex, controller);
318+
}
319+
294320
const id = tcId || generateId();
295321
const toolName = tcName || "";
296322
activeToolCalls.set(tcIndex, { id, toolName, args: "" });
323+
lastActiveToolIndex = tcIndex;
297324

298325
controller.enqueue({
299326
type: "tool-input-start",
300327
id,
301328
toolName,
302329
});
303330

304-
// If arguments arrived in the same chunk as the start, emit them
305331
if (tcArgs != null && tcArgs !== "") {
306332
const delta = typeof tcArgs === "string" ? tcArgs : JSON.stringify(tcArgs);
307333
activeToolCalls.get(tcIndex)!.args += delta;
@@ -312,8 +338,8 @@ export function getMappedStream(
312338
});
313339
}
314340
} else {
315-
// Subsequent chunks — emit argument deltas
316341
const active = activeToolCalls.get(tcIndex)!;
342+
lastActiveToolIndex = tcIndex;
317343
if (tcArgs != null && tcArgs !== "") {
318344
const delta = typeof tcArgs === "string" ? tcArgs : JSON.stringify(tcArgs);
319345
active.args += delta;

0 commit comments

Comments
 (0)