Skip to content

AWSNovaSonicLLMService Fails to Recover from "Invalid event bytes" due to Premature push_error #3388

@msmsriram

Description

@msmsriram

pipecat version

0.0.98

Python version

3.13.3

Operating System

windows

Issue description

When using AWSNovaSonicLLMService, requesting long responses (e.g., max_tokens=1024) frequently causes a smithy_aws_event_stream.events.InvalidEventBytesError from the underlying AWS SDK.

While the service has built-in auto-recovery logic via _wants_connection and reset_conversation(), it currently fails to recover the pipeline because it calls push_error() before attempting recovery. This causes the pipeline to propagate the ErrorFrame and terminate/cancel operations before the recovery can successfully re-establish the connection.

Reproduction steps

  1. Initialize AWSNovaSonicLLMService with a high max_tokens value (e.g., 1024).
  2. Prompt the model with a request that generates a long response.
  3. Observe the pipeline crash with InvalidEventBytesError.
import asyncio
import os
import logging
from dotenv import load_dotenv

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
from pipecat.processors.aggregators.llm_context import LLMContext

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

load_dotenv()

async def main():
    if not os.getenv("AWS_ACCESS_KEY_ID") or not os.getenv("AWS_SECRET_ACCESS_KEY"):
        logger.error("AWS credentials missing.")
        return

    # Trigger issue with high max_tokens
    llm = AWSNovaSonicLLMService(
        access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
        secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
        region=os.getenv("AWS_REGION", "us-east-1"),
        voice_id="tiffany",
        max_tokens=1024, 
    )

    context = LLMContext(
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {
                "role": "user", 
                "content": "Tell me a very long story about the history of the universe. Go into extreme detail."
            },
        ]
    )

    pipeline = Pipeline([llm])
    task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=False))
    runner = PipelineRunner()

    logger.info("🚀 Starting reproduction script...")
    try:
        await task.queue_frames([LLMMessagesFrame(context.messages)])
        await runner.run(task)
    except Exception as e:
        logger.error(f"❌ Pipeline crashed: {e}")

if __name__ == "__main__":
    asyncio.run(main())

Root Cause Analysis

In pipecat/services/aws/nova_sonic/llm.py, the _receive_task_handler catches exceptions from the stream:

# Current Implementation
except Exception as e:
    if self._disconnecting:
        return
    # ISSUE: This pushes an ErrorFrame which signals pipeline failure/cancellation
    await self.push_error(error_msg=f"Error processing responses: {e}", exception=e)
    
    # This recovery logic runs, but it's often too late because the pipeline is already halting
    if self._wants_connection:
        await self.reset_conversation()

The push_error sends an ErrorFrame downstream. In many pipeline configurations, receiving an ErrorFrame causes the runner/task to initiatialize cancellation or error handling that disrupts the seamless recovery intended by reset_conversation().

Expected behavior

Please consider modifying the error handling logic to attempt recovery before or instead of pushing a fatal error frame, specifically for known recoverable errors like "Invalid event bytes".

# Proposed Change
except Exception as e:
    if self._disconnecting:
        return
    
    # Check for known stream errors that we can recover from
    error_str = str(e)
    if "Invalid event bytes" in error_str and self._wants_connection:
        logger.warning(f"Stream error detected ({error_str}), attempting auto-recovery...")
        # Skip push_error to avoid killing the pipeline
        await self.reset_conversation()
    else:
        # For other errors, push error as normal
        await self.push_error(error_msg=f"Error processing responses: {e}", exception=e)
        if self._wants_connection:
            await self.reset_conversation()

This would allow the built-in reset_conversation() logic (which correctly preserves context and reconnects) to work seamlessly without the pipeline terminating prematurely.

Actual behavior

it is not completed full response

Logs

2026-01-09 11:24:21.605 | DEBUG    | pipecat.services.aws.nova_sonic.llm:_report_assistant_response_started:1065 - Assistant response started
INFO:bot:🟢 LLM response starting...
INFO:bot:🗣️ Agent started speaking
2026-01-09 11:24:21.609 | DEBUG    | pipecat.transports.base_output:_bot_started_speaking:604 - Bot started speaking
2026-01-09 11:24:40.415 | ERROR    | pipecat.processors.frame_processor:push_error_frame:695 - AWSNovaSonicLLMService#0 exception (C:\Users\\Desktop\nova2sonic\nova\Lib\site-packages\smithy_aws_event_stream\events.py:325): Error processing responses: Invalid event bytes.
2026-01-09 11:24:40.416 | DEBUG    | pipecat.services.aws.nova_sonic.llm:reset_conversation:347 - Resetting conversation
2026-01-09 11:24:40.417 | DEBUG    | pipecat.services.aws.nova_sonic.llm:_report_assistant_response_ended:1096 - Assistant response ended

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions