summaryrefslogtreecommitdiff
path: root/ext/websocket/02_websocketstream.js
diff options
context:
space:
mode:
authorLeo Kettmeir <crowlkats@toaxl.com>2023-02-07 20:22:46 +0100
committerGitHub <noreply@github.com>2023-02-07 20:22:46 +0100
commitb4aa1530970f7b9cc4e6f2f27e077852c4e178d3 (patch)
tree3d008912affe8550692183bd2697a386db5e3c79 /ext/websocket/02_websocketstream.js
parent65500f36e870b4ada3996b06aa287e30177d21a3 (diff)
refactor: Use ES modules for internal runtime code (#17648)
This PR refactors all internal js files (except core) to be written as ES modules. `__bootstrap`has been mostly replaced with static imports in form in `internal:[path to file from repo root]`. To specify if files are ESM, an `esm` method has been added to `Extension`, similar to the `js` method. A new ModuleLoader called `InternalModuleLoader` has been added to enable the loading of internal specifiers, which is used in all situations except when a snapshot is only loaded, and not a new one is created from it. --------- Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext/websocket/02_websocketstream.js')
-rw-r--r--ext/websocket/02_websocketstream.js778
1 files changed, 388 insertions, 390 deletions
diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js
index 5d7e47cc4..b3d21297f 100644
--- a/ext/websocket/02_websocketstream.js
+++ b/ext/websocket/02_websocketstream.js
@@ -1,426 +1,424 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-"use strict";
/// <reference path="../../core/internal.d.ts" />
-((window) => {
- const core = window.Deno.core;
- const ops = core.ops;
- const webidl = window.__bootstrap.webidl;
- const { writableStreamClose, Deferred } = window.__bootstrap.streams;
- const { DOMException } = window.__bootstrap.domException;
- const { add, remove } = window.__bootstrap.abortSignal;
- const { headersFromHeaderList, headerListFromHeaders, fillHeaders } =
- window.__bootstrap.headers;
-
- const {
- ArrayPrototypeJoin,
- ArrayPrototypeMap,
- Error,
- ObjectPrototypeIsPrototypeOf,
- PromisePrototypeCatch,
- PromisePrototypeThen,
- Set,
- StringPrototypeEndsWith,
- StringPrototypeToLowerCase,
- Symbol,
- SymbolFor,
- TypeError,
- Uint8ArrayPrototype,
- } = window.__bootstrap.primordials;
-
- webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
- "WebSocketStreamOptions",
- [
- {
- key: "protocols",
- converter: webidl.converters["sequence<USVString>"],
- get defaultValue() {
- return [];
- },
- },
- {
- key: "signal",
- converter: webidl.converters.AbortSignal,
- },
- {
- key: "headers",
- converter: webidl.converters.HeadersInit,
+const core = globalThis.Deno.core;
+const ops = core.ops;
+import * as webidl from "internal:ext/webidl/00_webidl.js";
+import { Deferred, writableStreamClose } from "internal:ext/web/06_streams.js";
+import DOMException from "internal:ext/web/01_dom_exception.js";
+import { add, remove } from "internal:ext/web/03_abort_signal.js";
+import {
+ fillHeaders,
+ headerListFromHeaders,
+ headersFromHeaderList,
+} from "internal:ext/fetch/20_headers.js";
+const primordials = globalThis.__bootstrap.primordials;
+const {
+ ArrayPrototypeJoin,
+ ArrayPrototypeMap,
+ Error,
+ ObjectPrototypeIsPrototypeOf,
+ PromisePrototypeCatch,
+ PromisePrototypeThen,
+ Set,
+ StringPrototypeEndsWith,
+ StringPrototypeToLowerCase,
+ Symbol,
+ SymbolFor,
+ TypeError,
+ Uint8ArrayPrototype,
+} = primordials;
+
+webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
+ "WebSocketStreamOptions",
+ [
+ {
+ key: "protocols",
+ converter: webidl.converters["sequence<USVString>"],
+ get defaultValue() {
+ return [];
},
- ],
- );
- webidl.converters.WebSocketCloseInfo = webidl.createDictionaryConverter(
- "WebSocketCloseInfo",
- [
- {
- key: "code",
- converter: webidl.converters["unsigned short"],
- },
- {
- key: "reason",
- converter: webidl.converters.USVString,
- defaultValue: "",
- },
- ],
- );
-
- const CLOSE_RESPONSE_TIMEOUT = 5000;
-
- const _rid = Symbol("[[rid]]");
- const _url = Symbol("[[url]]");
- const _connection = Symbol("[[connection]]");
- const _closed = Symbol("[[closed]]");
- const _earlyClose = Symbol("[[earlyClose]]");
- const _closeSent = Symbol("[[closeSent]]");
- class WebSocketStream {
- [_rid];
-
- [_url];
- get url() {
- webidl.assertBranded(this, WebSocketStreamPrototype);
- return this[_url];
+ },
+ {
+ key: "signal",
+ converter: webidl.converters.AbortSignal,
+ },
+ {
+ key: "headers",
+ converter: webidl.converters.HeadersInit,
+ },
+ ],
+);
+webidl.converters.WebSocketCloseInfo = webidl.createDictionaryConverter(
+ "WebSocketCloseInfo",
+ [
+ {
+ key: "code",
+ converter: webidl.converters["unsigned short"],
+ },
+ {
+ key: "reason",
+ converter: webidl.converters.USVString,
+ defaultValue: "",
+ },
+ ],
+);
+
+const CLOSE_RESPONSE_TIMEOUT = 5000;
+
+const _rid = Symbol("[[rid]]");
+const _url = Symbol("[[url]]");
+const _connection = Symbol("[[connection]]");
+const _closed = Symbol("[[closed]]");
+const _earlyClose = Symbol("[[earlyClose]]");
+const _closeSent = Symbol("[[closeSent]]");
+class WebSocketStream {
+ [_rid];
+
+ [_url];
+ get url() {
+ webidl.assertBranded(this, WebSocketStreamPrototype);
+ return this[_url];
+ }
+
+ constructor(url, options) {
+ this[webidl.brand] = webidl.brand;
+ const prefix = "Failed to construct 'WebSocketStream'";
+ webidl.requiredArguments(arguments.length, 1, { prefix });
+ url = webidl.converters.USVString(url, {
+ prefix,
+ context: "Argument 1",
+ });
+ options = webidl.converters.WebSocketStreamOptions(options, {
+ prefix,
+ context: "Argument 2",
+ });
+
+ const wsURL = new URL(url);
+
+ if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") {
+ throw new DOMException(
+ "Only ws & wss schemes are allowed in a WebSocket URL.",
+ "SyntaxError",
+ );
}
- constructor(url, options) {
- this[webidl.brand] = webidl.brand;
- const prefix = "Failed to construct 'WebSocketStream'";
- webidl.requiredArguments(arguments.length, 1, { prefix });
- url = webidl.converters.USVString(url, {
- prefix,
- context: "Argument 1",
- });
- options = webidl.converters.WebSocketStreamOptions(options, {
- prefix,
- context: "Argument 2",
- });
-
- const wsURL = new URL(url);
-
- if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") {
- throw new DOMException(
- "Only ws & wss schemes are allowed in a WebSocket URL.",
- "SyntaxError",
- );
- }
-
- if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) {
- throw new DOMException(
- "Fragments are not allowed in a WebSocket URL.",
- "SyntaxError",
- );
- }
-
- this[_url] = wsURL.href;
-
- if (
- options.protocols.length !==
- new Set(
- ArrayPrototypeMap(
- options.protocols,
- (p) => StringPrototypeToLowerCase(p),
- ),
- ).size
- ) {
- throw new DOMException(
- "Can't supply multiple times the same protocol.",
- "SyntaxError",
- );
- }
-
- const headers = headersFromHeaderList([], "request");
- if (options.headers !== undefined) {
- fillHeaders(headers, options.headers);
- }
-
- const cancelRid = ops.op_ws_check_permission_and_cancel_handle(
- "WebSocketStream.abort()",
- this[_url],
- true,
+ if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) {
+ throw new DOMException(
+ "Fragments are not allowed in a WebSocket URL.",
+ "SyntaxError",
);
+ }
- if (options.signal?.aborted) {
- core.close(cancelRid);
- const err = options.signal.reason;
- this[_connection].reject(err);
- this[_closed].reject(err);
- } else {
- const abort = () => {
- core.close(cancelRid);
- };
- options.signal?.[add](abort);
- PromisePrototypeThen(
- core.opAsync(
- "op_ws_create",
- "new WebSocketStream()",
- this[_url],
- options.protocols
- ? ArrayPrototypeJoin(options.protocols, ", ")
- : "",
- cancelRid,
- headerListFromHeaders(headers),
+ this[_url] = wsURL.href;
+
+ if (
+ options.protocols.length !==
+ new Set(
+ ArrayPrototypeMap(
+ options.protocols,
+ (p) => StringPrototypeToLowerCase(p),
),
- (create) => {
- options.signal?.[remove](abort);
- if (this[_earlyClose]) {
- PromisePrototypeThen(
- core.opAsync("op_ws_close", create.rid),
- () => {
- PromisePrototypeThen(
- (async () => {
- while (true) {
- const { kind } = await core.opAsync(
- "op_ws_next_event",
- create.rid,
- );
-
- if (kind === "close") {
- break;
- }
- }
- })(),
- () => {
- const err = new DOMException(
- "Closed while connecting",
- "NetworkError",
+ ).size
+ ) {
+ throw new DOMException(
+ "Can't supply multiple times the same protocol.",
+ "SyntaxError",
+ );
+ }
+
+ const headers = headersFromHeaderList([], "request");
+ if (options.headers !== undefined) {
+ fillHeaders(headers, options.headers);
+ }
+
+ const cancelRid = ops.op_ws_check_permission_and_cancel_handle(
+ "WebSocketStream.abort()",
+ this[_url],
+ true,
+ );
+
+ if (options.signal?.aborted) {
+ core.close(cancelRid);
+ const err = options.signal.reason;
+ this[_connection].reject(err);
+ this[_closed].reject(err);
+ } else {
+ const abort = () => {
+ core.close(cancelRid);
+ };
+ options.signal?.[add](abort);
+ PromisePrototypeThen(
+ core.opAsync(
+ "op_ws_create",
+ "new WebSocketStream()",
+ this[_url],
+ options.protocols ? ArrayPrototypeJoin(options.protocols, ", ") : "",
+ cancelRid,
+ headerListFromHeaders(headers),
+ ),
+ (create) => {
+ options.signal?.[remove](abort);
+ if (this[_earlyClose]) {
+ PromisePrototypeThen(
+ core.opAsync("op_ws_close", create.rid),
+ () => {
+ PromisePrototypeThen(
+ (async () => {
+ while (true) {
+ const { kind } = await core.opAsync(
+ "op_ws_next_event",
+ create.rid,
);
- this[_connection].reject(err);
- this[_closed].reject(err);
- },
- );
- },
- () => {
- const err = new DOMException(
- "Closed while connecting",
- "NetworkError",
+
+ if (kind === "close") {
+ break;
+ }
+ }
+ })(),
+ () => {
+ const err = new DOMException(
+ "Closed while connecting",
+ "NetworkError",
+ );
+ this[_connection].reject(err);
+ this[_closed].reject(err);
+ },
+ );
+ },
+ () => {
+ const err = new DOMException(
+ "Closed while connecting",
+ "NetworkError",
+ );
+ this[_connection].reject(err);
+ this[_closed].reject(err);
+ },
+ );
+ } else {
+ this[_rid] = create.rid;
+
+ const writable = new WritableStream({
+ write: async (chunk) => {
+ if (typeof chunk === "string") {
+ await core.opAsync("op_ws_send", this[_rid], {
+ kind: "text",
+ value: chunk,
+ });
+ } else if (
+ ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, chunk)
+ ) {
+ await core.opAsync("op_ws_send", this[_rid], {
+ kind: "binary",
+ value: chunk,
+ }, chunk);
+ } else {
+ throw new TypeError(
+ "A chunk may only be either a string or an Uint8Array",
);
- this[_connection].reject(err);
- this[_closed].reject(err);
- },
+ }
+ },
+ close: async (reason) => {
+ try {
+ this.close(reason?.code !== undefined ? reason : {});
+ } catch (_) {
+ this.close();
+ }
+ await this.closed;
+ },
+ abort: async (reason) => {
+ try {
+ this.close(reason?.code !== undefined ? reason : {});
+ } catch (_) {
+ this.close();
+ }
+ await this.closed;
+ },
+ });
+ const pull = async (controller) => {
+ const { kind, value } = await core.opAsync(
+ "op_ws_next_event",
+ this[_rid],
);
- } else {
- this[_rid] = create.rid;
-
- const writable = new WritableStream({
- write: async (chunk) => {
- if (typeof chunk === "string") {
- await core.opAsync("op_ws_send", this[_rid], {
- kind: "text",
- value: chunk,
- });
- } else if (
- ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, chunk)
- ) {
- await core.opAsync("op_ws_send", this[_rid], {
- kind: "binary",
- value: chunk,
- }, chunk);
- } else {
- throw new TypeError(
- "A chunk may only be either a string or an Uint8Array",
- );
- }
- },
- close: async (reason) => {
+
+ switch (kind) {
+ case "string": {
+ controller.enqueue(value);
+ break;
+ }
+ case "binary": {
+ controller.enqueue(value);
+ break;
+ }
+ case "ping": {
+ await core.opAsync("op_ws_send", this[_rid], {
+ kind: "pong",
+ });
+ await pull(controller);
+ break;
+ }
+ case "closed":
+ case "close": {
+ this[_closed].resolve(value);
+ core.tryClose(this[_rid]);
+ break;
+ }
+ case "error": {
+ const err = new Error(value);
+ this[_closed].reject(err);
+ controller.error(err);
+ core.tryClose(this[_rid]);
+ break;
+ }
+ }
+
+ if (
+ this[_closeSent].state === "fulfilled" &&
+ this[_closed].state === "pending"
+ ) {
+ if (
+ new Date().getTime() - await this[_closeSent].promise <=
+ CLOSE_RESPONSE_TIMEOUT
+ ) {
+ return pull(controller);
+ }
+
+ this[_closed].resolve(value);
+ core.tryClose(this[_rid]);
+ }
+ };
+ const readable = new ReadableStream({
+ start: (controller) => {
+ PromisePrototypeThen(this.closed, () => {
try {
- this.close(reason?.code !== undefined ? reason : {});
+ controller.close();
} catch (_) {
- this.close();
+ // needed to ignore warnings & assertions
}
- await this.closed;
- },
- abort: async (reason) => {
try {
- this.close(reason?.code !== undefined ? reason : {});
+ PromisePrototypeCatch(
+ writableStreamClose(writable),
+ () => {},
+ );
} catch (_) {
- this.close();
+ // needed to ignore warnings & assertions
}
- await this.closed;
- },
- });
- const pull = async (controller) => {
- const { kind, value } = await core.opAsync(
- "op_ws_next_event",
- this[_rid],
- );
+ });
- switch (kind) {
- case "string": {
- controller.enqueue(value);
- break;
- }
- case "binary": {
- controller.enqueue(value);
- break;
- }
- case "ping": {
- await core.opAsync("op_ws_send", this[_rid], {
- kind: "pong",
- });
- await pull(controller);
- break;
- }
- case "closed":
- case "close": {
- this[_closed].resolve(value);
- core.tryClose(this[_rid]);
- break;
- }
- case "error": {
- const err = new Error(value);
- this[_closed].reject(err);
- controller.error(err);
- core.tryClose(this[_rid]);
- break;
+ PromisePrototypeThen(this[_closeSent].promise, () => {
+ if (this[_closed].state === "pending") {
+ return pull(controller);
}
+ });
+ },
+ pull,
+ cancel: async (reason) => {
+ try {
+ this.close(reason?.code !== undefined ? reason : {});
+ } catch (_) {
+ this.close();
}
+ await this.closed;
+ },
+ });
+
+ this[_connection].resolve({
+ readable,
+ writable,
+ extensions: create.extensions ?? "",
+ protocol: create.protocol ?? "",
+ });
+ }
+ },
+ (err) => {
+ if (ObjectPrototypeIsPrototypeOf(core.InterruptedPrototype, err)) {
+ // The signal was aborted.
+ err = options.signal.reason;
+ } else {
+ core.tryClose(cancelRid);
+ }
+ this[_connection].reject(err);
+ this[_closed].reject(err);
+ },
+ );
+ }
+ }
- if (
- this[_closeSent].state === "fulfilled" &&
- this[_closed].state === "pending"
- ) {
- if (
- new Date().getTime() - await this[_closeSent].promise <=
- CLOSE_RESPONSE_TIMEOUT
- ) {
- return pull(controller);
- }
+ [_connection] = new Deferred();
+ get connection() {
+ webidl.assertBranded(this, WebSocketStreamPrototype);
+ return this[_connection].promise;
+ }
- this[_closed].resolve(value);
- core.tryClose(this[_rid]);
- }
- };
- const readable = new ReadableStream({
- start: (controller) => {
- PromisePrototypeThen(this.closed, () => {
- try {
- controller.close();
- } catch (_) {
- // needed to ignore warnings & assertions
- }
- try {
- PromisePrototypeCatch(
- writableStreamClose(writable),
- () => {},
- );
- } catch (_) {
- // needed to ignore warnings & assertions
- }
- });
+ [_earlyClose] = false;
+ [_closed] = new Deferred();
+ [_closeSent] = new Deferred();
+ get closed() {
+ webidl.assertBranded(this, WebSocketStreamPrototype);
+ return this[_closed].promise;
+ }
- PromisePrototypeThen(this[_closeSent].promise, () => {
- if (this[_closed].state === "pending") {
- return pull(controller);
- }
- });
- },
- pull,
- cancel: async (reason) => {
- try {
- this.close(reason?.code !== undefined ? reason : {});
- } catch (_) {
- this.close();
- }
- await this.closed;
- },
- });
-
- this[_connection].resolve({
- readable,
- writable,
- extensions: create.extensions ?? "",
- protocol: create.protocol ?? "",
- });
- }
- },
- (err) => {
- if (ObjectPrototypeIsPrototypeOf(core.InterruptedPrototype, err)) {
- // The signal was aborted.
- err = options.signal.reason;
- } else {
- core.tryClose(cancelRid);
- }
- this[_connection].reject(err);
- this[_closed].reject(err);
- },
- );
- }
+ close(closeInfo) {
+ webidl.assertBranded(this, WebSocketStreamPrototype);
+ closeInfo = webidl.converters.WebSocketCloseInfo(closeInfo, {
+ prefix: "Failed to execute 'close' on 'WebSocketStream'",
+ context: "Argument 1",
+ });
+
+ if (
+ closeInfo.code &&
+ !(closeInfo.code === 1000 ||
+ (3000 <= closeInfo.code && closeInfo.code < 5000))
+ ) {
+ throw new DOMException(
+ "The close code must be either 1000 or in the range of 3000 to 4999.",
+ "InvalidAccessError",
+ );
}
- [_connection] = new Deferred();
- get connection() {
- webidl.assertBranded(this, WebSocketStreamPrototype);
- return this[_connection].promise;
+ const encoder = new TextEncoder();
+ if (
+ closeInfo.reason && encoder.encode(closeInfo.reason).byteLength > 123
+ ) {
+ throw new DOMException(
+ "The close reason may not be longer than 123 bytes.",
+ "SyntaxError",
+ );
}
- [_earlyClose] = false;
- [_closed] = new Deferred();
- [_closeSent] = new Deferred();
- get closed() {
- webidl.assertBranded(this, WebSocketStreamPrototype);
- return this[_closed].promise;
+ let code = closeInfo.code;
+ if (closeInfo.reason && code === undefined) {
+ code = 1000;
}
- close(closeInfo) {
- webidl.assertBranded(this, WebSocketStreamPrototype);
- closeInfo = webidl.converters.WebSocketCloseInfo(closeInfo, {
- prefix: "Failed to execute 'close' on 'WebSocketStream'",
- context: "Argument 1",
- });
-
- if (
- closeInfo.code &&
- !(closeInfo.code === 1000 ||
- (3000 <= closeInfo.code && closeInfo.code < 5000))
- ) {
- throw new DOMException(
- "The close code must be either 1000 or in the range of 3000 to 4999.",
- "InvalidAccessError",
- );
- }
-
- const encoder = new TextEncoder();
- if (
- closeInfo.reason && encoder.encode(closeInfo.reason).byteLength > 123
- ) {
- throw new DOMException(
- "The close reason may not be longer than 123 bytes.",
- "SyntaxError",
- );
- }
-
- let code = closeInfo.code;
- if (closeInfo.reason && code === undefined) {
- code = 1000;
- }
-
- if (this[_connection].state === "pending") {
- this[_earlyClose] = true;
- } else if (this[_closed].state === "pending") {
- PromisePrototypeThen(
- core.opAsync("op_ws_close", this[_rid], code, closeInfo.reason),
- () => {
- setTimeout(() => {
- this[_closeSent].resolve(new Date().getTime());
- }, 0);
- },
- (err) => {
- this[_rid] && core.tryClose(this[_rid]);
- this[_closed].reject(err);
- },
- );
- }
+ if (this[_connection].state === "pending") {
+ this[_earlyClose] = true;
+ } else if (this[_closed].state === "pending") {
+ PromisePrototypeThen(
+ core.opAsync("op_ws_close", this[_rid], code, closeInfo.reason),
+ () => {
+ setTimeout(() => {
+ this[_closeSent].resolve(new Date().getTime());
+ }, 0);
+ },
+ (err) => {
+ this[_rid] && core.tryClose(this[_rid]);
+ this[_closed].reject(err);
+ },
+ );
}
+ }
- [SymbolFor("Deno.customInspect")](inspect) {
- return `${this.constructor.name} ${
- inspect({
- url: this.url,
- })
- }`;
- }
+ [SymbolFor("Deno.customInspect")](inspect) {
+ return `${this.constructor.name} ${
+ inspect({
+ url: this.url,
+ })
+ }`;
}
+}
- const WebSocketStreamPrototype = WebSocketStream.prototype;
+const WebSocketStreamPrototype = WebSocketStream.prototype;
- window.__bootstrap.webSocket.WebSocketStream = WebSocketStream;
-})(this);
+export { WebSocketStream };