summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2021-07-05 12:18:41 +0200
committerGitHub <noreply@github.com>2021-07-05 12:18:41 +0200
commit3ee0c36453a2591139b7e35882e04c1e706e9253 (patch)
treeddce3758ee3f7ab70103efc997ba9f6bde242bbe
parentc4cc353d594edae121fd1e8fcd5c85d4ae0d0988 (diff)
refactor: introduce primordials for web/streams (#11251)
-rw-r--r--core/00_primordials.js3
-rw-r--r--core/internal.d.ts1
-rw-r--r--extensions/web/06_streams.js191
3 files changed, 109 insertions, 86 deletions
diff --git a/core/00_primordials.js b/core/00_primordials.js
index 63b6730db..c6132620e 100644
--- a/core/00_primordials.js
+++ b/core/00_primordials.js
@@ -170,6 +170,9 @@
// Create copy of isNaN
primordials[isNaN.name] = isNaN;
+ // Create copy of queueMicrotask
+ primordials["queueMicrotask"] = queueMicrotask;
+
// Create copies of URI handling functions
[
decodeURI,
diff --git a/core/internal.d.ts b/core/internal.d.ts
index 8e52f4bf1..e582eb359 100644
--- a/core/internal.d.ts
+++ b/core/internal.d.ts
@@ -632,6 +632,7 @@ declare namespace __bootstrap {
export const ObjectPrototypeToLocaleString: UncurryThis<
typeof Object.prototype.toLocaleString
>;
+ export const queueMicrotask: typeof globalThis.queueMicrotask;
export const RangeError: typeof globalThis.RangeError;
export const RangeErrorLength: typeof RangeError.length;
export const RangeErrorName: typeof RangeError.name;
diff --git a/extensions/web/06_streams.js b/extensions/web/06_streams.js
index 48585e8be..388b7b13c 100644
--- a/extensions/web/06_streams.js
+++ b/extensions/web/06_streams.js
@@ -9,6 +9,33 @@
((window) => {
const webidl = window.__bootstrap.webidl;
+ // TODO(lucacasonato): get AbortSignal from __bootstrap.
+ const {
+ ArrayPrototypeMap,
+ ArrayPrototypePush,
+ ArrayPrototypeShift,
+ Error,
+ NumberIsInteger,
+ NumberIsNaN,
+ ObjectCreate,
+ ObjectDefineProperties,
+ ObjectDefineProperty,
+ ObjectGetPrototypeOf,
+ ObjectSetPrototypeOf,
+ Promise,
+ PromiseAll,
+ PromisePrototypeThen,
+ PromiseReject,
+ queueMicrotask,
+ RangeError,
+ SymbolAsyncIterator,
+ TypeError,
+ Uint8Array,
+ WeakMap,
+ WeakMapPrototypeGet,
+ WeakMapPrototypeHas,
+ WeakMapPrototypeSet,
+ } = globalThis.__bootstrap.primordials;
const { DOMException } = window.__bootstrap.domException;
class AssertionError extends Error {
@@ -78,29 +105,13 @@
}
}
- const originalPromise = Promise;
- const originalPromiseThen = Promise.prototype.then;
-
- /**
- * @template T
- * @template TResult1
- * @template TResult2
- * @param {Promise<T>} promise
- * @param {(value: T) => TResult1 | PromiseLike<TResult1>} onFulfilled
- * @param {(reason: any) => TResult2 | PromiseLike<TResult2>=} onRejected
- * @returns {Promise<TResult1 | TResult2>}
- */
- function performPromiseThen(promise, onFulfilled, onRejected) {
- return originalPromiseThen.call(promise, onFulfilled, onRejected);
- }
-
/**
* @template T
* @param {T | PromiseLike<T>} value
* @returns {Promise<T>}
*/
function resolvePromiseWith(value) {
- return new originalPromise((resolve) => resolve(value));
+ return new Promise((resolve) => resolve(value));
}
/** @param {any} e */
@@ -114,7 +125,7 @@
/** @param {Promise<any>} promise */
function setPromiseIsHandledToTrue(promise) {
- performPromiseThen(promise, undefined, rethrowAssertionErrorRejection);
+ PromisePrototypeThen(promise, undefined, rethrowAssertionErrorRejection);
}
/**
@@ -127,7 +138,7 @@
* @returns {Promise<TResult1 | TResult2>}
*/
function transformPromiseWith(promise, fulfillmentHandler, rejectionHandler) {
- return performPromiseThen(promise, fulfillmentHandler, rejectionHandler);
+ return PromisePrototypeThen(promise, fulfillmentHandler, rejectionHandler);
}
/**
@@ -162,8 +173,8 @@
* @returns {void}
*/
function uponPromise(promise, onFulfilled, onRejected) {
- performPromiseThen(
- performPromiseThen(promise, onFulfilled, onRejected),
+ PromisePrototypeThen(
+ PromisePrototypeThen(promise, onFulfilled, onRejected),
undefined,
rethrowAssertionErrorRejection,
);
@@ -335,7 +346,7 @@
function dequeueValue(container) {
assert(_queue in container && _queueTotalSize in container);
assert(container[_queue].length);
- const valueWithSize = container[_queue].shift();
+ const valueWithSize = ArrayPrototypeShift(container[_queue]);
container[_queueTotalSize] -= valueWithSize.size;
if (container[_queueTotalSize] < 0) {
container[_queueTotalSize] = 0;
@@ -358,7 +369,7 @@
if (size === Infinity) {
throw RangeError("chunk size is invalid");
}
- container[_queue].push({ value, size });
+ ArrayPrototypePush(container[_queue], { value, size });
container[_queueTotalSize] += size;
}
@@ -371,7 +382,7 @@
return defaultHWM;
}
const highWaterMark = strategy.highWaterMark;
- if (Number.isNaN(highWaterMark) || highWaterMark < 0) {
+ if (NumberIsNaN(highWaterMark) || highWaterMark < 0) {
throw RangeError(
`Expected highWaterMark to be a positive number or Infinity, got "${highWaterMark}".`,
);
@@ -492,7 +503,7 @@
if (typeof v !== "number") {
return false;
}
- if (Number.isNaN(v)) {
+ if (NumberIsNaN(v)) {
return false;
}
if (v < 0) {
@@ -589,7 +600,8 @@
/** @type {Promise<void>} */
const pullPromise = controller[_pullAlgorithm](controller);
setPromiseIsHandledToTrue(
- pullPromise.then(
+ PromisePrototypeThen(
+ pullPromise,
() => {
controller[_pulling] = false;
if (controller[_pullAgain]) {
@@ -707,7 +719,7 @@
byteOffset,
byteLength,
) {
- controller[_queue].push({ buffer, byteOffset, byteLength });
+ ArrayPrototypePush(controller[_queue], { buffer, byteOffset, byteLength });
controller[_queueTotalSize] += byteLength;
}
@@ -787,7 +799,7 @@
function readableStreamAddReadRequest(stream, readRequest) {
assert(isReadableStreamDefaultReader(stream[_reader]));
assert(stream[_state] === "readable");
- stream[_reader][_readRequests].push(readRequest);
+ ArrayPrototypePush(stream[_reader][_readRequests], readRequest);
}
/**
@@ -802,12 +814,12 @@
return resolvePromiseWith(undefined);
}
if (stream[_state] === "errored") {
- return Promise.reject(stream[_storedError]);
+ return PromiseReject(stream[_storedError]);
}
readableStreamClose(stream);
/** @type {Promise<void>} */
const sourceCancelPromise = stream[_controller][_cancelSteps](reason);
- return sourceCancelPromise.then(() => undefined);
+ return PromisePrototypeThen(sourceCancelPromise, () => undefined);
}
/**
@@ -1064,7 +1076,7 @@
const reader = stream[_reader];
assert(reader[_readRequests].length);
/** @type {ReadRequest<R>} */
- const readRequest = reader[_readRequests].shift();
+ const readRequest = ArrayPrototypeShift(reader[_readRequests]);
if (done) {
readRequest.closeSteps();
} else {
@@ -1138,7 +1150,7 @@
/** @type {Array<() => Promise<void>>} */
const actions = [];
if (preventAbort === false) {
- actions.push(() => {
+ ArrayPrototypePush(actions, () => {
if (dest[_state] === "writable") {
return writableStreamAbort(dest, error);
} else {
@@ -1147,7 +1159,7 @@
});
}
if (preventCancel === false) {
- actions.push(() => {
+ ArrayPrototypePush(actions, () => {
if (source[_state] === "readable") {
return readableStreamCancel(source, error);
} else {
@@ -1156,7 +1168,7 @@
});
}
shutdownWithAction(
- () => Promise.all(actions.map((action) => action())),
+ () => PromiseAll(ArrayPrototypeMap(actions, (action) => action())),
true,
error,
);
@@ -1166,6 +1178,7 @@
abortAlgorithm();
return promise.promise;
}
+ // TODO(lucacasonato): use the internal API to listen for abort.
signal.addEventListener("abort", abortAlgorithm);
}
@@ -1370,6 +1383,7 @@
readableStreamReaderGenericRelease(reader);
if (signal !== undefined) {
+ // TODO(lucacasonato): use the internal API to remove the listener.
signal.removeEventListener("abort", abortAlgorithm);
}
if (isError) {
@@ -1478,6 +1492,8 @@
const value1 = value;
const value2 = value;
+ // TODO(lucacasonato): respect clonedForBranch2.
+
if (canceled1 === false) {
readableStreamDefaultControllerEnqueue(
/** @type {ReadableStreamDefaultController<any>} */ (branch1[
@@ -1578,7 +1594,9 @@
]),
r,
);
- cancelPromise.resolve(undefined);
+ if (canceled1 === false || canceled2 === false) {
+ cancelPromise.resolve(undefined);
+ }
});
return [branch1, branch2];
@@ -1604,7 +1622,7 @@
) {
assert(stream[_controller] === undefined);
if (autoAllocateChunkSize !== undefined) {
- assert(Number.isInteger(autoAllocateChunkSize));
+ assert(NumberIsInteger(autoAllocateChunkSize));
assert(autoAllocateChunkSize >= 0);
}
controller[_stream] = stream;
@@ -1621,7 +1639,8 @@
const startResult = startAlgorithm();
const startPromise = resolvePromiseWith(startResult);
setPromiseIsHandledToTrue(
- startPromise.then(
+ PromisePrototypeThen(
+ startPromise,
() => {
controller[_started] = true;
assert(controller[_pulling] === false);
@@ -1877,7 +1896,7 @@
try {
transformStreamDefaultControllerEnqueue(controller, chunk);
} catch (e) {
- return Promise.reject(e);
+ return PromiseReject(e);
}
return resolvePromiseWith(undefined);
};
@@ -2333,7 +2352,7 @@
assert(stream[_state] === "writable");
/** @type {Deferred<void>} */
const deferred = new Deferred();
- stream[_writeRequests].push(deferred);
+ ArrayPrototypePush(stream[_writeRequests], deferred);
return deferred.promise;
}
@@ -2344,7 +2363,7 @@
function writableStreamClose(stream) {
const state = stream[_state];
if (state === "closed" || state === "errored") {
- return Promise.reject(
+ return PromiseReject(
new TypeError("Writable stream is closed or errored."),
);
}
@@ -2599,7 +2618,7 @@
return resolvePromiseWith(undefined);
}
if (state === "errored") {
- return Promise.reject(stream[_storedError]);
+ return PromiseReject(stream[_storedError]);
}
assert(state === "writable" || state === "erroring");
return writableStreamDefaultWriterClose(writer);
@@ -2690,21 +2709,21 @@
chunk,
);
if (stream !== writer[_stream]) {
- return Promise.reject(new TypeError("Writer's stream is unexpected."));
+ return PromiseReject(new TypeError("Writer's stream is unexpected."));
}
const state = stream[_state];
if (state === "errored") {
- return Promise.reject(stream[_storedError]);
+ return PromiseReject(stream[_storedError]);
}
if (
writableStreamCloseQueuedOrInFlight(stream) === true || state === "closed"
) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("The stream is closing or is closed."),
);
}
if (state === "erroring") {
- return Promise.reject(stream[_storedError]);
+ return PromiseReject(stream[_storedError]);
}
assert(state === "writable");
const promise = writableStreamAddWriteRequest(stream);
@@ -2899,8 +2918,8 @@
* @returns {IteratorResult<T>}
*/
function createIteratorResult(value, done) {
- const result = Object.create(null);
- Object.defineProperties(result, {
+ const result = ObjectCreate(null);
+ ObjectDefineProperties(result, {
value: { value, writable: true, enumerable: true, configurable: true },
done: {
value: done,
@@ -2913,18 +2932,18 @@
}
/** @type {AsyncIterator<unknown, unknown>} */
- const asyncIteratorPrototype = Object.getPrototypeOf(
- Object.getPrototypeOf(async function* () {}).prototype,
+ const asyncIteratorPrototype = ObjectGetPrototypeOf(
+ ObjectGetPrototypeOf(async function* () {}).prototype,
);
/** @type {AsyncIterator<unknown>} */
- const readableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
+ const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
/** @returns {Promise<IteratorResult<unknown>>} */
next() {
/** @type {ReadableStreamDefaultReader} */
const reader = this[_reader];
if (reader[_stream] === undefined) {
- return Promise.reject(
+ return PromiseReject(
new TypeError(
"Cannot get the next iteration result once the reader has been released.",
),
@@ -2995,7 +3014,7 @@
get size() {
webidl.assertBranded(this, ByteLengthQueuingStrategy);
initializeByteLengthSizeFunction(this[_globalObject]);
- return byteSizeFunctionWeakMap.get(this[_globalObject]);
+ return WeakMapPrototypeGet(byteSizeFunctionWeakMap, this[_globalObject]);
}
[Symbol.for("Deno.customInspect")](inspect) {
@@ -3015,11 +3034,11 @@
const byteSizeFunctionWeakMap = new WeakMap();
function initializeByteLengthSizeFunction(globalObject) {
- if (byteSizeFunctionWeakMap.has(globalObject)) {
+ if (WeakMapPrototypeHas(byteSizeFunctionWeakMap, globalObject)) {
return;
}
const size = (chunk) => chunk.byteLength;
- byteSizeFunctionWeakMap.set(globalObject, size);
+ WeakMapPrototypeSet(byteSizeFunctionWeakMap, globalObject, size);
}
class CountQueuingStrategy {
@@ -3046,7 +3065,7 @@
get size() {
webidl.assertBranded(this, CountQueuingStrategy);
initializeCountSizeFunction(this[_globalObject]);
- return countSizeFunctionWeakMap.get(this[_globalObject]);
+ return WeakMapPrototypeGet(countSizeFunctionWeakMap, this[_globalObject]);
}
[Symbol.for("Deno.customInspect")](inspect) {
@@ -3067,11 +3086,11 @@
/** @param {typeof globalThis} globalObject */
function initializeCountSizeFunction(globalObject) {
- if (countSizeFunctionWeakMap.has(globalObject)) {
+ if (WeakMapPrototypeHas(countSizeFunctionWeakMap, globalObject)) {
return;
}
const size = () => 1;
- countSizeFunctionWeakMap.set(globalObject, size);
+ WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
}
/** @template R */
@@ -3159,10 +3178,10 @@
reason = webidl.converters.any(reason);
}
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
if (isReadableStreamLocked(this)) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("Cannot cancel a locked ReadableStream."),
);
}
@@ -3175,7 +3194,7 @@
* @returns {AsyncIterableIterator<R>}
*/
getIterator(options = {}) {
- return this[Symbol.asyncIterator](options);
+ return this[SymbolAsyncIterator](options);
}
/**
@@ -3254,16 +3273,16 @@
context: "Argument 2",
});
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
const { preventClose, preventAbort, preventCancel, signal } = options;
if (isReadableStreamLocked(this)) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("ReadableStream is already locked."),
);
}
if (isWritableStreamLocked(destination)) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("destination WritableStream is already locked."),
);
}
@@ -3296,7 +3315,7 @@
context: "Argument 1",
});
/** @type {AsyncIterableIterator<R>} */
- const iterator = Object.create(readableStreamAsyncIteratorPrototype);
+ const iterator = ObjectCreate(readableStreamAsyncIteratorPrototype);
const reader = acquireReadableStreamDefaultReader(this);
iterator[_reader] = reader;
iterator[_preventCancel] = options.preventCancel;
@@ -3313,9 +3332,9 @@
}
// TODO(lucacasonato): should be moved to webidl crate
- ReadableStream.prototype[Symbol.asyncIterator] =
+ ReadableStream.prototype[SymbolAsyncIterator] =
ReadableStream.prototype.values;
- Object.defineProperty(ReadableStream.prototype, Symbol.asyncIterator, {
+ ObjectDefineProperty(ReadableStream.prototype, SymbolAsyncIterator, {
writable: true,
enumerable: false,
configurable: true,
@@ -3353,10 +3372,10 @@
try {
webidl.assertBranded(this, ReadableStreamDefaultReader);
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
if (this[_stream] === undefined) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("Reader has no associated stream."),
);
}
@@ -3396,7 +3415,7 @@
try {
webidl.assertBranded(this, ReadableStreamDefaultReader);
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
return this[_closedPromise].promise;
}
@@ -3412,11 +3431,11 @@
reason = webidl.converters.any(reason);
}
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
if (this[_stream] === undefined) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("Reader has no associated stream."),
);
}
@@ -3573,7 +3592,7 @@
assert(readableStreamHasDefaultReader(stream));
if (this[_queueTotalSize] > 0) {
assert(readableStreamGetNumReadRequests(stream) === 0);
- const entry = this[_queue].shift();
+ const entry = ArrayPrototypeShift(this[_queue]);
this[_queueTotalSize] -= entry.byteLength;
readableByteStreamControllerHandleQueueDrain(this);
const view = new Uint8Array(
@@ -3978,13 +3997,13 @@
try {
webidl.assertBranded(this, WritableStream);
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
if (reason !== undefined) {
reason = webidl.converters.any(reason);
}
if (isWritableStreamLocked(this)) {
- return Promise.reject(
+ return PromiseReject(
new TypeError(
"The writable stream is locked, therefore cannot be aborted.",
),
@@ -3998,17 +4017,17 @@
try {
webidl.assertBranded(this, WritableStream);
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
if (isWritableStreamLocked(this)) {
- return Promise.reject(
+ return PromiseReject(
new TypeError(
"The writable stream is locked, therefore cannot be closed.",
),
);
}
if (writableStreamCloseQueuedOrInFlight(this) === true) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("The writable stream is already closing."),
);
}
@@ -4062,7 +4081,7 @@
try {
webidl.assertBranded(this, WritableStreamDefaultWriter);
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
return this[_closedPromise].promise;
}
@@ -4083,7 +4102,7 @@
try {
webidl.assertBranded(this, WritableStreamDefaultWriter);
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
return this[_readyPromise].promise;
}
@@ -4096,13 +4115,13 @@
try {
webidl.assertBranded(this, WritableStreamDefaultWriter);
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
if (reason !== undefined) {
reason = webidl.converters.any(reason);
}
if (this[_stream] === undefined) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("A writable stream is not associated with the writer."),
);
}
@@ -4114,16 +4133,16 @@
try {
webidl.assertBranded(this, WritableStreamDefaultWriter);
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
const stream = this[_stream];
if (stream === undefined) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("A writable stream is not associated with the writer."),
);
}
if (writableStreamCloseQueuedOrInFlight(stream) === true) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("The associated stream is already closing."),
);
}
@@ -4152,10 +4171,10 @@
chunk = webidl.converters.any(chunk);
}
} catch (err) {
- return Promise.reject(err);
+ return PromiseReject(err);
}
if (this[_stream] === undefined) {
- return Promise.reject(
+ return PromiseReject(
new TypeError("A writable stream is not associate with the writer."),
);
}