summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2023-10-10 18:42:31 +0900
committerGitHub <noreply@github.com>2023-10-10 18:42:31 +0900
commit6450334f5bbb059b55005cebfef57fa7969b625e (patch)
treee8ebce7f641a9f19cb297a27b5deb1446155efb0 /ext/web/06_streams.js
parent6bbccb72d7153f95688b63391672f2e2b979eeed (diff)
feat(ext/web): cancel support for TransformStream (#20815)
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js121
1 files changed, 105 insertions, 16 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 9c6191fce..38581ac79 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -317,6 +317,7 @@ const _controller = Symbol("[[controller]]");
const _detached = Symbol("[[Detached]]");
const _disturbed = Symbol("[[disturbed]]");
const _errorSteps = Symbol("[[ErrorSteps]]");
+const _finishPromise = Symbol("[[finishPromise]]");
const _flushAlgorithm = Symbol("[[flushAlgorithm]]");
const _globalObject = Symbol("[[globalObject]]");
const _highWaterMark = Symbol("[[highWaterMark]]");
@@ -609,8 +610,7 @@ function initializeTransformStream(
}
function cancelAlgorithm(reason) {
- transformStreamErrorWritableAndUnblockWrite(stream, reason);
- return resolvePromiseWith(undefined);
+ return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
}
stream[_readable] = createReadableStream(
@@ -3690,12 +3690,14 @@ function setUpReadableStreamDefaultReader(reader, stream) {
* @param {TransformStreamDefaultController<O>} controller
* @param {(chunk: O, controller: TransformStreamDefaultController<O>) => Promise<void>} transformAlgorithm
* @param {(controller: TransformStreamDefaultController<O>) => Promise<void>} flushAlgorithm
+ * @param {(reason: any) => Promise<void>} cancelAlgorithm
*/
function setUpTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm,
+ cancelAlgorithm,
) {
assert(ObjectPrototypeIsPrototypeOf(TransformStreamPrototype, stream));
assert(stream[_controller] === undefined);
@@ -3703,6 +3705,7 @@ function setUpTransformStreamDefaultController(
stream[_controller] = controller;
controller[_transformAlgorithm] = transformAlgorithm;
controller[_flushAlgorithm] = flushAlgorithm;
+ controller[_cancelAlgorithm] = cancelAlgorithm;
}
/**
@@ -3730,6 +3733,8 @@ function setUpTransformStreamDefaultControllerFromTransformer(
};
/** @type {(controller: TransformStreamDefaultController<O>) => Promise<void>} */
let flushAlgorithm = () => resolvePromiseWith(undefined);
+ /** @type {(reason: any) => Promise<void>} */
+ let cancelAlgorithm = () => resolvePromiseWith(undefined);
if (transformerDict.transform !== undefined) {
transformAlgorithm = (chunk, controller) =>
webidl.invokeCallbackFunction(
@@ -3752,11 +3757,23 @@ function setUpTransformStreamDefaultControllerFromTransformer(
true,
);
}
+ if (transformerDict.cancel !== undefined) {
+ cancelAlgorithm = (reason) =>
+ webidl.invokeCallbackFunction(
+ transformerDict.cancel,
+ [reason],
+ transformer,
+ webidl.converters["Promise<undefined>"],
+ "Failed to call 'cancelAlgorithm' on 'TransformStreamDefaultController'",
+ true,
+ );
+ }
setUpTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm,
+ cancelAlgorithm,
);
}
@@ -3938,6 +3955,7 @@ function setUpWritableStreamDefaultWriter(writer, stream) {
function transformStreamDefaultControllerClearAlgorithms(controller) {
controller[_transformAlgorithm] = undefined;
controller[_flushAlgorithm] = undefined;
+ controller[_cancelAlgorithm] = undefined;
}
/**
@@ -4007,13 +4025,33 @@ function transformStreamDefaultControllerTerminate(controller) {
}
/**
- * @param {TransformStream} stream
+ * @template I
+ * @template O
+ * @param {TransformStream<I, O>} stream
* @param {any=} reason
* @returns {Promise<void>}
*/
function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
- transformStreamError(stream, reason);
- return resolvePromiseWith(undefined);
+ const controller = stream[_controller];
+ if (controller[_finishPromise] !== undefined) {
+ return controller[_finishPromise].promise;
+ }
+ const readable = stream[_readable];
+ controller[_finishPromise] = new Deferred();
+ const cancelPromise = controller[_cancelAlgorithm](reason);
+ transformStreamDefaultControllerClearAlgorithms(controller);
+ transformPromiseWith(cancelPromise, () => {
+ if (readable[_state] === "errored") {
+ controller[_finishPromise].reject(readable[_storedError]);
+ } else {
+ readableStreamDefaultControllerError(readable[_controller], reason);
+ controller[_finishPromise].resolve(undefined);
+ }
+ }, (r) => {
+ readableStreamDefaultControllerError(readable[_controller], r);
+ controller[_finishPromise].reject(r);
+ });
+ return controller[_finishPromise].promise;
}
/**
@@ -4023,21 +4061,26 @@ function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
* @returns {Promise<void>}
*/
function transformStreamDefaultSinkCloseAlgorithm(stream) {
- const readable = stream[_readable];
const controller = stream[_controller];
+ if (controller[_finishPromise] !== undefined) {
+ return controller[_finishPromise].promise;
+ }
+ const readable = stream[_readable];
+ controller[_finishPromise] = new Deferred();
const flushPromise = controller[_flushAlgorithm](controller);
transformStreamDefaultControllerClearAlgorithms(controller);
- return transformPromiseWith(flushPromise, () => {
+ transformPromiseWith(flushPromise, () => {
if (readable[_state] === "errored") {
- throw readable[_storedError];
+ controller[_finishPromise].reject(readable[_storedError]);
+ } else {
+ readableStreamDefaultControllerClose(readable[_controller]);
+ controller[_finishPromise].resolve(undefined);
}
- readableStreamDefaultControllerClose(
- /** @type {ReadableStreamDefaultController} */ readable[_controller],
- );
}, (r) => {
- transformStreamError(stream, r);
- throw readable[_storedError];
+ readableStreamDefaultControllerError(readable[_controller], r);
+ controller[_finishPromise].reject(r);
});
+ return controller[_finishPromise].promise;
}
/**
@@ -4070,6 +4113,41 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
}
/**
+ * @template I
+ * @template O
+ * @param {TransformStream<I, O>} stream
+ * @param {any=} reason
+ * @returns {Promise<void>}
+ */
+function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
+ const controller = stream[_controller];
+ if (controller[_finishPromise] !== undefined) {
+ return controller[_finishPromise].promise;
+ }
+ const writable = stream[_writable];
+ controller[_finishPromise] = new Deferred();
+ const cancelPromise = controller[_cancelAlgorithm](reason);
+ transformStreamDefaultControllerClearAlgorithms(controller);
+ transformPromiseWith(cancelPromise, () => {
+ if (writable[_state] === "errored") {
+ controller[_finishPromise].reject(writable[_storedError]);
+ } else {
+ writableStreamDefaultControllerErrorIfNeeded(
+ writable[_controller],
+ reason,
+ );
+ transformStreamUnblockWrite(stream);
+ controller[_finishPromise].resolve(undefined);
+ }
+ }, (r) => {
+ writableStreamDefaultControllerErrorIfNeeded(writable[_controller], r);
+ transformStreamUnblockWrite(stream);
+ controller[_finishPromise].reject(r);
+ });
+ return controller[_finishPromise].promise;
+}
+
+/**
* @param {TransformStream} stream
* @returns {Promise<void>}
*/
@@ -4104,9 +4182,7 @@ function transformStreamErrorWritableAndUnblockWrite(stream, e) {
stream[_writable][_controller],
e,
);
- if (stream[_backpressure] === true) {
- transformStreamSetBackpressure(stream, false);
- }
+ transformStreamUnblockWrite(stream);
}
/**
@@ -4123,6 +4199,15 @@ function transformStreamSetBackpressure(stream, backpressure) {
}
/**
+ * @param {TransformStream} stream
+ */
+function transformStreamUnblockWrite(stream) {
+ if (stream[_backpressure] === true) {
+ transformStreamSetBackpressure(stream, false);
+ }
+}
+
+/**
* @param {WritableStream} stream
* @param {any=} reason
* @returns {Promise<void>}
@@ -6007,6 +6092,10 @@ const TransformStreamPrototype = TransformStream.prototype;
/** @template O */
class TransformStreamDefaultController {
+ /** @type {(reason: any) => Promise<void>} */
+ [_cancelAlgorithm];
+ /** @type {Promise<void> | undefined} */
+ [_finishPromise];
/** @type {(controller: this) => Promise<void>} */
[_flushAlgorithm];
/** @type {TransformStream<O>} */