diff --git a/changelog/4140.added.md b/changelog/4140.added.md new file mode 100644 index 0000000000..dc5a2201c8 --- /dev/null +++ b/changelog/4140.added.md @@ -0,0 +1 @@ +- Added Inworld Realtime LLM service with WebSocket-based cascade STT/LLM/TTS, semantic VAD, function calling, and Router support. diff --git a/examples/realtime/realtime-inworld.py b/examples/realtime/realtime-inworld.py new file mode 100644 index 0000000000..67bed6df90 --- /dev/null +++ b/examples/realtime/realtime-inworld.py @@ -0,0 +1,162 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +""" +Inworld Realtime Example + +This example demonstrates using Inworld's Realtime API for real-time voice +conversations. The Inworld Realtime API is OpenAI-compatible and operates +as a cascade STT/LLM/TTS pipeline under the hood, with built-in semantic +voice activity detection for turn management. + +Features: +- Real-time audio streaming with low latency +- Built-in semantic VAD (voice activity detection) +- Streaming user transcription +- Text and audio input + +Requirements: + - INWORLD_API_KEY environment variable set + - pip install pipecat-ai[inworld] + +Usage: + python realtime-inworld.py --transport webrtc + python realtime-inworld.py --transport daily +""" + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.frames.frames import LLMRunFrame +from pipecat.observers.loggers.transcription_log_observer import ( + TranscriptionLogObserver, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + AssistantTurnStoppedMessage, + LLMContextAggregatorPair, + UserTurnStoppedMessage, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +# --- Transport Configuration --- + +# No local VAD needed — Inworld's server-side semantic VAD handles turn detection. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info("Starting Inworld Realtime bot") + + # Create the Inworld Realtime LLM service. + # Common params (llm_model, voice, tts_model, stt_model) are top-level. + # For full control, use settings=InworldRealtimeLLMService.Settings(session_properties=...) + # + # llm_model can be any supported model or an Inworld Router. + # See: https://docs.inworld.ai/router/introduction + llm = InworldRealtimeLLMService( + api_key=os.getenv("INWORLD_API_KEY"), + llm_model="xai/grok-4-1-fast-non-reasoning", + voice="Sarah", + settings=InworldRealtimeLLMService.Settings( + system_instruction="""You are a helpful and friendly AI assistant powered by Inworld. + +Your voice and personality should be warm and engaging. Keep your responses +concise and conversational since this is a voice interaction. + +Always be helpful and proactive in offering assistance.""", + ), + ) + + # Create context with initial message + context = LLMContext( + [{"role": "developer", "content": "Say hello and introduce yourself!"}], + ) + + user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + + # Build the pipeline + pipeline = Pipeline( + [ + transport.input(), + user_aggregator, + llm, # Inworld Realtime (handles STT + LLM + TTS) + transport.output(), + assistant_aggregator, + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + observers=[TranscriptionLogObserver()], + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected") + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + logger.info(f"Transcript: {timestamp}user: {message.content}") + + @assistant_aggregator.event_handler("on_assistant_turn_stopped") + async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + logger.info(f"Transcript: {timestamp}assistant: {message.content}") + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/pyproject.toml b/pyproject.toml index b2fef1d7bf..b56881dd62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,7 +77,7 @@ groq = [ "groq>=0.23.0,<2" ] gstreamer = [ "pygobject~=3.50.0" ] heygen = [ "livekit>=1.0.13,<2", "pipecat-ai[websockets-base]" ] hume = [ "hume>=0.11.2,<1" ] -inworld = [] +inworld = [ "pipecat-ai[websockets-base]" ] koala = [ "pvkoala~=2.0.3" ] kokoro = [ "kokoro-onnx>=0.5.0,<1", "requests>=2.32.5,<3" ] langchain = [ "langchain>=1.2.13,<2", "langchain-community>=0.4.1,<1", "langchain-openai>=1.1.12,<2" ] diff --git a/src/pipecat/adapters/services/inworld_realtime_adapter.py b/src/pipecat/adapters/services/inworld_realtime_adapter.py new file mode 100644 index 0000000000..3504c86c56 --- /dev/null +++ b/src/pipecat/adapters/services/inworld_realtime_adapter.py @@ -0,0 +1,255 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Inworld Realtime LLM adapter for Pipecat. + +Converts Pipecat's tool schemas and context into the format required by +Inworld's Realtime API. +""" + +import copy +import json +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, TypedDict + +from loguru import logger + +from pipecat.adapters.base_llm_adapter import BaseLLMAdapter +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage +from pipecat.services.inworld.realtime import events + + +class InworldRealtimeLLMInvocationParams(TypedDict): + """Context-based parameters for invoking Inworld Realtime API. + + Attributes: + system_instruction: System prompt/instructions for the session. + messages: List of conversation items formatted for Inworld Realtime. + tools: List of tool definitions. + """ + + system_instruction: Optional[str] + messages: List[events.ConversationItem] + tools: List[Dict[str, Any]] + + +class InworldRealtimeLLMAdapter(BaseLLMAdapter): + """LLM adapter for Inworld Realtime API. + + Converts Pipecat's universal context and tool schemas into the specific + format required by Inworld's Realtime API. + """ + + @property + def id_for_llm_specific_messages(self) -> str: + """Get the identifier used in LLMSpecificMessage instances for Inworld Realtime.""" + return "inworld-realtime" + + def get_llm_invocation_params( + self, context: LLMContext, *, system_instruction: Optional[str] = None + ) -> InworldRealtimeLLMInvocationParams: + """Get Inworld Realtime-specific LLM invocation parameters from a universal LLM context. + + Args: + context: The LLM context containing messages, tools, etc. + system_instruction: Optional system instruction from service settings. + + Returns: + Dictionary of parameters for invoking Inworld's Realtime API. + """ + messages = self._from_universal_context_messages(self.get_messages(context)) + effective_system = self._resolve_system_instruction( + messages.system_instruction, + system_instruction, + discard_context_system=True, + ) + return { + "system_instruction": effective_system, + "messages": messages.messages, + "tools": self.from_standard_tools(context.tools) or [], + } + + def get_messages_for_logging(self, context) -> List[Dict[str, Any]]: + """Get messages from context in a format safe for logging. + + Removes or truncates sensitive data like audio content. + + Args: + context: The LLM context containing messages. + + Returns: + List of messages with sensitive data redacted. + """ + msgs = [] + for message in self.get_messages(context): + msg = copy.deepcopy(message) + if "content" in msg: + if isinstance(msg["content"], list): + for item in msg["content"]: + if item.get("type") == "input_audio": + item["audio"] = "..." + if item.get("type") == "audio": + item["audio"] = "..." + msgs.append(msg) + return msgs + + @dataclass + class ConvertedMessages: + """Container for Inworld-formatted messages converted from universal context.""" + + messages: List[events.ConversationItem] + system_instruction: Optional[str] = None + + def _from_universal_context_messages( + self, universal_context_messages: List[LLMContextMessage] + ) -> ConvertedMessages: + """Convert universal context messages to Inworld Realtime format. + + Similar to OpenAI Realtime, we pack conversation history into a single + user message since the realtime API doesn't support loading long histories. + + Args: + universal_context_messages: List of messages in universal format. + + Returns: + ConvertedMessages with Inworld-formatted messages and system instruction. + """ + if not universal_context_messages: + return self.ConvertedMessages(messages=[]) + + messages = copy.deepcopy(universal_context_messages) + system_instruction = None + + # Extract system message as session instructions + if messages[0].get("role") == "system": + system = messages.pop(0) + content = system.get("content") + if isinstance(content, str): + system_instruction = content + elif isinstance(content, list): + system_instruction = content[0].get("text") + if not messages: + return self.ConvertedMessages(messages=[], system_instruction=system_instruction) + + # Convert any remaining "system"/"developer" messages to "user" + for msg in messages: + if msg.get("role") in ("system", "developer"): + msg["role"] = "user" + + # Single user message can be sent normally + if len(messages) == 1 and messages[0].get("role") == "user": + return self.ConvertedMessages( + messages=[self._from_universal_context_message(messages[0])], + system_instruction=system_instruction, + ) + + # Pack multiple messages into a single user message + intro_text = """ + This is a previously saved conversation. Please treat this conversation history as a + starting point for the current conversation.""" + + trailing_text = """ + This is the end of the previously saved conversation. Please continue the conversation + from here. If the last message is a user instruction or question, act on that instruction + or answer the question. If the last message is an assistant response, simply say that you + are ready to continue the conversation.""" + + return self.ConvertedMessages( + messages=[ + events.ConversationItem( + role="user", + type="message", + content=[ + events.ItemContent( + type="input_text", + text="\n\n".join( + [ + intro_text, + json.dumps(messages, indent=2), + trailing_text, + ] + ), + ) + ], + ) + ], + system_instruction=system_instruction, + ) + + def _from_universal_context_message( + self, message: LLMContextMessage + ) -> events.ConversationItem: + """Convert a single universal context message to Inworld format. + + Args: + message: Message in universal format. + + Returns: + ConversationItem formatted for Inworld Realtime API. + """ + if message.get("role") == "user": + content = message.get("content") + if isinstance(content, list): + text_content = "" + for c in content: + if c.get("type") == "text": + text_content += " " + c.get("text") + else: + logger.error( + f"Unhandled content type in context message: {c.get('type')} - {message}" + ) + content = text_content.strip() + return events.ConversationItem( + role="user", + type="message", + content=[events.ItemContent(type="input_text", text=content)], + ) + + if message.get("role") == "assistant" and message.get("tool_calls"): + tc = message.get("tool_calls")[0] + return events.ConversationItem( + type="function_call", + call_id=tc["id"], + name=tc["function"]["name"], + arguments=tc["function"]["arguments"], + ) + + logger.error(f"Unhandled message type in _from_universal_context_message: {message}") + + @staticmethod + def _to_inworld_function_format(function: FunctionSchema) -> Dict[str, Any]: + """Convert a function schema to Inworld Realtime function format. + + Args: + function: The function schema to convert. + + Returns: + Dictionary in Inworld Realtime function format. + """ + return { + "type": "function", + "name": function.name, + "description": function.description, + "parameters": { + "type": "object", + "properties": function.properties, + "required": function.required, + }, + } + + def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[Dict[str, Any]]: + """Convert tool schemas to Inworld Realtime format. + + Args: + tools_schema: The tools schema containing functions to convert. + + Returns: + List of tool definitions in Inworld Realtime format. + """ + functions_schema = tools_schema.standard_tools + return [self._to_inworld_function_format(func) for func in functions_schema] diff --git a/src/pipecat/services/inworld/realtime/__init__.py b/src/pipecat/services/inworld/realtime/__init__.py new file mode 100644 index 0000000000..c4d243b976 --- /dev/null +++ b/src/pipecat/services/inworld/realtime/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# diff --git a/src/pipecat/services/inworld/realtime/events.py b/src/pipecat/services/inworld/realtime/events.py new file mode 100644 index 0000000000..55f7f28bd8 --- /dev/null +++ b/src/pipecat/services/inworld/realtime/events.py @@ -0,0 +1,868 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Event models and data structures for Inworld Realtime API communication. + +Based on Inworld's Realtime API documentation: +https://docs.inworld.ai/api-reference/realtimeAPI/realtime/realtime-websocket +""" + +import json +import uuid +from typing import Any, Dict, List, Literal, Optional, Union + +from pydantic import BaseModel, ConfigDict, Field + +from pipecat.adapters.schemas.tools_schema import ToolsSchema + +# +# Audio format configuration +# + +# Inworld supports configurable sample rates for PCM audio +SUPPORTED_SAMPLE_RATES = Literal[8000, 16000, 24000, 32000, 44100, 48000] + + +class AudioFormat(BaseModel): + """Base class for audio format configuration.""" + + type: str + + +class PCMAudioFormat(AudioFormat): + """PCM audio format configuration with configurable sample rate. + + Parameters: + type: Audio format type, always "audio/pcm". + rate: Sample rate in Hz. Defaults to 24000. + """ + + type: Literal["audio/pcm"] = "audio/pcm" + rate: SUPPORTED_SAMPLE_RATES = 24000 + + +class PCMUAudioFormat(AudioFormat): + """PCMU (G.711 mu-law) audio format configuration. + + Fixed at 8000 Hz sample rate. + + Parameters: + type: Audio format type, always "audio/pcmu". + """ + + type: Literal["audio/pcmu"] = "audio/pcmu" + + +class PCMAAudioFormat(AudioFormat): + """PCMA (G.711 A-law) audio format configuration. + + Fixed at 8000 Hz sample rate. + + Parameters: + type: Audio format type, always "audio/pcma". + """ + + type: Literal["audio/pcma"] = "audio/pcma" + + +# +# Turn detection configuration (lives inside audio.input) +# + + +class TurnDetection(BaseModel): + """Server-side voice activity detection configuration. + + Parameters: + type: Detection type. "server_vad" for standard VAD, "semantic_vad" + for semantic-based detection. + eagerness: How eagerly to detect end of turn. Options: "low", "medium", "high". + create_response: Whether to automatically create a response on turn end. + interrupt_response: Whether user speech interrupts the current response. + """ + + type: Optional[Literal["server_vad", "semantic_vad"]] = "semantic_vad" + eagerness: Optional[str] = None + create_response: Optional[bool] = None + interrupt_response: Optional[bool] = None + + +class InputTranscription(BaseModel): + """Configuration for input audio transcription. + + Parameters: + model: The STT model to use for transcription. + """ + + model: Optional[str] = None + + +# +# Audio configuration +# + + +class AudioInput(BaseModel): + """Audio input configuration. + + Parameters: + format: The format configuration for input audio. + transcription: Configuration for input audio transcription. + turn_detection: Configuration for turn detection. + """ + + format: Optional[Union[PCMAudioFormat, PCMUAudioFormat, PCMAAudioFormat]] = None + transcription: Optional[InputTranscription] = None + turn_detection: Optional[TurnDetection] = None + + +class AudioOutput(BaseModel): + """Audio output configuration. + + Parameters: + format: The format configuration for output audio. + model: The TTS model to use (e.g. "inworld-tts-1.5-max"). + voice: The voice ID to use (e.g. "Sarah", "Clive"). + """ + + format: Optional[Union[PCMAudioFormat, PCMUAudioFormat, PCMAAudioFormat]] = None + model: Optional[str] = None + voice: Optional[str] = None + + +class AudioConfiguration(BaseModel): + """Audio configuration for input and output. + + Parameters: + input: Configuration for input audio. + output: Configuration for output audio. + """ + + input: Optional[AudioInput] = None + output: Optional[AudioOutput] = None + + +# +# Tool definitions +# + + +class FunctionTool(BaseModel): + """Custom function tool configuration. + + Parameters: + type: Tool type, always "function". + name: Name of the function. + description: Description of what the function does. + parameters: JSON schema for function parameters. + """ + + type: Literal["function"] = "function" + name: str + description: str + parameters: Dict[str, Any] + + +# Union type for Inworld tools +InworldTool = Union[FunctionTool, Dict[str, Any]] + + +# +# Session properties +# + + +class SessionProperties(BaseModel): + """Configuration properties for an Inworld Realtime session. + + Parameters: + type: Session type, always "realtime". + model: The LLM model to use (e.g. "openai/gpt-4.1-nano"). + instructions: System instructions for the assistant. + output_modalities: Output modalities (e.g. ["audio", "text"]). + audio: Audio configuration including input (transcription, turn detection) + and output (TTS model, voice). + tools: Available tools for the assistant. + """ + + # Needed to support ToolSchema in tools field. + model_config = ConfigDict(arbitrary_types_allowed=True) + + type: Optional[str] = "realtime" + model: Optional[str] = None + instructions: Optional[str] = None + temperature: Optional[float] = None + output_modalities: Optional[List[str]] = None + audio: Optional[AudioConfiguration] = None + # Tools can be ToolsSchema when provided by user, or list of dicts for API + tools: Optional[ToolsSchema | List[InworldTool]] = None + provider_data: Optional[Dict[str, Any]] = None + + +# +# Conversation items +# + + +class ItemContent(BaseModel): + """Content within a conversation item. + + Parameters: + type: Content type (input_text, input_audio, text, audio). + text: Text content for text-based items. + audio: Base64-encoded audio data for audio items. + transcript: Transcribed text for audio items. + """ + + type: Literal["text", "audio", "input_text", "input_audio", "output_text", "output_audio"] + text: Optional[str] = None + audio: Optional[str] = None # base64-encoded audio + transcript: Optional[str] = None + + +class ConversationItem(BaseModel): + """A conversation item in the realtime session. + + Parameters: + id: Unique identifier for the item, auto-generated if not provided. + object: Object type identifier for the realtime API. + type: Item type (message, function_call, or function_call_output). + status: Current status of the item. + role: Speaker role for message items (user, assistant, or system). + content: Content list for message items. + call_id: Function call identifier for function_call items. + name: Function name for function_call items. + arguments: Function arguments as JSON string for function_call items. + output: Function output as JSON string for function_call_output items. + """ + + id: str = Field(default_factory=lambda: str(uuid.uuid4().hex)) + object: Optional[Literal["realtime.item"]] = None + type: Literal["message", "function_call", "function_call_output"] + status: Optional[Literal["completed", "in_progress", "incomplete"]] = None + role: Optional[Literal["user", "assistant", "system", "tool"]] = None + content: Optional[List[ItemContent]] = None + call_id: Optional[str] = None + name: Optional[str] = None + arguments: Optional[str] = None + output: Optional[str] = None + + +class RealtimeConversation(BaseModel): + """A realtime conversation session. + + Parameters: + id: Unique identifier for the conversation. + object: Object type identifier, always "realtime.conversation". + """ + + id: str + object: Literal["realtime.conversation"] + + +class ResponseProperties(BaseModel): + """Properties for configuring assistant responses. + + Parameters: + modalities: Output modalities for the response (text, audio, or both). + """ + + modalities: Optional[List[Literal["text", "audio"]]] = ["text", "audio"] + + +# +# Error class +# + + +class RealtimeError(BaseModel): + """Error information from the realtime API. + + Parameters: + type: Error type identifier. + code: Specific error code. + message: Human-readable error message. + param: Parameter name that caused the error, if applicable. + event_id: Event ID associated with the error, if applicable. + """ + + type: Optional[str] = None + code: Optional[str] = "" + message: str + param: Optional[str] = None + event_id: Optional[str] = None + + +# +# Client Events (sent to Inworld) +# + + +class ClientEvent(BaseModel): + """Base class for client events sent to the realtime API. + + Parameters: + event_id: Unique identifier for the event, auto-generated if not provided. + """ + + event_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + + +class SessionUpdateEvent(ClientEvent): + """Event to update session properties. + + Parameters: + type: Event type, always "session.update". + session: Updated session properties. + """ + + type: Literal["session.update"] = "session.update" + session: SessionProperties + + +class InputAudioBufferAppendEvent(ClientEvent): + """Event to append audio data to the input buffer. + + Parameters: + type: Event type, always "input_audio_buffer.append". + audio: Base64-encoded audio data to append. + """ + + type: Literal["input_audio_buffer.append"] = "input_audio_buffer.append" + audio: str # base64-encoded audio + + +class InputAudioBufferCommitEvent(ClientEvent): + """Event to commit the current input audio buffer. + + Used when turn_detection is null (manual mode). + + Parameters: + type: Event type, always "input_audio_buffer.commit". + """ + + type: Literal["input_audio_buffer.commit"] = "input_audio_buffer.commit" + + +class InputAudioBufferClearEvent(ClientEvent): + """Event to clear the input audio buffer. + + Parameters: + type: Event type, always "input_audio_buffer.clear". + """ + + type: Literal["input_audio_buffer.clear"] = "input_audio_buffer.clear" + + +class ConversationItemCreateEvent(ClientEvent): + """Event to create a new conversation item. + + Parameters: + type: Event type, always "conversation.item.create". + previous_item_id: ID of the item to insert after, if any. + item: The conversation item to create. + """ + + type: Literal["conversation.item.create"] = "conversation.item.create" + previous_item_id: Optional[str] = None + item: ConversationItem + + +class ResponseCreateEvent(ClientEvent): + """Event to create a new assistant response. + + Parameters: + type: Event type, always "response.create". + response: Optional response configuration properties. + """ + + type: Literal["response.create"] = "response.create" + response: Optional[ResponseProperties] = None + + +class ResponseCancelEvent(ClientEvent): + """Event to cancel the current assistant response. + + Parameters: + type: Event type, always "response.cancel". + """ + + type: Literal["response.cancel"] = "response.cancel" + + +# +# Server Events (received from Inworld) +# + + +class ServerEvent(BaseModel): + """Base class for server events received from the realtime API. + + Parameters: + event_id: Unique identifier for the event. + type: Type of the server event. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + event_id: str + type: str + + +class SessionCreatedEvent(ServerEvent): + """Event indicating a session has been created. + + This is the first event received after connecting. + + Parameters: + type: Event type, always "session.created". + session: The initial session properties. + """ + + type: Literal["session.created"] + session: Optional[SessionProperties] = None + + +class SessionUpdatedEvent(ServerEvent): + """Event indicating a session has been updated. + + Parameters: + type: Event type, always "session.updated". + session: The updated session properties. + """ + + type: Literal["session.updated"] + session: SessionProperties + + +class ConversationCreated(ServerEvent): + """Event indicating a conversation has been created. + + This is the first message received after connecting. + + Parameters: + type: Event type, always "conversation.created". + conversation: The created conversation. + """ + + type: Literal["conversation.created"] + conversation: RealtimeConversation + + +class ConversationItemAdded(ServerEvent): + """Event indicating a conversation item has been added. + + Parameters: + type: Event type, always "conversation.item.added". + previous_item_id: ID of the previous item, if any. + item: The added conversation item. + """ + + type: Literal["conversation.item.added"] + previous_item_id: Optional[str] = None + item: ConversationItem + + +class ConversationItemInputAudioTranscriptionCompleted(ServerEvent): + """Event indicating input audio transcription is complete. + + Parameters: + type: Event type, always "conversation.item.input_audio_transcription.completed". + item_id: ID of the conversation item that was transcribed. + transcript: Complete transcription text. + """ + + type: Literal["conversation.item.input_audio_transcription.completed"] + item_id: str + transcript: str + + +class ConversationItemInputAudioTranscriptionDelta(ServerEvent): + """Event containing incremental input audio transcription. + + Parameters: + type: Event type, always "conversation.item.input_audio_transcription.delta". + item_id: ID of the conversation item being transcribed. + content_index: Index of the content part. + delta: Incremental transcription text. + """ + + type: Literal["conversation.item.input_audio_transcription.delta"] + item_id: str + content_index: Optional[int] = None + delta: str + + +class InputAudioBufferSpeechStarted(ServerEvent): + """Event indicating speech has started in the input audio buffer. + + Only sent when turn_detection is "server_vad". + + Parameters: + type: Event type, always "input_audio_buffer.speech_started". + item_id: ID of the associated conversation item. + """ + + type: Literal["input_audio_buffer.speech_started"] + item_id: str + + +class InputAudioBufferSpeechStopped(ServerEvent): + """Event indicating speech has stopped in the input audio buffer. + + Only sent when turn_detection is "server_vad". + + Parameters: + type: Event type, always "input_audio_buffer.speech_stopped". + item_id: ID of the associated conversation item. + """ + + type: Literal["input_audio_buffer.speech_stopped"] + item_id: str + + +class InputAudioBufferCommitted(ServerEvent): + """Event indicating the input audio buffer has been committed. + + Parameters: + type: Event type, always "input_audio_buffer.committed". + previous_item_id: ID of the previous item, if any. + item_id: ID of the committed conversation item. + """ + + type: Literal["input_audio_buffer.committed"] + previous_item_id: Optional[str] = None + item_id: str + + +class InputAudioBufferCleared(ServerEvent): + """Event indicating the input audio buffer has been cleared. + + Parameters: + type: Event type, always "input_audio_buffer.cleared". + """ + + type: Literal["input_audio_buffer.cleared"] + + +class ResponseCreated(ServerEvent): + """Event indicating an assistant response has been created. + + Parameters: + type: Event type, always "response.created". + response: The created response object. + """ + + type: Literal["response.created"] + response: "Response" + + +class ResponseOutputItemAdded(ServerEvent): + """Event indicating an output item has been added to a response. + + Parameters: + type: Event type, always "response.output_item.added". + response_id: ID of the response. + output_index: Index of the output item. + item: The added conversation item. + """ + + type: Literal["response.output_item.added"] + response_id: str + output_index: int + item: ConversationItem + + +class ResponseAudioTranscriptDelta(ServerEvent): + """Event containing incremental audio transcript from a response. + + Parameters: + type: Event type, always "response.output_audio_transcript.delta". + response_id: ID of the response. + item_id: ID of the conversation item. + delta: Incremental transcript text. + """ + + type: Literal["response.output_audio_transcript.delta"] + response_id: str + item_id: str + delta: str + + +class ResponseAudioTranscriptDone(ServerEvent): + """Event indicating audio transcript is complete. + + Parameters: + type: Event type, always "response.output_audio_transcript.done". + response_id: ID of the response. + item_id: ID of the conversation item. + """ + + type: Literal["response.output_audio_transcript.done"] + response_id: str + item_id: str + + +class ResponseAudioDelta(ServerEvent): + """Event containing incremental audio data from a response. + + Parameters: + type: Event type, always "response.output_audio.delta". + response_id: ID of the response. + item_id: ID of the conversation item. + output_index: Index of the output item. + content_index: Index of the content part. + delta: Base64-encoded incremental audio data. + """ + + type: Literal["response.output_audio.delta"] + response_id: str + item_id: str + output_index: int + content_index: int + delta: str # base64-encoded audio + + +class ResponseAudioDone(ServerEvent): + """Event indicating audio content is complete. + + Parameters: + type: Event type, always "response.output_audio.done". + response_id: ID of the response. + item_id: ID of the conversation item. + """ + + type: Literal["response.output_audio.done"] + response_id: str + item_id: str + + +class ResponseFunctionCallArgumentsDelta(ServerEvent): + """Event containing incremental function call arguments. + + Parameters: + type: Event type, always "response.function_call_arguments.delta". + response_id: ID of the response. + item_id: ID of the conversation item. + call_id: ID of the function call. + delta: Incremental function arguments as JSON. + previous_item_id: ID of the previous item, if any. + """ + + type: Literal["response.function_call_arguments.delta"] + response_id: Optional[str] = None + item_id: Optional[str] = None + call_id: str + delta: str + previous_item_id: Optional[str] = None + + +class ResponseFunctionCallArgumentsDone(ServerEvent): + """Event indicating function call arguments are complete. + + Parameters: + type: Event type, always "response.function_call_arguments.done". + call_id: ID of the function call. + name: Name of the function being called. Optional — Inworld may omit + this; the name can be resolved from the tracked function call item. + arguments: Complete function arguments as JSON string. + """ + + type: Literal["response.function_call_arguments.done"] + call_id: str + name: Optional[str] = None + arguments: str + + +class Usage(BaseModel): + """Token usage statistics for a response. + + Parameters: + total_tokens: Total number of tokens used. + input_tokens: Number of input tokens used. + output_tokens: Number of output tokens used. + """ + + total_tokens: Optional[int] = None + input_tokens: Optional[int] = None + output_tokens: Optional[int] = None + + +class Response(BaseModel): + """A complete assistant response. + + Parameters: + id: Unique identifier for the response. + object: Object type, always "realtime.response". + status: Current status of the response. + output: List of conversation items in the response. + usage: Token usage statistics for the response. + """ + + id: str + object: Literal["realtime.response"] + status: Literal["completed", "in_progress", "incomplete", "cancelled", "failed"] + status_details: Optional[Any] = None + output: List[ConversationItem] + usage: Optional[Usage] = None + + +class ResponseDone(ServerEvent): + """Event indicating an assistant response is complete. + + Parameters: + type: Event type, always "response.done". + response: The completed response object. + usage: Token usage (also available at top level). + """ + + type: Literal["response.done"] + response: Response + usage: Optional[Usage] = None + + +class ResponseOutputItemDone(ServerEvent): + """Event indicating an output item is complete. + + Parameters: + type: Event type, always "response.output_item.done". + response_id: ID of the response. + output_index: Index of the output item. + item: The completed conversation item. + """ + + type: Literal["response.output_item.done"] + response_id: str + output_index: int + item: ConversationItem + + +class ContentPart(BaseModel): + """A content part within a response. + + Parameters: + type: Type of the content part (audio, text). + transcript: Transcript text if applicable. + """ + + type: str + transcript: Optional[str] = None + + +class ResponseContentPartAdded(ServerEvent): + """Event indicating a content part has been added to a response. + + Parameters: + type: Event type, always "response.content_part.added". + response_id: ID of the response. + item_id: ID of the conversation item. + content_index: Index of the content part. + output_index: Index of the output item. + part: The added content part. + """ + + type: Literal["response.content_part.added"] + response_id: str + item_id: str + content_index: int + output_index: int + part: ContentPart + + +class ResponseContentPartDone(ServerEvent): + """Event indicating a content part is complete. + + Parameters: + type: Event type, always "response.content_part.done". + response_id: ID of the response. + item_id: ID of the conversation item. + content_index: Index of the content part. + output_index: Index of the output item. + """ + + type: Literal["response.content_part.done"] + response_id: str + item_id: str + content_index: int + output_index: int + + +class PingEvent(ServerEvent): + """Keep-alive ping event from the server. + + Parameters: + type: Event type, always "ping". + timestamp: Server timestamp in milliseconds. + """ + + type: Literal["ping"] + timestamp: int + + +class ErrorEvent(ServerEvent): + """Event indicating an error occurred. + + Parameters: + type: Event type, always "error". + error: Error details. + """ + + type: Literal["error"] + error: RealtimeError + + +# +# Event parsing +# + +_server_event_types = { + "error": ErrorEvent, + "ping": PingEvent, + "session.created": SessionCreatedEvent, + "session.updated": SessionUpdatedEvent, + "conversation.created": ConversationCreated, + "conversation.item.added": ConversationItemAdded, + "conversation.item.input_audio_transcription.completed": ConversationItemInputAudioTranscriptionCompleted, + "conversation.item.input_audio_transcription.delta": ConversationItemInputAudioTranscriptionDelta, + "input_audio_buffer.speech_started": InputAudioBufferSpeechStarted, + "input_audio_buffer.speech_stopped": InputAudioBufferSpeechStopped, + "input_audio_buffer.committed": InputAudioBufferCommitted, + "input_audio_buffer.cleared": InputAudioBufferCleared, + "response.created": ResponseCreated, + "response.output_item.added": ResponseOutputItemAdded, + "response.output_item.done": ResponseOutputItemDone, + "response.content_part.added": ResponseContentPartAdded, + "response.content_part.done": ResponseContentPartDone, + "response.output_audio_transcript.delta": ResponseAudioTranscriptDelta, + "response.output_audio_transcript.done": ResponseAudioTranscriptDone, + "response.output_audio.delta": ResponseAudioDelta, + "response.output_audio.done": ResponseAudioDone, + "response.function_call_arguments.delta": ResponseFunctionCallArgumentsDelta, + "response.function_call_arguments.done": ResponseFunctionCallArgumentsDone, + "response.done": ResponseDone, +} + + +def parse_server_event(data: str): + """Parse a server event from JSON string. + + Args: + data: JSON string containing the server event. + + Returns: + Parsed server event object of the appropriate type. + + Raises: + Exception: If the event type is unimplemented or parsing fails. + """ + try: + event = json.loads(data) + event_type = event["type"] + if event_type not in _server_event_types: + raise Exception(f"Unimplemented server event type: {event_type}") + return _server_event_types[event_type].model_validate(event) + except Exception as e: + raise Exception(f"{e} \n\n{data}") diff --git a/src/pipecat/services/inworld/realtime/llm.py b/src/pipecat/services/inworld/realtime/llm.py new file mode 100644 index 0000000000..79ee186ca5 --- /dev/null +++ b/src/pipecat/services/inworld/realtime/llm.py @@ -0,0 +1,1039 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Inworld Realtime LLM service implementation with WebSocket support. + +Based on Inworld's Realtime API documentation: +https://docs.inworld.ai/api-reference/realtimeAPI/realtime/realtime-websocket +""" + +import base64 +import json +import time +import urllib.parse +from dataclasses import dataclass, field +from dataclasses import fields as dataclass_fields +from typing import Any, Dict, Literal, Mapping, Optional, Type + +from loguru import logger + +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.adapters.services.inworld_realtime_adapter import InworldRealtimeLLMAdapter +from pipecat.frames.frames import ( + AggregationType, + BotStoppedSpeakingFrame, + CancelFrame, + EndFrame, + Frame, + InputAudioRawFrame, + InterimTranscriptionFrame, + InterruptionFrame, + LLMContextFrame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMMessagesAppendFrame, + LLMSetToolsFrame, + LLMTextFrame, + StartFrame, + TranscriptionFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, + TTSTextFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) +from pipecat.metrics.metrics import LLMTokenUsage +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.llm_service import FunctionCallFromLLM, LLMService +from pipecat.services.settings import ( + NOT_GIVEN, + LLMSettings, + _NotGiven, + is_given, +) +from pipecat.utils.time import time_now_iso8601 + +from . import events + +try: + from websockets.asyncio.client import connect as websocket_connect +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Inworld Realtime, you need to `pip install pipecat-ai[inworld]`.") + raise Exception(f"Missing module: {e}") + + +@dataclass +class CurrentAudioResponse: + """Tracks the current audio response from the assistant. + + Parameters: + item_id: Unique identifier for the audio response item. + content_index: Index of the audio content within the item. + start_time_ms: Timestamp when the audio response started in milliseconds. + total_size: Total size of audio data received in bytes. Defaults to 0. + """ + + item_id: str + content_index: int + start_time_ms: int + total_size: int = 0 + + +@dataclass +class InworldRealtimeLLMSettings(LLMSettings): + """Settings for InworldRealtimeLLMService. + + Parameters: + session_properties: Inworld Realtime session properties (audio config, + tools, etc.). ``model`` and ``instructions`` are synced + bidirectionally with the top-level ``model`` and + ``system_instruction`` fields. + """ + + session_properties: events.SessionProperties | _NotGiven = field( + default_factory=lambda: NOT_GIVEN + ) + + # -- Bidirectional sync helpers ------------------------------------------ + + @staticmethod + def _sync_top_level_to_sp(settings: "InworldRealtimeLLMService.Settings"): + """Push top-level ``model``/``system_instruction``/``temperature`` into SP.""" + if not is_given(settings.session_properties): + return + sp = settings.session_properties + if is_given(settings.model) and settings.model is not None: + sp.model = settings.model + if is_given(settings.system_instruction): + sp.instructions = settings.system_instruction + if is_given(settings.temperature) and settings.temperature is not None: + sp.temperature = settings.temperature + + # -- apply_update override ----------------------------------------------- + + def apply_update(self, delta: "InworldRealtimeLLMService.Settings") -> Dict[str, Any]: + """Merge a delta, keeping ``model``/``system_instruction`` in sync with SP. + + When the delta contains ``session_properties``, it **replaces** the + stored SP wholesale (matching legacy behaviour). Top-level field + values always take precedence over conflicting SP values. + """ + changed = super().apply_update(delta) + + if "session_properties" in changed and is_given(self.session_properties): + sp = self.session_properties + if "model" not in changed and sp.model is not None: + old_model = self.model + self.model = sp.model + if old_model != self.model: + changed["model"] = old_model + if "system_instruction" not in changed and sp.instructions is not None: + old_si = self.system_instruction + self.system_instruction = sp.instructions + if old_si != self.system_instruction: + changed["system_instruction"] = old_si + + self._sync_top_level_to_sp(self) + + return changed + + # -- from_mapping override ----------------------------------------------- + + @classmethod + def from_mapping( + cls: Type["InworldRealtimeLLMService.Settings"], settings: Mapping[str, Any] + ) -> "InworldRealtimeLLMService.Settings": + """Build a delta from a plain dict, routing SP keys into ``session_properties``. + + Keys that correspond to ``SessionProperties`` fields are collected into + a nested ``session_properties`` value. ``model`` is always routed to + the top-level field. Unknown keys go to ``extra``. + """ + own_field_names = {f.name for f in dataclass_fields(cls)} - {"extra"} + + top: Dict[str, Any] = {} + sp_dict: Dict[str, Any] = {} + extra: Dict[str, Any] = {} + + sp_keys = set(events.SessionProperties.model_fields.keys()) - {"model"} + + for key, value in settings.items(): + canonical = cls._aliases.get(key, key) + if canonical in own_field_names: + top[canonical] = value + elif canonical in sp_keys: + sp_dict[canonical] = value + else: + extra[key] = value + + if sp_dict: + top["session_properties"] = events.SessionProperties(**sp_dict) + + instance = cls(**top) + instance.extra = extra + return instance + + +# Error codes that are non-fatal and should not exit the receive loop. +_NON_FATAL_ERROR_CODES = { + "response_cancel_not_active", + "conversation_already_has_active_response", +} + + +class InworldRealtimeLLMService(LLMService): + """Inworld Realtime LLM service for real-time audio and text communication. + + Implements the Inworld Realtime API with WebSocket communication for + low-latency bidirectional audio and text interactions. The API operates + as a cascade STT/LLM/TTS pipeline under the hood, with built-in semantic + voice activity detection (VAD) for turn management. + + Supports function calling, conversation management, and real-time + transcription. + + Example:: + + llm = InworldRealtimeLLMService( + api_key=os.getenv("INWORLD_API_KEY"), + llm_model="openai/gpt-4.1-nano", + voice="Sarah", + tts_model="inworld-tts-1.5-max", + ) + + For full control over session properties (note: ``session_properties`` + **replaces** all defaults, so provide a complete config):: + + from pipecat.services.inworld.realtime.events import * + + llm = InworldRealtimeLLMService( + api_key=os.getenv("INWORLD_API_KEY"), + settings=InworldRealtimeLLMService.Settings( + session_properties=SessionProperties( + model="openai/gpt-4.1-nano", + temperature=0.7, + audio=AudioConfiguration( + input=AudioInput( + format=PCMAudioFormat(rate=24000), + turn_detection=TurnDetection( + type="semantic_vad", + eagerness="low", + ), + ), + output=AudioOutput( + format=PCMAudioFormat(rate=24000), + voice="Sarah", + model="inworld-tts-1.5-max", + ), + ), + ), + ), + ) + """ + + Settings = InworldRealtimeLLMSettings + _settings: Settings + + adapter_class = InworldRealtimeLLMAdapter + + # Target ~60ms audio chunks when sending to Inworld (16-bit mono). + _AUDIO_CHUNK_TARGET_MS = 60 + + def __init__( + self, + *, + api_key: str, + llm_model: Optional[str] = None, + voice: Optional[str] = None, + tts_model: Optional[str] = None, + stt_model: Optional[str] = None, + base_url: str = "wss://api.inworld.ai/api/v1/realtime/session", + auth_type: Literal["basic", "bearer"] = "basic", + settings: Optional[Settings] = None, + start_audio_paused: bool = False, + **kwargs, + ): + """Initialize the Inworld Realtime LLM service. + + Args: + api_key: Inworld API key for authentication. + llm_model: LLM model to use (e.g. "openai/gpt-4.1-nano"). + Shorthand for ``session_properties.model``. + voice: Voice ID for TTS output (e.g. "Sarah", "Clive"). + Shorthand for ``session_properties.audio.output.voice``. + tts_model: TTS model to use (e.g. "inworld-tts-1.5-max"). + Shorthand for ``session_properties.audio.output.model``. + stt_model: STT model for input transcription + (e.g. "assemblyai/universal-streaming-multilingual"). + Shorthand for ``session_properties.audio.input.transcription.model``. + base_url: WebSocket base URL for the realtime API. + auth_type: Authentication type. ``"basic"`` for server-side API key + auth, ``"bearer"`` for client-side JWT auth. + settings: Full settings for fine-grained control. When + ``session_properties`` is provided in settings, it **replaces** + all defaults wholesale — provide a complete ``SessionProperties`` + in that case. + start_audio_paused: Whether to start with audio input paused. + **kwargs: Additional arguments passed to parent LLMService. + """ + default_model = llm_model or "openai/gpt-4.1-mini" + default_voice = voice or "Clive" + default_tts_model = tts_model or "inworld-tts-1.5-max" + default_stt_model = stt_model or "assemblyai/u3-rt-pro" + + default_settings = self.Settings( + model=default_model, + system_instruction=None, + temperature=None, + max_tokens=None, + top_p=None, + top_k=None, + frequency_penalty=None, + presence_penalty=None, + seed=None, + filter_incomplete_user_turns=False, + user_turn_completion_config=None, + session_properties=events.SessionProperties( + model=default_model, + output_modalities=["audio", "text"], + audio=events.AudioConfiguration( + input=events.AudioInput( + format=events.PCMAudioFormat(rate=24000), + transcription=events.InputTranscription(model=default_stt_model), + turn_detection=events.TurnDetection( + type="semantic_vad", + eagerness="low", + create_response=True, + interrupt_response=True, + ), + ), + output=events.AudioOutput( + format=events.PCMAudioFormat(rate=24000), + model=default_tts_model, + voice=default_voice, + ), + ), + ), + ) + + self.Settings._sync_top_level_to_sp(default_settings) + + if settings is not None: + default_settings.apply_update(settings) + + super().__init__( + base_url=base_url, + settings=default_settings, + **kwargs, + ) + + self.api_key = api_key + self.base_url = base_url + self._auth_type = auth_type + + self._audio_input_paused = start_audio_paused + self._audio_buffer = b"" + self._audio_send_logged = False + self._interim_transcription_text = "" + self._websocket = None + self._receive_task = None + self._context: LLMContext = None + self._last_context_message_count = 0 + + self._llm_needs_conversation_setup = True + + self._disconnecting = False + self._api_session_ready = False + self._run_llm_when_api_session_ready = False + + self._current_assistant_response = None + self._current_audio_response = None + self._server_vad_handled_turn = False + + self._messages_added_manually = {} + self._pending_function_calls = {} + self._completed_tool_calls = set() + + self._register_event_handler("on_conversation_item_created") + self._register_event_handler("on_conversation_item_updated") + + def can_generate_metrics(self) -> bool: + """Check if the service can generate usage metrics.""" + return True + + def set_audio_input_paused(self, paused: bool): + """Set whether audio input is paused. + + Args: + paused: True to pause audio input, False to resume. + """ + self._audio_input_paused = paused + + def _get_configured_sample_rate(self, direction: str) -> Optional[int]: + """Get manually configured sample rate for input or output. + + Args: + direction: Either "input" or "output". + + Returns: + Configured sample rate or None if not manually configured. + """ + if not self._settings.session_properties.audio: + return None + + audio_config = ( + self._settings.session_properties.audio.input + if direction == "input" + else self._settings.session_properties.audio.output + ) + + if audio_config and audio_config.format: + if hasattr(audio_config.format, "rate"): + return audio_config.format.rate + elif audio_config.format.type in ("audio/pcmu", "audio/pcma"): + return 8000 + + return None + + def _get_output_sample_rate(self) -> int: + """Get the output sample rate. + + Returns: + Output sample rate in Hz, defaulting to 24000. + """ + rate = self._get_configured_sample_rate("output") + if rate is not None: + return rate + return getattr(self, "_output_sample_rate", 24000) + + async def _handle_interruption(self): + """Handle user interruption of assistant speech. + + Inworld's server-side VAD handles response cancellation and buffer + cleanup automatically, so we only need to clean up local state. + """ + await self._truncate_current_audio_response() + await self.stop_all_metrics() + + if self._current_assistant_response: + await self.push_frame(LLMFullResponseEndFrame()) + await self.push_frame(TTSStoppedFrame()) + + async def _handle_user_started_speaking(self, frame): + """Handle user started speaking event.""" + pass + + async def _handle_user_stopped_speaking(self, frame): + """Handle user stopped speaking event. + + Inworld's server-side VAD handles commit and response creation, + so this is a no-op. Metrics are started in _handle_evt_speech_stopped. + """ + pass + + async def _handle_bot_stopped_speaking(self): + """Handle bot stopped speaking event.""" + self._current_audio_response = None + + async def _truncate_current_audio_response(self): + """Truncates the current audio response (best-effort cleanup).""" + self._current_audio_response = None + + # + # Standard AIService frame handling + # + + def _ensure_audio_config(self, input_sample_rate: int, output_sample_rate: int): + """Ensure session_properties.audio has input and output configs. + + Preserves Inworld-specific fields (turn_detection, voice, model) and + syncs the format sample rates with the transport's actual rates so + Inworld knows the correct input/output sample rates. + + Args: + input_sample_rate: Sample rate for audio input (Hz). + output_sample_rate: Sample rate for audio output (Hz). + """ + self._input_sample_rate = input_sample_rate + self._output_sample_rate = output_sample_rate + props = self._settings.session_properties + if not props.audio: + props.audio = events.AudioConfiguration() + if not props.audio.input: + props.audio.input = events.AudioInput() + if not props.audio.output: + props.audio.output = events.AudioOutput() + if not props.audio.input.format: + props.audio.input.format = events.PCMAudioFormat() + if not props.audio.output.format: + props.audio.output.format = events.PCMAudioFormat() + props.audio.input.format.rate = input_sample_rate + props.audio.output.format.rate = output_sample_rate + + async def start(self, frame: StartFrame): + """Start the service and establish WebSocket connection.""" + await super().start(frame) + self._ensure_audio_config(frame.audio_in_sample_rate, frame.audio_out_sample_rate) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the service and close WebSocket connection.""" + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the service and close WebSocket connection.""" + await super().cancel(frame) + await self._disconnect() + + # + # Frame processing + # + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process incoming frames from the pipeline.""" + await super().process_frame(frame, direction) + + if isinstance(frame, TranscriptionFrame): + pass + elif isinstance(frame, LLMContextFrame): + await self._handle_context(frame.context) + elif isinstance(frame, InputAudioRawFrame): + if not self._audio_input_paused: + await self._send_user_audio(frame) + elif isinstance(frame, InterruptionFrame): + await self._handle_interruption() + elif isinstance(frame, UserStartedSpeakingFrame): + await self._handle_user_started_speaking(frame) + elif isinstance(frame, UserStoppedSpeakingFrame): + await self._handle_user_stopped_speaking(frame) + elif isinstance(frame, BotStoppedSpeakingFrame): + await self._handle_bot_stopped_speaking() + elif isinstance(frame, LLMMessagesAppendFrame): + await self._handle_messages_append(frame) + elif isinstance(frame, LLMSetToolsFrame): + await self._send_session_update() + + await self.push_frame(frame, direction) + + async def _handle_context(self, context: LLMContext): + """Handle LLM context updates.""" + if not self._context: + self._context = context + self._last_context_message_count = len(context.get_messages()) + await self._process_completed_function_calls(send_new_results=False) + await self._create_response() + else: + self._context = context + await self._process_completed_function_calls(send_new_results=True) + + # Check for new user messages (e.g. from text input). + # The context is a shared mutable object, so we track the last + # known message count to detect new additions. + messages = self._context.get_messages() + current_count = len(messages) + if current_count > self._last_context_message_count: + last_msg = messages[-1] + self._last_context_message_count = current_count + + # When server-side VAD handled this turn, the server already + # has the user's audio and auto-created a response. Skip + # sending a duplicate text item + response.create. + if self._server_vad_handled_turn: + self._server_vad_handled_turn = False + return + + if last_msg.get("role") == "user": + content = last_msg.get("content", "") + if isinstance(content, list): + content = " ".join( + c.get("text", "") for c in content if c.get("type") == "text" + ) + if content: + item = events.ConversationItem( + role="user", + type="message", + content=[events.ItemContent(type="input_text", text=content)], + ) + await self.send_client_event(events.ConversationItemCreateEvent(item=item)) + await self.start_processing_metrics() + await self.start_ttfb_metrics() + await self.send_client_event(events.ResponseCreateEvent()) + + async def _handle_messages_append(self, frame): + """Handle appending messages to the context (not yet supported).""" + logger.warning(f"{self}: LLMMessagesAppendFrame is not yet supported by Inworld Realtime") + + # + # WebSocket communication + # + + async def send_client_event(self, event: events.ClientEvent): + """Send a client event to the Inworld Realtime API. + + Args: + event: The client event to send. + """ + await self._ws_send(event.model_dump(exclude_none=True)) + + async def _connect(self): + """Establish WebSocket connection to Inworld.""" + try: + if self._websocket: + return + + if self._auth_type == "bearer": + auth_header = f"Bearer {self.api_key}" + else: + auth_header = f"Basic {self.api_key}" + + # Inworld requires key and protocol query parameters + session_key = f"pipecat-realtime-{int(time.time() * 1000)}" + params = urllib.parse.urlencode({"key": session_key, "protocol": "realtime"}) + separator = "&" if "?" in self.base_url else "?" + uri = f"{self.base_url}{separator}{params}" + + self._websocket = await websocket_connect( + uri=uri, + additional_headers={ + "Authorization": auth_header, + }, + ) + self._receive_task = self.create_task(self._receive_task_handler()) + except Exception as e: + await self.push_error(error_msg=f"Error connecting to Inworld: {e}", exception=e) + self._websocket = None + + async def _disconnect(self): + """Close WebSocket connection.""" + try: + self._disconnecting = True + self._api_session_ready = False + await self.stop_all_metrics() + + if self._websocket: + await self._websocket.close() + self._websocket = None + + if self._receive_task: + await self.cancel_task(self._receive_task, timeout=1.0) + self._receive_task = None + + self._completed_tool_calls = set() + self._audio_buffer = b"" + self._interim_transcription_text = "" + self._disconnecting = False + except Exception as e: + await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e) + + async def _ws_send(self, realtime_message): + """Send a message over the WebSocket connection.""" + try: + if not self._disconnecting and self._websocket: + await self._websocket.send(json.dumps(realtime_message)) + except Exception as e: + if self._disconnecting or not self._websocket: + return + await self.push_error(error_msg=f"Error sending client event: {e}", exception=e) + + async def _update_settings(self, delta): + """Apply a settings delta, sending a session update when needed.""" + input_rate = self._get_configured_sample_rate("input") + output_rate = self._get_configured_sample_rate("output") + + changed = await super()._update_settings(delta) + + if "session_properties" in changed and input_rate and output_rate: + self._ensure_audio_config(input_rate, output_rate) + + handled = {"session_properties", "system_instruction", "model"} + if changed.keys() & handled: + await self._send_session_update() + self._warn_unhandled_updated_settings(changed.keys() - handled) + return changed + + async def _send_session_update(self): + """Update session settings on the server.""" + settings = self._settings.session_properties + adapter: InworldRealtimeLLMAdapter = self.get_llm_adapter() + + if self._context: + llm_invocation_params = adapter.get_llm_invocation_params( + self._context, system_instruction=self._settings.system_instruction + ) + + if llm_invocation_params["tools"]: + settings.tools = llm_invocation_params["tools"] + + # The adapter resolves conflicts between init-provided and + # context-provided system instructions (preferring init-provided). + if llm_invocation_params["system_instruction"]: + settings.instructions = llm_invocation_params["system_instruction"] + + # Convert ToolsSchema to list of dicts if needed + if settings.tools and isinstance(settings.tools, ToolsSchema): + settings.tools = adapter.from_standard_tools(settings.tools) + + settings.provider_data = {"metadata": {"sdk": "pipecat-realtime"}} + + await self.send_client_event(events.SessionUpdateEvent(session=settings)) + + # + # Inbound server event handling + # + + async def _receive_task_handler(self): + """Handle incoming WebSocket messages.""" + async for message in self._websocket: + try: + raw = json.loads(message) + event_type = raw.get("type", "") + except Exception: + logger.warning(f"Failed to decode server message: {message[:200]}") + continue + + # Skip events that don't have a matching Pydantic model + if event_type in ("conversation.item.done",): + continue + + try: + evt = events.parse_server_event(message) + except Exception as e: + logger.warning(f"Failed to parse server event: {e}") + continue + + if evt.type == "ping": + pass + elif evt.type == "session.created": + await self._handle_evt_session_created(evt) + elif evt.type == "session.updated": + await self._handle_evt_session_updated(evt) + elif evt.type == "response.created": + pass + elif evt.type == "response.output_audio.delta": + await self._handle_evt_audio_delta(evt) + elif evt.type == "response.output_audio.done": + await self._handle_evt_audio_done(evt) + elif evt.type in ("response.content_part.added", "response.content_part.done"): + pass + elif evt.type == "response.output_item.added": + await self._handle_evt_conversation_item_added(evt) + elif evt.type == "response.output_item.done": + pass + elif evt.type == "conversation.item.added": + await self._handle_evt_conversation_item_added(evt) + elif evt.type == "conversation.item.input_audio_transcription.delta": + await self._handle_evt_input_audio_transcription_delta(evt) + elif evt.type == "conversation.item.input_audio_transcription.completed": + await self._handle_evt_input_audio_transcription_completed(evt) + elif evt.type == "response.done": + await self._handle_evt_response_done(evt) + elif evt.type == "input_audio_buffer.speech_started": + await self._handle_evt_speech_started(evt) + elif evt.type == "input_audio_buffer.speech_stopped": + await self._handle_evt_speech_stopped(evt) + elif evt.type == "response.output_audio_transcript.delta": + await self._handle_evt_audio_transcript_delta(evt) + elif evt.type == "response.function_call_arguments.delta": + pass + elif evt.type == "response.function_call_arguments.done": + await self._handle_evt_function_call_arguments_done(evt) + elif evt.type == "error": + if evt.error.code in _NON_FATAL_ERROR_CODES: + logger.debug(f"{self} {evt.error.message}") + else: + await self._handle_evt_error(evt) + return + + async def _handle_evt_session_created(self, evt): + """Handle session.created event — first event after connecting.""" + await self._send_session_update() + + async def _handle_evt_session_updated(self, evt): + """Handle session.updated event.""" + self._api_session_ready = True + if self._run_llm_when_api_session_ready: + self._run_llm_when_api_session_ready = False + await self._create_response() + + async def _handle_evt_audio_delta(self, evt): + """Handle audio delta event — streaming audio from assistant.""" + await self.stop_ttfb_metrics() + + if self._current_audio_response and self._current_audio_response.item_id != evt.item_id: + logger.warning( + f"Received a new audio delta for an already completed audio response before receiving the BotStoppedSpeakingFrame." + ) + logger.debug("Forcing previous audio response to None") + self._current_audio_response = None + + if not self._current_audio_response: + self._current_audio_response = CurrentAudioResponse( + item_id=evt.item_id, + content_index=evt.content_index, + start_time_ms=int(time.time() * 1000), + ) + await self.push_frame(TTSStartedFrame()) + + audio = base64.b64decode(evt.delta) + self._current_audio_response.total_size += len(audio) + + frame = TTSAudioRawFrame( + audio=audio, + sample_rate=self._get_output_sample_rate(), + num_channels=1, + ) + await self.push_frame(frame) + + async def _handle_evt_audio_done(self, evt): + """Handle audio done event.""" + if self._current_audio_response: + await self.push_frame(TTSStoppedFrame()) + + async def _handle_evt_conversation_item_added(self, evt): + """Handle conversation.item.added event.""" + if evt.item.type == "function_call": + if evt.item.call_id not in self._pending_function_calls: + self._pending_function_calls[evt.item.call_id] = evt.item + else: + logger.debug(f"Function call {evt.item.call_id} already tracked, skipping") + + await self._call_event_handler("on_conversation_item_created", evt.item.id, evt.item) + + if self._messages_added_manually.get(evt.item.id): + del self._messages_added_manually[evt.item.id] + return + + if evt.item.role == "assistant": + self._current_assistant_response = evt.item + await self.push_frame(LLMFullResponseStartFrame()) + + async def _handle_evt_input_audio_transcription_delta(self, evt): + """Handle streaming input audio transcription delta. + + Accumulates deltas per item and pushes the running text as an + InterimTranscriptionFrame so the UI shows the full partial transcript. + """ + if evt.delta: + self._interim_transcription_text += evt.delta + await self.push_frame( + InterimTranscriptionFrame(self._interim_transcription_text, "", time_now_iso8601()), + FrameDirection.UPSTREAM, + ) + + async def _handle_evt_input_audio_transcription_completed(self, evt): + """Handle input audio transcription completed event.""" + self._interim_transcription_text = "" + await self._call_event_handler("on_conversation_item_updated", evt.item_id, None) + + transcript = evt.transcript.strip() if evt.transcript else "" + if transcript: + await self.push_frame( + TranscriptionFrame(transcript, "", time_now_iso8601(), result=evt), + FrameDirection.UPSTREAM, + ) + + async def _handle_evt_response_done(self, evt): + """Handle response.done event.""" + usage = evt.usage or evt.response.usage + if usage and usage.total_tokens: + tokens = LLMTokenUsage( + prompt_tokens=usage.input_tokens or 0, + completion_tokens=usage.output_tokens or 0, + total_tokens=usage.total_tokens or 0, + ) + await self.start_llm_usage_metrics(tokens) + + await self.stop_processing_metrics() + await self.push_frame(LLMFullResponseEndFrame()) + self._current_assistant_response = None + + if evt.response.status == "failed": + error_msg = "Response failed" + if evt.response.status_details: + error_msg = str(evt.response.status_details) + await self.push_error(error_msg=error_msg) + return + + for item in evt.response.output: + await self._call_event_handler("on_conversation_item_updated", item.id, item) + + async def _handle_evt_audio_transcript_delta(self, evt): + """Handle audio transcript delta event.""" + if evt.delta: + await self._push_output_transcript_text_frames(evt.delta) + + async def _push_output_transcript_text_frames(self, text: str): + # Push LLMTextFrame for RTVI "bot-llm-text" events (not appended to context + # to avoid duplication since the realtime API manages its own context). + llm_text_frame = LLMTextFrame(text) + llm_text_frame.append_to_context = False + await self.push_frame(llm_text_frame) + + # Push TTSTextFrame for output aggregation + tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE) + tts_text_frame.includes_inter_frame_spaces = True + await self.push_frame(tts_text_frame) + + async def _handle_evt_function_call_arguments_done(self, evt): + """Handle function call arguments done event.""" + try: + args = json.loads(evt.arguments) + + function_call_item = self._pending_function_calls.get(evt.call_id) + if function_call_item: + del self._pending_function_calls[evt.call_id] + + # Inworld may omit `name` from the done event — resolve from + # the tracked function call item. + function_name = evt.name or function_call_item.name + if not function_name: + logger.warning(f"No function name for call_id: {evt.call_id}") + return + + function_calls = [ + FunctionCallFromLLM( + context=self._context, + tool_call_id=evt.call_id, + function_name=function_name, + arguments=args, + ) + ] + + await self.run_function_calls(function_calls) + logger.debug(f"Processed function call: {function_name}") + else: + logger.warning(f"No tracked function call found for call_id: {evt.call_id}") + + except Exception as e: + logger.error(f"Failed to process function call arguments: {e}") + + async def _handle_evt_speech_started(self, evt): + """Handle speech started event from server-side VAD.""" + await self._truncate_current_audio_response() + await self.broadcast_frame(UserStartedSpeakingFrame) + await self.broadcast_interruption() + + async def _handle_evt_speech_stopped(self, evt): + """Handle speech stopped event from server-side VAD.""" + # Mark that the server is handling this turn (and will auto-create a + # response when create_response=True). This prevents _handle_context + # from sending a duplicate ResponseCreateEvent when the user aggregator + # appends the transcribed text to the context. + self._server_vad_handled_turn = True + await self.start_ttfb_metrics() + await self.start_processing_metrics() + await self.broadcast_frame(UserStoppedSpeakingFrame) + + async def _handle_evt_error(self, evt): + """Handle fatal error event.""" + await self.push_error(error_msg=f"Inworld Realtime Error: {evt.error.message}") + + # + # Response creation + # + + async def reset_conversation(self): + """Reset the conversation by disconnecting and reconnecting. + + This fully resets the server-side conversation state. Audio buffers, + pending function calls, and conversation history are cleared. + """ + logger.debug("Resetting Inworld conversation") + await self._disconnect() + + self._llm_needs_conversation_setup = True + await self._process_completed_function_calls(send_new_results=False) + + await self._connect() + + async def _create_response(self): + """Create an assistant response.""" + if not self._api_session_ready: + self._run_llm_when_api_session_ready = True + return + + adapter: InworldRealtimeLLMAdapter = self.get_llm_adapter() + + if self._llm_needs_conversation_setup: + logger.debug( + f"Setting up Inworld conversation with initial messages: " + f"{adapter.get_messages_for_logging(self._context)}" + ) + + llm_invocation_params = adapter.get_llm_invocation_params( + self._context, system_instruction=self._settings.system_instruction + ) + messages = llm_invocation_params["messages"] + + for item in messages: + evt = events.ConversationItemCreateEvent(item=item) + self._messages_added_manually[evt.item.id] = True + await self.send_client_event(evt) + + await self._send_session_update() + self._llm_needs_conversation_setup = False + + logger.debug("Creating Inworld response") + + await self.start_processing_metrics() + await self.start_ttfb_metrics() + + modalities = self._settings.session_properties.output_modalities or ["text", "audio"] + await self.send_client_event( + events.ResponseCreateEvent(response=events.ResponseProperties(modalities=modalities)) + ) + + async def _process_completed_function_calls(self, send_new_results: bool): + """Process completed function calls and send results to the service.""" + sent_new_result = False + + for message in self._context.get_messages(): + if message.get("role") == "tool" and message.get("content") != "IN_PROGRESS": + tool_call_id = message.get("tool_call_id") + if tool_call_id and tool_call_id not in self._completed_tool_calls: + if send_new_results: + sent_new_result = True + await self._send_tool_result(tool_call_id, message.get("content")) + self._completed_tool_calls.add(tool_call_id) + + if sent_new_result: + await self._create_response() + + async def _send_user_audio(self, frame): + """Send user audio to Inworld, buffered to ~60ms chunks.""" + if self._llm_needs_conversation_setup: + return + + if not self._audio_send_logged: + logger.debug( + f"Streaming audio to Inworld: {frame.sample_rate}Hz, " + f"{frame.num_channels}ch, {len(frame.audio)}B/frame" + ) + self._audio_send_logged = True + + # Compute chunk size from actual sample rate (16-bit mono = 2 bytes/sample) + chunk_bytes = int(frame.sample_rate * 2 * self._AUDIO_CHUNK_TARGET_MS / 1000) + + # Accumulate and send in chunks + self._audio_buffer += frame.audio + while len(self._audio_buffer) >= chunk_bytes: + chunk = self._audio_buffer[:chunk_bytes] + self._audio_buffer = self._audio_buffer[chunk_bytes:] + payload = base64.b64encode(chunk).decode("utf-8") + await self.send_client_event(events.InputAudioBufferAppendEvent(audio=payload)) + + async def _send_tool_result(self, tool_call_id: str, result: str): + """Send a tool call result to Inworld.""" + item = events.ConversationItem( + type="function_call_output", + call_id=tool_call_id, + output=json.dumps(result, ensure_ascii=False), + ) + await self.send_client_event(events.ConversationItemCreateEvent(item=item)) diff --git a/tests/test_function_calling_adapters.py b/tests/test_function_calling_adapters.py index 348754f47f..621c8d139f 100644 --- a/tests/test_function_calling_adapters.py +++ b/tests/test_function_calling_adapters.py @@ -13,6 +13,7 @@ from pipecat.adapters.services.anthropic_adapter import AnthropicLLMAdapter from pipecat.adapters.services.bedrock_adapter import AWSBedrockLLMAdapter from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter +from pipecat.adapters.services.inworld_realtime_adapter import InworldRealtimeLLMAdapter from pipecat.adapters.services.open_ai_adapter import OpenAILLMAdapter from pipecat.adapters.services.open_ai_realtime_adapter import OpenAIRealtimeLLMAdapter @@ -143,6 +144,32 @@ def test_openai_realtime_adapter(self): ] assert OpenAIRealtimeLLMAdapter().to_provider_tools_format(self.tools_def) == expected + def test_inworld_realtime_adapter(self): + """Test Inworld Realtime adapter format transformation.""" + expected = [ + { + "type": "function", + "name": "get_weather", + "description": "Get the weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city, e.g. San Francisco", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use.", + }, + }, + "required": ["location", "format"], + }, + } + ] + assert InworldRealtimeLLMAdapter().to_provider_tools_format(self.tools_def) == expected + def test_gemini_adapter_with_custom_tools(self): """Test Gemini adapter format transformation.""" search_tool = {"google_search": {}} diff --git a/tests/test_settings.py b/tests/test_settings.py index d790f2901d..6c45d53f51 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -10,6 +10,8 @@ from pipecat.services.deepgram.sagemaker.stt import DeepgramSageMakerSTTSettings from pipecat.services.deepgram.stt import DeepgramSTTService, DeepgramSTTSettings +from pipecat.services.inworld.realtime import events as inworld_events +from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMSettings from pipecat.services.openai.realtime import events from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMSettings from pipecat.services.settings import ( @@ -979,3 +981,200 @@ def test_roundtrip_from_mapping_apply_update(self): assert store.session_properties.instructions == "Be concise." assert store.session_properties.voice == "Eve" assert store.system_instruction == "Be concise." + + +# --------------------------------------------------------------------------- +# InworldRealtimeLLMSettings: apply_update with bidirectional sync +# --------------------------------------------------------------------------- + + +class TestInworldRealtimeSettingsApplyUpdate: + def _make_store(self, **kwargs) -> InworldRealtimeLLMSettings: + """Helper to build a store-mode InworldRealtimeLLMSettings.""" + defaults = dict( + model="openai/gpt-4.1-nano", + system_instruction=None, + temperature=None, + max_tokens=None, + top_p=None, + top_k=None, + frequency_penalty=None, + presence_penalty=None, + seed=None, + filter_incomplete_user_turns=False, + user_turn_completion_config=None, + session_properties=inworld_events.SessionProperties(), + ) + defaults.update(kwargs) + return InworldRealtimeLLMSettings(**defaults) + + def test_top_level_model_syncs_to_sp(self): + """Updating top-level model should propagate to session_properties.model.""" + store = self._make_store() + delta = InworldRealtimeLLMSettings(model="openai/gpt-4.1") + changed = store.apply_update(delta) + + assert "model" in changed + assert store.model == "openai/gpt-4.1" + assert store.session_properties.model == "openai/gpt-4.1" + + def test_top_level_system_instruction_syncs_to_sp(self): + """Updating top-level system_instruction should propagate to session_properties.instructions.""" + store = self._make_store() + delta = InworldRealtimeLLMSettings(system_instruction="Be helpful.") + changed = store.apply_update(delta) + + assert "system_instruction" in changed + assert store.system_instruction == "Be helpful." + assert store.session_properties.instructions == "Be helpful." + + def test_sp_replaces_wholesale(self): + """session_properties in delta replaces the entire stored SP.""" + store = self._make_store( + session_properties=inworld_events.SessionProperties( + output_modalities=["audio", "text"], + instructions="Old instructions.", + ), + system_instruction="Old instructions.", + ) + + new_sp = inworld_events.SessionProperties(output_modalities=["text"]) + delta = InworldRealtimeLLMSettings(session_properties=new_sp) + changed = store.apply_update(delta) + + assert "session_properties" in changed + assert store.session_properties.output_modalities == ["text"] + # model is synced from top-level + assert store.session_properties.model == "openai/gpt-4.1-nano" + + def test_sp_model_syncs_to_top_level(self): + """session_properties.model should sync to top-level model.""" + store = self._make_store() + new_sp = inworld_events.SessionProperties(model="openai/gpt-4.1") + delta = InworldRealtimeLLMSettings(session_properties=new_sp) + changed = store.apply_update(delta) + + assert "model" in changed + assert store.model == "openai/gpt-4.1" + assert store.session_properties.model == "openai/gpt-4.1" + + def test_sp_instructions_syncs_to_top_level(self): + """session_properties.instructions should sync to top-level system_instruction.""" + store = self._make_store() + new_sp = inworld_events.SessionProperties(instructions="New instructions.") + delta = InworldRealtimeLLMSettings(session_properties=new_sp) + changed = store.apply_update(delta) + + assert "system_instruction" in changed + assert store.system_instruction == "New instructions." + assert store.session_properties.instructions == "New instructions." + + def test_top_level_model_takes_precedence_over_sp_model(self): + """When both model and session_properties.model are in the delta, top-level wins.""" + store = self._make_store() + new_sp = inworld_events.SessionProperties(model="sp-model") + delta = InworldRealtimeLLMSettings(model="top-model", session_properties=new_sp) + store.apply_update(delta) + + assert store.model == "top-model" + assert store.session_properties.model == "top-model" + + def test_top_level_si_takes_precedence_over_sp_instructions(self): + """When both system_instruction and SP.instructions are in delta, top-level wins.""" + store = self._make_store() + new_sp = inworld_events.SessionProperties(instructions="sp instructions") + delta = InworldRealtimeLLMSettings( + system_instruction="top instructions", + session_properties=new_sp, + ) + store.apply_update(delta) + + assert store.system_instruction == "top instructions" + assert store.session_properties.instructions == "top instructions" + + def test_non_synced_field_update_does_not_affect_sp(self): + """Updating a non-synced field like temperature shouldn't touch session_properties.""" + store = self._make_store( + session_properties=inworld_events.SessionProperties(instructions="Keep me."), + system_instruction="Keep me.", + ) + original_sp = store.session_properties + + delta = InworldRealtimeLLMSettings(temperature=0.5) + changed = store.apply_update(delta) + + assert "temperature" in changed + assert store.temperature == 0.5 + # SP should be untouched (same object) + assert store.session_properties is original_sp + assert store.session_properties.instructions == "Keep me." + + +# --------------------------------------------------------------------------- +# InworldRealtimeLLMSettings: from_mapping +# --------------------------------------------------------------------------- + + +class TestInworldRealtimeSettingsFromMapping: + def test_sp_keys_route_to_session_properties(self): + """SessionProperties fields (instructions, output_modalities) route into nested SP.""" + delta = InworldRealtimeLLMSettings.from_mapping( + {"instructions": "Be concise.", "output_modalities": ["text"]} + ) + assert is_given(delta.session_properties) + assert delta.session_properties.instructions == "Be concise." + assert delta.session_properties.output_modalities == ["text"] + + def test_model_routes_to_top_level(self): + """model should go to the top-level field, not session_properties.""" + delta = InworldRealtimeLLMSettings.from_mapping({"model": "openai/gpt-4.1"}) + assert delta.model == "openai/gpt-4.1" + # No session_properties should be created since no SP keys were present + assert not is_given(delta.session_properties) + + def test_unknown_keys_go_to_extra(self): + """Unrecognized keys should land in extra.""" + delta = InworldRealtimeLLMSettings.from_mapping({"unknown_param": 42}) + assert not is_given(delta.model) + assert not is_given(delta.session_properties) + assert delta.extra == {"unknown_param": 42} + + def test_mixed_keys(self): + """model + SP keys + unknown keys are routed correctly.""" + delta = InworldRealtimeLLMSettings.from_mapping( + { + "model": "openai/gpt-4.1", + "instructions": "Be helpful.", + "unknown": "val", + } + ) + assert delta.model == "openai/gpt-4.1" + assert is_given(delta.session_properties) + assert delta.session_properties.instructions == "Be helpful." + assert delta.extra == {"unknown": "val"} + + def test_roundtrip_from_mapping_apply_update(self): + """Simulate dict-style update: from_mapping -> apply_update.""" + store = InworldRealtimeLLMSettings( + model="openai/gpt-4.1-nano", + system_instruction=None, + temperature=None, + max_tokens=None, + top_p=None, + top_k=None, + frequency_penalty=None, + presence_penalty=None, + seed=None, + filter_incomplete_user_turns=False, + user_turn_completion_config=None, + session_properties=inworld_events.SessionProperties(), + ) + + raw = {"instructions": "Be concise.", "output_modalities": ["text"]} + delta = InworldRealtimeLLMSettings.from_mapping(raw) + changed = store.apply_update(delta) + + assert "session_properties" in changed + assert store.session_properties.instructions == "Be concise." + assert store.session_properties.output_modalities == ["text"] + assert store.system_instruction == "Be concise." diff --git a/uv.lock b/uv.lock index 1445296ad3..a9b408abad 100644 --- a/uv.lock +++ b/uv.lock @@ -4248,6 +4248,9 @@ heygen = [ hume = [ { name = "hume" }, ] +inworld = [ + { name = "websockets" }, +] koala = [ { name = "pvkoala" }, ] @@ -4470,6 +4473,7 @@ requires-dist = [ { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'google'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'gradium'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'heygen'" }, + { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'inworld'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'lmnt'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'neuphonic'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'openai'" },