Skip to content

[BUG] In streaming mode, OpenAiApi cannot handle FinishReason. LENGTH) when there're too many toolcalls or parameters are too large to have exceeded the token limit. #5379

@chickenlj

Description

@chickenlj

In streaming mode, sometimes when I call chatmodel.stream(...), I will encounter issue where ToolCall arguments are illegal. There's no problem with chatmodel.call(...) non-streaming mode though, it's only for the streaming mode.

In the case where AssistantMessage returns a ToolCall list, sometimes the model may return FinishReason.LENGTH during the output of ToolCall (meaning that the output Token exceeds the limit), which the OpenAiApi implementation did not consider and is compatible with.

Below are some modifications I have made on OpenAiApi that can basically make this situation work.

If necessary, I can further polish the changes and submit a pull request.

OpenAiApi.java

	/**
	 * Creates a streaming chat response for the given chat conversation.
	 * @param chatRequest The chat completion request. Must have the stream property set
	 * to true.
	 * @param additionalHttpHeader Optional, additional HTTP headers to be added to the
	 * request.
	 * @return Returns a {@link Flux} stream from chat completion chunks.
	 */
	public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chatRequest,
			MultiValueMap<String, String> additionalHttpHeader) {

		Assert.notNull(chatRequest, REQUEST_BODY_NULL_MESSAGE);
		Assert.isTrue(chatRequest.stream(), "Request must set the stream property to true.");

		AtomicBoolean isInsideTool = new AtomicBoolean(false);

		// @formatter:off
		return this.webClient.post()
				.uri(this.completionsPath)
				.headers(headers -> {
					headers.addAll(additionalHttpHeader);
					addDefaultHeadersIfMissing(headers);
				}) // @formatter:on
				.bodyValue(chatRequest)
				.retrieve()
				.bodyToFlux(String.class)
				// cancels the flux stream after the "[DONE]" is received.
				.takeUntil(SSE_DONE_PREDICATE)
				// filters out the "[DONE]" message.
				.filter(SSE_DONE_PREDICATE.negate())
				.map(content -> ModelOptionsUtils.jsonToObject(content, ChatCompletionChunk.class))
				// Detect is the chunk is part of a streaming function call.
				.map(chunk -> {
					if (this.chunkMerger.isStreamingToolFunctionCall(chunk)) {
						isInsideTool.set(true);
					}
					return chunk;
				})
				// Group all chunks belonging to the same function call.
				// Flux<ChatCompletionChunk> -> Flux<Flux<ChatCompletionChunk>>
				.windowUntil(chunk -> {
					if (isInsideTool.get() && this.chunkMerger.isStreamingToolFunctionCallFinish(chunk)) {
						isInsideTool.set(false);
						return true;
					}
					return !isInsideTool.get();
				})
				// Merging the window chunks into a single chunk.
				// Reduce the inner Flux<ChatCompletionChunk> window into a single
				// Mono<ChatCompletionChunk>,
				// Flux<Flux<ChatCompletionChunk>> -> Flux<Mono<ChatCompletionChunk>>
				.concatMapIterable(window -> {
					Mono<ChatCompletionChunk> monoChunk = window
							// modified by liufy
							.filter(chunk -> chunk != null && !ObjectUtils.isEmpty(chunk.choices))
							.reduce(new ChatCompletionChunk(null, null, null, null, null, null, null, null),
									(previous, current) -> this.chunkMerger.merge(previous, current))
							.flatMap(mergedChunk -> {
								if (mergedChunk.choices() != null && !mergedChunk.choices().isEmpty()) {
									var choice = mergedChunk.choices().get(0);

									if (choice.finishReason() == ChatCompletionFinishReason.LENGTH
											&& choice.delta() != null
											&& !CollectionUtils.isEmpty(choice.delta().toolCalls())) {

										// Get the last unvalid ToolCall
										ChatCompletionChunk cleanedChunk =
												this.chunkMerger.removeInvalidLastToolCall(mergedChunk);

										if (cleanedChunk == null) {
											return Mono.empty();
										}

										return Mono.just(cleanedChunk);
									}
								}
								return Mono.just(mergedChunk);
							});
					return List.of(monoChunk);
				})
				// Flux<Mono<ChatCompletionChunk>> -> Flux<ChatCompletionChunk>
				.flatMap(mono -> mono);
	}

OpenAiStreamFunctionCallingHelper.java

public class OpenAiStreamFunctionCallingHelper {

	/**
	 * Merge the previous and current ChatCompletionChunk into a single one.
	 * @param previous the previous ChatCompletionChunk
	 * @param current the current ChatCompletionChunk
	 * @return the merged ChatCompletionChunk
	 */
	public ChatCompletionChunk merge(ChatCompletionChunk previous, ChatCompletionChunk current) {

		if (previous == null) {
			return current;
		}

		if (current == null) {
			return previous;
		}

		String id = (current.id() != null ? current.id() : previous.id());
		Long created = (current.created() != null ? current.created() : previous.created());
		String model = (current.model() != null ? current.model() : previous.model());
		String serviceTier = (current.serviceTier() != null ? current.serviceTier() : previous.serviceTier());
		String systemFingerprint = (current.systemFingerprint() != null ? current.systemFingerprint()
				: previous.systemFingerprint());
		String object = (current.object() != null ? current.object() : previous.object());
		Usage usage = (current.usage() != null ? current.usage() : previous.usage());

		ChunkChoice previousChoice0 = (CollectionUtils.isEmpty(previous.choices()) ? null : previous.choices().get(0));
		ChunkChoice currentChoice0 = (CollectionUtils.isEmpty(current.choices()) ? null : current.choices().get(0));

		ChunkChoice choice = merge(previousChoice0, currentChoice0);
		List<ChunkChoice> chunkChoices = choice == null ? List.of() : List.of(choice);
		return new ChatCompletionChunk(id, chunkChoices, created, model, serviceTier, systemFingerprint, object, usage);
	}

	private ChunkChoice merge(ChunkChoice previous, ChunkChoice current) {
		if (previous == null) {
			return current;
		}

		if (current == null) {
			return previous;
		}

		ChatCompletionFinishReason finishReason = (current.finishReason() != null ? current.finishReason()
				: previous.finishReason());
		Integer index = (current.index() != null ? current.index() : previous.index());

		ChatCompletionMessage message = merge(previous.delta(), current.delta());

		LogProbs logprobs = (current.logprobs() != null ? current.logprobs() : previous.logprobs());
		return new ChunkChoice(finishReason, index, message, logprobs);
	}

	private ChatCompletionMessage merge(ChatCompletionMessage previous, ChatCompletionMessage current) {
		// modified by liufy
		if (current == null) {
			return previous;
		}

		String content = (current.content() != null ? current.content()
				: "" + ((previous.content() != null) ? previous.content() : ""));
		String reasoningContent = (current.reasoningContent() != null ? current.reasoningContent()
				: "" + ((previous.reasoningContent() != null) ? previous.reasoningContent() : ""));
		Role role = (current.role() != null ? current.role() : previous.role());
		role = (role != null ? role : Role.ASSISTANT); // default to ASSISTANT (if null
		String name = (current.name() != null ? current.name() : previous.name());
		String toolCallId = (current.toolCallId() != null ? current.toolCallId() : previous.toolCallId());
		String refusal = (current.refusal() != null ? current.refusal() : previous.refusal());
		ChatCompletionMessage.AudioOutput audioOutput = (current.audioOutput() != null ? current.audioOutput()
				: previous.audioOutput());
		List<ChatCompletionMessage.Annotation> annotations = (current.annotations() != null ? current.annotations()
				: previous.annotations());

		List<ToolCall> toolCalls = new ArrayList<>();
		ToolCall lastPreviousTooCall = null;
		if (previous.toolCalls() != null && !previous.toolCalls().isEmpty()) {
			lastPreviousTooCall = previous.toolCalls().get(previous.toolCalls().size() - 1);
			if (previous.toolCalls().size() > 1) {
				toolCalls.addAll(previous.toolCalls().subList(0, previous.toolCalls().size() - 1));
			}
		}
		if (current.toolCalls() != null && !current.toolCalls().isEmpty()) {
			if (current.toolCalls().size() > 1) {
				throw new IllegalStateException("Currently only one tool call is supported per message!");
			}
			var currentToolCall = current.toolCalls().iterator().next();
			if (StringUtils.hasText(currentToolCall.id())) {
				if (lastPreviousTooCall != null) {
					toolCalls.add(lastPreviousTooCall);
				}
				toolCalls.add(currentToolCall);
			}
			else {
				toolCalls.add(merge(lastPreviousTooCall, currentToolCall));
			}
		}
		else {
			if (lastPreviousTooCall != null) {
				toolCalls.add(lastPreviousTooCall);
			}
		}
		return new ChatCompletionMessage(content, role, name, toolCallId, toolCalls, refusal, audioOutput, annotations,
				reasoningContent);
	}

	private ToolCall merge(ToolCall previous, ToolCall current) {
		if (previous == null) {
			return current;
		}
		String id = (StringUtils.hasText(current.id()) ? current.id() : previous.id());
		String type = (current.type() != null ? current.type() : previous.type());
		ChatCompletionFunction function = merge(previous.function(), current.function());
		return new ToolCall(id, type, function);
	}

	private ChatCompletionFunction merge(ChatCompletionFunction previous, ChatCompletionFunction current) {
		if (previous == null) {
			return current;
		}
		String name = (StringUtils.hasText(current.name()) ? current.name() : previous.name());
		StringBuilder arguments = new StringBuilder();
		if (previous.arguments() != null) {
			arguments.append(previous.arguments());
		}
		if (current.arguments() != null) {
			arguments.append(current.arguments());
		}
		return new ChatCompletionFunction(name, arguments.toString());
	}

	/**
	 * @param chatCompletion the ChatCompletionChunk to check
	 * @return true if the ChatCompletionChunk is a streaming tool function call.
	 */
	public boolean isStreamingToolFunctionCall(ChatCompletionChunk chatCompletion) {

		if (chatCompletion == null || CollectionUtils.isEmpty(chatCompletion.choices())) {
			return false;
		}

		var choice = chatCompletion.choices().get(0);
		if (choice == null || choice.delta() == null) {
			return false;
		}
		return !CollectionUtils.isEmpty(choice.delta().toolCalls());
	}

	/**
	 * @param chatCompletion the ChatCompletionChunk to check
	 * @return true if the ChatCompletionChunk is a streaming tool function call and it is
	 * the last one.
	 */
	public boolean isStreamingToolFunctionCallFinish(ChatCompletionChunk chatCompletion) {

		if (chatCompletion == null || CollectionUtils.isEmpty(chatCompletion.choices())) {
			return false;
		}

		var choice = chatCompletion.choices().get(0);
		if (choice == null || choice.delta() == null) {
			return false;
		}
		return choice.finishReason() == ChatCompletionFinishReason.TOOL_CALLS;
	}

	/**
	 * Convert the ChatCompletionChunk into a ChatCompletion. The Usage is set to null.
	 * @param chunk the ChatCompletionChunk to convert
	 * @return the ChatCompletion
	 */
	public ChatCompletion chunkToChatCompletion(ChatCompletionChunk chunk) {
		List<Choice> choices = chunk.choices()
				.stream()
				.map(chunkChoice -> new Choice(chunkChoice.finishReason(), chunkChoice.index(), chunkChoice.delta(),
						chunkChoice.logprobs()))
				.toList();

		return new OpenAiApi.ChatCompletion(chunk.id(), choices, chunk.created(), chunk.model(), chunk.serviceTier(),
				chunk.systemFingerprint(), "chat.completion", null);
	}

	public ChatCompletionChunk removeInvalidLastToolCall(ChatCompletionChunk chunk) {
		if (chunk == null || CollectionUtils.isEmpty(chunk.choices())) {
			return chunk;
		}

		var choice = chunk.choices().get(0);
		if (choice == null || choice.delta() == null
				|| CollectionUtils.isEmpty(choice.delta().toolCalls())) {
			return chunk;
		}

		List<ToolCall> toolCalls = new ArrayList<>(choice.delta().toolCalls());
		if (toolCalls.isEmpty()) {
			return chunk;
		}

		// 检查最后一个工具调用
		ToolCall lastToolCall = toolCalls.get(toolCalls.size() - 1);
		if (lastToolCall.function() != null && lastToolCall.function().arguments() != null) {
			if (!isValidJson(lastToolCall.function().arguments())) {
				// 移除最后一个无效的工具调用
				toolCalls.remove(toolCalls.size() - 1);

				if (toolCalls.isEmpty()) {
					return null; // 返回 null 表示整个 chunk 无效
				}

				// 构建新的 chunk
				ChatCompletionMessage newDelta = new ChatCompletionMessage(
						choice.delta().content(),
						choice.delta().role(),
						choice.delta().name(),
						choice.delta().toolCallId(),
						toolCalls,
						choice.delta().refusal(),
						choice.delta().audioOutput(),
						choice.delta().annotations(),
						choice.delta().reasoningContent()
				);

				ChunkChoice newChoice = new ChunkChoice(
						ChatCompletionFinishReason.TOOL_CALLS,
						choice.index(),
						newDelta,
						choice.logprobs()
				);

				return new ChatCompletionChunk(
						chunk.id(),
						List.of(newChoice),
						chunk.created(),
						chunk.model(),
						chunk.serviceTier(),
						chunk.systemFingerprint(),
						chunk.object(),
						chunk.usage()
				);
			}
		}

		return chunk;
	}

	private final ObjectMapper objectMapper = new ObjectMapper();

	private boolean isValidJson(String json) {
		if (json == null || json.trim().isEmpty()) {
			return false;
		}
		try {
			objectMapper.readTree(json);
			return true;
		} catch (Exception e) {
			return false;
		}
	}


}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions