diff options
author | Luca Casonato <lucacasonato@yahoo.com> | 2021-01-06 16:57:28 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-06 16:57:28 +0100 |
commit | 2e18fcebcc2ee931ee952ac2fe2175d6ec7acf69 (patch) | |
tree | 352e53d98cd46604e7661ba26df51dc9c0498b2a /op_crates | |
parent | 1959aca2a978642b73ea815b9b89dd3219830cef (diff) |
refactor: move WebSocket API to an op_crate (#9026)
Diffstat (limited to 'op_crates')
-rw-r--r-- | op_crates/websocket/01_websocket.js | 376 | ||||
-rw-r--r-- | op_crates/websocket/Cargo.toml | 24 | ||||
-rw-r--r-- | op_crates/websocket/README.md | 5 | ||||
-rw-r--r-- | op_crates/websocket/lib.deno_websocket.d.ts | 112 | ||||
-rw-r--r-- | op_crates/websocket/lib.rs | 347 |
5 files changed, 864 insertions, 0 deletions
diff --git a/op_crates/websocket/01_websocket.js b/op_crates/websocket/01_websocket.js new file mode 100644 index 000000000..6cc23eb95 --- /dev/null +++ b/op_crates/websocket/01_websocket.js @@ -0,0 +1,376 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +((window) => { + const core = window.Deno.core; + + // provided by "deno_web" + const { URL } = window.__bootstrap.url; + + const CONNECTING = 0; + const OPEN = 1; + const CLOSING = 2; + const CLOSED = 3; + + function requiredArguments( + name, + length, + required, + ) { + if (length < required) { + const errMsg = `${name} requires at least ${required} argument${ + required === 1 ? "" : "s" + }, but only ${length} present`; + throw new TypeError(errMsg); + } + } + + const handlerSymbol = Symbol("eventHandlers"); + function makeWrappedHandler(handler) { + function wrappedHandler(...args) { + if (typeof wrappedHandler.handler !== "function") { + return; + } + return wrappedHandler.handler.call(this, ...args); + } + wrappedHandler.handler = handler; + return wrappedHandler; + } + // TODO(lucacasonato) reuse when we can reuse code between web crates + function defineEventHandler(emitter, name) { + // HTML specification section 8.1.5.1 + Object.defineProperty(emitter, `on${name}`, { + get() { + return this[handlerSymbol]?.get(name)?.handler; + }, + set(value) { + if (!this[handlerSymbol]) { + this[handlerSymbol] = new Map(); + } + let handlerWrapper = this[handlerSymbol]?.get(name); + if (handlerWrapper) { + handlerWrapper.handler = value; + } else { + handlerWrapper = makeWrappedHandler(value); + this.addEventListener(name, handlerWrapper); + } + this[handlerSymbol].set(name, handlerWrapper); + }, + configurable: true, + enumerable: true, + }); + } + + class WebSocket extends EventTarget { + #readyState = CONNECTING; + + constructor(url, protocols = []) { + super(); + requiredArguments("WebSocket", arguments.length, 1); + + const wsURL = new URL(url); + + if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") { + throw new DOMException( + "Only ws & wss schemes are allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + if (wsURL.hash !== "" || wsURL.href.endsWith("#")) { + throw new DOMException( + "Fragments are not allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + this.#url = wsURL.href; + + core.jsonOpSync("op_ws_check_permission", { + url: this.#url, + }); + + if (protocols && typeof protocols === "string") { + protocols = [protocols]; + } + + if ( + protocols.some((x) => protocols.indexOf(x) !== protocols.lastIndexOf(x)) + ) { + throw new DOMException( + "Can't supply multiple times the same protocol.", + "SyntaxError", + ); + } + + core.jsonOpAsync("op_ws_create", { + url: wsURL.href, + protocols: protocols.join(", "), + }).then((create) => { + if (create.success) { + this.#rid = create.rid; + this.#extensions = create.extensions; + this.#protocol = create.protocol; + + if (this.#readyState === CLOSING) { + core.jsonOpAsync("op_ws_close", { + rid: this.#rid, + }).then(() => { + this.#readyState = CLOSED; + + const errEvent = new ErrorEvent("error"); + errEvent.target = this; + this.dispatchEvent(errEvent); + + const event = new CloseEvent("close"); + event.target = this; + this.dispatchEvent(event); + core.close(this.#rid); + }); + } else { + this.#readyState = OPEN; + const event = new Event("open"); + event.target = this; + this.dispatchEvent(event); + + this.#eventLoop(); + } + } else { + this.#readyState = CLOSED; + + const errEvent = new ErrorEvent("error"); + errEvent.target = this; + this.dispatchEvent(errEvent); + + const closeEvent = new CloseEvent("close"); + closeEvent.target = this; + this.dispatchEvent(closeEvent); + } + }).catch((err) => { + this.#readyState = CLOSED; + + const errorEv = new ErrorEvent( + "error", + { error: err, message: err.toString() }, + ); + errorEv.target = this; + this.dispatchEvent(errorEv); + + const closeEv = new CloseEvent("close"); + closeEv.target = this; + this.dispatchEvent(closeEv); + }); + } + + get CONNECTING() { + return CONNECTING; + } + get OPEN() { + return OPEN; + } + get CLOSING() { + return CLOSING; + } + get CLOSED() { + return CLOSED; + } + + get readyState() { + return this.#readyState; + } + + #extensions = ""; + #protocol = ""; + #url = ""; + #rid; + + get extensions() { + return this.#extensions; + } + get protocol() { + return this.#protocol; + } + + #binaryType = "blob"; + get binaryType() { + return this.#binaryType; + } + set binaryType(value) { + if (value === "blob" || value === "arraybuffer") { + this.#binaryType = value; + } + } + #bufferedAmount = 0; + get bufferedAmount() { + return this.#bufferedAmount; + } + + get url() { + return this.#url; + } + + send(data) { + requiredArguments("WebSocket.send", arguments.length, 1); + + if (this.#readyState != OPEN) { + throw Error("readyState not OPEN"); + } + + const sendTypedArray = (ta) => { + this.#bufferedAmount += ta.size; + core.jsonOpAsync("op_ws_send", { + rid: this.#rid, + kind: "binary", + }, ta).then(() => { + this.#bufferedAmount -= ta.size; + }); + }; + + if (data instanceof Blob) { + data.slice().arrayBuffer().then((ab) => + sendTypedArray(new DataView(ab)) + ); + } else if ( + data instanceof Int8Array || data instanceof Int16Array || + data instanceof Int32Array || data instanceof Uint8Array || + data instanceof Uint16Array || data instanceof Uint32Array || + data instanceof Uint8ClampedArray || data instanceof Float32Array || + data instanceof Float64Array || data instanceof DataView + ) { + sendTypedArray(data); + } else if (data instanceof ArrayBuffer) { + sendTypedArray(new DataView(data)); + } else { + const string = String(data); + const encoder = new TextEncoder(); + const d = encoder.encode(string); + this.#bufferedAmount += d.size; + core.jsonOpAsync("op_ws_send", { + rid: this.#rid, + kind: "text", + text: string, + }).then(() => { + this.#bufferedAmount -= d.size; + }); + } + } + + close(code, reason) { + if (code && (code !== 1000 && !(3000 <= code > 5000))) { + throw new DOMException( + "The close code must be either 1000 or in the range of 3000 to 4999.", + "NotSupportedError", + ); + } + + const encoder = new TextEncoder(); + if (reason && encoder.encode(reason).byteLength > 123) { + throw new DOMException( + "The close reason may not be longer than 123 bytes.", + "SyntaxError", + ); + } + + if (this.#readyState === CONNECTING) { + this.#readyState = CLOSING; + } else if (this.#readyState === OPEN) { + this.#readyState = CLOSING; + + core.jsonOpAsync("op_ws_close", { + rid: this.#rid, + code, + reason, + }).then(() => { + this.#readyState = CLOSED; + const event = new CloseEvent("close", { + wasClean: true, + code, + reason, + }); + event.target = this; + this.dispatchEvent(event); + core.close(this.#rid); + }); + } + } + + async #eventLoop() { + if (this.#readyState === OPEN) { + const message = await core.jsonOpAsync( + "op_ws_next_event", + { rid: this.#rid }, + ); + if (message.type === "string" || message.type === "binary") { + let data; + + if (message.type === "string") { + data = message.data; + } else { + if (this.binaryType === "blob") { + data = new Blob([new Uint8Array(message.data)]); + } else { + data = new Uint8Array(message.data).buffer; + } + } + + const event = new MessageEvent("message", { + data, + origin: this.#url, + }); + event.target = this; + this.dispatchEvent(event); + + this.#eventLoop(); + } else if (message.type === "ping") { + core.jsonOpAsync("op_ws_send", { + rid: this.#rid, + kind: "pong", + }); + + this.#eventLoop(); + } else if (message.type === "close") { + this.#readyState = CLOSED; + const event = new CloseEvent("close", { + wasClean: true, + code: message.code, + reason: message.reason, + }); + event.target = this; + this.dispatchEvent(event); + } else if (message.type === "error") { + this.#readyState = CLOSED; + + const errorEv = new ErrorEvent("error"); + errorEv.target = this; + this.dispatchEvent(errorEv); + + this.#readyState = CLOSED; + const closeEv = new CloseEvent("close"); + closeEv.target = this; + this.dispatchEvent(closeEv); + } + } + } + } + + Object.defineProperties(WebSocket, { + CONNECTING: { + value: 0, + }, + OPEN: { + value: 1, + }, + CLOSING: { + value: 2, + }, + CLOSED: { + value: 3, + }, + }); + + defineEventHandler(WebSocket.prototype, "message"); + defineEventHandler(WebSocket.prototype, "error"); + defineEventHandler(WebSocket.prototype, "close"); + defineEventHandler(WebSocket.prototype, "open"); + + window.__bootstrap.webSocket = { WebSocket }; +})(this); diff --git a/op_crates/websocket/Cargo.toml b/op_crates/websocket/Cargo.toml new file mode 100644 index 000000000..20fba9804 --- /dev/null +++ b/op_crates/websocket/Cargo.toml @@ -0,0 +1,24 @@ +# Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_websocket" +version = "0.1.0" +edition = "2018" +description = "Implementation of WebSocket API for Deno" +authors = ["the Deno authors"] +license = "MIT" +readme = "README.md" +repository = "https://github.com/denoland/deno" + +[lib] +path = "lib.rs" + +[dependencies] +deno_core = { version = "0.75.0", path = "../../core" } +http = "0.2.1" +tokio = { version = "0.2.22", features = ["full"] } +tokio-rustls = "0.14.1" +tokio-tungstenite = "0.11.0" +serde = { version = "1.0.116", features = ["derive"] } +webpki = "0.21.3" +webpki-roots = "=0.19.0" # Pinned to v0.19.0 to match 'reqwest'. diff --git a/op_crates/websocket/README.md b/op_crates/websocket/README.md new file mode 100644 index 000000000..d6495f397 --- /dev/null +++ b/op_crates/websocket/README.md @@ -0,0 +1,5 @@ +# deno_websocket + +This op crate implements the websocket functions of Deno. + +Spec: https://html.spec.whatwg.org/multipage/web-sockets.html diff --git a/op_crates/websocket/lib.deno_websocket.d.ts b/op_crates/websocket/lib.deno_websocket.d.ts new file mode 100644 index 000000000..d47665c8b --- /dev/null +++ b/op_crates/websocket/lib.deno_websocket.d.ts @@ -0,0 +1,112 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +// deno-lint-ignore-file no-explicit-any + +/// <reference no-default-lib="true" /> +/// <reference lib="esnext" /> + +interface CloseEventInit extends EventInit { + code?: number; + reason?: string; + wasClean?: boolean; +} + +declare class CloseEvent extends Event { + constructor(type: string, eventInitDict?: CloseEventInit); + /** + * Returns the WebSocket connection close code provided by the server. + */ + readonly code: number; + /** + * Returns the WebSocket connection close reason provided by the server. + */ + readonly reason: string; + /** + * Returns true if the connection closed cleanly; false otherwise. + */ + readonly wasClean: boolean; +} + +interface WebSocketEventMap { + close: CloseEvent; + error: Event; + message: MessageEvent; + open: Event; +} + +/** Provides the API for creating and managing a WebSocket connection to a server, as well as for sending and receiving data on the connection. */ +declare class WebSocket extends EventTarget { + constructor(url: string, protocols?: string | string[]); + + static readonly CLOSED: number; + static readonly CLOSING: number; + static readonly CONNECTING: number; + static readonly OPEN: number; + + /** + * Returns a string that indicates how binary data from the WebSocket object is exposed to scripts: + * + * Can be set, to change how binary data is returned. The default is "blob". + */ + binaryType: BinaryType; + /** + * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network. + * + * If the WebSocket connection is closed, this attribute's value will only increase with each call to the send() method. (The number does not reset to zero once the connection closes.) + */ + readonly bufferedAmount: number; + /** + * Returns the extensions selected by the server, if any. + */ + readonly extensions: string; + onclose: ((this: WebSocket, ev: CloseEvent) => any) | null; + onerror: ((this: WebSocket, ev: Event | ErrorEvent) => any) | null; + onmessage: ((this: WebSocket, ev: MessageEvent) => any) | null; + onopen: ((this: WebSocket, ev: Event) => any) | null; + /** + * Returns the subprotocol selected by the server, if any. It can be used in conjunction with the array form of the constructor's second argument to perform subprotocol negotiation. + */ + readonly protocol: string; + /** + * Returns the state of the WebSocket object's connection. It can have the values described below. + */ + readonly readyState: number; + /** + * Returns the URL that was used to establish the WebSocket connection. + */ + readonly url: string; + /** + * Closes the WebSocket connection, optionally using code as the the WebSocket connection close code and reason as the the WebSocket connection close reason. + */ + close(code?: number, reason?: string): void; + /** + * Transmits data using the WebSocket connection. data can be a string, a Blob, an ArrayBuffer, or an ArrayBufferView. + */ + send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void; + readonly CLOSED: number; + readonly CLOSING: number; + readonly CONNECTING: number; + readonly OPEN: number; + addEventListener<K extends keyof WebSocketEventMap>( + type: K, + listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, + options?: boolean | AddEventListenerOptions, + ): void; + addEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions, + ): void; + removeEventListener<K extends keyof WebSocketEventMap>( + type: K, + listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, + options?: boolean | EventListenerOptions, + ): void; + removeEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | EventListenerOptions, + ): void; +} + +type BinaryType = "arraybuffer" | "blob"; diff --git a/op_crates/websocket/lib.rs b/op_crates/websocket/lib.rs new file mode 100644 index 000000000..b688fe9fd --- /dev/null +++ b/op_crates/websocket/lib.rs @@ -0,0 +1,347 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::bad_resource_id; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::stream::SplitSink; +use deno_core::futures::stream::SplitStream; +use deno_core::futures::SinkExt; +use deno_core::futures::StreamExt; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::url; +use deno_core::AsyncRefCell; +use deno_core::BufVec; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::JsRuntime; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::{serde_json, ZeroCopyBuf}; + +use http::{Method, Request, Uri}; +use serde::Deserialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::io::BufReader; +use std::io::Cursor; +use std::path::PathBuf; +use std::rc::Rc; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_rustls::{rustls::ClientConfig, TlsConnector}; +use tokio_tungstenite::stream::Stream as StreamSwitcher; +use tokio_tungstenite::tungstenite::Error as TungsteniteError; +use tokio_tungstenite::tungstenite::{ + handshake::client::Response, protocol::frame::coding::CloseCode, + protocol::CloseFrame, Message, +}; +use tokio_tungstenite::{client_async, WebSocketStream}; +use webpki::DNSNameRef; + +pub use tokio_tungstenite; // Re-export tokio_tungstenite + +#[derive(Clone)] +pub struct WsCaData(pub Vec<u8>); +#[derive(Clone)] +pub struct WsUserAgent(pub String); + +pub trait WebSocketPermissions { + fn check_net_url(&self, _url: &url::Url) -> Result<(), AnyError>; +} + +/// For use with `op_websocket_*` when the user does not want permissions. +pub struct NoWebSocketPermissions; + +impl WebSocketPermissions for NoWebSocketPermissions { + fn check_net_url(&self, _url: &url::Url) -> Result<(), AnyError> { + Ok(()) + } +} + +type MaybeTlsStream = + StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>; + +type WsStream = WebSocketStream<MaybeTlsStream>; +struct WsStreamResource { + tx: AsyncRefCell<SplitSink<WsStream, Message>>, + rx: AsyncRefCell<SplitStream<WsStream>>, + // When a `WsStreamResource` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + cancel: CancelHandle, +} + +impl Resource for WsStreamResource { + fn name(&self) -> Cow<str> { + "webSocketStream".into() + } +} + +impl WsStreamResource {} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CheckPermissionArgs { + url: String, +} + +// This op is needed because creating a WS instance in JavaScript is a sync +// operation and should throw error when permissions are not fullfiled, +// but actual op that connects WS is async. +pub fn op_ws_check_permission<WP>( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> +where + WP: WebSocketPermissions + 'static, +{ + let args: CheckPermissionArgs = serde_json::from_value(args)?; + + state + .borrow::<WP>() + .check_net_url(&url::Url::parse(&args.url)?)?; + + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateArgs { + url: String, + protocols: String, +} + +pub async fn op_ws_create<WP>( + state: Rc<RefCell<OpState>>, + args: Value, + _bufs: BufVec, +) -> Result<Value, AnyError> +where + WP: WebSocketPermissions + 'static, +{ + let args: CreateArgs = serde_json::from_value(args)?; + + { + let s = state.borrow(); + s.borrow::<WP>() + .check_net_url(&url::Url::parse(&args.url)?) + .expect( + "Permission check should have been done in op_ws_check_permission", + ); + } + + let ws_ca_data = state.borrow().try_borrow::<WsCaData>().cloned(); + let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone(); + let uri: Uri = args.url.parse()?; + let mut request = Request::builder().method(Method::GET).uri(&uri); + + request = request.header("User-Agent", user_agent); + + if !args.protocols.is_empty() { + request = request.header("Sec-WebSocket-Protocol", args.protocols); + } + + let request = request.body(())?; + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { + Some("wss") => 443, + Some("ws") => 80, + _ => unreachable!(), + }); + let addr = format!("{}:{}", domain, port); + let try_socket = TcpStream::connect(addr).await; + let tcp_socket = match try_socket.map_err(TungsteniteError::Io) { + Ok(socket) => socket, + Err(_) => return Ok(json!({"success": false})), + }; + + let socket: MaybeTlsStream = match uri.scheme_str() { + Some("ws") => StreamSwitcher::Plain(tcp_socket), + Some("wss") => { + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + + if let Some(ws_ca_data) = ws_ca_data { + let reader = &mut BufReader::new(Cursor::new(ws_ca_data.0)); + config.root_store.add_pem_file(reader).unwrap(); + } + + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; + StreamSwitcher::Tls(tls_socket) + } + _ => unreachable!(), + }; + + let (stream, response): (WsStream, Response) = + client_async(request, socket).await.map_err(|err| { + type_error(format!( + "failed to connect to WebSocket: {}", + err.to_string() + )) + })?; + + let (ws_tx, ws_rx) = stream.split(); + let resource = WsStreamResource { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + cancel: Default::default(), + }; + let mut state = state.borrow_mut(); + let rid = state.resource_table.add(resource); + + let protocol = match response.headers().get("Sec-WebSocket-Protocol") { + Some(header) => header.to_str().unwrap(), + None => "", + }; + let extensions = response + .headers() + .get_all("Sec-WebSocket-Extensions") + .iter() + .map(|header| header.to_str().unwrap()) + .collect::<String>(); + Ok(json!({ + "success": true, + "rid": rid, + "protocol": protocol, + "extensions": extensions + })) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SendArgs { + rid: u32, + kind: String, + text: Option<String>, +} + +pub async fn op_ws_send( + state: Rc<RefCell<OpState>>, + args: Value, + bufs: BufVec, +) -> Result<Value, AnyError> { + let args: SendArgs = serde_json::from_value(args)?; + + let msg = match args.kind.as_str() { + "text" => Message::Text(args.text.unwrap()), + "binary" => Message::Binary(bufs[0].to_vec()), + "pong" => Message::Pong(vec![]), + _ => unreachable!(), + }; + let rid = args.rid; + + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CloseArgs { + rid: u32, + code: Option<u16>, + reason: Option<String>, +} + +pub async fn op_ws_close( + state: Rc<RefCell<OpState>>, + args: Value, + _bufs: BufVec, +) -> Result<Value, AnyError> { + let args: CloseArgs = serde_json::from_value(args)?; + let rid = args.rid; + let msg = Message::Close(args.code.map(|c| CloseFrame { + code: CloseCode::from(c), + reason: match args.reason { + Some(reason) => Cow::from(reason), + None => Default::default(), + }, + })); + + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct NextEventArgs { + rid: u32, +} + +pub async fn op_ws_next_event( + state: Rc<RefCell<OpState>>, + args: Value, + _bufs: BufVec, +) -> Result<Value, AnyError> { + let args: NextEventArgs = serde_json::from_value(args)?; + + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(args.rid) + .ok_or_else(bad_resource_id)?; + + let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let val = rx.next().or_cancel(cancel).await?; + let res = match val { + Some(Ok(Message::Text(text))) => json!({ + "type": "string", + "data": text + }), + Some(Ok(Message::Binary(data))) => { + // TODO(ry): don't use json to send binary data. + json!({ + "type": "binary", + "data": data + }) + } + Some(Ok(Message::Close(Some(frame)))) => json!({ + "type": "close", + "code": u16::from(frame.code), + "reason": frame.reason.as_ref() + }), + Some(Ok(Message::Close(None))) => json!({ "type": "close" }), + Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), + Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), + Some(Err(_)) => json!({"type": "error"}), + None => { + state.borrow_mut().resource_table.close(args.rid).unwrap(); + json!({"type": "closed"}) + } + }; + Ok(res) +} + +/// Load and execute the javascript code. +pub fn init(isolate: &mut JsRuntime) { + isolate + .execute( + "deno:op_crates/websocket/01_websocket.js", + include_str!("01_websocket.js"), + ) + .unwrap(); +} + +pub fn get_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts") +} |