diff options
author | Luca Casonato <hello@lcas.dev> | 2023-10-10 18:42:31 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-10 18:42:31 +0900 |
commit | 6450334f5bbb059b55005cebfef57fa7969b625e (patch) | |
tree | e8ebce7f641a9f19cb297a27b5deb1446155efb0 /ext/web/06_streams.js | |
parent | 6bbccb72d7153f95688b63391672f2e2b979eeed (diff) |
feat(ext/web): cancel support for TransformStream (#20815)
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r-- | ext/web/06_streams.js | 121 |
1 files changed, 105 insertions, 16 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 9c6191fce..38581ac79 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -317,6 +317,7 @@ const _controller = Symbol("[[controller]]"); const _detached = Symbol("[[Detached]]"); const _disturbed = Symbol("[[disturbed]]"); const _errorSteps = Symbol("[[ErrorSteps]]"); +const _finishPromise = Symbol("[[finishPromise]]"); const _flushAlgorithm = Symbol("[[flushAlgorithm]]"); const _globalObject = Symbol("[[globalObject]]"); const _highWaterMark = Symbol("[[highWaterMark]]"); @@ -609,8 +610,7 @@ function initializeTransformStream( } function cancelAlgorithm(reason) { - transformStreamErrorWritableAndUnblockWrite(stream, reason); - return resolvePromiseWith(undefined); + return transformStreamDefaultSourceCancelAlgorithm(stream, reason); } stream[_readable] = createReadableStream( @@ -3690,12 +3690,14 @@ function setUpReadableStreamDefaultReader(reader, stream) { * @param {TransformStreamDefaultController<O>} controller * @param {(chunk: O, controller: TransformStreamDefaultController<O>) => Promise<void>} transformAlgorithm * @param {(controller: TransformStreamDefaultController<O>) => Promise<void>} flushAlgorithm + * @param {(reason: any) => Promise<void>} cancelAlgorithm */ function setUpTransformStreamDefaultController( stream, controller, transformAlgorithm, flushAlgorithm, + cancelAlgorithm, ) { assert(ObjectPrototypeIsPrototypeOf(TransformStreamPrototype, stream)); assert(stream[_controller] === undefined); @@ -3703,6 +3705,7 @@ function setUpTransformStreamDefaultController( stream[_controller] = controller; controller[_transformAlgorithm] = transformAlgorithm; controller[_flushAlgorithm] = flushAlgorithm; + controller[_cancelAlgorithm] = cancelAlgorithm; } /** @@ -3730,6 +3733,8 @@ function setUpTransformStreamDefaultControllerFromTransformer( }; /** @type {(controller: TransformStreamDefaultController<O>) => Promise<void>} */ let flushAlgorithm = () => resolvePromiseWith(undefined); + /** @type {(reason: any) => Promise<void>} */ + let cancelAlgorithm = () => resolvePromiseWith(undefined); if (transformerDict.transform !== undefined) { transformAlgorithm = (chunk, controller) => webidl.invokeCallbackFunction( @@ -3752,11 +3757,23 @@ function setUpTransformStreamDefaultControllerFromTransformer( true, ); } + if (transformerDict.cancel !== undefined) { + cancelAlgorithm = (reason) => + webidl.invokeCallbackFunction( + transformerDict.cancel, + [reason], + transformer, + webidl.converters["Promise<undefined>"], + "Failed to call 'cancelAlgorithm' on 'TransformStreamDefaultController'", + true, + ); + } setUpTransformStreamDefaultController( stream, controller, transformAlgorithm, flushAlgorithm, + cancelAlgorithm, ); } @@ -3938,6 +3955,7 @@ function setUpWritableStreamDefaultWriter(writer, stream) { function transformStreamDefaultControllerClearAlgorithms(controller) { controller[_transformAlgorithm] = undefined; controller[_flushAlgorithm] = undefined; + controller[_cancelAlgorithm] = undefined; } /** @@ -4007,13 +4025,33 @@ function transformStreamDefaultControllerTerminate(controller) { } /** - * @param {TransformStream} stream + * @template I + * @template O + * @param {TransformStream<I, O>} stream * @param {any=} reason * @returns {Promise<void>} */ function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { - transformStreamError(stream, reason); - return resolvePromiseWith(undefined); + const controller = stream[_controller]; + if (controller[_finishPromise] !== undefined) { + return controller[_finishPromise].promise; + } + const readable = stream[_readable]; + controller[_finishPromise] = new Deferred(); + const cancelPromise = controller[_cancelAlgorithm](reason); + transformStreamDefaultControllerClearAlgorithms(controller); + transformPromiseWith(cancelPromise, () => { + if (readable[_state] === "errored") { + controller[_finishPromise].reject(readable[_storedError]); + } else { + readableStreamDefaultControllerError(readable[_controller], reason); + controller[_finishPromise].resolve(undefined); + } + }, (r) => { + readableStreamDefaultControllerError(readable[_controller], r); + controller[_finishPromise].reject(r); + }); + return controller[_finishPromise].promise; } /** @@ -4023,21 +4061,26 @@ function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { * @returns {Promise<void>} */ function transformStreamDefaultSinkCloseAlgorithm(stream) { - const readable = stream[_readable]; const controller = stream[_controller]; + if (controller[_finishPromise] !== undefined) { + return controller[_finishPromise].promise; + } + const readable = stream[_readable]; + controller[_finishPromise] = new Deferred(); const flushPromise = controller[_flushAlgorithm](controller); transformStreamDefaultControllerClearAlgorithms(controller); - return transformPromiseWith(flushPromise, () => { + transformPromiseWith(flushPromise, () => { if (readable[_state] === "errored") { - throw readable[_storedError]; + controller[_finishPromise].reject(readable[_storedError]); + } else { + readableStreamDefaultControllerClose(readable[_controller]); + controller[_finishPromise].resolve(undefined); } - readableStreamDefaultControllerClose( - /** @type {ReadableStreamDefaultController} */ readable[_controller], - ); }, (r) => { - transformStreamError(stream, r); - throw readable[_storedError]; + readableStreamDefaultControllerError(readable[_controller], r); + controller[_finishPromise].reject(r); }); + return controller[_finishPromise].promise; } /** @@ -4070,6 +4113,41 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { } /** + * @template I + * @template O + * @param {TransformStream<I, O>} stream + * @param {any=} reason + * @returns {Promise<void>} + */ +function transformStreamDefaultSourceCancelAlgorithm(stream, reason) { + const controller = stream[_controller]; + if (controller[_finishPromise] !== undefined) { + return controller[_finishPromise].promise; + } + const writable = stream[_writable]; + controller[_finishPromise] = new Deferred(); + const cancelPromise = controller[_cancelAlgorithm](reason); + transformStreamDefaultControllerClearAlgorithms(controller); + transformPromiseWith(cancelPromise, () => { + if (writable[_state] === "errored") { + controller[_finishPromise].reject(writable[_storedError]); + } else { + writableStreamDefaultControllerErrorIfNeeded( + writable[_controller], + reason, + ); + transformStreamUnblockWrite(stream); + controller[_finishPromise].resolve(undefined); + } + }, (r) => { + writableStreamDefaultControllerErrorIfNeeded(writable[_controller], r); + transformStreamUnblockWrite(stream); + controller[_finishPromise].reject(r); + }); + return controller[_finishPromise].promise; +} + +/** * @param {TransformStream} stream * @returns {Promise<void>} */ @@ -4104,9 +4182,7 @@ function transformStreamErrorWritableAndUnblockWrite(stream, e) { stream[_writable][_controller], e, ); - if (stream[_backpressure] === true) { - transformStreamSetBackpressure(stream, false); - } + transformStreamUnblockWrite(stream); } /** @@ -4123,6 +4199,15 @@ function transformStreamSetBackpressure(stream, backpressure) { } /** + * @param {TransformStream} stream + */ +function transformStreamUnblockWrite(stream) { + if (stream[_backpressure] === true) { + transformStreamSetBackpressure(stream, false); + } +} + +/** * @param {WritableStream} stream * @param {any=} reason * @returns {Promise<void>} @@ -6007,6 +6092,10 @@ const TransformStreamPrototype = TransformStream.prototype; /** @template O */ class TransformStreamDefaultController { + /** @type {(reason: any) => Promise<void>} */ + [_cancelAlgorithm]; + /** @type {Promise<void> | undefined} */ + [_finishPromise]; /** @type {(controller: this) => Promise<void>} */ [_flushAlgorithm]; /** @type {TransformStream<O>} */ |