Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions src/services/api/__test__/readPartText.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import {ReadableStream as WebReadableStream} from 'stream/web';
import {TextDecoder as NodeTextDecoder, TextEncoder as NodeTextEncoder} from 'util';

import type {MultipartPart} from '@mjackson/multipart-parser';

import {readPartText} from '../streamingPartReader';

// jsdom does not provide Web Streams / Encoding APIs; polyfill for this test file.
/* eslint-disable @typescript-eslint/no-explicit-any */
if (typeof globalThis.ReadableStream === 'undefined') {
(globalThis as any).ReadableStream = WebReadableStream;
}
if (typeof globalThis.TextEncoder === 'undefined') {
(globalThis as any).TextEncoder = NodeTextEncoder;
}
if (typeof globalThis.TextDecoder === 'undefined') {
(globalThis as any).TextDecoder = NodeTextDecoder;
}
/* eslint-enable @typescript-eslint/no-explicit-any */

function createFakePart(bodyChunks: Uint8Array[], contentLength: number | null): MultipartPart {
const body = new ReadableStream<Uint8Array>({
start(controller) {
for (const chunk of bodyChunks) {
controller.enqueue(chunk);
}
controller.close();
},
});

return {
headers: {contentLength},
body,
text: () => {
const reader = body.getReader();
const chunks: Uint8Array[] = [];
const pump = (): Promise<string> =>
reader.read().then(({done, value}) => {
if (done) {
const total = chunks.reduce((s, c) => s + c.byteLength, 0);
const merged = new Uint8Array(total);
let off = 0;
for (const c of chunks) {
merged.set(c, off);
off += c.byteLength;
}
return new TextDecoder().decode(merged);
}
chunks.push(value);
return pump();
});
return pump();
},
} as unknown as MultipartPart;
}

function toBytes(str: string): Uint8Array {
return Buffer.from(str);
}

function splitBytes(data: Uint8Array, ...splitPoints: number[]): Uint8Array[] {
const chunks: Uint8Array[] = [];
let prev = 0;
for (const point of splitPoints) {
chunks.push(data.subarray(prev, point));
prev = point;
}
chunks.push(data.subarray(prev));
return chunks;
}

describe('readPartText', () => {
test('reads body delivered as a single chunk', async () => {
const json = '{"event":"SessionCreated"}';
const bytes = toBytes(json);
const part = createFakePart([bytes], bytes.byteLength);

const result = await readPartText(part);
expect(result).toBe(json);
});

test('accumulates body split across multiple small chunks', async () => {
const json = '{"meta":{"event":"SessionCreated","node_id":1,"query_id":"q1"}}';
const bytes = toBytes(json);
const chunks = splitBytes(bytes, 5, 20, 40);

expect(chunks.length).toBe(4);
expect(chunks.reduce((sum, c) => sum + c.byteLength, 0)).toBe(bytes.byteLength);

const part = createFakePart(chunks, bytes.byteLength);
const result = await readPartText(part);
expect(result).toBe(json);
});

test('accumulates body delivered one byte at a time', async () => {
const json = '{"x":1}';
const bytes = toBytes(json);
const chunks = Array.from(bytes).map((b) => new Uint8Array([b]));

const part = createFakePart(chunks, bytes.byteLength);
const result = await readPartText(part);
expect(result).toBe(json);
});

test('falls back to part.text() when Content-Length is absent', async () => {
const json = '{"fallback":true}';
const bytes = toBytes(json);
const part = createFakePart([bytes], null);

const result = await readPartText(part);
expect(result).toBe(json);
});

test('clamps chunk that exceeds remaining buffer capacity', async () => {
const json = '{"ok":true}';
const bytes = toBytes(json);
const oversized = new Uint8Array(bytes.byteLength + 20);
oversized.set(bytes, 0);

const part = createFakePart([oversized], bytes.byteLength);
const result = await readPartText(part);
expect(result).toBe(json);
});

test('handles async delivery with delays between chunks', async () => {
const json = '{"delayed":"chunks"}';
const bytes = toBytes(json);
const mid = Math.floor(bytes.byteLength / 2);

const body = new ReadableStream<Uint8Array>({
async start(controller) {
controller.enqueue(bytes.subarray(0, mid));
await new Promise((r) => setTimeout(r, 50));
controller.enqueue(bytes.subarray(mid));
controller.close();
},
});

const part = {
headers: {contentLength: bytes.byteLength},
body,
text: () => Promise.resolve(json),
} as unknown as MultipartPart;

const result = await readPartText(part);
expect(result).toBe(json);
});
});
3 changes: 2 additions & 1 deletion src/services/api/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {DEV_ENABLE_TRACING_FOR_ALL_REQUESTS} from '../../utils/constants';
import {isRedirectToAuth} from '../../utils/response';

import {BaseYdbAPI} from './base';
import {readPartText} from './streamingPartReader';

const BOUNDARY = 'boundary';

Expand Down Expand Up @@ -92,7 +93,7 @@ export class StreamingAPI extends BaseYdbAPI {
const traceId = response.headers.get('traceresponse')?.split('-')[1];

await parseMultipart(response.body, {boundary: BOUNDARY}, async (part) => {
const text = await part.text();
const text = await readPartText(part);

let chunk: unknown;
try {
Expand Down
29 changes: 29 additions & 0 deletions src/services/api/streamingPartReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import type {MultipartPart} from '@mjackson/multipart-parser';

export async function readPartText(part: MultipartPart): Promise<string> {
const contentLength = part.headers.contentLength;
if (contentLength === null || contentLength <= 0) {
return part.text();
}

const reader = part.body.getReader();
try {
const buffer = new Uint8Array(contentLength);
let offset = 0;

while (offset < contentLength) {
const {done, value} = await reader.read();
if (done) {
break;
}
const remaining = contentLength - offset;
const slice = value.byteLength <= remaining ? value : value.subarray(0, remaining);
buffer.set(slice, offset);
offset += slice.byteLength;
}

return new TextDecoder().decode(buffer.subarray(0, offset));
} finally {
reader.releaseLock();
}
}
20 changes: 20 additions & 0 deletions tests/suites/tenant/queryEditor/queryStatus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,29 @@ test.describe('Test Query Execution Status', async () => {
}
}

expect(transitions).toContain('Running');
expect(transitions[transitions.length - 1]).toBe('Completed');
});

test('Streaming query reaches "Running" when SessionCreated arrives in split chunks', async ({
page,
}) => {
const queryEditor = new QueryEditor(page);
await toggleExperiment(page, 'on', 'Query Streaming');

await setupMockStreamingFetch(page, {
totalChunks: 5,
splitSessionPart: true,
chunkIntervalMs: 1000,
});

await queryEditor.setQuery(testQuery);
await queryEditor.clickRunButton();

await expect(queryEditor.waitForStatus('Running')).resolves.toBe(true);
await expect(queryEditor.waitForStatus('Completed')).resolves.toBe(true);
});

test('Streaming query shows "Failed" status on server error', async ({page}) => {
const queryEditor = new QueryEditor(page);
await toggleExperiment(page, 'on', 'Query Streaming');
Expand Down
88 changes: 61 additions & 27 deletions tests/utils/mockStreamingFetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ export interface MockStreamingOptions {
* errorAfterChunks are set.
*/
errorAfterChunks?: number;
/**
* When true, the SessionCreated part is delivered in two halves with a
* 100 ms pause between them, simulating a partial network delivery.
* Useful for verifying that `readPartText` correctly accumulates bytes
* when the body arrives across multiple ReadableStream chunks.
*/
splitSessionPart?: boolean;
}

/**
Expand All @@ -38,9 +45,15 @@ export async function setupMockStreamingFetch(
const chunkIntervalMs = options.chunkIntervalMs ?? 200;
const totalChunks = options.totalChunks ?? null;
const errorAfterChunks = options.errorAfterChunks ?? null;
const splitSessionPart = options.splitSessionPart ?? false;

await page.evaluate(
({chunkIntervalMs: interval, totalChunks: total, errorAfterChunks: errorAfter}) => {
({
chunkIntervalMs: interval,
totalChunks: total,
errorAfterChunks: errorAfter,
splitSessionPart: splitSession,
}) => {
const originalFetch = window.fetch;
(window as unknown as Record<string, unknown>).__originalFetch = originalFetch;

Expand Down Expand Up @@ -134,45 +147,65 @@ export async function setupMockStreamingFetch(
const chunkLimit = shouldError ? errorAfter : total;

let intervalId: number | undefined;
let splitTimeoutId: ReturnType<typeof setTimeout> | undefined;
let chunkIndex = 0;

const stream = new ReadableStream<Uint8Array>({
start(controller) {
// Send session chunk immediately
controller.enqueue(encodePart(sessionJSON));

// Deliver data chunks at steady intervals
intervalId = window.setInterval(() => {
try {
// Check if we should terminate
if (chunkLimit !== null && chunkIndex >= chunkLimit) {
const sessionPart = encodePart(sessionJSON);

const startDataChunks = () => {
intervalId = window.setInterval(() => {
try {
// Check if we should terminate
if (chunkLimit !== null && chunkIndex >= chunkLimit) {
window.clearInterval(intervalId);
const responseJSON = shouldError
? errorResponseJSON
: queryResponseJSON;
controller.enqueue(encodePart(responseJSON));
controller.enqueue(encodeClosingBoundary());
controller.close();
return;
}

controller.enqueue(encodePart(dataChunkJSON(chunkIndex)));
chunkIndex++;
} catch (error) {
window.clearInterval(intervalId);
const responseJSON = shouldError
? errorResponseJSON
: queryResponseJSON;
controller.enqueue(encodePart(responseJSON));
controller.enqueue(encodeClosingBoundary());
controller.close();
return;
try {
controller.error(
error instanceof Error
? error
: new Error(String(error)),
);
} catch {
// stream may already be errored/closed
}
}
}, interval);
};

controller.enqueue(encodePart(dataChunkJSON(chunkIndex)));
chunkIndex++;
} catch (error) {
window.clearInterval(intervalId);
if (splitSession) {
const mid = Math.floor(sessionPart.byteLength / 2);
controller.enqueue(sessionPart.subarray(0, mid));
splitTimeoutId = setTimeout(() => {
try {
controller.error(
error instanceof Error ? error : new Error(String(error)),
);
controller.enqueue(sessionPart.subarray(mid));
} catch {
// stream may already be errored/closed
return;
}
}
}, interval);
startDataChunks();
}, 100);
} else {
controller.enqueue(sessionPart);
startDataChunks();
}

if (signal) {
const onAbort = () => {
window.clearInterval(intervalId);
clearTimeout(splitTimeoutId);
try {
controller.error(
new DOMException(
Expand All @@ -195,6 +228,7 @@ export async function setupMockStreamingFetch(
},
cancel() {
window.clearInterval(intervalId);
clearTimeout(splitTimeoutId);
},
});

Expand All @@ -206,7 +240,7 @@ export async function setupMockStreamingFetch(
});
}
},
{chunkIntervalMs, totalChunks, errorAfterChunks},
{chunkIntervalMs, totalChunks, errorAfterChunks, splitSessionPart},
);
}

Expand Down
Loading