From 6450334f5bbb059b55005cebfef57fa7969b625e Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Tue, 10 Oct 2023 18:42:31 +0900 Subject: feat(ext/web): cancel support for TransformStream (#20815) --- ext/web/06_streams.js | 121 ++++++++++++++++++++++++++++++++++++++++------ ext/web/lib.deno_web.d.ts | 1 + 2 files changed, 106 insertions(+), 16 deletions(-) (limited to 'ext/web') diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 9c6191fce..38581ac79 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -317,6 +317,7 @@ const _controller = Symbol("[[controller]]"); const _detached = Symbol("[[Detached]]"); const _disturbed = Symbol("[[disturbed]]"); const _errorSteps = Symbol("[[ErrorSteps]]"); +const _finishPromise = Symbol("[[finishPromise]]"); const _flushAlgorithm = Symbol("[[flushAlgorithm]]"); const _globalObject = Symbol("[[globalObject]]"); const _highWaterMark = Symbol("[[highWaterMark]]"); @@ -609,8 +610,7 @@ function initializeTransformStream( } function cancelAlgorithm(reason) { - transformStreamErrorWritableAndUnblockWrite(stream, reason); - return resolvePromiseWith(undefined); + return transformStreamDefaultSourceCancelAlgorithm(stream, reason); } stream[_readable] = createReadableStream( @@ -3690,12 +3690,14 @@ function setUpReadableStreamDefaultReader(reader, stream) { * @param {TransformStreamDefaultController} controller * @param {(chunk: O, controller: TransformStreamDefaultController) => Promise} transformAlgorithm * @param {(controller: TransformStreamDefaultController) => Promise} flushAlgorithm + * @param {(reason: any) => Promise} cancelAlgorithm */ function setUpTransformStreamDefaultController( stream, controller, transformAlgorithm, flushAlgorithm, + cancelAlgorithm, ) { assert(ObjectPrototypeIsPrototypeOf(TransformStreamPrototype, stream)); assert(stream[_controller] === undefined); @@ -3703,6 +3705,7 @@ function setUpTransformStreamDefaultController( stream[_controller] = controller; controller[_transformAlgorithm] = transformAlgorithm; controller[_flushAlgorithm] = flushAlgorithm; + controller[_cancelAlgorithm] = cancelAlgorithm; } /** @@ -3730,6 +3733,8 @@ function setUpTransformStreamDefaultControllerFromTransformer( }; /** @type {(controller: TransformStreamDefaultController) => Promise} */ let flushAlgorithm = () => resolvePromiseWith(undefined); + /** @type {(reason: any) => Promise} */ + let cancelAlgorithm = () => resolvePromiseWith(undefined); if (transformerDict.transform !== undefined) { transformAlgorithm = (chunk, controller) => webidl.invokeCallbackFunction( @@ -3752,11 +3757,23 @@ function setUpTransformStreamDefaultControllerFromTransformer( true, ); } + if (transformerDict.cancel !== undefined) { + cancelAlgorithm = (reason) => + webidl.invokeCallbackFunction( + transformerDict.cancel, + [reason], + transformer, + webidl.converters["Promise"], + "Failed to call 'cancelAlgorithm' on 'TransformStreamDefaultController'", + true, + ); + } setUpTransformStreamDefaultController( stream, controller, transformAlgorithm, flushAlgorithm, + cancelAlgorithm, ); } @@ -3938,6 +3955,7 @@ function setUpWritableStreamDefaultWriter(writer, stream) { function transformStreamDefaultControllerClearAlgorithms(controller) { controller[_transformAlgorithm] = undefined; controller[_flushAlgorithm] = undefined; + controller[_cancelAlgorithm] = undefined; } /** @@ -4007,13 +4025,33 @@ function transformStreamDefaultControllerTerminate(controller) { } /** - * @param {TransformStream} stream + * @template I + * @template O + * @param {TransformStream} stream * @param {any=} reason * @returns {Promise} */ function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { - transformStreamError(stream, reason); - return resolvePromiseWith(undefined); + const controller = stream[_controller]; + if (controller[_finishPromise] !== undefined) { + return controller[_finishPromise].promise; + } + const readable = stream[_readable]; + controller[_finishPromise] = new Deferred(); + const cancelPromise = controller[_cancelAlgorithm](reason); + transformStreamDefaultControllerClearAlgorithms(controller); + transformPromiseWith(cancelPromise, () => { + if (readable[_state] === "errored") { + controller[_finishPromise].reject(readable[_storedError]); + } else { + readableStreamDefaultControllerError(readable[_controller], reason); + controller[_finishPromise].resolve(undefined); + } + }, (r) => { + readableStreamDefaultControllerError(readable[_controller], r); + controller[_finishPromise].reject(r); + }); + return controller[_finishPromise].promise; } /** @@ -4023,21 +4061,26 @@ function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { * @returns {Promise} */ function transformStreamDefaultSinkCloseAlgorithm(stream) { - const readable = stream[_readable]; const controller = stream[_controller]; + if (controller[_finishPromise] !== undefined) { + return controller[_finishPromise].promise; + } + const readable = stream[_readable]; + controller[_finishPromise] = new Deferred(); const flushPromise = controller[_flushAlgorithm](controller); transformStreamDefaultControllerClearAlgorithms(controller); - return transformPromiseWith(flushPromise, () => { + transformPromiseWith(flushPromise, () => { if (readable[_state] === "errored") { - throw readable[_storedError]; + controller[_finishPromise].reject(readable[_storedError]); + } else { + readableStreamDefaultControllerClose(readable[_controller]); + controller[_finishPromise].resolve(undefined); } - readableStreamDefaultControllerClose( - /** @type {ReadableStreamDefaultController} */ readable[_controller], - ); }, (r) => { - transformStreamError(stream, r); - throw readable[_storedError]; + readableStreamDefaultControllerError(readable[_controller], r); + controller[_finishPromise].reject(r); }); + return controller[_finishPromise].promise; } /** @@ -4069,6 +4112,41 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { return transformStreamDefaultControllerPerformTransform(controller, chunk); } +/** + * @template I + * @template O + * @param {TransformStream} stream + * @param {any=} reason + * @returns {Promise} + */ +function transformStreamDefaultSourceCancelAlgorithm(stream, reason) { + const controller = stream[_controller]; + if (controller[_finishPromise] !== undefined) { + return controller[_finishPromise].promise; + } + const writable = stream[_writable]; + controller[_finishPromise] = new Deferred(); + const cancelPromise = controller[_cancelAlgorithm](reason); + transformStreamDefaultControllerClearAlgorithms(controller); + transformPromiseWith(cancelPromise, () => { + if (writable[_state] === "errored") { + controller[_finishPromise].reject(writable[_storedError]); + } else { + writableStreamDefaultControllerErrorIfNeeded( + writable[_controller], + reason, + ); + transformStreamUnblockWrite(stream); + controller[_finishPromise].resolve(undefined); + } + }, (r) => { + writableStreamDefaultControllerErrorIfNeeded(writable[_controller], r); + transformStreamUnblockWrite(stream); + controller[_finishPromise].reject(r); + }); + return controller[_finishPromise].promise; +} + /** * @param {TransformStream} stream * @returns {Promise} @@ -4104,9 +4182,7 @@ function transformStreamErrorWritableAndUnblockWrite(stream, e) { stream[_writable][_controller], e, ); - if (stream[_backpressure] === true) { - transformStreamSetBackpressure(stream, false); - } + transformStreamUnblockWrite(stream); } /** @@ -4122,6 +4198,15 @@ function transformStreamSetBackpressure(stream, backpressure) { stream[_backpressure] = backpressure; } +/** + * @param {TransformStream} stream + */ +function transformStreamUnblockWrite(stream) { + if (stream[_backpressure] === true) { + transformStreamSetBackpressure(stream, false); + } +} + /** * @param {WritableStream} stream * @param {any=} reason @@ -6007,6 +6092,10 @@ const TransformStreamPrototype = TransformStream.prototype; /** @template O */ class TransformStreamDefaultController { + /** @type {(reason: any) => Promise} */ + [_cancelAlgorithm]; + /** @type {Promise | undefined} */ + [_finishPromise]; /** @type {(controller: this) => Promise} */ [_flushAlgorithm]; /** @type {TransformStream} */ diff --git a/ext/web/lib.deno_web.d.ts b/ext/web/lib.deno_web.d.ts index 331c9536b..bcc0c12d8 100644 --- a/ext/web/lib.deno_web.d.ts +++ b/ext/web/lib.deno_web.d.ts @@ -933,6 +933,7 @@ declare interface Transformer { readableType?: undefined; start?: TransformStreamDefaultControllerCallback; transform?: TransformStreamDefaultControllerTransformCallback; + cancel?: (reason: any) => Promise; writableType?: undefined; } -- cgit v1.2.3