summaryrefslogtreecommitdiff
path: root/extensions/websocket
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/websocket')
-rw-r--r--extensions/websocket/01_websocket.js6
-rw-r--r--extensions/websocket/02_websocketstream.js412
-rw-r--r--extensions/websocket/lib.rs109
3 files changed, 517 insertions, 10 deletions
diff --git a/extensions/websocket/01_websocket.js b/extensions/websocket/01_websocket.js
index 7af795197..df8063d21 100644
--- a/extensions/websocket/01_websocket.js
+++ b/extensions/websocket/01_websocket.js
@@ -234,7 +234,11 @@
this[_url] = wsURL.href;
- core.opSync("op_ws_check_permission", this[_url]);
+ core.opSync(
+ "op_ws_check_permission_and_cancel_handle",
+ this[_url],
+ false,
+ );
if (typeof protocols === "string") {
protocols = [protocols];
diff --git a/extensions/websocket/02_websocketstream.js b/extensions/websocket/02_websocketstream.js
new file mode 100644
index 000000000..6290d94a0
--- /dev/null
+++ b/extensions/websocket/02_websocketstream.js
@@ -0,0 +1,412 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+/// <reference path="../../core/internal.d.ts" />
+
+((window) => {
+ const core = window.Deno.core;
+ const webidl = window.__bootstrap.webidl;
+ const { writableStreamClose, Deferred } = window.__bootstrap.streams;
+ const { DOMException } = window.__bootstrap.domException;
+ const { add, remove } = window.__bootstrap.abortSignal;
+
+ const {
+ StringPrototypeEndsWith,
+ StringPrototypeToLowerCase,
+ Symbol,
+ SymbolFor,
+ Set,
+ ArrayPrototypeMap,
+ ArrayPrototypeJoin,
+ PromisePrototypeThen,
+ PromisePrototypeCatch,
+ Uint8Array,
+ TypeError,
+ } = window.__bootstrap.primordials;
+
+ webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
+ "WebSocketStreamOptions",
+ [
+ {
+ key: "protocols",
+ converter: webidl.converters["sequence<USVString>"],
+ get defaultValue() {
+ return [];
+ },
+ },
+ {
+ key: "signal",
+ converter: webidl.converters.AbortSignal,
+ },
+ ],
+ );
+ webidl.converters.WebSocketCloseInfo = webidl.createDictionaryConverter(
+ "WebSocketCloseInfo",
+ [
+ {
+ key: "code",
+ converter: webidl.converters["unsigned short"],
+ },
+ {
+ key: "reason",
+ converter: webidl.converters.USVString,
+ defaultValue: "",
+ },
+ ],
+ );
+
+ /**
+ * Tries to close the resource (and ignores BadResource errors).
+ * @param {number} rid
+ */
+ function tryClose(rid) {
+ try {
+ core.close(rid);
+ } catch (err) {
+ // Ignore error if the socket has already been closed.
+ if (!(err instanceof Deno.errors.BadResource)) throw err;
+ }
+ }
+
+ const _rid = Symbol("[[rid]]");
+ const _url = Symbol("[[url]]");
+ const _connection = Symbol("[[connection]]");
+ const _closed = Symbol("[[closed]]");
+ const _closing = Symbol("[[closing]]");
+ const _earlyClose = Symbol("[[earlyClose]]");
+ class WebSocketStream {
+ [_rid];
+
+ [_url];
+ get url() {
+ webidl.assertBranded(this, WebSocketStream);
+ return this[_url];
+ }
+
+ constructor(url, options) {
+ this[webidl.brand] = webidl.brand;
+ const prefix = "Failed to construct 'WebSocketStream'";
+ webidl.requiredArguments(arguments.length, 1, { prefix });
+ url = webidl.converters.USVString(url, {
+ prefix,
+ context: "Argument 1",
+ });
+ options = webidl.converters.WebSocketStreamOptions(options, {
+ prefix,
+ context: "Argument 2",
+ });
+
+ const wsURL = new URL(url);
+
+ if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") {
+ throw new DOMException(
+ "Only ws & wss schemes are allowed in a WebSocket URL.",
+ "SyntaxError",
+ );
+ }
+
+ if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) {
+ throw new DOMException(
+ "Fragments are not allowed in a WebSocket URL.",
+ "SyntaxError",
+ );
+ }
+
+ this[_url] = wsURL.href;
+
+ if (
+ options.protocols.length !==
+ new Set(
+ ArrayPrototypeMap(
+ options.protocols,
+ (p) => StringPrototypeToLowerCase(p),
+ ),
+ ).size
+ ) {
+ throw new DOMException(
+ "Can't supply multiple times the same protocol.",
+ "SyntaxError",
+ );
+ }
+
+ const cancelRid = core.opSync(
+ "op_ws_check_permission_and_cancel_handle",
+ this[_url],
+ true,
+ );
+
+ if (options.signal?.aborted) {
+ core.close(cancelRid);
+ const err = new DOMException(
+ "This operation was aborted",
+ "AbortError",
+ );
+ this[_connection].reject(err);
+ this[_closed].reject(err);
+ } else {
+ const abort = () => {
+ core.close(cancelRid);
+ };
+ options.signal?.[add](abort);
+ PromisePrototypeThen(
+ core.opAsync("op_ws_create", {
+ url: this[_url],
+ protocols: options.protocols
+ ? ArrayPrototypeJoin(options.protocols, ", ")
+ : "",
+ cancelHandle: cancelRid,
+ }),
+ (create) => {
+ options.signal?.[remove](abort);
+ if (this[_earlyClose]) {
+ PromisePrototypeThen(
+ core.opAsync("op_ws_close", {
+ rid: create.rid,
+ }),
+ () => {
+ PromisePrototypeThen(
+ (async () => {
+ while (true) {
+ const { kind } = await core.opAsync(
+ "op_ws_next_event",
+ create.rid,
+ );
+
+ if (kind === "close") {
+ break;
+ }
+ }
+ })(),
+ () => {
+ const err = new DOMException(
+ "Closed while connecting",
+ "NetworkError",
+ );
+ this[_connection].reject(err);
+ this[_closed].reject(err);
+ },
+ );
+ },
+ () => {
+ const err = new DOMException(
+ "Closed while connecting",
+ "NetworkError",
+ );
+ this[_connection].reject(err);
+ this[_closed].reject(err);
+ },
+ );
+ } else {
+ this[_rid] = create.rid;
+
+ const writable = new WritableStream({
+ write: async (chunk) => {
+ if (typeof chunk === "string") {
+ await core.opAsync("op_ws_send", {
+ rid: this[_rid],
+ kind: "text",
+ text: chunk,
+ });
+ } else if (chunk instanceof Uint8Array) {
+ await core.opAsync("op_ws_send", {
+ rid: this[_rid],
+ kind: "binary",
+ }, chunk);
+ } else {
+ throw new TypeError(
+ "A chunk may only be either a string or an Uint8Array",
+ );
+ }
+ },
+ close: async (reason) => {
+ try {
+ this.close(reason?.code !== undefined ? reason : {});
+ } catch (_) {
+ this.close();
+ }
+ await this.closed;
+ },
+ abort: async (reason) => {
+ try {
+ this.close(reason?.code !== undefined ? reason : {});
+ } catch (_) {
+ this.close();
+ }
+ await this.closed;
+ },
+ });
+ const readable = new ReadableStream({
+ start: (controller) => {
+ PromisePrototypeThen(this.closed, () => {
+ try {
+ controller.close();
+ } catch (_) {
+ // needed to ignore warnings & assertions
+ }
+ try {
+ PromisePrototypeCatch(
+ writableStreamClose(writable),
+ () => {},
+ );
+ } catch (_) {
+ // needed to ignore warnings & assertions
+ }
+ });
+ },
+ pull: async (controller) => {
+ const { kind, value } = await core.opAsync(
+ "op_ws_next_event",
+ this[_rid],
+ );
+
+ switch (kind) {
+ case "string": {
+ controller.enqueue(value);
+ break;
+ }
+ case "binary": {
+ controller.enqueue(value);
+ break;
+ }
+ case "ping": {
+ await core.opAsync("op_ws_send", {
+ rid: this[_rid],
+ kind: "pong",
+ });
+ break;
+ }
+ case "close": {
+ if (this[_closing]) {
+ this[_closed].resolve(value);
+ tryClose(this[_rid]);
+ } else {
+ PromisePrototypeThen(
+ core.opAsync("op_ws_close", {
+ rid: this[_rid],
+ ...value,
+ }),
+ () => {
+ this[_closed].resolve(value);
+ tryClose(this[_rid]);
+ },
+ (err) => {
+ this[_closed].reject(err);
+ controller.error(err);
+ tryClose(this[_rid]);
+ },
+ );
+ }
+ break;
+ }
+ case "error": {
+ const err = new Error(value);
+ this[_closed].reject(err);
+ controller.error(err);
+ tryClose(this[_rid]);
+ break;
+ }
+ }
+ },
+ cancel: async (reason) => {
+ try {
+ this.close(reason?.code !== undefined ? reason : {});
+ } catch (_) {
+ this.close();
+ }
+ await this.closed;
+ },
+ });
+
+ this[_connection].resolve({
+ readable,
+ writable,
+ extensions: create.extensions ?? "",
+ protocol: create.protocol ?? "",
+ });
+ }
+ },
+ (err) => {
+ tryClose(cancelRid);
+ this[_connection].reject(err);
+ this[_closed].reject(err);
+ },
+ );
+ }
+ }
+
+ [_connection] = new Deferred();
+ get connection() {
+ webidl.assertBranded(this, WebSocketStream);
+ return this[_connection].promise;
+ }
+
+ [_earlyClose] = false;
+ [_closing] = false;
+ [_closed] = new Deferred();
+ get closed() {
+ webidl.assertBranded(this, WebSocketStream);
+ return this[_closed].promise;
+ }
+
+ close(closeInfo) {
+ webidl.assertBranded(this, WebSocketStream);
+ closeInfo = webidl.converters.WebSocketCloseInfo(closeInfo, {
+ prefix: "Failed to execute 'close' on 'WebSocketStream'",
+ context: "Argument 1",
+ });
+
+ if (
+ closeInfo.code &&
+ !(closeInfo.code === 1000 ||
+ (3000 <= closeInfo.code && closeInfo.code < 5000))
+ ) {
+ throw new DOMException(
+ "The close code must be either 1000 or in the range of 3000 to 4999.",
+ "InvalidAccessError",
+ );
+ }
+
+ const encoder = new TextEncoder();
+ if (
+ closeInfo.reason && encoder.encode(closeInfo.reason).byteLength > 123
+ ) {
+ throw new DOMException(
+ "The close reason may not be longer than 123 bytes.",
+ "SyntaxError",
+ );
+ }
+
+ let code = closeInfo.code;
+ if (closeInfo.reason && code === undefined) {
+ code = 1000;
+ }
+
+ if (this[_connection].state === "pending") {
+ this[_earlyClose] = true;
+ } else if (this[_closed].state === "pending") {
+ this[_closing] = true;
+ PromisePrototypeCatch(
+ core.opAsync("op_ws_close", {
+ rid: this[_rid],
+ code,
+ reason: closeInfo.reason,
+ }),
+ (err) => {
+ this[_rid] && tryClose(this[_rid]);
+ this[_closed].reject(err);
+ },
+ );
+ }
+ }
+
+ [SymbolFor("Deno.customInspect")](inspect) {
+ return `${this.constructor.name} ${
+ inspect({
+ url: this.url,
+ })
+ }`;
+ }
+ }
+
+ window.__bootstrap.webSocket.WebSocketStream = WebSocketStream;
+})(this);
diff --git a/extensions/websocket/lib.rs b/extensions/websocket/lib.rs
index 896a5f2e2..97e970e85 100644
--- a/extensions/websocket/lib.rs
+++ b/extensions/websocket/lib.rs
@@ -3,7 +3,6 @@
use deno_core::error::bad_resource_id;
use deno_core::error::invalid_hostname;
use deno_core::error::null_opbuf;
-use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::stream::SplitSink;
use deno_core::futures::stream::SplitStream;
@@ -30,6 +29,7 @@ use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::fmt;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
@@ -153,14 +153,26 @@ impl Resource for WsStreamResource {
}
}
+pub struct WsCancelResource(Rc<CancelHandle>);
+
+impl Resource for WsCancelResource {
+ fn name(&self) -> Cow<str> {
+ "webSocketCancel".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.0.cancel()
+ }
+}
+
// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
-pub fn op_ws_check_permission<WP>(
+pub fn op_ws_check_permission_and_cancel_handle<WP>(
state: &mut OpState,
url: String,
- _: (),
-) -> Result<(), AnyError>
+ cancel_handle: bool,
+) -> Result<Option<ResourceId>, AnyError>
where
WP: WebSocketPermissions + 'static,
{
@@ -168,7 +180,14 @@ where
.borrow_mut::<WP>()
.check_net_url(&url::Url::parse(&url)?)?;
- Ok(())
+ if cancel_handle {
+ let rid = state
+ .resource_table
+ .add(WsCancelResource(CancelHandle::new_rc()));
+ Ok(Some(rid))
+ } else {
+ Ok(None)
+ }
}
#[derive(Deserialize)]
@@ -176,6 +195,7 @@ where
pub struct CreateArgs {
url: String,
protocols: String,
+ cancel_handle: Option<ResourceId>,
}
#[derive(Serialize)]
@@ -246,14 +266,32 @@ where
_ => unreachable!(),
};
+ let client = client_async(request, socket);
let (stream, response): (WsStream, Response) =
- client_async(request, socket).await.map_err(|err| {
- type_error(format!(
+ if let Some(cancel_rid) = args.cancel_handle {
+ let r = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsCancelResource>(cancel_rid)
+ .ok_or_else(bad_resource_id)?;
+ client
+ .or_cancel(r.0.to_owned())
+ .await
+ .map_err(|_| DomExceptionAbortError::new("connection was aborted"))?
+ } else {
+ client.await
+ }
+ .map_err(|err| {
+ DomExceptionNetworkError::new(&format!(
"failed to connect to WebSocket: {}",
err.to_string()
))
})?;
+ if let Some(cancel_rid) = args.cancel_handle {
+ state.borrow_mut().resource_table.close(cancel_rid);
+ }
+
let (ws_tx, ws_rx) = stream.split();
let resource = WsStreamResource {
stream: WebSocketStreamType::Client {
@@ -398,11 +436,12 @@ pub fn init<P: WebSocketPermissions + 'static>(
.js(include_js_files!(
prefix "deno:extensions/websocket",
"01_websocket.js",
+ "02_websocketstream.js",
))
.ops(vec![
(
- "op_ws_check_permission",
- op_sync(op_ws_check_permission::<P>),
+ "op_ws_check_permission_and_cancel_handle",
+ op_sync(op_ws_check_permission_and_cancel_handle::<P>),
),
("op_ws_create", op_async(op_ws_create::<P>)),
("op_ws_send", op_async(op_ws_send)),
@@ -423,3 +462,55 @@ pub fn init<P: WebSocketPermissions + 'static>(
pub fn get_declaration() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts")
}
+
+#[derive(Debug)]
+pub struct DomExceptionNetworkError {
+ pub msg: String,
+}
+
+impl DomExceptionNetworkError {
+ pub fn new(msg: &str) -> Self {
+ DomExceptionNetworkError {
+ msg: msg.to_string(),
+ }
+ }
+}
+
+impl fmt::Display for DomExceptionNetworkError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad(&self.msg)
+ }
+}
+
+impl std::error::Error for DomExceptionNetworkError {}
+
+pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> {
+ e.downcast_ref::<DomExceptionNetworkError>()
+ .map(|_| "DOMExceptionNetworkError")
+}
+
+#[derive(Debug)]
+pub struct DomExceptionAbortError {
+ pub msg: String,
+}
+
+impl DomExceptionAbortError {
+ pub fn new(msg: &str) -> Self {
+ DomExceptionAbortError {
+ msg: msg.to_string(),
+ }
+ }
+}
+
+impl fmt::Display for DomExceptionAbortError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad(&self.msg)
+ }
+}
+
+impl std::error::Error for DomExceptionAbortError {}
+
+pub fn get_abort_error_class_name(e: &AnyError) -> Option<&'static str> {
+ e.downcast_ref::<DomExceptionAbortError>()
+ .map(|_| "DOMExceptionAbortError")
+}