diff options
Diffstat (limited to 'ext/websocket')
-rw-r--r-- | ext/websocket/01_websocket.js | 538 | ||||
-rw-r--r-- | ext/websocket/02_websocketstream.js | 412 | ||||
-rw-r--r-- | ext/websocket/Cargo.toml | 24 | ||||
-rw-r--r-- | ext/websocket/README.md | 5 | ||||
-rw-r--r-- | ext/websocket/lib.deno_websocket.d.ts | 112 | ||||
-rw-r--r-- | ext/websocket/lib.rs | 516 |
6 files changed, 1607 insertions, 0 deletions
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js new file mode 100644 index 000000000..df8063d21 --- /dev/null +++ b/ext/websocket/01_websocket.js @@ -0,0 +1,538 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +/// <reference path="../../core/internal.d.ts" /> + +((window) => { + const core = window.Deno.core; + const { URL } = window.__bootstrap.url; + const webidl = window.__bootstrap.webidl; + const { HTTP_TOKEN_CODE_POINT_RE } = window.__bootstrap.infra; + const { DOMException } = window.__bootstrap.domException; + const { Blob } = globalThis.__bootstrap.file; + const { + ArrayBuffer, + ArrayBufferIsView, + ArrayPrototypeJoin, + DataView, + ErrorPrototypeToString, + ObjectDefineProperty, + Map, + MapPrototypeGet, + MapPrototypeSet, + Set, + Symbol, + String, + StringPrototypeToLowerCase, + StringPrototypeEndsWith, + FunctionPrototypeCall, + RegExpPrototypeTest, + ObjectDefineProperties, + ArrayPrototypeMap, + ArrayPrototypeSome, + PromisePrototypeThen, + } = window.__bootstrap.primordials; + + webidl.converters["sequence<DOMString> or DOMString"] = (V, opts) => { + // Union for (sequence<DOMString> or DOMString) + if (webidl.type(V) === "Object" && V !== null) { + if (V[Symbol.iterator] !== undefined) { + return webidl.converters["sequence<DOMString>"](V, opts); + } + } + return webidl.converters.DOMString(V, opts); + }; + + webidl.converters["WebSocketSend"] = (V, opts) => { + // Union for (Blob or ArrayBufferView or ArrayBuffer or USVString) + if (V instanceof Blob) { + return webidl.converters["Blob"](V, opts); + } + if (typeof V === "object") { + // TODO(littledivy): use primordial for SharedArrayBuffer + if (V instanceof ArrayBuffer || V instanceof SharedArrayBuffer) { + return webidl.converters["ArrayBuffer"](V, opts); + } + if (ArrayBufferIsView(V)) { + return webidl.converters["ArrayBufferView"](V, opts); + } + } + return webidl.converters["USVString"](V, opts); + }; + + const CONNECTING = 0; + const OPEN = 1; + const CLOSING = 2; + const CLOSED = 3; + + /** + * Tries to close the resource (and ignores BadResource errors). + * @param {number} rid + */ + function tryClose(rid) { + try { + core.close(rid); + } catch (err) { + // Ignore error if the socket has already been closed. + if (!(err instanceof Deno.errors.BadResource)) throw err; + } + } + + const handlerSymbol = Symbol("eventHandlers"); + function makeWrappedHandler(handler) { + function wrappedHandler(...args) { + if (typeof wrappedHandler.handler !== "function") { + return; + } + return FunctionPrototypeCall(wrappedHandler.handler, this, ...args); + } + wrappedHandler.handler = handler; + return wrappedHandler; + } + // TODO(lucacasonato) reuse when we can reuse code between web crates + function defineEventHandler(emitter, name) { + // HTML specification section 8.1.5.1 + ObjectDefineProperty(emitter, `on${name}`, { + get() { + if (!this[handlerSymbol]) { + return null; + } + return MapPrototypeGet(this[handlerSymbol], name)?.handler; + }, + set(value) { + if (!this[handlerSymbol]) { + this[handlerSymbol] = new Map(); + } + let handlerWrapper = MapPrototypeGet(this[handlerSymbol], name); + if (handlerWrapper) { + handlerWrapper.handler = value; + } else { + handlerWrapper = makeWrappedHandler(value); + this.addEventListener(name, handlerWrapper); + } + MapPrototypeSet(this[handlerSymbol], name, handlerWrapper); + }, + configurable: true, + enumerable: true, + }); + } + + const _readyState = Symbol("[[readyState]]"); + const _url = Symbol("[[url]]"); + const _rid = Symbol("[[rid]]"); + const _extensions = Symbol("[[extensions]]"); + const _protocol = Symbol("[[protocol]]"); + const _binaryType = Symbol("[[binaryType]]"); + const _bufferedAmount = Symbol("[[bufferedAmount]]"); + const _eventLoop = Symbol("[[eventLoop]]"); + const _server = Symbol("[[server]]"); + class WebSocket extends EventTarget { + [_rid]; + + [_readyState] = CONNECTING; + get readyState() { + webidl.assertBranded(this, WebSocket); + return this[_readyState]; + } + + get CONNECTING() { + webidl.assertBranded(this, WebSocket); + return CONNECTING; + } + get OPEN() { + webidl.assertBranded(this, WebSocket); + return OPEN; + } + get CLOSING() { + webidl.assertBranded(this, WebSocket); + return CLOSING; + } + get CLOSED() { + webidl.assertBranded(this, WebSocket); + return CLOSED; + } + + [_extensions] = ""; + get extensions() { + webidl.assertBranded(this, WebSocket); + return this[_extensions]; + } + + [_protocol] = ""; + get protocol() { + webidl.assertBranded(this, WebSocket); + return this[_protocol]; + } + + [_url] = ""; + get url() { + webidl.assertBranded(this, WebSocket); + return this[_url]; + } + + [_binaryType] = "blob"; + get binaryType() { + webidl.assertBranded(this, WebSocket); + return this[_binaryType]; + } + set binaryType(value) { + webidl.assertBranded(this, WebSocket); + value = webidl.converters.DOMString(value, { + prefix: "Failed to set 'binaryType' on 'WebSocket'", + }); + if (value === "blob" || value === "arraybuffer") { + this[_binaryType] = value; + } + } + + [_bufferedAmount] = 0; + get bufferedAmount() { + webidl.assertBranded(this, WebSocket); + return this[_bufferedAmount]; + } + + constructor(url, protocols = []) { + super(); + this[webidl.brand] = webidl.brand; + const prefix = "Failed to construct 'WebSocket'"; + webidl.requiredArguments(arguments.length, 1, { + prefix, + }); + url = webidl.converters.USVString(url, { + prefix, + context: "Argument 1", + }); + protocols = webidl.converters["sequence<DOMString> or DOMString"]( + protocols, + { + prefix, + context: "Argument 2", + }, + ); + + let wsURL; + + try { + wsURL = new URL(url); + } catch (e) { + throw new DOMException(e.message, "SyntaxError"); + } + + if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") { + throw new DOMException( + "Only ws & wss schemes are allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) { + throw new DOMException( + "Fragments are not allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + this[_url] = wsURL.href; + + core.opSync( + "op_ws_check_permission_and_cancel_handle", + this[_url], + false, + ); + + if (typeof protocols === "string") { + protocols = [protocols]; + } + + if ( + protocols.length !== + new Set( + ArrayPrototypeMap(protocols, (p) => StringPrototypeToLowerCase(p)), + ).size + ) { + throw new DOMException( + "Can't supply multiple times the same protocol.", + "SyntaxError", + ); + } + + if ( + ArrayPrototypeSome( + protocols, + (protocol) => + !RegExpPrototypeTest(HTTP_TOKEN_CODE_POINT_RE, protocol), + ) + ) { + throw new DOMException( + "Invalid protocol value.", + "SyntaxError", + ); + } + + PromisePrototypeThen( + core.opAsync("op_ws_create", { + url: wsURL.href, + protocols: ArrayPrototypeJoin(protocols, ", "), + }), + (create) => { + this[_rid] = create.rid; + this[_extensions] = create.extensions; + this[_protocol] = create.protocol; + + if (this[_readyState] === CLOSING) { + PromisePrototypeThen( + core.opAsync("op_ws_close", { + rid: this[_rid], + }), + () => { + this[_readyState] = CLOSED; + + const errEvent = new ErrorEvent("error"); + this.dispatchEvent(errEvent); + + const event = new CloseEvent("close"); + this.dispatchEvent(event); + tryClose(this[_rid]); + }, + ); + } else { + this[_readyState] = OPEN; + const event = new Event("open"); + this.dispatchEvent(event); + + this[_eventLoop](); + } + }, + (err) => { + this[_readyState] = CLOSED; + + const errorEv = new ErrorEvent( + "error", + { error: err, message: ErrorPrototypeToString(err) }, + ); + this.dispatchEvent(errorEv); + + const closeEv = new CloseEvent("close"); + this.dispatchEvent(closeEv); + }, + ); + } + + send(data) { + webidl.assertBranded(this, WebSocket); + const prefix = "Failed to execute 'send' on 'WebSocket'"; + + webidl.requiredArguments(arguments.length, 1, { + prefix, + }); + data = webidl.converters.WebSocketSend(data, { + prefix, + context: "Argument 1", + }); + + if (this[_readyState] !== OPEN) { + throw new DOMException("readyState not OPEN", "InvalidStateError"); + } + + const sendTypedArray = (ta) => { + this[_bufferedAmount] += ta.byteLength; + PromisePrototypeThen( + core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "binary", + }, ta), + () => { + this[_bufferedAmount] -= ta.byteLength; + }, + ); + }; + + if (data instanceof Blob) { + PromisePrototypeThen( + data.slice().arrayBuffer(), + (ab) => sendTypedArray(new DataView(ab)), + ); + } else if (ArrayBufferIsView(data)) { + sendTypedArray(data); + } else if (data instanceof ArrayBuffer) { + sendTypedArray(new DataView(data)); + } else { + const string = String(data); + const d = core.encode(string); + this[_bufferedAmount] += d.byteLength; + PromisePrototypeThen( + core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "text", + text: string, + }), + () => { + this[_bufferedAmount] -= d.byteLength; + }, + ); + } + } + + close(code = undefined, reason = undefined) { + webidl.assertBranded(this, WebSocket); + const prefix = "Failed to execute 'close' on 'WebSocket'"; + + if (code !== undefined) { + code = webidl.converters["unsigned short"](code, { + prefix, + clamp: true, + context: "Argument 1", + }); + } + + if (reason !== undefined) { + reason = webidl.converters.USVString(reason, { + prefix, + context: "Argument 2", + }); + } + + if (!this[_server]) { + if ( + code !== undefined && + !(code === 1000 || (3000 <= code && code < 5000)) + ) { + throw new DOMException( + "The close code must be either 1000 or in the range of 3000 to 4999.", + "InvalidAccessError", + ); + } + } + + if (reason !== undefined && core.encode(reason).byteLength > 123) { + throw new DOMException( + "The close reason may not be longer than 123 bytes.", + "SyntaxError", + ); + } + + if (this[_readyState] === CONNECTING) { + this[_readyState] = CLOSING; + } else if (this[_readyState] === OPEN) { + this[_readyState] = CLOSING; + + PromisePrototypeThen( + core.opAsync("op_ws_close", { + rid: this[_rid], + code, + reason, + }), + () => { + this[_readyState] = CLOSED; + const event = new CloseEvent("close", { + wasClean: true, + code: code ?? 1005, + reason, + }); + this.dispatchEvent(event); + tryClose(this[_rid]); + }, + ); + } + } + + async [_eventLoop]() { + while (this[_readyState] === OPEN) { + const { kind, value } = await core.opAsync( + "op_ws_next_event", + this[_rid], + ); + + switch (kind) { + case "string": { + const event = new MessageEvent("message", { + data: value, + origin: this[_url], + }); + this.dispatchEvent(event); + break; + } + case "binary": { + let data; + + if (this.binaryType === "blob") { + data = new Blob([value]); + } else { + data = value.buffer; + } + + const event = new MessageEvent("message", { + data, + origin: this[_url], + }); + this.dispatchEvent(event); + break; + } + case "ping": { + core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "pong", + }); + break; + } + case "close": { + this[_readyState] = CLOSED; + + const event = new CloseEvent("close", { + wasClean: true, + code: value.code, + reason: value.reason, + }); + this.dispatchEvent(event); + tryClose(this[_rid]); + break; + } + case "error": { + this[_readyState] = CLOSED; + + const errorEv = new ErrorEvent("error", { + message: value, + }); + this.dispatchEvent(errorEv); + + const closeEv = new CloseEvent("close"); + this.dispatchEvent(closeEv); + tryClose(this[_rid]); + break; + } + } + } + } + } + + ObjectDefineProperties(WebSocket, { + CONNECTING: { + value: 0, + }, + OPEN: { + value: 1, + }, + CLOSING: { + value: 2, + }, + CLOSED: { + value: 3, + }, + }); + + defineEventHandler(WebSocket.prototype, "message"); + defineEventHandler(WebSocket.prototype, "error"); + defineEventHandler(WebSocket.prototype, "close"); + defineEventHandler(WebSocket.prototype, "open"); + + webidl.configurePrototype(WebSocket); + + window.__bootstrap.webSocket = { + WebSocket, + _rid, + _readyState, + _eventLoop, + _protocol, + _server, + }; +})(this); diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js new file mode 100644 index 000000000..6290d94a0 --- /dev/null +++ b/ext/websocket/02_websocketstream.js @@ -0,0 +1,412 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +/// <reference path="../../core/internal.d.ts" /> + +((window) => { + const core = window.Deno.core; + const webidl = window.__bootstrap.webidl; + const { writableStreamClose, Deferred } = window.__bootstrap.streams; + const { DOMException } = window.__bootstrap.domException; + const { add, remove } = window.__bootstrap.abortSignal; + + const { + StringPrototypeEndsWith, + StringPrototypeToLowerCase, + Symbol, + SymbolFor, + Set, + ArrayPrototypeMap, + ArrayPrototypeJoin, + PromisePrototypeThen, + PromisePrototypeCatch, + Uint8Array, + TypeError, + } = window.__bootstrap.primordials; + + webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter( + "WebSocketStreamOptions", + [ + { + key: "protocols", + converter: webidl.converters["sequence<USVString>"], + get defaultValue() { + return []; + }, + }, + { + key: "signal", + converter: webidl.converters.AbortSignal, + }, + ], + ); + webidl.converters.WebSocketCloseInfo = webidl.createDictionaryConverter( + "WebSocketCloseInfo", + [ + { + key: "code", + converter: webidl.converters["unsigned short"], + }, + { + key: "reason", + converter: webidl.converters.USVString, + defaultValue: "", + }, + ], + ); + + /** + * Tries to close the resource (and ignores BadResource errors). + * @param {number} rid + */ + function tryClose(rid) { + try { + core.close(rid); + } catch (err) { + // Ignore error if the socket has already been closed. + if (!(err instanceof Deno.errors.BadResource)) throw err; + } + } + + const _rid = Symbol("[[rid]]"); + const _url = Symbol("[[url]]"); + const _connection = Symbol("[[connection]]"); + const _closed = Symbol("[[closed]]"); + const _closing = Symbol("[[closing]]"); + const _earlyClose = Symbol("[[earlyClose]]"); + class WebSocketStream { + [_rid]; + + [_url]; + get url() { + webidl.assertBranded(this, WebSocketStream); + return this[_url]; + } + + constructor(url, options) { + this[webidl.brand] = webidl.brand; + const prefix = "Failed to construct 'WebSocketStream'"; + webidl.requiredArguments(arguments.length, 1, { prefix }); + url = webidl.converters.USVString(url, { + prefix, + context: "Argument 1", + }); + options = webidl.converters.WebSocketStreamOptions(options, { + prefix, + context: "Argument 2", + }); + + const wsURL = new URL(url); + + if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") { + throw new DOMException( + "Only ws & wss schemes are allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) { + throw new DOMException( + "Fragments are not allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + this[_url] = wsURL.href; + + if ( + options.protocols.length !== + new Set( + ArrayPrototypeMap( + options.protocols, + (p) => StringPrototypeToLowerCase(p), + ), + ).size + ) { + throw new DOMException( + "Can't supply multiple times the same protocol.", + "SyntaxError", + ); + } + + const cancelRid = core.opSync( + "op_ws_check_permission_and_cancel_handle", + this[_url], + true, + ); + + if (options.signal?.aborted) { + core.close(cancelRid); + const err = new DOMException( + "This operation was aborted", + "AbortError", + ); + this[_connection].reject(err); + this[_closed].reject(err); + } else { + const abort = () => { + core.close(cancelRid); + }; + options.signal?.[add](abort); + PromisePrototypeThen( + core.opAsync("op_ws_create", { + url: this[_url], + protocols: options.protocols + ? ArrayPrototypeJoin(options.protocols, ", ") + : "", + cancelHandle: cancelRid, + }), + (create) => { + options.signal?.[remove](abort); + if (this[_earlyClose]) { + PromisePrototypeThen( + core.opAsync("op_ws_close", { + rid: create.rid, + }), + () => { + PromisePrototypeThen( + (async () => { + while (true) { + const { kind } = await core.opAsync( + "op_ws_next_event", + create.rid, + ); + + if (kind === "close") { + break; + } + } + })(), + () => { + const err = new DOMException( + "Closed while connecting", + "NetworkError", + ); + this[_connection].reject(err); + this[_closed].reject(err); + }, + ); + }, + () => { + const err = new DOMException( + "Closed while connecting", + "NetworkError", + ); + this[_connection].reject(err); + this[_closed].reject(err); + }, + ); + } else { + this[_rid] = create.rid; + + const writable = new WritableStream({ + write: async (chunk) => { + if (typeof chunk === "string") { + await core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "text", + text: chunk, + }); + } else if (chunk instanceof Uint8Array) { + await core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "binary", + }, chunk); + } else { + throw new TypeError( + "A chunk may only be either a string or an Uint8Array", + ); + } + }, + close: async (reason) => { + try { + this.close(reason?.code !== undefined ? reason : {}); + } catch (_) { + this.close(); + } + await this.closed; + }, + abort: async (reason) => { + try { + this.close(reason?.code !== undefined ? reason : {}); + } catch (_) { + this.close(); + } + await this.closed; + }, + }); + const readable = new ReadableStream({ + start: (controller) => { + PromisePrototypeThen(this.closed, () => { + try { + controller.close(); + } catch (_) { + // needed to ignore warnings & assertions + } + try { + PromisePrototypeCatch( + writableStreamClose(writable), + () => {}, + ); + } catch (_) { + // needed to ignore warnings & assertions + } + }); + }, + pull: async (controller) => { + const { kind, value } = await core.opAsync( + "op_ws_next_event", + this[_rid], + ); + + switch (kind) { + case "string": { + controller.enqueue(value); + break; + } + case "binary": { + controller.enqueue(value); + break; + } + case "ping": { + await core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "pong", + }); + break; + } + case "close": { + if (this[_closing]) { + this[_closed].resolve(value); + tryClose(this[_rid]); + } else { + PromisePrototypeThen( + core.opAsync("op_ws_close", { + rid: this[_rid], + ...value, + }), + () => { + this[_closed].resolve(value); + tryClose(this[_rid]); + }, + (err) => { + this[_closed].reject(err); + controller.error(err); + tryClose(this[_rid]); + }, + ); + } + break; + } + case "error": { + const err = new Error(value); + this[_closed].reject(err); + controller.error(err); + tryClose(this[_rid]); + break; + } + } + }, + cancel: async (reason) => { + try { + this.close(reason?.code !== undefined ? reason : {}); + } catch (_) { + this.close(); + } + await this.closed; + }, + }); + + this[_connection].resolve({ + readable, + writable, + extensions: create.extensions ?? "", + protocol: create.protocol ?? "", + }); + } + }, + (err) => { + tryClose(cancelRid); + this[_connection].reject(err); + this[_closed].reject(err); + }, + ); + } + } + + [_connection] = new Deferred(); + get connection() { + webidl.assertBranded(this, WebSocketStream); + return this[_connection].promise; + } + + [_earlyClose] = false; + [_closing] = false; + [_closed] = new Deferred(); + get closed() { + webidl.assertBranded(this, WebSocketStream); + return this[_closed].promise; + } + + close(closeInfo) { + webidl.assertBranded(this, WebSocketStream); + closeInfo = webidl.converters.WebSocketCloseInfo(closeInfo, { + prefix: "Failed to execute 'close' on 'WebSocketStream'", + context: "Argument 1", + }); + + if ( + closeInfo.code && + !(closeInfo.code === 1000 || + (3000 <= closeInfo.code && closeInfo.code < 5000)) + ) { + throw new DOMException( + "The close code must be either 1000 or in the range of 3000 to 4999.", + "InvalidAccessError", + ); + } + + const encoder = new TextEncoder(); + if ( + closeInfo.reason && encoder.encode(closeInfo.reason).byteLength > 123 + ) { + throw new DOMException( + "The close reason may not be longer than 123 bytes.", + "SyntaxError", + ); + } + + let code = closeInfo.code; + if (closeInfo.reason && code === undefined) { + code = 1000; + } + + if (this[_connection].state === "pending") { + this[_earlyClose] = true; + } else if (this[_closed].state === "pending") { + this[_closing] = true; + PromisePrototypeCatch( + core.opAsync("op_ws_close", { + rid: this[_rid], + code, + reason: closeInfo.reason, + }), + (err) => { + this[_rid] && tryClose(this[_rid]); + this[_closed].reject(err); + }, + ); + } + } + + [SymbolFor("Deno.customInspect")](inspect) { + return `${this.constructor.name} ${ + inspect({ + url: this.url, + }) + }`; + } + } + + window.__bootstrap.webSocket.WebSocketStream = WebSocketStream; +})(this); diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml new file mode 100644 index 000000000..cec440d36 --- /dev/null +++ b/ext/websocket/Cargo.toml @@ -0,0 +1,24 @@ +# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_websocket" +version = "0.19.0" +authors = ["the Deno authors"] +edition = "2018" +license = "MIT" +readme = "README.md" +repository = "https://github.com/denoland/deno" +description = "Implementation of WebSocket API for Deno" + +[lib] +path = "lib.rs" + +[dependencies] +deno_core = { version = "0.96.0", path = "../../core" } +deno_tls = { version = "0.1.0", path = "../tls" } +http = "0.2.4" +hyper = { version = "0.14.9" } +serde = { version = "1.0.126", features = ["derive"] } +tokio = { version = "1.8.1", features = ["full"] } +tokio-rustls = "0.22.0" +tokio-tungstenite = { version = "0.14.0", features = ["rustls-tls"] } diff --git a/ext/websocket/README.md b/ext/websocket/README.md new file mode 100644 index 000000000..d6495f397 --- /dev/null +++ b/ext/websocket/README.md @@ -0,0 +1,5 @@ +# deno_websocket + +This op crate implements the websocket functions of Deno. + +Spec: https://html.spec.whatwg.org/multipage/web-sockets.html diff --git a/ext/websocket/lib.deno_websocket.d.ts b/ext/websocket/lib.deno_websocket.d.ts new file mode 100644 index 000000000..31e5782a6 --- /dev/null +++ b/ext/websocket/lib.deno_websocket.d.ts @@ -0,0 +1,112 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +// deno-lint-ignore-file no-explicit-any + +/// <reference no-default-lib="true" /> +/// <reference lib="esnext" /> + +interface CloseEventInit extends EventInit { + code?: number; + reason?: string; + wasClean?: boolean; +} + +declare class CloseEvent extends Event { + constructor(type: string, eventInitDict?: CloseEventInit); + /** + * Returns the WebSocket connection close code provided by the server. + */ + readonly code: number; + /** + * Returns the WebSocket connection close reason provided by the server. + */ + readonly reason: string; + /** + * Returns true if the connection closed cleanly; false otherwise. + */ + readonly wasClean: boolean; +} + +interface WebSocketEventMap { + close: CloseEvent; + error: Event; + message: MessageEvent; + open: Event; +} + +/** Provides the API for creating and managing a WebSocket connection to a server, as well as for sending and receiving data on the connection. */ +declare class WebSocket extends EventTarget { + constructor(url: string, protocols?: string | string[]); + + static readonly CLOSED: number; + static readonly CLOSING: number; + static readonly CONNECTING: number; + static readonly OPEN: number; + + /** + * Returns a string that indicates how binary data from the WebSocket object is exposed to scripts: + * + * Can be set, to change how binary data is returned. The default is "blob". + */ + binaryType: BinaryType; + /** + * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network. + * + * If the WebSocket connection is closed, this attribute's value will only increase with each call to the send() method. (The number does not reset to zero once the connection closes.) + */ + readonly bufferedAmount: number; + /** + * Returns the extensions selected by the server, if any. + */ + readonly extensions: string; + onclose: ((this: WebSocket, ev: CloseEvent) => any) | null; + onerror: ((this: WebSocket, ev: Event | ErrorEvent) => any) | null; + onmessage: ((this: WebSocket, ev: MessageEvent) => any) | null; + onopen: ((this: WebSocket, ev: Event) => any) | null; + /** + * Returns the subprotocol selected by the server, if any. It can be used in conjunction with the array form of the constructor's second argument to perform subprotocol negotiation. + */ + readonly protocol: string; + /** + * Returns the state of the WebSocket object's connection. It can have the values described below. + */ + readonly readyState: number; + /** + * Returns the URL that was used to establish the WebSocket connection. + */ + readonly url: string; + /** + * Closes the WebSocket connection, optionally using code as the the WebSocket connection close code and reason as the the WebSocket connection close reason. + */ + close(code?: number, reason?: string): void; + /** + * Transmits data using the WebSocket connection. data can be a string, a Blob, an ArrayBuffer, or an ArrayBufferView. + */ + send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void; + readonly CLOSED: number; + readonly CLOSING: number; + readonly CONNECTING: number; + readonly OPEN: number; + addEventListener<K extends keyof WebSocketEventMap>( + type: K, + listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, + options?: boolean | AddEventListenerOptions, + ): void; + addEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions, + ): void; + removeEventListener<K extends keyof WebSocketEventMap>( + type: K, + listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, + options?: boolean | EventListenerOptions, + ): void; + removeEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | EventListenerOptions, + ): void; +} + +type BinaryType = "arraybuffer" | "blob"; diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs new file mode 100644 index 000000000..40bcb7bc4 --- /dev/null +++ b/ext/websocket/lib.rs @@ -0,0 +1,516 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::bad_resource_id; +use deno_core::error::invalid_hostname; +use deno_core::error::null_opbuf; +use deno_core::error::AnyError; +use deno_core::futures::stream::SplitSink; +use deno_core::futures::stream::SplitStream; +use deno_core::futures::SinkExt; +use deno_core::futures::StreamExt; +use deno_core::include_js_files; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::url; +use deno_core::AsyncRefCell; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::Extension; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use deno_tls::create_client_config; +use deno_tls::webpki::DNSNameRef; + +use http::{Method, Request, Uri}; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::fmt; +use std::path::PathBuf; +use std::rc::Rc; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_rustls::rustls::RootCertStore; +use tokio_rustls::TlsConnector; +use tokio_tungstenite::tungstenite::{ + handshake::client::Response, protocol::frame::coding::CloseCode, + protocol::CloseFrame, Message, +}; +use tokio_tungstenite::MaybeTlsStream; +use tokio_tungstenite::{client_async, WebSocketStream}; + +pub use tokio_tungstenite; // Re-export tokio_tungstenite + +#[derive(Clone)] +pub struct WsRootStore(pub Option<RootCertStore>); +#[derive(Clone)] +pub struct WsUserAgent(pub String); + +pub trait WebSocketPermissions { + fn check_net_url(&mut self, _url: &url::Url) -> Result<(), AnyError>; +} + +/// `UnsafelyIgnoreCertificateErrors` is a wrapper struct so it can be placed inside `GothamState`; +/// using type alias for a `Option<Vec<String>>` could work, but there's a high chance +/// that there might be another type alias pointing to a `Option<Vec<String>>`, which +/// would override previously used alias. +pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>); + +/// For use with `op_websocket_*` when the user does not want permissions. +pub struct NoWebSocketPermissions; + +impl WebSocketPermissions for NoWebSocketPermissions { + fn check_net_url(&mut self, _url: &url::Url) -> Result<(), AnyError> { + Ok(()) + } +} + +type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>; +pub enum WebSocketStreamType { + Client { + tx: AsyncRefCell<SplitSink<WsStream, Message>>, + rx: AsyncRefCell<SplitStream<WsStream>>, + }, + Server { + tx: AsyncRefCell< + SplitSink<WebSocketStream<hyper::upgrade::Upgraded>, Message>, + >, + rx: AsyncRefCell<SplitStream<WebSocketStream<hyper::upgrade::Upgraded>>>, + }, +} + +pub struct WsStreamResource { + pub stream: WebSocketStreamType, + // When a `WsStreamResource` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + pub cancel: CancelHandle, +} + +impl WsStreamResource { + async fn send(self: &Rc<Self>, message: Message) -> Result<(), AnyError> { + match self.stream { + WebSocketStreamType::Client { .. } => { + let mut tx = RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { tx, .. } => tx, + WebSocketStreamType::Server { .. } => unreachable!(), + }) + .borrow_mut() + .await; + tx.send(message).await?; + } + WebSocketStreamType::Server { .. } => { + let mut tx = RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { .. } => unreachable!(), + WebSocketStreamType::Server { tx, .. } => tx, + }) + .borrow_mut() + .await; + tx.send(message).await?; + } + } + + Ok(()) + } + + async fn next_message( + self: &Rc<Self>, + cancel: RcRef<CancelHandle>, + ) -> Result< + Option<Result<Message, tokio_tungstenite::tungstenite::Error>>, + AnyError, + > { + match &self.stream { + WebSocketStreamType::Client { .. } => { + let mut rx = RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { rx, .. } => rx, + WebSocketStreamType::Server { .. } => unreachable!(), + }) + .borrow_mut() + .await; + rx.next().or_cancel(cancel).await.map_err(AnyError::from) + } + WebSocketStreamType::Server { .. } => { + let mut rx = RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { .. } => unreachable!(), + WebSocketStreamType::Server { rx, .. } => rx, + }) + .borrow_mut() + .await; + rx.next().or_cancel(cancel).await.map_err(AnyError::from) + } + } + } +} + +impl Resource for WsStreamResource { + fn name(&self) -> Cow<str> { + "webSocketStream".into() + } +} + +pub struct WsCancelResource(Rc<CancelHandle>); + +impl Resource for WsCancelResource { + fn name(&self) -> Cow<str> { + "webSocketCancel".into() + } + + fn close(self: Rc<Self>) { + self.0.cancel() + } +} + +// This op is needed because creating a WS instance in JavaScript is a sync +// operation and should throw error when permissions are not fulfilled, +// but actual op that connects WS is async. +pub fn op_ws_check_permission_and_cancel_handle<WP>( + state: &mut OpState, + url: String, + cancel_handle: bool, +) -> Result<Option<ResourceId>, AnyError> +where + WP: WebSocketPermissions + 'static, +{ + state + .borrow_mut::<WP>() + .check_net_url(&url::Url::parse(&url)?)?; + + if cancel_handle { + let rid = state + .resource_table + .add(WsCancelResource(CancelHandle::new_rc())); + Ok(Some(rid)) + } else { + Ok(None) + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateArgs { + url: String, + protocols: String, + cancel_handle: Option<ResourceId>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateResponse { + rid: ResourceId, + protocol: String, + extensions: String, +} + +pub async fn op_ws_create<WP>( + state: Rc<RefCell<OpState>>, + args: CreateArgs, + _: (), +) -> Result<CreateResponse, AnyError> +where + WP: WebSocketPermissions + 'static, +{ + { + let mut s = state.borrow_mut(); + s.borrow_mut::<WP>() + .check_net_url(&url::Url::parse(&args.url)?) + .expect( + "Permission check should have been done in op_ws_check_permission", + ); + } + + let unsafely_ignore_certificate_errors = state + .borrow() + .borrow::<UnsafelyIgnoreCertificateErrors>() + .0 + .clone(); + let root_cert_store = state.borrow().borrow::<WsRootStore>().0.clone(); + let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone(); + let uri: Uri = args.url.parse()?; + let mut request = Request::builder().method(Method::GET).uri(&uri); + + request = request.header("User-Agent", user_agent); + + if !args.protocols.is_empty() { + request = request.header("Sec-WebSocket-Protocol", args.protocols); + } + + let request = request.body(())?; + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { + Some("wss") => 443, + Some("ws") => 80, + _ => unreachable!(), + }); + let addr = format!("{}:{}", domain, port); + let tcp_socket = TcpStream::connect(addr).await?; + + let socket: MaybeTlsStream<TcpStream> = match uri.scheme_str() { + Some("ws") => MaybeTlsStream::Plain(tcp_socket), + Some("wss") => { + let tls_config = create_client_config( + root_cert_store, + None, + unsafely_ignore_certificate_errors, + )?; + let tls_connector = TlsConnector::from(Arc::new(tls_config)); + let dnsname = DNSNameRef::try_from_ascii_str(domain) + .map_err(|_| invalid_hostname(domain))?; + let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; + MaybeTlsStream::Rustls(tls_socket) + } + _ => unreachable!(), + }; + + let client = client_async(request, socket); + let (stream, response): (WsStream, Response) = + if let Some(cancel_rid) = args.cancel_handle { + let r = state + .borrow_mut() + .resource_table + .get::<WsCancelResource>(cancel_rid) + .ok_or_else(bad_resource_id)?; + client + .or_cancel(r.0.to_owned()) + .await + .map_err(|_| DomExceptionAbortError::new("connection was aborted"))? + } else { + client.await + } + .map_err(|err| { + DomExceptionNetworkError::new(&format!( + "failed to connect to WebSocket: {}", + err.to_string() + )) + })?; + + if let Some(cancel_rid) = args.cancel_handle { + state.borrow_mut().resource_table.close(cancel_rid); + } + + let (ws_tx, ws_rx) = stream.split(); + let resource = WsStreamResource { + stream: WebSocketStreamType::Client { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + }, + cancel: Default::default(), + }; + let mut state = state.borrow_mut(); + let rid = state.resource_table.add(resource); + + let protocol = match response.headers().get("Sec-WebSocket-Protocol") { + Some(header) => header.to_str().unwrap(), + None => "", + }; + let extensions = response + .headers() + .get_all("Sec-WebSocket-Extensions") + .iter() + .map(|header| header.to_str().unwrap()) + .collect::<String>(); + Ok(CreateResponse { + rid, + protocol: protocol.to_string(), + extensions, + }) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SendArgs { + rid: ResourceId, + kind: String, + text: Option<String>, +} + +pub async fn op_ws_send( + state: Rc<RefCell<OpState>>, + args: SendArgs, + buf: Option<ZeroCopyBuf>, +) -> Result<(), AnyError> { + let msg = match args.kind.as_str() { + "text" => Message::Text(args.text.unwrap()), + "binary" => Message::Binary(buf.ok_or_else(null_opbuf)?.to_vec()), + "pong" => Message::Pong(vec![]), + _ => unreachable!(), + }; + + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(args.rid) + .ok_or_else(bad_resource_id)?; + resource.send(msg).await?; + Ok(()) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CloseArgs { + rid: ResourceId, + code: Option<u16>, + reason: Option<String>, +} + +pub async fn op_ws_close( + state: Rc<RefCell<OpState>>, + args: CloseArgs, + _: (), +) -> Result<(), AnyError> { + let rid = args.rid; + let msg = Message::Close(args.code.map(|c| CloseFrame { + code: CloseCode::from(c), + reason: match args.reason { + Some(reason) => Cow::from(reason), + None => Default::default(), + }, + })); + + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + resource.send(msg).await?; + Ok(()) +} + +#[derive(Serialize)] +#[serde(tag = "kind", content = "value", rename_all = "camelCase")] +pub enum NextEventResponse { + String(String), + Binary(ZeroCopyBuf), + Close { code: u16, reason: String }, + Ping, + Pong, + Error(String), + Closed, +} + +pub async fn op_ws_next_event( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<NextEventResponse, AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + + let cancel = RcRef::map(&resource, |r| &r.cancel); + let val = resource.next_message(cancel).await?; + let res = match val { + Some(Ok(Message::Text(text))) => NextEventResponse::String(text), + Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()), + Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close { + code: frame.code.into(), + reason: frame.reason.to_string(), + }, + Some(Ok(Message::Close(None))) => NextEventResponse::Close { + code: 1005, + reason: String::new(), + }, + Some(Ok(Message::Ping(_))) => NextEventResponse::Ping, + Some(Ok(Message::Pong(_))) => NextEventResponse::Pong, + Some(Err(e)) => NextEventResponse::Error(e.to_string()), + None => { + state.borrow_mut().resource_table.close(rid).unwrap(); + NextEventResponse::Closed + } + }; + Ok(res) +} + +pub fn init<P: WebSocketPermissions + 'static>( + user_agent: String, + root_cert_store: Option<RootCertStore>, + unsafely_ignore_certificate_errors: Option<Vec<String>>, +) -> Extension { + Extension::builder() + .js(include_js_files!( + prefix "deno:ext/websocket", + "01_websocket.js", + "02_websocketstream.js", + )) + .ops(vec![ + ( + "op_ws_check_permission_and_cancel_handle", + op_sync(op_ws_check_permission_and_cancel_handle::<P>), + ), + ("op_ws_create", op_async(op_ws_create::<P>)), + ("op_ws_send", op_async(op_ws_send)), + ("op_ws_close", op_async(op_ws_close)), + ("op_ws_next_event", op_async(op_ws_next_event)), + ]) + .state(move |state| { + state.put::<WsUserAgent>(WsUserAgent(user_agent.clone())); + state.put(UnsafelyIgnoreCertificateErrors( + unsafely_ignore_certificate_errors.clone(), + )); + state.put::<WsRootStore>(WsRootStore(root_cert_store.clone())); + Ok(()) + }) + .build() +} + +pub fn get_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts") +} + +#[derive(Debug)] +pub struct DomExceptionNetworkError { + pub msg: String, +} + +impl DomExceptionNetworkError { + pub fn new(msg: &str) -> Self { + DomExceptionNetworkError { + msg: msg.to_string(), + } + } +} + +impl fmt::Display for DomExceptionNetworkError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad(&self.msg) + } +} + +impl std::error::Error for DomExceptionNetworkError {} + +pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> { + e.downcast_ref::<DomExceptionNetworkError>() + .map(|_| "DOMExceptionNetworkError") +} + +#[derive(Debug)] +pub struct DomExceptionAbortError { + pub msg: String, +} + +impl DomExceptionAbortError { + pub fn new(msg: &str) -> Self { + DomExceptionAbortError { + msg: msg.to_string(), + } + } +} + +impl fmt::Display for DomExceptionAbortError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad(&self.msg) + } +} + +impl std::error::Error for DomExceptionAbortError {} + +pub fn get_abort_error_class_name(e: &AnyError) -> Option<&'static str> { + e.downcast_ref::<DomExceptionAbortError>() + .map(|_| "DOMExceptionAbortError") +} |