Skip to content
66 changes: 38 additions & 28 deletions packages/react-client/src/ReactFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,6 @@ type Response = {
_chunks: Map<number, SomeChunk<any>>,
_fromJSON: (key: string, value: JSONValue) => any,
_stringDecoder: StringDecoder,
_rowState: RowParserState,
_rowID: number, // parts of a row ID parsed so far
_rowTag: number, // 0 indicates that we're currently parsing the row ID
_rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline.
_buffer: Array<Uint8Array>, // chunks received so far as part of this row
_closed: boolean,
_closedReason: mixed,
_tempRefs: void | TemporaryReferenceSet, // the set temporary references can be resolved from
Expand Down Expand Up @@ -2154,11 +2149,6 @@ function ResponseInstance(
this._chunks = chunks;
this._stringDecoder = createStringDecoder();
this._fromJSON = (null: any);
this._rowState = 0;
this._rowID = 0;
this._rowTag = 0;
this._rowLength = 0;
this._buffer = [];
this._closed = false;
this._closedReason = null;
this._tempRefs = temporaryReferences;
Expand Down Expand Up @@ -2259,6 +2249,24 @@ export function createResponse(
);
}

export type StreamState = {
_rowState: RowParserState,
_rowID: number, // parts of a row ID parsed so far
_rowTag: number, // 0 indicates that we're currently parsing the row ID
_rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline.
_buffer: Array<Uint8Array>, // chunks received so far as part of this row
};

export function createStreamState(): StreamState {
return {
_rowState: 0,
_rowID: 0,
_rowTag: 0,
_rowLength: 0,
_buffer: [],
};
}

function resolveDebugHalt(response: Response, id: number): void {
const chunks = response._chunks;
let chunk = chunks.get(id);
Expand Down Expand Up @@ -3995,6 +4003,7 @@ function processFullStringRow(

export function processBinaryChunk(
weakResponse: WeakResponse,
streamState: StreamState,
chunk: Uint8Array,
): void {
if (hasGCedResponse(weakResponse)) {
Expand All @@ -4003,11 +4012,11 @@ export function processBinaryChunk(
}
const response = unwrapWeakResponse(weakResponse);
let i = 0;
let rowState = response._rowState;
let rowID = response._rowID;
let rowTag = response._rowTag;
let rowLength = response._rowLength;
const buffer = response._buffer;
let rowState = streamState._rowState;
let rowID = streamState._rowID;
let rowTag = streamState._rowTag;
let rowLength = streamState._rowLength;
const buffer = streamState._buffer;
const chunkLength = chunk.length;
while (i < chunkLength) {
let lastIdx = -1;
Expand Down Expand Up @@ -4112,14 +4121,15 @@ export function processBinaryChunk(
break;
}
}
response._rowState = rowState;
response._rowID = rowID;
response._rowTag = rowTag;
response._rowLength = rowLength;
streamState._rowState = rowState;
streamState._rowID = rowID;
streamState._rowTag = rowTag;
streamState._rowLength = rowLength;
}

export function processStringChunk(
weakResponse: WeakResponse,
streamState: StreamState,
chunk: string,
): void {
if (hasGCedResponse(weakResponse)) {
Expand All @@ -4136,11 +4146,11 @@ export function processStringChunk(
// here. Basically, only if Flight Server gave you this string as a chunk,
// you can use it here.
let i = 0;
let rowState = response._rowState;
let rowID = response._rowID;
let rowTag = response._rowTag;
let rowLength = response._rowLength;
const buffer = response._buffer;
let rowState = streamState._rowState;
let rowID = streamState._rowID;
let rowTag = streamState._rowTag;
let rowLength = streamState._rowLength;
const buffer = streamState._buffer;
const chunkLength = chunk.length;
while (i < chunkLength) {
let lastIdx = -1;
Expand Down Expand Up @@ -4264,10 +4274,10 @@ export function processStringChunk(
);
}
}
response._rowState = rowState;
response._rowID = rowID;
response._rowTag = rowTag;
response._rowLength = rowLength;
streamState._rowState = rowState;
streamState._rowID = rowID;
streamState._rowTag = rowTag;
streamState._rowLength = rowLength;
}

function parseModel<T>(response: Response, json: UninitializedModel): T {
Expand Down
4 changes: 3 additions & 1 deletion packages/react-markup/src/ReactMarkupServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {

import {
createResponse as createFlightResponse,
createStreamState as createFlightStreamState,
getRoot as getFlightRoot,
processStringChunk as processFlightStringChunk,
close as closeFlight,
Expand Down Expand Up @@ -80,10 +81,11 @@ export function experimental_renderToHTML(
options?: MarkupOptions,
): Promise<string> {
return new Promise((resolve, reject) => {
const streamState = createFlightStreamState();
const flightDestination = {
push(chunk: string | null): boolean {
if (chunk !== null) {
processFlightStringChunk(flightResponse, chunk);
processFlightStringChunk(flightResponse, streamState, chunk);
} else {
closeFlight(flightResponse);
}
Expand Down
62 changes: 32 additions & 30 deletions packages/react-noop-renderer/src/ReactNoopFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,36 @@ type Source = Array<Uint8Array>;

const decoderOptions = {stream: true};

const {createResponse, processBinaryChunk, getRoot, close} = ReactFlightClient({
createStringDecoder() {
return new TextDecoder();
},
readPartialStringChunk(decoder: TextDecoder, buffer: Uint8Array): string {
return decoder.decode(buffer, decoderOptions);
},
readFinalStringChunk(decoder: TextDecoder, buffer: Uint8Array): string {
return decoder.decode(buffer);
},
resolveClientReference(bundlerConfig: null, idx: string) {
return idx;
},
prepareDestinationForModule(moduleLoading: null, metadata: string) {},
preloadModule(idx: string) {},
requireModule(idx: string) {
return readModule(idx);
},
parseModel(response: Response, json) {
return JSON.parse(json, response._fromJSON);
},
bindToConsole(methodName, args, badgeName) {
return Function.prototype.bind.apply(
// eslint-disable-next-line react-internal/no-production-logging
console[methodName],
[console].concat(args),
);
},
});
const {createResponse, createStreamState, processBinaryChunk, getRoot, close} =
ReactFlightClient({
createStringDecoder() {
return new TextDecoder();
},
readPartialStringChunk(decoder: TextDecoder, buffer: Uint8Array): string {
return decoder.decode(buffer, decoderOptions);
},
readFinalStringChunk(decoder: TextDecoder, buffer: Uint8Array): string {
return decoder.decode(buffer);
},
resolveClientReference(bundlerConfig: null, idx: string) {
return idx;
},
prepareDestinationForModule(moduleLoading: null, metadata: string) {},
preloadModule(idx: string) {},
requireModule(idx: string) {
return readModule(idx);
},
parseModel(response: Response, json) {
return JSON.parse(json, response._fromJSON);
},
bindToConsole(methodName, args, badgeName) {
return Function.prototype.bind.apply(
// eslint-disable-next-line react-internal/no-production-logging
console[methodName],
[console].concat(args),
);
},
});

type ReadOptions = {|
findSourceMapURL?: FindSourceMapURLCallback,
Expand All @@ -76,8 +77,9 @@ function read<T>(source: Source, options: ReadOptions): Thenable<T> {
? options.debugChannel.onMessage
: undefined,
);
const streamState = createStreamState();
for (let i = 0; i < source.length; i++) {
processBinaryChunk(response, source[i], 0);
processBinaryChunk(response, streamState, source[i], 0);
}
if (options !== undefined && options.close) {
close(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import type {ReactServerValue} from 'react-client/src/ReactFlightReplyClient';

import {
createResponse,
createStreamState,
getRoot,
reportGlobalError,
processBinaryChunk,
processStringChunk,
close,
injectIntoDevTools,
} from 'react-client/src/ReactFlightClient';
Expand All @@ -44,7 +46,7 @@ type CallServerCallback = <A, T>(string, args: A) => Promise<T>;
export type Options = {
moduleBaseURL?: string,
callServer?: CallServerCallback,
debugChannel?: {writable?: WritableStream, ...},
debugChannel?: {writable?: WritableStream, readable?: ReadableStream, ...},
temporaryReferences?: TemporaryReferenceSet,
findSourceMapURL?: FindSourceMapURLCallback,
replayConsoleLogs?: boolean,
Expand Down Expand Up @@ -96,10 +98,50 @@ function createResponseFromOptions(options: void | Options) {
);
}

function startReadingFromUniversalStream(
response: FlightResponse,
stream: ReadableStream,
): void {
// This is the same as startReadingFromStream except this allows WebSocketStreams which
// return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially
// always allow streams with variable chunk types.
const streamState = createStreamState();
const reader = stream.getReader();
function progress({
done,
value,
}: {
done: boolean,
value: any,
...
}): void | Promise<void> {
if (done) {
close(response);
return;
}
if (value instanceof ArrayBuffer) {
// WebSockets can produce ArrayBuffer values in ReadableStreams.
processBinaryChunk(response, streamState, new Uint8Array(value));
} else if (typeof value === 'string') {
// WebSockets can produce string values in ReadableStreams.
processStringChunk(response, streamState, value);
} else {
processBinaryChunk(response, streamState, value);
}
return reader.read().then(progress).catch(error);
}
function error(e: any) {
reportGlobalError(response, e);
}
reader.read().then(progress).catch(error);
}

function startReadingFromStream(
response: FlightResponse,
stream: ReadableStream,
isSecondaryStream: boolean,
): void {
const streamState = createStreamState();
const reader = stream.getReader();
function progress({
done,
Expand All @@ -110,25 +152,37 @@ function startReadingFromStream(
...
}): void | Promise<void> {
if (done) {
close(response);
// If we're the secondary stream, then we don't close the response until the debug channel closes.
if (!isSecondaryStream) {
close(response);
}
return;
}
const buffer: Uint8Array = (value: any);
processBinaryChunk(response, buffer);
processBinaryChunk(response, streamState, buffer);
return reader.read().then(progress).catch(error);
}
function error(e: any) {
reportGlobalError(response, e);
}
reader.read().then(progress).catch(error);
}

function createFromReadableStream<T>(
stream: ReadableStream,
options?: Options,
): Thenable<T> {
const response: FlightResponse = createResponseFromOptions(options);
startReadingFromStream(response, stream);
if (
__DEV__ &&
options &&
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromUniversalStream(response, options.debugChannel.readable);
startReadingFromStream(response, stream, true);
} else {
startReadingFromStream(response, stream, false);
}
return getRoot(response);
}

Expand All @@ -139,7 +193,20 @@ function createFromFetch<T>(
const response: FlightResponse = createResponseFromOptions(options);
promiseForResponse.then(
function (r) {
startReadingFromStream(response, (r.body: any));
if (
__DEV__ &&
options &&
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromUniversalStream(
response,
options.debugChannel.readable,
);
startReadingFromStream(response, (r.body: any), true);
} else {
startReadingFromStream(response, (r.body: any), false);
}
},
function (e) {
reportGlobalError(response, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import type {Readable} from 'stream';

import {
createResponse,
createStreamState,
getRoot,
reportGlobalError,
processStringChunk,
processBinaryChunk,
close,
} from 'react-client/src/ReactFlightClient';
Expand Down Expand Up @@ -78,8 +80,13 @@ function createFromNodeStream<T>(
? options.environmentName
: undefined,
);
const streamState = createStreamState();
stream.on('data', chunk => {
processBinaryChunk(response, chunk);
if (typeof chunk === 'string') {
processStringChunk(response, streamState, chunk);
} else {
processBinaryChunk(response, streamState, chunk);
}
});
stream.on('error', error => {
reportGlobalError(response, error);
Expand Down
Loading
Loading