Skip to content

Commit aac37a2

Browse files
committed
streams: implement TransformStream cleanup using "transformer.cancel"
Fixes: #49971
1 parent 8e814e3 commit aac37a2

File tree

1 file changed

+105
-10
lines changed

1 file changed

+105
-10
lines changed

lib/internal/webstreams/transformstream.js

Lines changed: 105 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const {
4747
nonOpFlush,
4848
kType,
4949
kState,
50+
nonOpCancel,
5051
} = require('internal/webstreams/util');
5152

5253
const {
@@ -384,8 +385,7 @@ function initializeTransformStream(
384385
return transformStreamDefaultSourcePullAlgorithm(stream);
385386
},
386387
cancel(reason) {
387-
transformStreamErrorWritableAndUnblockWrite(stream, reason);
388-
return PromiseResolve();
388+
return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
389389
},
390390
}, {
391391
highWaterMark: readableHighWaterMark,
@@ -427,6 +427,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error) {
427427
writableStreamDefaultControllerErrorIfNeeded(
428428
writable[kState].controller,
429429
error);
430+
transformStreamUnblockWrite(stream);
431+
}
432+
433+
function transformStreamUnblockWrite(stream) {
430434
if (stream[kState].backpressure)
431435
transformStreamSetBackpressure(stream, false);
432436
}
@@ -443,13 +447,15 @@ function setupTransformStreamDefaultController(
443447
stream,
444448
controller,
445449
transformAlgorithm,
446-
flushAlgorithm) {
450+
flushAlgorithm,
451+
cancelAlgorithm) {
447452
assert(isTransformStream(stream));
448453
assert(stream[kState].controller === undefined);
449454
controller[kState] = {
450455
stream,
451456
transformAlgorithm,
452457
flushAlgorithm,
458+
cancelAlgorithm,
453459
};
454460
stream[kState].controller = controller;
455461
}
@@ -460,21 +466,26 @@ function setupTransformStreamDefaultControllerFromTransformer(
460466
const controller = new TransformStreamDefaultController(kSkipThrow);
461467
const transform = transformer?.transform || defaultTransformAlgorithm;
462468
const flush = transformer?.flush || nonOpFlush;
469+
const cancel = transformer?.cancel || nonOpCancel;
463470
const transformAlgorithm =
464471
FunctionPrototypeBind(transform, transformer);
465472
const flushAlgorithm =
466473
FunctionPrototypeBind(flush, transformer);
474+
const cancelAlgorithm =
475+
FunctionPrototypeBind(cancel, transformer);
467476

468477
setupTransformStreamDefaultController(
469478
stream,
470479
controller,
471480
transformAlgorithm,
472-
flushAlgorithm);
481+
flushAlgorithm,
482+
cancelAlgorithm);
473483
}
474484

475485
function transformStreamDefaultControllerClearAlgorithms(controller) {
476486
controller[kState].transformAlgorithm = undefined;
477487
controller[kState].flushAlgorithm = undefined;
488+
controller[kState].cancelAlgorithm = undefined;
478489
}
479490

480491
function transformStreamDefaultControllerEnqueue(controller, chunk) {
@@ -563,7 +574,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
563574
}
564575

565576
async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
566-
transformStreamError(stream, reason);
577+
const {
578+
controller,
579+
readable,
580+
} = stream[kState];
581+
582+
if (controller[kState].finishPromise !== undefined) {
583+
return controller[kState].finishPromise
584+
}
585+
586+
const { promise, resolve, reject } = createDeferredPromise();
587+
controller[kState].finishPromise = promise;
588+
const cancelPromise = ensureIsPromise(
589+
controller[kState].cancelAlgorithm,
590+
controller,
591+
reason);
592+
transformStreamDefaultControllerClearAlgorithms(controller);
593+
594+
PromisePrototypeThen(
595+
cancelPromise,
596+
() => {
597+
if (readable[kState].state === 'errored')
598+
reject(readable[kState].storedError);
599+
else {
600+
readableStreamDefaultControllerError(readable[kState].controller, reason);
601+
resolve();
602+
}
603+
},
604+
(error) => {
605+
readableStreamDefaultControllerError(readable[kState].controller, error);
606+
reject(error);
607+
}
608+
);
609+
610+
return controller[kState].finishPromise;
567611
}
568612

569613
function transformStreamDefaultSinkCloseAlgorithm(stream) {
@@ -572,23 +616,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {
572616
controller,
573617
} = stream[kState];
574618

619+
if (controller[kState].finishPromise !== undefined) {
620+
return controller[kState].finishPromise
621+
}
622+
const { promise, resolve, reject } = createDeferredPromise();
623+
controller[kState].finishPromise = promise;
575624
const flushPromise =
576625
ensureIsPromise(
577626
controller[kState].flushAlgorithm,
578627
controller,
579628
controller);
580629
transformStreamDefaultControllerClearAlgorithms(controller);
581-
return PromisePrototypeThen(
630+
PromisePrototypeThen(
582631
flushPromise,
583632
() => {
584633
if (readable[kState].state === 'errored')
585-
throw readable[kState].storedError;
586-
readableStreamDefaultControllerClose(readable[kState].controller);
634+
reject(readable[kState].storedError);
635+
else {
636+
readableStreamDefaultControllerClose(readable[kState].controller);
637+
resolve();
638+
}
587639
},
588640
(error) => {
589-
transformStreamError(stream, error);
590-
throw readable[kState].storedError;
641+
readableStreamDefaultControllerError(readable[kState].controller, error);
642+
reject(error);
591643
});
644+
return controller[kState].finishPromise;
592645
}
593646

594647
function transformStreamDefaultSourcePullAlgorithm(stream) {
@@ -598,6 +651,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream) {
598651
return stream[kState].backpressureChange.promise;
599652
}
600653

654+
function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
655+
const {
656+
controller,
657+
writable,
658+
} = stream[kState];
659+
660+
if (controller[kState].finishPromise !== undefined) {
661+
return controller[kState].finishPromise;
662+
}
663+
664+
const { promise, resolve, reject } = createDeferredPromise();
665+
controller[kState].finishPromise = promise;
666+
const cancelPromise = ensureIsPromise(
667+
controller[kState].cancelAlgorithm,
668+
controller,
669+
reason);
670+
transformStreamDefaultControllerClearAlgorithms(controller);
671+
672+
PromisePrototypeThen(cancelPromise,
673+
() => {
674+
if (writable[kState].state === 'errored')
675+
reject(writable[kState].storedError);
676+
else {
677+
writableStreamDefaultControllerErrorIfNeeded(
678+
writable[kState].controller,
679+
reason);
680+
transformStreamUnblockWrite(stream);
681+
resolve();
682+
}
683+
},
684+
(error) => {
685+
writableStreamDefaultControllerErrorIfNeeded(
686+
writable[kState].controller,
687+
error);
688+
transformStreamUnblockWrite(stream);
689+
reject(error);
690+
}
691+
);
692+
693+
return controller[kState].finishPromise
694+
}
695+
601696
module.exports = {
602697
TransformStream,
603698
TransformStreamDefaultController,

0 commit comments

Comments
 (0)