Skip to content

[API Proposal]: Add APIs to WebSocket which allow it to be read as a Stream #111217

@christothes

Description

@christothes

UPD 17-06-2025 by @CarnaViire
Updated API shape for the second review round:

namespace System.Net.WebSockets;

public class WebSocketStream : Stream
{
    internal WebSocketStream();

    public WebSocket WebSocket { get; }

    public static WebSocketStream Create(
        WebSocket webSocket,
        WebSocketMessageType writeMessageType,
        bool ownsWebSocket = false); // if ownsWebSocket == true, (default) closeTimeout = 60s

    public static WebSocketStream Create( // implicit ownsWebSocket = true
        WebSocket webSocket,
        WebSocketMessageType writeMessageType,
        TimeSpan closeTimeout); // if closeTimeout == 0, Dispose aborts the WebSocket

    public static WebSocketStream CreateWritableMessageStream(
        WebSocket webSocket,
        WebSocketMessageType writeMessageType);

    public static WebSocketStream CreateReadableMessageStream(WebSocket webSocket);

    ... // relevant Stream overrides
}
⤵️ Diff from the previous version

Previous (approved) version: #111217 (comment)

public class WebSocketStream : Stream
{
    internal WebSocketStream();

    public WebSocket WebSocket { get; }

    public static WebSocketStream Create(
        WebSocket webSocket,
+       WebSocketMessageType writeMessageType,
        bool ownsWebSocket = false);

+   public static WebSocketStream Create(
+       WebSocket webSocket,
+       WebSocketMessageType writeMessageType,
+       TimeSpan closeTimeout);

    public static WebSocketStream CreateWritableMessageStream(
        WebSocket webSocket,
+       WebSocketMessageType writeMessageType);

    public static WebSocketStream CreateReadableMessageStream(WebSocket webSocket);

    ... // relevant Stream overrides
}
⤵️ Usage examples
  1. Duplex stream, continuously reading/writing Text data without frame alignment (e.g. STOMP)
using Stream transportStream = WebSocketStream.Create(connectedWebSocket, WebSocketMessageType.Text, ownsWebSocket: true);
// ...
await frame.SerializeToStreamAsync(transportStream, token); // integration with Stream-based APIs
// ...
using var transportReader = new StreamReader(transportStream, leaveOpen: true); // seamless UTF-8 conversion for text
var line = await transportReader.ReadLineAsync(cancellationToken);              // protocols with new line separators
// ...
  1. Duplex stream, continuously reading/writing Binary data without frame alignment (e.g. AMQP)
// using Stream transportStream = new NetworkStream(tcpSocket, ownsSocket: true); // stream as an easy transport abstraction
using Stream transportStream = WebSocketStream.Create(connectedWebSocket, WebSocketMessageType.Binary, closeTimeout: TimeSpan.FromSeconds(10));
// ...
await transportStream.WriteAsync(sendPayload, cancellationToken);
// ...
var receivePayload = new byte[payloadLength];
await transportStream.ReadExactlyAsync(receivePayload, cancellationToken); // convenience methods
  1. Single message read stream, parsing Json
using Stream messageStream = WebSocketStream.CreateReadableMessageStream(connectedWebSocket, WebSocketMessageType.Text);
var appMessage = await JsonSerializer.DeserializeAsync<AppMessage>(messageStream); // DeserializeAsync reads until EOS
  1. Single message write stream, binary serialization
public async Task SendMessageAsync(AppMessage message, CancellationToken cancellationToken)
{
    using Stream messageStream = WebSocketStream.CreateWritableMessageStream(_connectedWebSocket, WebSocketMessageType.Binary);
    foreach (ReadOnlyMemory<byte> chunk in message.SplitToChunks())
    {
        await messageStream.WriteAsync(chunk, cancellationToken);
    }
} // implicit EOM sent on messageStream.Dispose()

EDITED by @stephentoub 4/7/2025 to add API for review:

+public sealed class WebSocketStream : Stream
+{
+    public WebSocketStream(WebSocket webSocket, bool ownsWebSocket = false);
+    public WebSocket WebSocket { get; }
+
+    ... // relevant Stream overrides
+}

Open questions:

  • Name and polarity of the boolean. NetworkStream uses ownsSocket = false, but while it's the closest in relation to this class, it's also fairly unique i this approach. Most other .NET types use leaveOpen = false.
  • There's a possible further designed expansion, where you could create a Stream to represent writing out a single message in parts, in which case all writes on the stream would be with EndOfMessage:false and then closing the stream would output an empty message with EndOfMessage:true. If we add that subsequently, how would we want it to show up? Should we instead have a factory method WebSocketStream.Create, and then later add a WebSocketStream.CreateMessage?

Background and motivation

Utilizing WebSockets is a convenient approach to writing real-time audio processing code for ASP.NET applications. One such scenario is implementing a real-time conversation with Open AI.

OpenAI's real-time API SendInputAudioAsync accept a Stream as input which leaves it up to the developer to write a custom Stream implementation that reads from an underlying WebSocket. It would be a nice enhancement to the WebSocket APIs if one could wrap read operations in a Stream.

API Proposal

public class WebSocket
{
    public Stream AsStream();
}

API Usage

using var webSocket = await context.WebSockets.AcceptWebSocketAsync();
using RealtimeConversationSession session = await InitSession(realtime);
// <...>
using var stream = webSocket.AsStream();
await session.SendInputAudioAsync(stream);

Alternative Designs

No response

Risks

WebSocket doesn’t provide synchronous methods for wire-based operations, so all of the Stream sync APIs (including Dispose, which presumably would need to not just Dispose the WebSocket but also CloseAsync it) would be sync-over-async.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions