Skip to content

Commit 2eec122

Browse files
committed
wip: still fixing the problem
* Related #14 [ci skip]
1 parent 35bdbe4 commit 2eec122

File tree

3 files changed

+114
-30
lines changed

3 files changed

+114
-30
lines changed

src/QUICConnection.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ class QUICConnection extends EventTarget {
4646
protected codeToReason: StreamCodeToReason;
4747
protected maxReadableStreamBytes: number | undefined;
4848
protected maxWritableStreamBytes: number | undefined;
49-
protected destroyingMap: Map<StreamId, QUICStream> = new Map();
5049

5150
// This basically allows one to await this promise
5251
// once resolved, always resolved...
@@ -471,7 +470,6 @@ class QUICConnection extends EventTarget {
471470
quicStream = await QUICStream.createQUICStream({
472471
streamId,
473472
connection: this,
474-
destroyingMap: this.destroyingMap,
475473
codeToReason: this.codeToReason,
476474
reasonToCode: this.reasonToCode,
477475
maxReadableStreamBytes: this.maxReadableStreamBytes,
@@ -482,6 +480,7 @@ class QUICConnection extends EventTarget {
482480
new events.QUICConnectionStreamEvent({ detail: quicStream }),
483481
);
484482
}
483+
this.logger.info('processing read');
485484
quicStream.read();
486485
quicStream.dispatchEvent(new events.QUICStreamReadableEvent());
487486
}
@@ -494,7 +493,6 @@ class QUICConnection extends EventTarget {
494493
connection: this,
495494
codeToReason: this.codeToReason,
496495
reasonToCode: this.reasonToCode,
497-
destroyingMap: this.destroyingMap,
498496
maxReadableStreamBytes: this.maxReadableStreamBytes,
499497
logger: this.logger.getChild(`${QUICStream.name} ${streamId}`),
500498
});
@@ -503,15 +501,15 @@ class QUICConnection extends EventTarget {
503501
);
504502
}
505503
quicStream.dispatchEvent(new events.QUICStreamWritableEvent());
504+
this.logger.info('processing write');
506505
quicStream.write();
507506
}
508-
// Checking shortlist if streams have finished.
509-
for (const [streamId, stream] of this.destroyingMap) {
510-
if (stream.isFinished()) {
511-
// If it has finished, it will trigger its own clean up.
512-
// Remove the stream from the shortlist.
513-
this.destroyingMap.delete(streamId);
514-
}
507+
for (const [streamId, quicStream] of this.streamMap) {
508+
// Checking if state has changed, we need to check all streams here for two reasons
509+
// 1. Reading can end, but we won't know since it won't be listed as readable.
510+
// 2. Writing can end, but we won't know unless we check here or attempt a write.
511+
if (quicStream[status] === 'destroying') quicStream.isRecvFinished();
512+
quicStream.isSendFinished();
515513
}
516514
}
517515
} finally {
@@ -632,6 +630,13 @@ class QUICConnection extends EventTarget {
632630
this.dispatchEvent(new events.QUICConnectionSendEvent());
633631
}
634632
} finally {
633+
for (const [, quicStream] of this.streamMap) {
634+
// Checking if state has changed, we need to check all streams here for two reasons
635+
// 1. Reading can end, but we won't know since it won't be listed as readable.
636+
// 2. Writing can end, but we won't know unless we check here or attempt a write.
637+
if (quicStream[status] === 'destroying') quicStream.isRecvFinished();
638+
quicStream.isSendFinished();
639+
}
635640
this.logger.debug('SEND FINALLY');
636641
this.checkTimeout();
637642
if (
@@ -689,7 +694,6 @@ class QUICConnection extends EventTarget {
689694
connection: this,
690695
codeToReason: this.codeToReason,
691696
reasonToCode: this.reasonToCode,
692-
destroyingMap: this.destroyingMap,
693697
maxReadableStreamBytes: this.maxReadableStreamBytes,
694698
maxWritableStreamBytes: this.maxWritableStreamBytes,
695699
logger: this.logger.getChild(`${QUICStream.name} ${streamId!}`),

src/QUICStream.ts

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class QUICStream
5353
protected sendFinishedProm = utils.promise<void>();
5454
// This resolves when `streamRecv` results in a `StreamReset(u64)` or a fin flag indicating receiving has ended
5555
protected recvFinishedProm = utils.promise<void>();
56-
protected destroyingMap: Map<StreamId, QUICStream>;
5756

5857
/**
5958
* For `reasonToCode`, return 0 means "unknown reason"
@@ -66,7 +65,6 @@ class QUICStream
6665
public static async createQUICStream({
6766
streamId,
6867
connection,
69-
destroyingMap,
7068
reasonToCode = () => 0,
7169
codeToReason = (type, code) =>
7270
new Error(`${type.toString()} ${code.toString()}`),
@@ -76,7 +74,6 @@ class QUICStream
7674
}: {
7775
streamId: StreamId;
7876
connection: QUICConnection;
79-
destroyingMap: Map<StreamId, QUICStream>;
8077
reasonToCode?: StreamReasonToCode;
8178
codeToReason?: StreamCodeToReason;
8279
maxReadableStreamBytes?: number;
@@ -96,7 +93,6 @@ class QUICStream
9693
connection,
9794
reasonToCode,
9895
codeToReason,
99-
destroyingMap,
10096
maxReadableStreamBytes,
10197
maxWritableStreamBytes,
10298
logger,
@@ -111,7 +107,6 @@ class QUICStream
111107
connection,
112108
reasonToCode,
113109
codeToReason,
114-
destroyingMap,
115110
maxReadableStreamBytes,
116111
maxWritableStreamBytes,
117112
logger,
@@ -120,7 +115,6 @@ class QUICStream
120115
connection: QUICConnection;
121116
reasonToCode: StreamReasonToCode;
122117
codeToReason: StreamCodeToReason;
123-
destroyingMap: Map<StreamId, QUICStream>;
124118
maxReadableStreamBytes: number;
125119
maxWritableStreamBytes: number;
126120
logger: Logger;
@@ -133,7 +127,13 @@ class QUICStream
133127
this.streamMap = connection.streamMap;
134128
this.reasonToCode = reasonToCode;
135129
this.codeToReason = codeToReason;
136-
this.destroyingMap = destroyingMap;
130+
131+
this.sendFinishedProm.p.finally(() =>
132+
logger.info('sendFinishedProm resolved'),
133+
);
134+
this.recvFinishedProm.p.finally(() =>
135+
logger.info('recvFinishedProm resolved'),
136+
);
137137

138138
this.readable = new ReadableStream(
139139
{
@@ -238,16 +238,12 @@ class QUICStream
238238
await this.closeSend(true, e);
239239
}
240240
await this.connection.send();
241-
// Await this.streamSend(new Uint8Array(0), true).catch(e => console.error(e));
242241
this.logger.debug('waiting for underlying streams to finish');
243-
this.destroyingMap.set(this.streamId, this);
244242
this.isFinished();
243+
// We need to wait for the connection to finish before fully destroying
245244
await Promise.all([this.sendFinishedProm.p, this.recvFinishedProm.p]);
246245
this.logger.debug('done waiting for underlying streams to finish');
247246
this.streamMap.delete(this.streamId);
248-
// Remove from the shortlist, just in case
249-
this.destroyingMap.delete(this.streamId);
250-
// We need to wait for the connection to finish before fully destroying
251247
this.dispatchEvent(new events.QUICStreamDestroyEvent());
252248
this.logger.info(`Destroyed ${this.constructor.name}`);
253249
}
@@ -370,7 +366,7 @@ class QUICStream
370366
// This should never be reported... (this branch should be dead code)
371367
return;
372368
} else {
373-
this.logger.debug('Stream reported: error');
369+
this.logger.info('Stream reported: error');
374370
this.logger.error(`Stream reported: error ${e.message}`);
375371
// Signal receiving has ended
376372
this.recvFinishedProm.resolveP();
@@ -396,7 +392,7 @@ class QUICStream
396392
// If fin is true, then that means, the stream is CLOSED
397393
if (fin) {
398394
// This will render `stream.cancel` a noop
399-
this.logger.debug('Stream reported: fin');
395+
this.logger.info('Stream reported: fin');
400396
if (!this._recvClosed) this.readableController.close();
401397
await this.closeRecv();
402398
// Signal receiving has ended
@@ -484,7 +480,7 @@ class QUICStream
484480
): Promise<void> {
485481
// Further closes are NOPs
486482
if (this._recvClosed) return;
487-
this.logger.debug(`Close Recv`);
483+
this.logger.info(`Close Recv`);
488484
// Indicate that the receiving side is closed
489485
this._recvClosed = true;
490486
const code = isError ? await this.reasonToCode('send', reason) : 0;
@@ -518,7 +514,7 @@ class QUICStream
518514
): Promise<void> {
519515
// Further closes are NOPs
520516
if (this._sendClosed) return;
521-
this.logger.debug(`Close Send`);
517+
this.logger.info(`Close Send`);
522518
// Indicate that the sending side is closed
523519
this._sendClosed = true;
524520
// If the QUIC stream is already closed
@@ -551,14 +547,18 @@ class QUICStream
551547
e: Error,
552548
type: 'recv' | 'send',
553549
): Promise<any | null> {
554-
const match =
550+
let match =
555551
e.message.match(/StreamStopped\((.+)\)/) ??
556-
e.message.match(/InvalidStreamState\((.+)\)/) ??
557552
e.message.match(/StreamReset\((.+)\)/);
558553
if (match != null) {
559554
const code = parseInt(match[1]);
560555
return await this.codeToReason(type, code);
561556
}
557+
match = e.message.match(/InvalidStreamState\((.+)\)/);
558+
if (match != null) {
559+
// `InvalidStreamState()` returns the stream Id and not any actual error code
560+
return await this.codeToReason(type, 0);
561+
}
562562
return null;
563563
}
564564
}

tests/concurrency.test.ts

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type { Crypto, Host, Port } from '@';
44
import type { TlsConfig } from '@/config';
55
import { fc, testProp } from '@fast-check/jest';
66
import Logger, { formatting, LogLevel, StreamHandler } from '@matrixai/logger';
7+
import { status } from '@matrixai/async-init/dist/CreateDestroy';
78
import QUICServer from '@/QUICServer';
89
import { promise } from '@/utils';
910
import QUICClient from '@/QUICClient';
@@ -24,6 +25,69 @@ describe('Concurrency tests', () => {
2425
ops: Crypto;
2526
};
2627

28+
// Tracking resources
29+
let sockets: Array<QUICSocket>;
30+
31+
const clearResourcesList = () => {
32+
sockets = [];
33+
};
34+
35+
const listResources = () => {
36+
logger.info(`RESOURCE REPORT`);
37+
for (const [socketNum, socket] of sockets.entries()) {
38+
logger.info(`Socket-${socketNum} status ${socket[status]}`);
39+
const serverConnections = socket.connectionMap.serverConnections;
40+
logger.info(`severConnections ${serverConnections.size}`);
41+
for (const [connId, connection] of serverConnections) {
42+
logger.info(`connectionId-${connId}`);
43+
const serverStreams = connection.streamMap;
44+
for (const [streamId, stream] of serverStreams) {
45+
logger.info(
46+
`streamId-${streamId} ${stream.recvClosed},${stream.sendClosed}`,
47+
);
48+
}
49+
}
50+
51+
const clientConnections = socket.connectionMap.clientConnections;
52+
logger.info(`clientConnections ${clientConnections.size}`);
53+
for (const [connId, connection] of clientConnections) {
54+
logger.info(`connectionId-${connId}`);
55+
const serverStreams = connection.streamMap;
56+
for (const [streamId, stream] of serverStreams) {
57+
logger.info(
58+
`streamId-${streamId} ${stream.recvClosed},${stream.sendClosed}`,
59+
);
60+
}
61+
}
62+
}
63+
};
64+
65+
const destroyResources = async () => {
66+
const socketDestroyProms: Array<Promise<void>> = [];
67+
for (const [, socket] of sockets.entries()) {
68+
const connectionDestroyProms: Array<Promise<void>> = [];
69+
for (const [, connection] of socket.connectionMap) {
70+
const streamDestroyProms: Array<Promise<void>> = [];
71+
for (const [, stream] of connection.streamMap) {
72+
streamDestroyProms.push(stream.destroy());
73+
}
74+
connectionDestroyProms.push(
75+
(async () => {
76+
await Promise.allSettled(streamDestroyProms);
77+
await connection.destroy({ force: true });
78+
})(),
79+
);
80+
}
81+
socketDestroyProms.push(
82+
(async () => {
83+
await Promise.allSettled(connectionDestroyProms);
84+
await socket.stop();
85+
})(),
86+
);
87+
}
88+
return await Promise.allSettled(socketDestroyProms);
89+
};
90+
2791
beforeEach(async () => {
2892
crypto = {
2993
key: await testsUtils.generateKey(),
@@ -33,6 +97,13 @@ describe('Concurrency tests', () => {
3397
randomBytes: testsUtils.randomBytes,
3498
},
3599
};
100+
clearResourcesList();
101+
});
102+
103+
afterEach(async () => {
104+
console.log('AFTER EACH');
105+
listResources();
106+
await destroyResources();
36107
});
37108

38109
/**
@@ -455,7 +526,6 @@ describe('Concurrency tests', () => {
455526
);
456527
}
457528
};
458-
jest.setTimeout(100000);
459529
testProp.only(
460530
'Multiple clients sharing a socket with a server',
461531
[
@@ -474,6 +544,11 @@ describe('Concurrency tests', () => {
474544
serverStreams1,
475545
serverStreams2,
476546
) => {
547+
const clientsInfosA = clientDatas1.map((v) => v.streams.length);
548+
const clientsInfosB = clientDatas2.map((v) => v.streams.length);
549+
logger.info(`clientsA: ${clientsInfosA}`);
550+
logger.info(`clientsB: ${clientsInfosB}`);
551+
477552
const tlsConfig1 = await tlsConfigProm1;
478553
const tlsConfig2 = await tlsConfigProm2;
479554
const cleanUpHoldProm = promise<void>();
@@ -486,6 +561,8 @@ describe('Concurrency tests', () => {
486561
crypto,
487562
logger: logger.getChild('socket'),
488563
});
564+
sockets.push(socket1);
565+
sockets.push(socket2);
489566
await socket1.start({
490567
host: '127.0.0.1' as Host,
491568
});
@@ -556,10 +633,12 @@ describe('Concurrency tests', () => {
556633
logger.info('STARTING TEST');
557634
try {
558635
await (async () => {
636+
console.log('waiting for client proms');
559637
await Promise.all([
560638
Promise.all(clientProms1),
561639
Promise.all(clientProms2),
562640
]);
641+
console.log('DONE waiting for client proms');
563642
// Allow for streams to be negotiated
564643
await sleep(200);
565644
cleanUpHoldProm.resolveP();
@@ -583,6 +662,7 @@ describe('Concurrency tests', () => {
583662
)}`,
584663
);
585664
}
665+
logger.info('CLOSING SOCKETS');
586666
await socket1.stop();
587667
await socket2.stop();
588668
logger.info('TEST FULLY DONE!');

0 commit comments

Comments
 (0)