diff options
-rw-r--r-- | cli/Cargo.toml | 4 | ||||
-rw-r--r-- | cli/dts/lib.deno.shared_globals.d.ts | 98 | ||||
-rw-r--r-- | cli/ops/mod.rs | 1 | ||||
-rw-r--r-- | cli/ops/websocket.rs | 288 | ||||
-rw-r--r-- | cli/rt/11_workers.js | 15 | ||||
-rw-r--r-- | cli/rt/27_websocket.js | 305 | ||||
-rw-r--r-- | cli/rt/99_main.js | 6 | ||||
-rw-r--r-- | cli/tests/integration_tests.rs | 21 | ||||
-rw-r--r-- | cli/tests/websocket_test.ts | 259 | ||||
-rw-r--r-- | cli/web_worker.rs | 1 | ||||
-rw-r--r-- | cli/worker.rs | 1 | ||||
-rw-r--r-- | op_crates/web/01_event.js | 51 | ||||
-rw-r--r-- | test_util/src/lib.rs | 23 |
13 files changed, 1054 insertions, 19 deletions
diff --git a/cli/Cargo.toml b/cli/Cargo.toml index cd2ef0a84..f67158310 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -69,6 +69,8 @@ tempfile = "3.1.0" termcolor = "1.1.0" tokio = { version = "0.2.22", features = ["full"] } tokio-rustls = "0.14.0" +# Keep in-sync with warp. +tokio-tungstenite = { version = "0.11.0" } url = "2.1.1" webpki = "0.21.3" webpki-roots = "0.19.0" @@ -89,8 +91,6 @@ nix = "0.17.0" # Used in benchmark chrono = "0.4" os_pipe = "0.9.2" -# Used for testing inspector. Keep in-sync with warp. -tokio-tungstenite = { version = "0.11.0", features = ["connect"] } test_util = { path = "../test_util" } [package.metadata.winres] diff --git a/cli/dts/lib.deno.shared_globals.d.ts b/cli/dts/lib.deno.shared_globals.d.ts index a903a0693..8e97fe014 100644 --- a/cli/dts/lib.deno.shared_globals.d.ts +++ b/cli/dts/lib.deno.shared_globals.d.ts @@ -595,7 +595,7 @@ declare class Console { * * Since we write to stdout, we can't display anything interactive * we just fall back to `console.dir`. - * + * * > [Console.dirxml](https://developer.mozilla.org/en-US/docs/Web/API/Console/dirxml) * > by Mozilla Contributors is licensed under CC-BY-SA 2.5. */ @@ -1436,3 +1436,99 @@ interface ErrorConstructor { // TODO(nayeemrmn): Support `Error.prepareStackTrace()`. We currently use this // internally in a way that makes it unavailable for users. } + +interface CloseEventInit extends EventInit { + code?: number; + reason?: string; + wasClean?: boolean; +} + +interface CloseEvent extends Event { + /** + * Returns the WebSocket connection close code provided by the server. + */ + readonly code: number; + /** + * Returns the WebSocket connection close reason provided by the server. + */ + readonly reason: string; + /** + * Returns true if the connection closed cleanly; false otherwise. + */ + readonly wasClean: boolean; +} + +declare var CloseEvent: { + prototype: CloseEvent; + new(type: string, eventInitDict?: CloseEventInit): CloseEvent; +}; + +interface WebSocketEventMap { + "close": CloseEvent; + "error": Event; + "message": MessageEvent; + "open": Event; +} + +/** Provides the API for creating and managing a WebSocket connection to a server, as well as for sending and receiving data on the connection. */ +interface WebSocket extends EventTarget { + /** + * Returns a string that indicates how binary data from the WebSocket object is exposed to scripts: + * + * Can be set, to change how binary data is returned. The default is "blob". + */ + binaryType: BinaryType; + /** + * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network. + * + * If the WebSocket connection is closed, this attribute's value will only increase with each call to the send() method. (The number does not reset to zero once the connection closes.) + */ + readonly bufferedAmount: number; + /** + * Returns the extensions selected by the server, if any. + */ + readonly extensions: string; + onclose: ((this: WebSocket, ev: CloseEvent) => any) | null; + onerror: ((this: WebSocket, ev: Event) => any) | null; + onmessage: ((this: WebSocket, ev: MessageEvent) => any) | null; + onopen: ((this: WebSocket, ev: Event) => any) | null; + /** + * Returns the subprotocol selected by the server, if any. It can be used in conjunction with the array form of the constructor's second argument to perform subprotocol negotiation. + */ + readonly protocol: string; + /** + * Returns the state of the WebSocket object's connection. It can have the values described below. + */ + readonly readyState: number; + /** + * Returns the URL that was used to establish the WebSocket connection. + */ + readonly url: string; + /** + * Closes the WebSocket connection, optionally using code as the the WebSocket connection close code and reason as the the WebSocket connection close reason. + */ + close(code?: number, reason?: string): void; + /** + * Transmits data using the WebSocket connection. data can be a string, a Blob, an ArrayBuffer, or an ArrayBufferView. + */ + send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void; + readonly CLOSED: number; + readonly CLOSING: number; + readonly CONNECTING: number; + readonly OPEN: number; + addEventListener<K extends keyof WebSocketEventMap>(type: K, listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, options?: boolean | AddEventListenerOptions): void; + addEventListener(type: string, listener: EventListenerOrEventListenerObject, options?: boolean | AddEventListenerOptions): void; + removeEventListener<K extends keyof WebSocketEventMap>(type: K, listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, options?: boolean | EventListenerOptions): void; + removeEventListener(type: string, listener: EventListenerOrEventListenerObject, options?: boolean | EventListenerOptions): void; +} + +declare var WebSocket: { + prototype: WebSocket; + new(url: string, protocols?: string | string[]): WebSocket; + readonly CLOSED: number; + readonly CLOSING: number; + readonly CONNECTING: number; + readonly OPEN: number; +}; + +type BinaryType = "arraybuffer" | "blob"; diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index f8fcea1b5..bc6b4f377 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -33,4 +33,5 @@ pub mod timers; pub mod tls; pub mod tty; pub mod web_worker; +pub mod websocket; pub mod worker_host; diff --git a/cli/ops/websocket.rs b/cli/ops/websocket.rs new file mode 100644 index 000000000..126d67861 --- /dev/null +++ b/cli/ops/websocket.rs @@ -0,0 +1,288 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::state::State; +use core::task::Poll; +use deno_core::ErrBox; +use deno_core::ZeroCopyBuf; +use deno_core::{CoreIsolate, CoreIsolateState}; +use futures::future::{poll_fn, FutureExt}; +use futures::StreamExt; +use futures::{ready, SinkExt}; +use http::{Method, Request, Uri}; +use std::borrow::Cow; +use std::fs::File; +use std::io::BufReader; +use std::rc::Rc; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_rustls::{rustls::ClientConfig, TlsConnector}; +use tokio_tungstenite::stream::Stream as StreamSwitcher; +use tokio_tungstenite::tungstenite::{ + handshake::client::Response, protocol::frame::coding::CloseCode, + protocol::CloseFrame, Error, Message, +}; +use tokio_tungstenite::{client_async, WebSocketStream}; +use webpki::DNSNameRef; + +pub fn init(i: &mut CoreIsolate, s: &Rc<State>) { + i.register_op("op_ws_create", s.stateful_json_op2(op_ws_create)); + i.register_op("op_ws_send", s.stateful_json_op2(op_ws_send)); + i.register_op("op_ws_close", s.stateful_json_op2(op_ws_close)); + i.register_op("op_ws_next_event", s.stateful_json_op2(op_ws_next_event)); +} + +type MaybeTlsStream = + StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>; + +type WsStream = WebSocketStream<MaybeTlsStream>; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateArgs { + url: String, + protocols: String, +} + +pub fn op_ws_create( + isolate_state: &mut CoreIsolateState, + state: &Rc<State>, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<JsonOp, ErrBox> { + let args: CreateArgs = serde_json::from_value(args)?; + state.check_net_url(&url::Url::parse(&args.url)?)?; + let resource_table = isolate_state.resource_table.clone(); + let ca_file = state.global_state.flags.ca_file.clone(); + let future = async move { + let uri: Uri = args.url.parse().unwrap(); + let request = Request::builder() + .method(Method::GET) + .uri(&uri) + .header("Sec-WebSocket-Protocol", args.protocols) + .body(()) + .unwrap(); + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { + Some("wss") => 443, + Some("ws") => 80, + _ => unreachable!(), + }); + let addr = format!("{}:{}", domain, port); + let try_socket = TcpStream::connect(addr).await; + let tcp_socket = match try_socket.map_err(Error::Io) { + Ok(socket) => socket, + Err(_) => return Ok(json!({"success": false})), + }; + + let socket: MaybeTlsStream = match uri.scheme_str() { + Some("ws") => StreamSwitcher::Plain(tcp_socket), + Some("wss") => { + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + + if let Some(path) = ca_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + config.root_store.add_pem_file(reader).unwrap(); + } + + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; + StreamSwitcher::Tls(tls_socket) + } + _ => unreachable!(), + }; + + let (stream, response): (WsStream, Response) = + client_async(request, socket).await.unwrap(); + + let rid = { + let mut resource_table = resource_table.borrow_mut(); + resource_table.add("webSocketStream", Box::new(stream)) + }; + + let protocol = match response.headers().get("Sec-WebSocket-Protocol") { + Some(header) => header.to_str().unwrap(), + None => "", + }; + let extensions = response + .headers() + .get_all("Sec-WebSocket-Extensions") + .iter() + .map(|header| header.to_str().unwrap()) + .collect::<String>(); + Ok(json!({ + "success": true, + "rid": rid, + "protocol": protocol, + "extensions": extensions + })) + }; + Ok(JsonOp::Async(future.boxed_local())) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SendArgs { + rid: u32, + text: Option<String>, +} + +pub fn op_ws_send( + isolate_state: &mut CoreIsolateState, + _state: &Rc<State>, + args: Value, + zero_copy: &mut [ZeroCopyBuf], +) -> Result<JsonOp, ErrBox> { + let args: SendArgs = serde_json::from_value(args)?; + + let mut maybe_msg = Some(match args.text { + Some(text) => Message::Text(text), + None => Message::Binary(zero_copy[0].to_owned().to_vec()), + }); + let resource_table = isolate_state.resource_table.clone(); + let rid = args.rid; + + let future = poll_fn(move |cx| { + let mut resource_table = resource_table.borrow_mut(); + let stream = resource_table + .get_mut::<WsStream>(rid) + .ok_or_else(ErrBox::bad_resource_id)?; + + // TODO(ry) Handle errors below instead of unwrap. + // Need to map tungstenite::error::Error to ErrBox. + + ready!(stream.poll_ready_unpin(cx)).unwrap(); + if let Some(msg) = maybe_msg.take() { + stream.start_send_unpin(msg).unwrap(); + } + ready!(stream.poll_flush_unpin(cx)).unwrap(); + + Poll::Ready(Ok(json!({}))) + }); + Ok(JsonOp::Async(future.boxed_local())) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CloseArgs { + rid: u32, + code: Option<u16>, + reason: Option<String>, +} + +pub fn op_ws_close( + isolate_state: &mut CoreIsolateState, + _state: &Rc<State>, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<JsonOp, ErrBox> { + let args: CloseArgs = serde_json::from_value(args)?; + let resource_table = isolate_state.resource_table.clone(); + let rid = args.rid; + let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame { + code: CloseCode::from(c), + reason: match args.reason { + Some(reason) => Cow::from(reason), + None => Default::default(), + }, + }))); + + let future = poll_fn(move |cx| { + let mut resource_table = resource_table.borrow_mut(); + let stream = resource_table + .get_mut::<WsStream>(rid) + .ok_or_else(ErrBox::bad_resource_id)?; + + // TODO(ry) Handle errors below instead of unwrap. + // Need to map tungstenite::error::Error to ErrBox. + + ready!(stream.poll_ready_unpin(cx)).unwrap(); + if let Some(msg) = maybe_msg.take() { + stream.start_send_unpin(msg).unwrap(); + } + ready!(stream.poll_flush_unpin(cx)).unwrap(); + ready!(stream.poll_close_unpin(cx)).unwrap(); + + Poll::Ready(Ok(json!({}))) + }); + + Ok(JsonOp::Async(future.boxed_local())) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct NextEventArgs { + rid: u32, +} + +pub fn op_ws_next_event( + isolate_state: &mut CoreIsolateState, + _state: &Rc<State>, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<JsonOp, ErrBox> { + let args: NextEventArgs = serde_json::from_value(args)?; + let resource_table = isolate_state.resource_table.clone(); + let future = poll_fn(move |cx| { + let mut resource_table = resource_table.borrow_mut(); + let stream = resource_table + .get_mut::<WsStream>(args.rid) + .ok_or_else(ErrBox::bad_resource_id)?; + + stream.poll_next_unpin(cx).map(|val| { + match val { + Some(val) => { + match val { + Ok(message) => { + match message { + Message::Text(text) => Ok(json!({ + "type": "string", + "data": text + })), + Message::Binary(data) => { + Ok(json!({ //TODO: don't use json to send binary data + "type": "binary", + "data": data + })) + } + Message::Close(frame) => { + if let Some(frame) = frame { + let code: u16 = frame.code.into(); + Ok(json!({ + "type": "close", + "code": code, + "reason": frame.reason.as_ref() + })) + } else { + Ok(json!({ "type": "close" })) + } + } + Message::Ping(_) => Ok(json!({"type": "ping"})), + Message::Pong(_) => Ok(json!({"type": "pong"})), + } + } + Err(_) => Ok(json!({ + "type": "error", + })), + } + } + None => { + resource_table + .close(args.rid) + .ok_or_else(ErrBox::bad_resource_id)?; + Ok(json!({ + "type": "closed", + })) + } + } + }) + }); + + Ok(JsonOp::Async(future.boxed_local())) +} diff --git a/cli/rt/11_workers.js b/cli/rt/11_workers.js index 8ae0d5ad5..9a23e3dd8 100644 --- a/cli/rt/11_workers.js +++ b/cli/rt/11_workers.js @@ -39,20 +39,6 @@ const encoder = new TextEncoder(); const decoder = new TextDecoder(); - class MessageEvent extends Event { - constructor(type, eventInitDict) { - super(type, { - bubbles: eventInitDict?.bubbles ?? false, - cancelable: eventInitDict?.cancelable ?? false, - composed: eventInitDict?.composed ?? false, - }); - - this.data = eventInitDict?.data ?? null; - this.origin = eventInitDict?.origin ?? ""; - this.lastEventId = eventInitDict?.lastEventId ?? ""; - } - } - function encodeMessage(data) { const dataJson = JSON.stringify(data); return encoder.encode(dataJson); @@ -226,6 +212,5 @@ window.__bootstrap.worker = { Worker, - MessageEvent, }; })(this); diff --git a/cli/rt/27_websocket.js b/cli/rt/27_websocket.js new file mode 100644 index 000000000..0b113ebca --- /dev/null +++ b/cli/rt/27_websocket.js @@ -0,0 +1,305 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +((window) => { + const { sendAsync } = window.__bootstrap.dispatchJson; + const { close } = window.__bootstrap.resources; + const { requiredArguments } = window.__bootstrap.webUtil; + const CONNECTING = 0; + const OPEN = 1; + const CLOSING = 2; + const CLOSED = 3; + + class WebSocket extends EventTarget { + #readyState = CONNECTING; + + constructor(url, protocols = []) { + super(); + requiredArguments("WebSocket", arguments.length, 1); + + const wsURL = new URL(url); + + if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") { + throw new DOMException( + "Only ws & wss schemes are allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + if (wsURL.hash !== "" || wsURL.href.endsWith("#")) { + throw new DOMException( + "Fragments are not allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + this.#url = wsURL.href; + + if (protocols && typeof protocols === "string") { + protocols = [protocols]; + } + + if ( + protocols.some((x) => protocols.indexOf(x) !== protocols.lastIndexOf(x)) + ) { + throw new DOMException( + "Can't supply multiple times the same protocol.", + "SyntaxError", + ); + } + + sendAsync("op_ws_create", { + url: wsURL.href, + protocols: protocols.join("; "), + }).then((create) => { + if (create.success) { + this.#rid = create.rid; + this.#extensions = create.extensions; + this.#protocol = create.protocol; + + if (this.#readyState === CLOSING) { + sendAsync("op_ws_close", { + rid: this.#rid, + }).then(() => { + this.#readyState = CLOSED; + + const errEvent = new Event("error"); + errEvent.target = this; + this.onerror?.(errEvent); + this.dispatchEvent(errEvent); + + const event = new CloseEvent("close"); + event.target = this; + this.onclose?.(event); + this.dispatchEvent(event); + close(this.#rid); + }); + + const event = new Event("error"); + event.target = this; + this.onerror?.(event); + this.dispatchEvent(event); + } else { + this.#readyState = OPEN; + const event = new Event("open"); + event.target = this; + this.onopen?.(event); + this.dispatchEvent(event); + + this.#eventLoop(); + } + } else { + this.#readyState = CLOSED; + + const errEvent = new Event("error"); + errEvent.target = this; + this.onerror?.(errEvent); + this.dispatchEvent(errEvent); + + const closeEvent = new CloseEvent("close"); + closeEvent.target = this; + this.onclose?.(closeEvent); + this.dispatchEvent(closeEvent); + } + }); + } + + get CONNECTING() { + return CONNECTING; + } + get OPEN() { + return OPEN; + } + get CLOSING() { + return CLOSING; + } + get CLOSED() { + return CLOSED; + } + + get readyState() { + return this.#readyState; + } + + #extensions = ""; + #protocol = ""; + #url = ""; + #rid; + + get extensions() { + return this.#extensions; + } + get protocol() { + return this.#protocol; + } + + #binaryType = "blob"; + get binaryType() { + return this.#binaryType; + } + set binaryType(value) { + if (value === "blob" || value === "arraybuffer") { + this.#binaryType = value; + } + } + #bufferedAmount = 0; + get bufferedAmount() { + return this.#bufferedAmount; + } + + get url() { + return this.#url; + } + + onopen = () => {}; + onerror = () => {}; + onclose = () => {}; + onmessage = () => {}; + + send(data) { + requiredArguments("WebSocket.send", arguments.length, 1); + + if (this.#readyState != OPEN) { + throw Error("readyState not OPEN"); + } + + const sendTypedArray = (ta) => { + this.#bufferedAmount += ta.size; + sendAsync("op_ws_send", { + rid: this.#rid, + }, ta).then(() => { + this.#bufferedAmount -= ta.size; + }); + }; + + if (data instanceof Blob) { + data.slice().arrayBuffer().then((ab) => + sendTypedArray(new DataView(ab)) + ); + } else if ( + data instanceof Int8Array || data instanceof Int16Array || + data instanceof Int32Array || data instanceof Uint8Array || + data instanceof Uint16Array || data instanceof Uint32Array || + data instanceof Uint8ClampedArray || data instanceof Float32Array || + data instanceof Float64Array || data instanceof DataView + ) { + sendTypedArray(data); + } else if (data instanceof ArrayBuffer) { + sendTypedArray(new DataView(data)); + } else { + const string = String(data); + const encoder = new TextEncoder(); + const d = encoder.encode(string); + this.#bufferedAmount += d.size; + sendAsync("op_ws_send", { + rid: this.#rid, + text: string, + }).then(() => { + this.#bufferedAmount -= d.size; + }); + } + } + + close(code, reason) { + if (code && (code !== 1000 && !(3000 <= code > 5000))) { + throw new DOMException( + "The close code must be either 1000 or in the range of 3000 to 4999.", + "NotSupportedError", + ); + } + + const encoder = new TextEncoder(); + if (reason && encoder.encode(reason).byteLength > 123) { + throw new DOMException( + "The close reason may not be longer than 123 bytes.", + "SyntaxError", + ); + } + + if (this.#readyState === CONNECTING) { + this.#readyState = CLOSING; + } else if (this.#readyState === OPEN) { + this.#readyState = CLOSING; + + sendAsync("op_ws_close", { + rid: this.#rid, + code, + reason, + }).then(() => { + this.#readyState = CLOSED; + const event = new CloseEvent("close", { + wasClean: true, + code, + reason, + }); + event.target = this; + this.onclose?.(event); + this.dispatchEvent(event); + close(this.#rid); + }); + } + } + + async #eventLoop() { + if (this.#readyState === OPEN) { + const message = await sendAsync("op_ws_next_event", { rid: this.#rid }); + if (message.type === "string" || message.type === "binary") { + let data; + + if (message.type === "string") { + data = message.data; + } else { + if (this.binaryType === "blob") { + data = new Blob([new Uint8Array(message.data)]); + } else { + data = new Uint8Array(message.data).buffer; + } + } + + const event = new MessageEvent("message", { + data, + origin: this.#url, + }); + event.target = this; + this.onmessage?.(event); + this.dispatchEvent(event); + + this.#eventLoop(); + } else if (message.type === "close") { + this.#readyState = CLOSED; + const event = new CloseEvent("close", { + wasClean: true, + code: message.code, + reason: message.reason, + }); + event.target = this; + this.onclose?.(event); + this.dispatchEvent(event); + } else if (message.type === "error") { + const event = new Event("error"); + event.target = this; + this.onerror?.(event); + this.dispatchEvent(event); + } + } + } + } + + Object.defineProperties(WebSocket, { + CONNECTING: { + value: 0, + }, + OPEN: { + value: 1, + }, + CLOSING: { + value: 2, + }, + CLOSED: { + value: 3, + }, + }); + + window.__bootstrap.webSocket = { + WebSocket, + }; +})(this); diff --git a/cli/rt/99_main.js b/cli/rt/99_main.js index c6d6d1395..ede47980c 100644 --- a/cli/rt/99_main.js +++ b/cli/rt/99_main.js @@ -30,6 +30,7 @@ delete Object.prototype.__proto__; const progressEvent = window.__bootstrap.progressEvent; const fileReader = window.__bootstrap.fileReader; const formData = window.__bootstrap.formData; + const webSocket = window.__bootstrap.webSocket; const request = window.__bootstrap.request; const fetch = window.__bootstrap.fetch; const denoNs = window.__bootstrap.denoNs; @@ -80,7 +81,7 @@ delete Object.prototype.__proto__; let isClosing = false; async function workerMessageRecvCallback(data) { - const msgEvent = new worker.MessageEvent("message", { + const msgEvent = new MessageEvent("message", { cancelable: false, data, }); @@ -232,10 +233,13 @@ delete Object.prototype.__proto__; CustomEvent: util.nonEnumerable(CustomEvent), DOMException: util.nonEnumerable(DOMException), ErrorEvent: util.nonEnumerable(ErrorEvent), + CloseEvent: util.nonEnumerable(CloseEvent), + MessageEvent: util.nonEnumerable(MessageEvent), Event: util.nonEnumerable(Event), EventTarget: util.nonEnumerable(EventTarget), Headers: util.nonEnumerable(headers.Headers), FormData: util.nonEnumerable(formData.FormData), + WebSocket: util.nonEnumerable(webSocket.WebSocket), ReadableStream: util.nonEnumerable(streams.ReadableStream), Request: util.nonEnumerable(request.Request), Response: util.nonEnumerable(fetch.Response), diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index 08c68a9ae..94d410dcc 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -3205,6 +3205,27 @@ async fn inspector_runtime_evaluate_does_not_crash() { } #[test] +fn websocket() { + let _g = util::http_server(); + + let script = util::tests_path().join("websocket_test.ts"); + let root_ca = util::tests_path().join("tls/RootCA.pem"); + let status = util::deno_cmd() + .arg("test") + .arg("--unstable") + .arg("--allow-net") + .arg("--cert") + .arg(root_ca) + .arg(script) + .spawn() + .unwrap() + .wait() + .unwrap(); + + assert!(status.success()); +} + +#[test] fn exec_path() { let output = util::deno_cmd() .current_dir(util::root_path()) diff --git a/cli/tests/websocket_test.ts b/cli/tests/websocket_test.ts new file mode 100644 index 000000000..0fb9af951 --- /dev/null +++ b/cli/tests/websocket_test.ts @@ -0,0 +1,259 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { + assertEquals, + assertThrows, + createResolvable, + fail, +} from "./unit/test_util.ts"; + +Deno.test("invalid scheme", () => { + assertThrows(() => new WebSocket("foo://localhost:4242")); +}); + +Deno.test("fragment", () => { + assertThrows(() => new WebSocket("ws://localhost:4242/#")); + assertThrows(() => new WebSocket("ws://localhost:4242/#foo")); +}); + +Deno.test("duplicate protocols", () => { + assertThrows(() => new WebSocket("ws://localhost:4242", ["foo", "foo"])); +}); + +Deno.test("invalid server", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:2121"); + let err = false; + ws.onerror = (): void => { + err = true; + }; + ws.onclose = (): void => { + if (err) { + promise.resolve(); + } else { + fail(); + } + }; + ws.onopen = (): void => fail(); + await promise; +}); + +Deno.test("connect & close", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.onerror = (): void => fail(); + ws.onopen = (): void => { + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("connect & abort", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.close(); + let err = false; + ws.onerror = (): void => { + err = true; + }; + ws.onclose = (): void => { + if (err) { + promise.resolve(); + } else { + fail(); + } + }; + ws.onopen = (): void => fail(); + await promise; +}); + +Deno.test("connect & close custom valid code", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.close(1000); + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("connect & close custom invalid code", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.onerror = (): void => fail(); + ws.onopen = (): void => { + assertThrows(() => ws.close(1001)); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("connect & close custom valid reason", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.close(1000, "foo"); + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("connect & close custom invalid reason", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.onerror = (): void => fail(); + ws.onopen = (): void => { + assertThrows(() => ws.close(1000, "".padEnd(124, "o"))); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("echo string", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.send("foo"); + ws.onmessage = (e): void => { + assertEquals(e.data, "foo"); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("echo string tls", async () => { + const promise = createResolvable(); + const ws = new WebSocket("wss://localhost:4243"); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.send("foo"); + ws.onmessage = (e): void => { + assertEquals(e.data, "foo"); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("echo blob with binaryType blob", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + const blob = new Blob(["foo"]); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.send(blob); + ws.onmessage = (e): void => { + e.data.text().then((actual: string) => { + blob.text().then((expected) => { + assertEquals(actual, expected); + }); + }); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("echo blob with binaryType arraybuffer", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.binaryType = "arraybuffer"; + const blob = new Blob(["foo"]); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.send(blob); + ws.onmessage = (e): void => { + blob.arrayBuffer().then((expected) => { + assertEquals(e.data, expected); + }); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("echo uint8array with binaryType blob", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + const uint = new Uint8Array([102, 111, 111]); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.send(uint); + ws.onmessage = (e): void => { + e.data.arrayBuffer().then((actual: ArrayBuffer) => { + assertEquals(actual, uint.buffer); + }); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("echo uint8array with binaryType arraybuffer", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.binaryType = "arraybuffer"; + const uint = new Uint8Array([102, 111, 111]); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.send(uint); + ws.onmessage = (e): void => { + assertEquals(e.data, uint.buffer); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("echo arraybuffer with binaryType blob", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + const buffer = new ArrayBuffer(3); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.send(buffer); + ws.onmessage = (e): void => { + e.data.arrayBuffer().then((actual: ArrayBuffer) => { + assertEquals(actual, buffer); + }); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); + +Deno.test("echo arraybuffer with binaryType arraybuffer", async () => { + const promise = createResolvable(); + const ws = new WebSocket("ws://localhost:4242"); + ws.binaryType = "arraybuffer"; + const buffer = new ArrayBuffer(3); + ws.onerror = (): void => fail(); + ws.onopen = (): void => ws.send(buffer); + ws.onmessage = (e): void => { + assertEquals(e.data, buffer); + ws.close(); + }; + ws.onclose = (): void => { + promise.resolve(); + }; + await promise; +}); diff --git a/cli/web_worker.rs b/cli/web_worker.rs index 8d9c00499..9b42ebe32 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -127,6 +127,7 @@ impl WebWorker { ops::errors::init(isolate, &state); ops::timers::init(isolate, &state); ops::fetch::init(isolate, &state); + ops::websocket::init(isolate, &state); if has_deno_namespace { ops::runtime_compiler::init(isolate, &state); diff --git a/cli/worker.rs b/cli/worker.rs index 0129242e6..3773871dc 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -263,6 +263,7 @@ impl MainWorker { ops::runtime_compiler::init(isolate, &state); ops::errors::init(isolate, &state); ops::fetch::init(isolate, &state); + ops::websocket::init(isolate, &state); ops::fs::init(isolate, &state); ops::fs_events::init(isolate, &state); ops::idna::init(isolate, &state); diff --git a/op_crates/web/01_event.js b/op_crates/web/01_event.js index 48899e6fd..5b21d4402 100644 --- a/op_crates/web/01_event.js +++ b/op_crates/web/01_event.js @@ -1011,6 +1011,55 @@ "error", ]); + class CloseEvent extends Event { + #wasClean = ""; + #code = ""; + #reason = ""; + + get wasClean() { + return this.#wasClean; + } + get code() { + return this.#code; + } + get reason() { + return this.#reason; + } + + constructor(type, { + bubbles, + cancelable, + composed, + wasClean = false, + code = 0, + reason = "", + } = {}) { + super(type, { + bubbles: bubbles, + cancelable: cancelable, + composed: composed, + }); + + this.#wasClean = wasClean; + this.#code = code; + this.#reason = reason; + } + } + + class MessageEvent extends Event { + constructor(type, eventInitDict) { + super(type, { + bubbles: eventInitDict?.bubbles ?? false, + cancelable: eventInitDict?.cancelable ?? false, + composed: eventInitDict?.composed ?? false, + }); + + this.data = eventInitDict?.data ?? null; + this.origin = eventInitDict?.origin ?? ""; + this.lastEventId = eventInitDict?.lastEventId ?? ""; + } + } + class CustomEvent extends Event { #detail = null; @@ -1037,6 +1086,8 @@ window.Event = Event; window.EventTarget = EventTarget; window.ErrorEvent = ErrorEvent; + window.CloseEvent = CloseEvent; + window.MessageEvent = MessageEvent; window.CustomEvent = CustomEvent; window.dispatchEvent = EventTarget.prototype.dispatchEvent; window.addEventListener = EventTarget.prototype.addEventListener; diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs index 1f8548e34..832c40f59 100644 --- a/test_util/src/lib.rs +++ b/test_util/src/lib.rs @@ -39,6 +39,8 @@ const DOUBLE_REDIRECTS_PORT: u16 = 4548; const INF_REDIRECTS_PORT: u16 = 4549; const REDIRECT_ABSOLUTE_PORT: u16 = 4550; const HTTPS_PORT: u16 = 5545; +const WS_PORT: u16 = 4242; +const WSS_PORT: u16 = 4243; pub const PERMISSION_VARIANTS: [&str; 5] = ["read", "write", "env", "net", "run"]; @@ -135,6 +137,25 @@ pub async fn run_all_servers() { let redirect_server_fut = warp::serve(routes).bind(([127, 0, 0, 1], REDIRECT_PORT)); + let websocket_route = warp::ws().map(|ws: warp::ws::Ws| { + ws.on_upgrade(|websocket| { + use futures::stream::StreamExt; + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + if let Err(e) = result { + println!("websocket server error: {:?}", e); + } + }) + }) + }); + let ws_server_fut = + warp::serve(websocket_route).bind(([127, 0, 0, 1], WS_PORT)); + let wss_server_fut = warp::serve(websocket_route) + .tls() + .cert_path("std/http/testdata/tls/localhost.crt") + .key_path("std/http/testdata/tls/localhost.key") + .bind(([127, 0, 0, 1], WSS_PORT)); + let routes = warp::path::full().map(|path: warp::path::FullPath| { let p = path.as_str(); assert_eq!(&p[0..1], "/"); @@ -426,6 +447,8 @@ pub async fn run_all_servers() { http_fut, https_fut, redirect_server_fut, + ws_server_fut, + wss_server_fut, another_redirect_server_fut, inf_redirect_server_fut, double_redirect_server_fut, |