summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/Cargo.toml4
-rw-r--r--cli/dts/lib.deno.shared_globals.d.ts98
-rw-r--r--cli/ops/mod.rs1
-rw-r--r--cli/ops/websocket.rs288
-rw-r--r--cli/rt/11_workers.js15
-rw-r--r--cli/rt/27_websocket.js305
-rw-r--r--cli/rt/99_main.js6
-rw-r--r--cli/tests/integration_tests.rs21
-rw-r--r--cli/tests/websocket_test.ts259
-rw-r--r--cli/web_worker.rs1
-rw-r--r--cli/worker.rs1
-rw-r--r--op_crates/web/01_event.js51
-rw-r--r--test_util/src/lib.rs23
13 files changed, 1054 insertions, 19 deletions
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index cd2ef0a84..f67158310 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -69,6 +69,8 @@ tempfile = "3.1.0"
termcolor = "1.1.0"
tokio = { version = "0.2.22", features = ["full"] }
tokio-rustls = "0.14.0"
+# Keep in-sync with warp.
+tokio-tungstenite = { version = "0.11.0" }
url = "2.1.1"
webpki = "0.21.3"
webpki-roots = "0.19.0"
@@ -89,8 +91,6 @@ nix = "0.17.0"
# Used in benchmark
chrono = "0.4"
os_pipe = "0.9.2"
-# Used for testing inspector. Keep in-sync with warp.
-tokio-tungstenite = { version = "0.11.0", features = ["connect"] }
test_util = { path = "../test_util" }
[package.metadata.winres]
diff --git a/cli/dts/lib.deno.shared_globals.d.ts b/cli/dts/lib.deno.shared_globals.d.ts
index a903a0693..8e97fe014 100644
--- a/cli/dts/lib.deno.shared_globals.d.ts
+++ b/cli/dts/lib.deno.shared_globals.d.ts
@@ -595,7 +595,7 @@ declare class Console {
*
* Since we write to stdout, we can't display anything interactive
* we just fall back to `console.dir`.
- *
+ *
* > [Console.dirxml](https://developer.mozilla.org/en-US/docs/Web/API/Console/dirxml)
* > by Mozilla Contributors is licensed under CC-BY-SA 2.5.
*/
@@ -1436,3 +1436,99 @@ interface ErrorConstructor {
// TODO(nayeemrmn): Support `Error.prepareStackTrace()`. We currently use this
// internally in a way that makes it unavailable for users.
}
+
+interface CloseEventInit extends EventInit {
+ code?: number;
+ reason?: string;
+ wasClean?: boolean;
+}
+
+interface CloseEvent extends Event {
+ /**
+ * Returns the WebSocket connection close code provided by the server.
+ */
+ readonly code: number;
+ /**
+ * Returns the WebSocket connection close reason provided by the server.
+ */
+ readonly reason: string;
+ /**
+ * Returns true if the connection closed cleanly; false otherwise.
+ */
+ readonly wasClean: boolean;
+}
+
+declare var CloseEvent: {
+ prototype: CloseEvent;
+ new(type: string, eventInitDict?: CloseEventInit): CloseEvent;
+};
+
+interface WebSocketEventMap {
+ "close": CloseEvent;
+ "error": Event;
+ "message": MessageEvent;
+ "open": Event;
+}
+
+/** Provides the API for creating and managing a WebSocket connection to a server, as well as for sending and receiving data on the connection. */
+interface WebSocket extends EventTarget {
+ /**
+ * Returns a string that indicates how binary data from the WebSocket object is exposed to scripts:
+ *
+ * Can be set, to change how binary data is returned. The default is "blob".
+ */
+ binaryType: BinaryType;
+ /**
+ * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network.
+ *
+ * If the WebSocket connection is closed, this attribute's value will only increase with each call to the send() method. (The number does not reset to zero once the connection closes.)
+ */
+ readonly bufferedAmount: number;
+ /**
+ * Returns the extensions selected by the server, if any.
+ */
+ readonly extensions: string;
+ onclose: ((this: WebSocket, ev: CloseEvent) => any) | null;
+ onerror: ((this: WebSocket, ev: Event) => any) | null;
+ onmessage: ((this: WebSocket, ev: MessageEvent) => any) | null;
+ onopen: ((this: WebSocket, ev: Event) => any) | null;
+ /**
+ * Returns the subprotocol selected by the server, if any. It can be used in conjunction with the array form of the constructor's second argument to perform subprotocol negotiation.
+ */
+ readonly protocol: string;
+ /**
+ * Returns the state of the WebSocket object's connection. It can have the values described below.
+ */
+ readonly readyState: number;
+ /**
+ * Returns the URL that was used to establish the WebSocket connection.
+ */
+ readonly url: string;
+ /**
+ * Closes the WebSocket connection, optionally using code as the the WebSocket connection close code and reason as the the WebSocket connection close reason.
+ */
+ close(code?: number, reason?: string): void;
+ /**
+ * Transmits data using the WebSocket connection. data can be a string, a Blob, an ArrayBuffer, or an ArrayBufferView.
+ */
+ send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void;
+ readonly CLOSED: number;
+ readonly CLOSING: number;
+ readonly CONNECTING: number;
+ readonly OPEN: number;
+ addEventListener<K extends keyof WebSocketEventMap>(type: K, listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, options?: boolean | AddEventListenerOptions): void;
+ addEventListener(type: string, listener: EventListenerOrEventListenerObject, options?: boolean | AddEventListenerOptions): void;
+ removeEventListener<K extends keyof WebSocketEventMap>(type: K, listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, options?: boolean | EventListenerOptions): void;
+ removeEventListener(type: string, listener: EventListenerOrEventListenerObject, options?: boolean | EventListenerOptions): void;
+}
+
+declare var WebSocket: {
+ prototype: WebSocket;
+ new(url: string, protocols?: string | string[]): WebSocket;
+ readonly CLOSED: number;
+ readonly CLOSING: number;
+ readonly CONNECTING: number;
+ readonly OPEN: number;
+};
+
+type BinaryType = "arraybuffer" | "blob";
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index f8fcea1b5..bc6b4f377 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -33,4 +33,5 @@ pub mod timers;
pub mod tls;
pub mod tty;
pub mod web_worker;
+pub mod websocket;
pub mod worker_host;
diff --git a/cli/ops/websocket.rs b/cli/ops/websocket.rs
new file mode 100644
index 000000000..126d67861
--- /dev/null
+++ b/cli/ops/websocket.rs
@@ -0,0 +1,288 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use super::dispatch_json::{Deserialize, JsonOp, Value};
+use crate::state::State;
+use core::task::Poll;
+use deno_core::ErrBox;
+use deno_core::ZeroCopyBuf;
+use deno_core::{CoreIsolate, CoreIsolateState};
+use futures::future::{poll_fn, FutureExt};
+use futures::StreamExt;
+use futures::{ready, SinkExt};
+use http::{Method, Request, Uri};
+use std::borrow::Cow;
+use std::fs::File;
+use std::io::BufReader;
+use std::rc::Rc;
+use std::sync::Arc;
+use tokio::net::TcpStream;
+use tokio_rustls::{rustls::ClientConfig, TlsConnector};
+use tokio_tungstenite::stream::Stream as StreamSwitcher;
+use tokio_tungstenite::tungstenite::{
+ handshake::client::Response, protocol::frame::coding::CloseCode,
+ protocol::CloseFrame, Error, Message,
+};
+use tokio_tungstenite::{client_async, WebSocketStream};
+use webpki::DNSNameRef;
+
+pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
+ i.register_op("op_ws_create", s.stateful_json_op2(op_ws_create));
+ i.register_op("op_ws_send", s.stateful_json_op2(op_ws_send));
+ i.register_op("op_ws_close", s.stateful_json_op2(op_ws_close));
+ i.register_op("op_ws_next_event", s.stateful_json_op2(op_ws_next_event));
+}
+
+type MaybeTlsStream =
+ StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>;
+
+type WsStream = WebSocketStream<MaybeTlsStream>;
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateArgs {
+ url: String,
+ protocols: String,
+}
+
+pub fn op_ws_create(
+ isolate_state: &mut CoreIsolateState,
+ state: &Rc<State>,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<JsonOp, ErrBox> {
+ let args: CreateArgs = serde_json::from_value(args)?;
+ state.check_net_url(&url::Url::parse(&args.url)?)?;
+ let resource_table = isolate_state.resource_table.clone();
+ let ca_file = state.global_state.flags.ca_file.clone();
+ let future = async move {
+ let uri: Uri = args.url.parse().unwrap();
+ let request = Request::builder()
+ .method(Method::GET)
+ .uri(&uri)
+ .header("Sec-WebSocket-Protocol", args.protocols)
+ .body(())
+ .unwrap();
+ let domain = &uri.host().unwrap().to_string();
+ let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
+ Some("wss") => 443,
+ Some("ws") => 80,
+ _ => unreachable!(),
+ });
+ let addr = format!("{}:{}", domain, port);
+ let try_socket = TcpStream::connect(addr).await;
+ let tcp_socket = match try_socket.map_err(Error::Io) {
+ Ok(socket) => socket,
+ Err(_) => return Ok(json!({"success": false})),
+ };
+
+ let socket: MaybeTlsStream = match uri.scheme_str() {
+ Some("ws") => StreamSwitcher::Plain(tcp_socket),
+ Some("wss") => {
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+
+ if let Some(path) = ca_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
+ }
+
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?;
+ StreamSwitcher::Tls(tls_socket)
+ }
+ _ => unreachable!(),
+ };
+
+ let (stream, response): (WsStream, Response) =
+ client_async(request, socket).await.unwrap();
+
+ let rid = {
+ let mut resource_table = resource_table.borrow_mut();
+ resource_table.add("webSocketStream", Box::new(stream))
+ };
+
+ let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
+ Some(header) => header.to_str().unwrap(),
+ None => "",
+ };
+ let extensions = response
+ .headers()
+ .get_all("Sec-WebSocket-Extensions")
+ .iter()
+ .map(|header| header.to_str().unwrap())
+ .collect::<String>();
+ Ok(json!({
+ "success": true,
+ "rid": rid,
+ "protocol": protocol,
+ "extensions": extensions
+ }))
+ };
+ Ok(JsonOp::Async(future.boxed_local()))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct SendArgs {
+ rid: u32,
+ text: Option<String>,
+}
+
+pub fn op_ws_send(
+ isolate_state: &mut CoreIsolateState,
+ _state: &Rc<State>,
+ args: Value,
+ zero_copy: &mut [ZeroCopyBuf],
+) -> Result<JsonOp, ErrBox> {
+ let args: SendArgs = serde_json::from_value(args)?;
+
+ let mut maybe_msg = Some(match args.text {
+ Some(text) => Message::Text(text),
+ None => Message::Binary(zero_copy[0].to_owned().to_vec()),
+ });
+ let resource_table = isolate_state.resource_table.clone();
+ let rid = args.rid;
+
+ let future = poll_fn(move |cx| {
+ let mut resource_table = resource_table.borrow_mut();
+ let stream = resource_table
+ .get_mut::<WsStream>(rid)
+ .ok_or_else(ErrBox::bad_resource_id)?;
+
+ // TODO(ry) Handle errors below instead of unwrap.
+ // Need to map tungstenite::error::Error to ErrBox.
+
+ ready!(stream.poll_ready_unpin(cx)).unwrap();
+ if let Some(msg) = maybe_msg.take() {
+ stream.start_send_unpin(msg).unwrap();
+ }
+ ready!(stream.poll_flush_unpin(cx)).unwrap();
+
+ Poll::Ready(Ok(json!({})))
+ });
+ Ok(JsonOp::Async(future.boxed_local()))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CloseArgs {
+ rid: u32,
+ code: Option<u16>,
+ reason: Option<String>,
+}
+
+pub fn op_ws_close(
+ isolate_state: &mut CoreIsolateState,
+ _state: &Rc<State>,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<JsonOp, ErrBox> {
+ let args: CloseArgs = serde_json::from_value(args)?;
+ let resource_table = isolate_state.resource_table.clone();
+ let rid = args.rid;
+ let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame {
+ code: CloseCode::from(c),
+ reason: match args.reason {
+ Some(reason) => Cow::from(reason),
+ None => Default::default(),
+ },
+ })));
+
+ let future = poll_fn(move |cx| {
+ let mut resource_table = resource_table.borrow_mut();
+ let stream = resource_table
+ .get_mut::<WsStream>(rid)
+ .ok_or_else(ErrBox::bad_resource_id)?;
+
+ // TODO(ry) Handle errors below instead of unwrap.
+ // Need to map tungstenite::error::Error to ErrBox.
+
+ ready!(stream.poll_ready_unpin(cx)).unwrap();
+ if let Some(msg) = maybe_msg.take() {
+ stream.start_send_unpin(msg).unwrap();
+ }
+ ready!(stream.poll_flush_unpin(cx)).unwrap();
+ ready!(stream.poll_close_unpin(cx)).unwrap();
+
+ Poll::Ready(Ok(json!({})))
+ });
+
+ Ok(JsonOp::Async(future.boxed_local()))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct NextEventArgs {
+ rid: u32,
+}
+
+pub fn op_ws_next_event(
+ isolate_state: &mut CoreIsolateState,
+ _state: &Rc<State>,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<JsonOp, ErrBox> {
+ let args: NextEventArgs = serde_json::from_value(args)?;
+ let resource_table = isolate_state.resource_table.clone();
+ let future = poll_fn(move |cx| {
+ let mut resource_table = resource_table.borrow_mut();
+ let stream = resource_table
+ .get_mut::<WsStream>(args.rid)
+ .ok_or_else(ErrBox::bad_resource_id)?;
+
+ stream.poll_next_unpin(cx).map(|val| {
+ match val {
+ Some(val) => {
+ match val {
+ Ok(message) => {
+ match message {
+ Message::Text(text) => Ok(json!({
+ "type": "string",
+ "data": text
+ })),
+ Message::Binary(data) => {
+ Ok(json!({ //TODO: don't use json to send binary data
+ "type": "binary",
+ "data": data
+ }))
+ }
+ Message::Close(frame) => {
+ if let Some(frame) = frame {
+ let code: u16 = frame.code.into();
+ Ok(json!({
+ "type": "close",
+ "code": code,
+ "reason": frame.reason.as_ref()
+ }))
+ } else {
+ Ok(json!({ "type": "close" }))
+ }
+ }
+ Message::Ping(_) => Ok(json!({"type": "ping"})),
+ Message::Pong(_) => Ok(json!({"type": "pong"})),
+ }
+ }
+ Err(_) => Ok(json!({
+ "type": "error",
+ })),
+ }
+ }
+ None => {
+ resource_table
+ .close(args.rid)
+ .ok_or_else(ErrBox::bad_resource_id)?;
+ Ok(json!({
+ "type": "closed",
+ }))
+ }
+ }
+ })
+ });
+
+ Ok(JsonOp::Async(future.boxed_local()))
+}
diff --git a/cli/rt/11_workers.js b/cli/rt/11_workers.js
index 8ae0d5ad5..9a23e3dd8 100644
--- a/cli/rt/11_workers.js
+++ b/cli/rt/11_workers.js
@@ -39,20 +39,6 @@
const encoder = new TextEncoder();
const decoder = new TextDecoder();
- class MessageEvent extends Event {
- constructor(type, eventInitDict) {
- super(type, {
- bubbles: eventInitDict?.bubbles ?? false,
- cancelable: eventInitDict?.cancelable ?? false,
- composed: eventInitDict?.composed ?? false,
- });
-
- this.data = eventInitDict?.data ?? null;
- this.origin = eventInitDict?.origin ?? "";
- this.lastEventId = eventInitDict?.lastEventId ?? "";
- }
- }
-
function encodeMessage(data) {
const dataJson = JSON.stringify(data);
return encoder.encode(dataJson);
@@ -226,6 +212,5 @@
window.__bootstrap.worker = {
Worker,
- MessageEvent,
};
})(this);
diff --git a/cli/rt/27_websocket.js b/cli/rt/27_websocket.js
new file mode 100644
index 000000000..0b113ebca
--- /dev/null
+++ b/cli/rt/27_websocket.js
@@ -0,0 +1,305 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+((window) => {
+ const { sendAsync } = window.__bootstrap.dispatchJson;
+ const { close } = window.__bootstrap.resources;
+ const { requiredArguments } = window.__bootstrap.webUtil;
+ const CONNECTING = 0;
+ const OPEN = 1;
+ const CLOSING = 2;
+ const CLOSED = 3;
+
+ class WebSocket extends EventTarget {
+ #readyState = CONNECTING;
+
+ constructor(url, protocols = []) {
+ super();
+ requiredArguments("WebSocket", arguments.length, 1);
+
+ const wsURL = new URL(url);
+
+ if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") {
+ throw new DOMException(
+ "Only ws & wss schemes are allowed in a WebSocket URL.",
+ "SyntaxError",
+ );
+ }
+
+ if (wsURL.hash !== "" || wsURL.href.endsWith("#")) {
+ throw new DOMException(
+ "Fragments are not allowed in a WebSocket URL.",
+ "SyntaxError",
+ );
+ }
+
+ this.#url = wsURL.href;
+
+ if (protocols && typeof protocols === "string") {
+ protocols = [protocols];
+ }
+
+ if (
+ protocols.some((x) => protocols.indexOf(x) !== protocols.lastIndexOf(x))
+ ) {
+ throw new DOMException(
+ "Can't supply multiple times the same protocol.",
+ "SyntaxError",
+ );
+ }
+
+ sendAsync("op_ws_create", {
+ url: wsURL.href,
+ protocols: protocols.join("; "),
+ }).then((create) => {
+ if (create.success) {
+ this.#rid = create.rid;
+ this.#extensions = create.extensions;
+ this.#protocol = create.protocol;
+
+ if (this.#readyState === CLOSING) {
+ sendAsync("op_ws_close", {
+ rid: this.#rid,
+ }).then(() => {
+ this.#readyState = CLOSED;
+
+ const errEvent = new Event("error");
+ errEvent.target = this;
+ this.onerror?.(errEvent);
+ this.dispatchEvent(errEvent);
+
+ const event = new CloseEvent("close");
+ event.target = this;
+ this.onclose?.(event);
+ this.dispatchEvent(event);
+ close(this.#rid);
+ });
+
+ const event = new Event("error");
+ event.target = this;
+ this.onerror?.(event);
+ this.dispatchEvent(event);
+ } else {
+ this.#readyState = OPEN;
+ const event = new Event("open");
+ event.target = this;
+ this.onopen?.(event);
+ this.dispatchEvent(event);
+
+ this.#eventLoop();
+ }
+ } else {
+ this.#readyState = CLOSED;
+
+ const errEvent = new Event("error");
+ errEvent.target = this;
+ this.onerror?.(errEvent);
+ this.dispatchEvent(errEvent);
+
+ const closeEvent = new CloseEvent("close");
+ closeEvent.target = this;
+ this.onclose?.(closeEvent);
+ this.dispatchEvent(closeEvent);
+ }
+ });
+ }
+
+ get CONNECTING() {
+ return CONNECTING;
+ }
+ get OPEN() {
+ return OPEN;
+ }
+ get CLOSING() {
+ return CLOSING;
+ }
+ get CLOSED() {
+ return CLOSED;
+ }
+
+ get readyState() {
+ return this.#readyState;
+ }
+
+ #extensions = "";
+ #protocol = "";
+ #url = "";
+ #rid;
+
+ get extensions() {
+ return this.#extensions;
+ }
+ get protocol() {
+ return this.#protocol;
+ }
+
+ #binaryType = "blob";
+ get binaryType() {
+ return this.#binaryType;
+ }
+ set binaryType(value) {
+ if (value === "blob" || value === "arraybuffer") {
+ this.#binaryType = value;
+ }
+ }
+ #bufferedAmount = 0;
+ get bufferedAmount() {
+ return this.#bufferedAmount;
+ }
+
+ get url() {
+ return this.#url;
+ }
+
+ onopen = () => {};
+ onerror = () => {};
+ onclose = () => {};
+ onmessage = () => {};
+
+ send(data) {
+ requiredArguments("WebSocket.send", arguments.length, 1);
+
+ if (this.#readyState != OPEN) {
+ throw Error("readyState not OPEN");
+ }
+
+ const sendTypedArray = (ta) => {
+ this.#bufferedAmount += ta.size;
+ sendAsync("op_ws_send", {
+ rid: this.#rid,
+ }, ta).then(() => {
+ this.#bufferedAmount -= ta.size;
+ });
+ };
+
+ if (data instanceof Blob) {
+ data.slice().arrayBuffer().then((ab) =>
+ sendTypedArray(new DataView(ab))
+ );
+ } else if (
+ data instanceof Int8Array || data instanceof Int16Array ||
+ data instanceof Int32Array || data instanceof Uint8Array ||
+ data instanceof Uint16Array || data instanceof Uint32Array ||
+ data instanceof Uint8ClampedArray || data instanceof Float32Array ||
+ data instanceof Float64Array || data instanceof DataView
+ ) {
+ sendTypedArray(data);
+ } else if (data instanceof ArrayBuffer) {
+ sendTypedArray(new DataView(data));
+ } else {
+ const string = String(data);
+ const encoder = new TextEncoder();
+ const d = encoder.encode(string);
+ this.#bufferedAmount += d.size;
+ sendAsync("op_ws_send", {
+ rid: this.#rid,
+ text: string,
+ }).then(() => {
+ this.#bufferedAmount -= d.size;
+ });
+ }
+ }
+
+ close(code, reason) {
+ if (code && (code !== 1000 && !(3000 <= code > 5000))) {
+ throw new DOMException(
+ "The close code must be either 1000 or in the range of 3000 to 4999.",
+ "NotSupportedError",
+ );
+ }
+
+ const encoder = new TextEncoder();
+ if (reason && encoder.encode(reason).byteLength > 123) {
+ throw new DOMException(
+ "The close reason may not be longer than 123 bytes.",
+ "SyntaxError",
+ );
+ }
+
+ if (this.#readyState === CONNECTING) {
+ this.#readyState = CLOSING;
+ } else if (this.#readyState === OPEN) {
+ this.#readyState = CLOSING;
+
+ sendAsync("op_ws_close", {
+ rid: this.#rid,
+ code,
+ reason,
+ }).then(() => {
+ this.#readyState = CLOSED;
+ const event = new CloseEvent("close", {
+ wasClean: true,
+ code,
+ reason,
+ });
+ event.target = this;
+ this.onclose?.(event);
+ this.dispatchEvent(event);
+ close(this.#rid);
+ });
+ }
+ }
+
+ async #eventLoop() {
+ if (this.#readyState === OPEN) {
+ const message = await sendAsync("op_ws_next_event", { rid: this.#rid });
+ if (message.type === "string" || message.type === "binary") {
+ let data;
+
+ if (message.type === "string") {
+ data = message.data;
+ } else {
+ if (this.binaryType === "blob") {
+ data = new Blob([new Uint8Array(message.data)]);
+ } else {
+ data = new Uint8Array(message.data).buffer;
+ }
+ }
+
+ const event = new MessageEvent("message", {
+ data,
+ origin: this.#url,
+ });
+ event.target = this;
+ this.onmessage?.(event);
+ this.dispatchEvent(event);
+
+ this.#eventLoop();
+ } else if (message.type === "close") {
+ this.#readyState = CLOSED;
+ const event = new CloseEvent("close", {
+ wasClean: true,
+ code: message.code,
+ reason: message.reason,
+ });
+ event.target = this;
+ this.onclose?.(event);
+ this.dispatchEvent(event);
+ } else if (message.type === "error") {
+ const event = new Event("error");
+ event.target = this;
+ this.onerror?.(event);
+ this.dispatchEvent(event);
+ }
+ }
+ }
+ }
+
+ Object.defineProperties(WebSocket, {
+ CONNECTING: {
+ value: 0,
+ },
+ OPEN: {
+ value: 1,
+ },
+ CLOSING: {
+ value: 2,
+ },
+ CLOSED: {
+ value: 3,
+ },
+ });
+
+ window.__bootstrap.webSocket = {
+ WebSocket,
+ };
+})(this);
diff --git a/cli/rt/99_main.js b/cli/rt/99_main.js
index c6d6d1395..ede47980c 100644
--- a/cli/rt/99_main.js
+++ b/cli/rt/99_main.js
@@ -30,6 +30,7 @@ delete Object.prototype.__proto__;
const progressEvent = window.__bootstrap.progressEvent;
const fileReader = window.__bootstrap.fileReader;
const formData = window.__bootstrap.formData;
+ const webSocket = window.__bootstrap.webSocket;
const request = window.__bootstrap.request;
const fetch = window.__bootstrap.fetch;
const denoNs = window.__bootstrap.denoNs;
@@ -80,7 +81,7 @@ delete Object.prototype.__proto__;
let isClosing = false;
async function workerMessageRecvCallback(data) {
- const msgEvent = new worker.MessageEvent("message", {
+ const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
});
@@ -232,10 +233,13 @@ delete Object.prototype.__proto__;
CustomEvent: util.nonEnumerable(CustomEvent),
DOMException: util.nonEnumerable(DOMException),
ErrorEvent: util.nonEnumerable(ErrorEvent),
+ CloseEvent: util.nonEnumerable(CloseEvent),
+ MessageEvent: util.nonEnumerable(MessageEvent),
Event: util.nonEnumerable(Event),
EventTarget: util.nonEnumerable(EventTarget),
Headers: util.nonEnumerable(headers.Headers),
FormData: util.nonEnumerable(formData.FormData),
+ WebSocket: util.nonEnumerable(webSocket.WebSocket),
ReadableStream: util.nonEnumerable(streams.ReadableStream),
Request: util.nonEnumerable(request.Request),
Response: util.nonEnumerable(fetch.Response),
diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs
index 08c68a9ae..94d410dcc 100644
--- a/cli/tests/integration_tests.rs
+++ b/cli/tests/integration_tests.rs
@@ -3205,6 +3205,27 @@ async fn inspector_runtime_evaluate_does_not_crash() {
}
#[test]
+fn websocket() {
+ let _g = util::http_server();
+
+ let script = util::tests_path().join("websocket_test.ts");
+ let root_ca = util::tests_path().join("tls/RootCA.pem");
+ let status = util::deno_cmd()
+ .arg("test")
+ .arg("--unstable")
+ .arg("--allow-net")
+ .arg("--cert")
+ .arg(root_ca)
+ .arg(script)
+ .spawn()
+ .unwrap()
+ .wait()
+ .unwrap();
+
+ assert!(status.success());
+}
+
+#[test]
fn exec_path() {
let output = util::deno_cmd()
.current_dir(util::root_path())
diff --git a/cli/tests/websocket_test.ts b/cli/tests/websocket_test.ts
new file mode 100644
index 000000000..0fb9af951
--- /dev/null
+++ b/cli/tests/websocket_test.ts
@@ -0,0 +1,259 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+import {
+ assertEquals,
+ assertThrows,
+ createResolvable,
+ fail,
+} from "./unit/test_util.ts";
+
+Deno.test("invalid scheme", () => {
+ assertThrows(() => new WebSocket("foo://localhost:4242"));
+});
+
+Deno.test("fragment", () => {
+ assertThrows(() => new WebSocket("ws://localhost:4242/#"));
+ assertThrows(() => new WebSocket("ws://localhost:4242/#foo"));
+});
+
+Deno.test("duplicate protocols", () => {
+ assertThrows(() => new WebSocket("ws://localhost:4242", ["foo", "foo"]));
+});
+
+Deno.test("invalid server", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:2121");
+ let err = false;
+ ws.onerror = (): void => {
+ err = true;
+ };
+ ws.onclose = (): void => {
+ if (err) {
+ promise.resolve();
+ } else {
+ fail();
+ }
+ };
+ ws.onopen = (): void => fail();
+ await promise;
+});
+
+Deno.test("connect & close", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => {
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("connect & abort", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.close();
+ let err = false;
+ ws.onerror = (): void => {
+ err = true;
+ };
+ ws.onclose = (): void => {
+ if (err) {
+ promise.resolve();
+ } else {
+ fail();
+ }
+ };
+ ws.onopen = (): void => fail();
+ await promise;
+});
+
+Deno.test("connect & close custom valid code", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.close(1000);
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("connect & close custom invalid code", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => {
+ assertThrows(() => ws.close(1001));
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("connect & close custom valid reason", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.close(1000, "foo");
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("connect & close custom invalid reason", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => {
+ assertThrows(() => ws.close(1000, "".padEnd(124, "o")));
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("echo string", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.send("foo");
+ ws.onmessage = (e): void => {
+ assertEquals(e.data, "foo");
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("echo string tls", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("wss://localhost:4243");
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.send("foo");
+ ws.onmessage = (e): void => {
+ assertEquals(e.data, "foo");
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("echo blob with binaryType blob", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ const blob = new Blob(["foo"]);
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.send(blob);
+ ws.onmessage = (e): void => {
+ e.data.text().then((actual: string) => {
+ blob.text().then((expected) => {
+ assertEquals(actual, expected);
+ });
+ });
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("echo blob with binaryType arraybuffer", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.binaryType = "arraybuffer";
+ const blob = new Blob(["foo"]);
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.send(blob);
+ ws.onmessage = (e): void => {
+ blob.arrayBuffer().then((expected) => {
+ assertEquals(e.data, expected);
+ });
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("echo uint8array with binaryType blob", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ const uint = new Uint8Array([102, 111, 111]);
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.send(uint);
+ ws.onmessage = (e): void => {
+ e.data.arrayBuffer().then((actual: ArrayBuffer) => {
+ assertEquals(actual, uint.buffer);
+ });
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("echo uint8array with binaryType arraybuffer", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.binaryType = "arraybuffer";
+ const uint = new Uint8Array([102, 111, 111]);
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.send(uint);
+ ws.onmessage = (e): void => {
+ assertEquals(e.data, uint.buffer);
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("echo arraybuffer with binaryType blob", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ const buffer = new ArrayBuffer(3);
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.send(buffer);
+ ws.onmessage = (e): void => {
+ e.data.arrayBuffer().then((actual: ArrayBuffer) => {
+ assertEquals(actual, buffer);
+ });
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test("echo arraybuffer with binaryType arraybuffer", async () => {
+ const promise = createResolvable();
+ const ws = new WebSocket("ws://localhost:4242");
+ ws.binaryType = "arraybuffer";
+ const buffer = new ArrayBuffer(3);
+ ws.onerror = (): void => fail();
+ ws.onopen = (): void => ws.send(buffer);
+ ws.onmessage = (e): void => {
+ assertEquals(e.data, buffer);
+ ws.close();
+ };
+ ws.onclose = (): void => {
+ promise.resolve();
+ };
+ await promise;
+});
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
index 8d9c00499..9b42ebe32 100644
--- a/cli/web_worker.rs
+++ b/cli/web_worker.rs
@@ -127,6 +127,7 @@ impl WebWorker {
ops::errors::init(isolate, &state);
ops::timers::init(isolate, &state);
ops::fetch::init(isolate, &state);
+ ops::websocket::init(isolate, &state);
if has_deno_namespace {
ops::runtime_compiler::init(isolate, &state);
diff --git a/cli/worker.rs b/cli/worker.rs
index 0129242e6..3773871dc 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -263,6 +263,7 @@ impl MainWorker {
ops::runtime_compiler::init(isolate, &state);
ops::errors::init(isolate, &state);
ops::fetch::init(isolate, &state);
+ ops::websocket::init(isolate, &state);
ops::fs::init(isolate, &state);
ops::fs_events::init(isolate, &state);
ops::idna::init(isolate, &state);
diff --git a/op_crates/web/01_event.js b/op_crates/web/01_event.js
index 48899e6fd..5b21d4402 100644
--- a/op_crates/web/01_event.js
+++ b/op_crates/web/01_event.js
@@ -1011,6 +1011,55 @@
"error",
]);
+ class CloseEvent extends Event {
+ #wasClean = "";
+ #code = "";
+ #reason = "";
+
+ get wasClean() {
+ return this.#wasClean;
+ }
+ get code() {
+ return this.#code;
+ }
+ get reason() {
+ return this.#reason;
+ }
+
+ constructor(type, {
+ bubbles,
+ cancelable,
+ composed,
+ wasClean = false,
+ code = 0,
+ reason = "",
+ } = {}) {
+ super(type, {
+ bubbles: bubbles,
+ cancelable: cancelable,
+ composed: composed,
+ });
+
+ this.#wasClean = wasClean;
+ this.#code = code;
+ this.#reason = reason;
+ }
+ }
+
+ class MessageEvent extends Event {
+ constructor(type, eventInitDict) {
+ super(type, {
+ bubbles: eventInitDict?.bubbles ?? false,
+ cancelable: eventInitDict?.cancelable ?? false,
+ composed: eventInitDict?.composed ?? false,
+ });
+
+ this.data = eventInitDict?.data ?? null;
+ this.origin = eventInitDict?.origin ?? "";
+ this.lastEventId = eventInitDict?.lastEventId ?? "";
+ }
+ }
+
class CustomEvent extends Event {
#detail = null;
@@ -1037,6 +1086,8 @@
window.Event = Event;
window.EventTarget = EventTarget;
window.ErrorEvent = ErrorEvent;
+ window.CloseEvent = CloseEvent;
+ window.MessageEvent = MessageEvent;
window.CustomEvent = CustomEvent;
window.dispatchEvent = EventTarget.prototype.dispatchEvent;
window.addEventListener = EventTarget.prototype.addEventListener;
diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs
index 1f8548e34..832c40f59 100644
--- a/test_util/src/lib.rs
+++ b/test_util/src/lib.rs
@@ -39,6 +39,8 @@ const DOUBLE_REDIRECTS_PORT: u16 = 4548;
const INF_REDIRECTS_PORT: u16 = 4549;
const REDIRECT_ABSOLUTE_PORT: u16 = 4550;
const HTTPS_PORT: u16 = 5545;
+const WS_PORT: u16 = 4242;
+const WSS_PORT: u16 = 4243;
pub const PERMISSION_VARIANTS: [&str; 5] =
["read", "write", "env", "net", "run"];
@@ -135,6 +137,25 @@ pub async fn run_all_servers() {
let redirect_server_fut =
warp::serve(routes).bind(([127, 0, 0, 1], REDIRECT_PORT));
+ let websocket_route = warp::ws().map(|ws: warp::ws::Ws| {
+ ws.on_upgrade(|websocket| {
+ use futures::stream::StreamExt;
+ let (tx, rx) = websocket.split();
+ rx.forward(tx).map(|result| {
+ if let Err(e) = result {
+ println!("websocket server error: {:?}", e);
+ }
+ })
+ })
+ });
+ let ws_server_fut =
+ warp::serve(websocket_route).bind(([127, 0, 0, 1], WS_PORT));
+ let wss_server_fut = warp::serve(websocket_route)
+ .tls()
+ .cert_path("std/http/testdata/tls/localhost.crt")
+ .key_path("std/http/testdata/tls/localhost.key")
+ .bind(([127, 0, 0, 1], WSS_PORT));
+
let routes = warp::path::full().map(|path: warp::path::FullPath| {
let p = path.as_str();
assert_eq!(&p[0..1], "/");
@@ -426,6 +447,8 @@ pub async fn run_all_servers() {
http_fut,
https_fut,
redirect_server_fut,
+ ws_server_fut,
+ wss_server_fut,
another_redirect_server_fut,
inf_redirect_server_fut,
double_redirect_server_fut,