Skip to content

[Go] Streaming breaks structured JSON output parsing #3569

@stringl1l1l1l

Description

@stringl1l1l1l

Describe the bug

When using Firebase Genkit defined prompt with ai.WithOutputType option for structured output, then Execute with ai.WithStreaming option, it causes "unexpected end of JSON input" errors. This occurs because the streaming implementation in compat_oai and googlegenai plugin fragments the response into multiple ai.Part objects instead of accumulating the complete text content, making it impossible to parse structured JSON outputs.

Error Details

model failed to generate output matching expected schema: data is not valid JSON: unexpected end of JSON input.

The streaming works correctly for displaying tokens, but resp.Output(&structuredResponse) fails because the response content is fragmented across multiple Parts.

Root Cause

Take compat_oai for example, in go/plugins/compat_oai/generate.go, the generateStream function appends each streaming chunk as a separate ai.Part:

// Problematic code that creates multiple Parts
fullResponse.Message.Content = append(fullResponse.Message.Content, modelChunk.Content...)

When parsing structured output with resp.Output(&response), the JSON parser expects complete content but receives fragmented Parts, resulting in parsing failures.

To Reproduce

Create a minimal example usinggooglegenai provider, same for compat_oai:

package main

import (
	"context"
	"fmt"

	"github.com/firebase/genkit/go/ai"
	"github.com/firebase/genkit/go/genkit"
	"github.com/firebase/genkit/go/plugins/googlegenai"
)

type UserInput struct {
	Input string `json:"input"`
}
type StructuredOutput struct {
	Content string `json:"content"`
}

func main() {
	ctx := context.Background()
	g := genkit.Init(ctx, genkit.WithPlugins(&googlegenai.GoogleAI{
		APIKey: "YOUR_API_KEY",
	}))

	prompt := genkit.DefinePrompt(g, "test",
		ai.WithSystem("You are a helpful assistant that always responds in JSON format."),
		ai.WithInputType(UserInput{}),
		ai.WithOutputType(StructuredOutput{}),
		ai.WithPrompt("input: {{input}}"),
		ai.WithModelName("googleai/gemini-2.5-pro"),
	)

	// This fails with streaming enabled
	resp, err := prompt.Execute(ctx,
		ai.WithInput(UserInput{Input: "What's the meaning of life?"}),
		ai.WithStreaming(func(ctx context.Context, chunk *ai.ModelResponseChunk) error {
			fmt.Print(chunk.Text())
			return nil
		}),
	)

	if err != nil {
		panic(err)
	}
	var output StructuredOutput
	err = resp.Output(&output) // This will fail: "unexpected end of JSON input". 
	if err != nil {
		panic(err)
	}

	fmt.Printf("Parsed output: %+v\n", output)
}

Expected behavior

The structured output parsing should work correctly even with streaming enabled. The streaming should display tokens in real-time while still maintaining the ability to parse the complete response into structured formats.

Runtime Information

  • OS: macOS
  • Go Version: 1.25.0
  • Genkit Version: v1.0.2
  • Plugin: compat_oai (OpenAI-compatible providers)

Solution

This patch fixes the issue by accumulating streaming content into a single consolidated text Part instead of creating multiple fragmented Parts:

diff --git a/go/plugins/compat_oai/generate.go b/go/plugins/compat_oai/generate.go
index 5146e6ee..051fcb21 100644
--- a/go/plugins/compat_oai/generate.go
+++ b/go/plugins/compat_oai/generate.go
@@ -18,6 +18,7 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"strings"
 
 	"github.com/firebase/genkit/go/ai"
 	"github.com/openai/openai-go"
@@ -268,6 +269,8 @@ func (g *ModelGenerator) generateStream(ctx context.Context, handleChunk func(co
 	var currentToolCall *ai.ToolRequest
 	var currentArguments string
 
+	var accumulatedText strings.Builder
+
 	for stream.Next() {
 		chunk := stream.Current()
 		if len(chunk.Choices) > 0 {
@@ -320,7 +323,7 @@ func (g *ModelGenerator) generateStream(ctx context.Context, handleChunk func(co
 				return nil, fmt.Errorf("callback error: %w", err)
 			}
 
-			fullResponse.Message.Content = append(fullResponse.Message.Content, modelChunk.Content...)
+			accumulatedText.WriteString(content)
 
 			// Update Usage
 			fullResponse.Usage.InputTokens += int(chunk.Usage.PromptTokens)
@@ -333,6 +336,9 @@ func (g *ModelGenerator) generateStream(ctx context.Context, handleChunk func(co
 		return nil, fmt.Errorf("stream error: %w", err)
 	}
 
+	fullResponse.Message.Content = []*ai.Part{
+		ai.NewTextPart(accumulatedText.String()),
+	}
 	return &fullResponse, nil
 }

Additional Context

This issue affects any use case that combines:

  1. Streaming output (ai.WithStreaming)
  2. Structured response parsing (resp.Output(&struct))
  3. OpenAI-compatible providers via compat_oai plugin

If more changes are needed for the googlegenai plugin, I’ll submit a PR with all of them. Moreover, I need some tests to ensure no additional bugs are introduced.

Metadata

Metadata

Assignees

Labels

bugSomething isn't workinggo

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions