Skip to content

Add auto chunking for text streams #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/curvy-moons-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

Add auto chunking for text streams
23 changes: 8 additions & 15 deletions packages/livekit-rtc/src/data_streams/stream_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { ReadableStream } from 'node:stream/web';
import { log } from '../log.js';
import type { DataStream_Chunk } from '../proto/room_pb.js';
import { bigIntToNumber } from '../utils.js';
import type { BaseStreamInfo, ByteStreamInfo, TextStreamChunk, TextStreamInfo } from './types.js';
import type { BaseStreamInfo, ByteStreamInfo, TextStreamInfo } from './types.js';

abstract class BaseStreamReader<T extends BaseStreamInfo> {
protected reader: ReadableStream<DataStream_Chunk>;
Expand Down Expand Up @@ -124,7 +124,7 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
const decoder = new TextDecoder();

return {
next: async (): Promise<IteratorResult<TextStreamChunk>> => {
next: async (): Promise<IteratorResult<string>> => {
try {
const { done, value } = await reader.read();
if (done) {
Expand All @@ -133,14 +133,7 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
this.handleChunkReceived(value);
return {
done: false,
value: {
index: bigIntToNumber(value.chunkIndex)!,
current: decoder.decode(value.content!),
collected: Array.from(this.receivedChunks.values())
.sort((a, b) => bigIntToNumber(a.chunkIndex!) - bigIntToNumber(b.chunkIndex!))
.map((chunk) => decoder.decode(chunk.content!))
.join(''),
},
value: decoder.decode(value.content!),
};
}
} catch (error) {
Expand All @@ -149,18 +142,18 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
}
},

return(): IteratorResult<TextStreamChunk> {
return(): IteratorResult<string> {
reader.releaseLock();
return { done: true, value: undefined };
},
};
}

async readAll(): Promise<string> {
let latestString: string = '';
for await (const { collected } of this) {
latestString = collected;
let finalString: string = '';
for await (const chunk of this) {
finalString += chunk;
}
return latestString;
return finalString;
}
}
10 changes: 5 additions & 5 deletions packages/livekit-rtc/src/data_streams/stream_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ import type { WritableStream } from 'node:stream/web';
import type { BaseStreamInfo, ByteStreamInfo, TextStreamInfo } from './types.js';

class BaseStreamWriter<T, InfoType extends BaseStreamInfo> {
protected writableStream: WritableStream<[T, number?]>;
protected writableStream: WritableStream<T>;

protected defaultWriter: WritableStreamDefaultWriter<[T, number?]>;
protected defaultWriter: WritableStreamDefaultWriter<T>;

protected onClose?: () => void;

readonly info: InfoType;

constructor(writableStream: WritableStream<[T, number?]>, info: InfoType, onClose?: () => void) {
constructor(writableStream: WritableStream<T>, info: InfoType, onClose?: () => void) {
this.writableStream = writableStream;
this.defaultWriter = writableStream.getWriter();
this.onClose = onClose;
this.info = info;
}

write(chunk: T, chunkIndex?: number): Promise<void> {
return this.defaultWriter.write([chunk, chunkIndex]);
write(chunk: T): Promise<void> {
return this.defaultWriter.write(chunk);
}

async close() {
Expand Down
6 changes: 0 additions & 6 deletions packages/livekit-rtc/src/data_streams/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ export type ByteStreamInfo = BaseStreamInfo & {

export type TextStreamInfo = BaseStreamInfo;

export type TextStreamChunk = {
index: number;
current: string;
collected: string;
};

export interface DataStreamOptions {
topic?: string;
destinationIdentities?: Array<string>;
Expand Down
36 changes: 11 additions & 25 deletions packages/livekit-rtc/src/participant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,40 +293,29 @@ export class LocalParticipant extends Participant {

await this.sendStreamHeader(headerReq);

let chunkId = 0;
let nextChunkId = 0;
const localHandle = this.ffi_handle.handle;
const sendTrailer = this.sendStreamTrailer;
const sendChunk = this.sendStreamChunk;

const writableStream = new WritableStream<[string, number?]>({
const writableStream = new WritableStream<string>({
// Implement the sink
write([textChunk, overrideChunkId]) {
const textInBytes = new TextEncoder().encode(textChunk);

if (textInBytes.byteLength > STREAM_CHUNK_SIZE) {
this.abort?.();
throw new Error('chunk size too large');
}

return new Promise(async (resolve) => {
async write(text) {
for (const textChunk in splitUtf8(text, STREAM_CHUNK_SIZE)) {
const chunkRequest = new SendStreamChunkRequest({
senderIdentity,
localParticipantHandle: localHandle,
destinationIdentities,
chunk: new DataStream_Chunk({
content: textInBytes,
content: new TextEncoder().encode(textChunk),
streamId,
chunkIndex: numberToBigInt(overrideChunkId ?? chunkId),
chunkIndex: numberToBigInt(nextChunkId),
}),
});

await sendChunk(chunkRequest);

if (!overrideChunkId) {
chunkId += 1;
resolve();
}
});
nextChunkId += 1;
}
},
async close() {
const trailerReq = new SendStreamTrailerRequest({
Expand Down Expand Up @@ -360,10 +349,7 @@ export class LocalParticipant extends Participant {
},
) {
const writer = await this.streamText(options);

for (const chunk in splitUtf8(text, STREAM_CHUNK_SIZE)) {
await writer.write(chunk);
}
await writer.write(text);
await writer.close();
return writer.info;
}
Expand Down Expand Up @@ -419,8 +405,8 @@ export class LocalParticipant extends Participant {
const sendChunk = this.sendStreamChunk;
const writeMutex = new Mutex();

const writableStream = new WritableStream<[Uint8Array, number?]>({
async write([chunk]) {
const writableStream = new WritableStream<Uint8Array>({
async write(chunk) {
const unlock = await writeMutex.lock();

let byteOffset = 0;
Expand Down
Loading