diff options
Diffstat (limited to 'op_crates/websocket')
-rw-r--r-- | op_crates/websocket/01_websocket.js | 401 | ||||
-rw-r--r-- | op_crates/websocket/Cargo.toml | 24 | ||||
-rw-r--r-- | op_crates/websocket/README.md | 5 | ||||
-rw-r--r-- | op_crates/websocket/lib.deno_websocket.d.ts | 112 | ||||
-rw-r--r-- | op_crates/websocket/lib.rs | 367 |
5 files changed, 0 insertions, 909 deletions
diff --git a/op_crates/websocket/01_websocket.js b/op_crates/websocket/01_websocket.js deleted file mode 100644 index c77af4566..000000000 --- a/op_crates/websocket/01_websocket.js +++ /dev/null @@ -1,401 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -"use strict"; - -((window) => { - const core = window.Deno.core; - - // provided by "deno_web" - const { URL } = window.__bootstrap.url; - - const CONNECTING = 0; - const OPEN = 1; - const CLOSING = 2; - const CLOSED = 3; - - function requiredArguments( - name, - length, - required, - ) { - if (length < required) { - const errMsg = `${name} requires at least ${required} argument${ - required === 1 ? "" : "s" - }, but only ${length} present`; - throw new TypeError(errMsg); - } - } - - /** - * Tries to close the resource (and ignores BadResource errors). - * @param {number} rid - */ - function tryClose(rid) { - try { - core.close(rid); - } catch (err) { - // Ignore error if the socket has already been closed. - if (!(err instanceof Deno.errors.BadResource)) throw err; - } - } - - const handlerSymbol = Symbol("eventHandlers"); - function makeWrappedHandler(handler) { - function wrappedHandler(...args) { - if (typeof wrappedHandler.handler !== "function") { - return; - } - return wrappedHandler.handler.call(this, ...args); - } - wrappedHandler.handler = handler; - return wrappedHandler; - } - // TODO(lucacasonato) reuse when we can reuse code between web crates - function defineEventHandler(emitter, name) { - // HTML specification section 8.1.5.1 - Object.defineProperty(emitter, `on${name}`, { - get() { - return this[handlerSymbol]?.get(name)?.handler; - }, - set(value) { - if (!this[handlerSymbol]) { - this[handlerSymbol] = new Map(); - } - let handlerWrapper = this[handlerSymbol]?.get(name); - if (handlerWrapper) { - handlerWrapper.handler = value; - } else { - handlerWrapper = makeWrappedHandler(value); - this.addEventListener(name, handlerWrapper); - } - this[handlerSymbol].set(name, handlerWrapper); - }, - configurable: true, - enumerable: true, - }); - } - - class WebSocket extends EventTarget { - #readyState = CONNECTING; - - constructor(url, protocols = []) { - super(); - requiredArguments("WebSocket", arguments.length, 1); - - const wsURL = new URL(url); - - if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") { - throw new DOMException( - "Only ws & wss schemes are allowed in a WebSocket URL.", - "SyntaxError", - ); - } - - if (wsURL.hash !== "" || wsURL.href.endsWith("#")) { - throw new DOMException( - "Fragments are not allowed in a WebSocket URL.", - "SyntaxError", - ); - } - - this.#url = wsURL.href; - - core.opSync("op_ws_check_permission", this.#url); - - if (protocols && typeof protocols === "string") { - protocols = [protocols]; - } - - if ( - protocols.some((x) => protocols.indexOf(x) !== protocols.lastIndexOf(x)) - ) { - throw new DOMException( - "Can't supply multiple times the same protocol.", - "SyntaxError", - ); - } - - core.opAsync("op_ws_create", { - url: wsURL.href, - protocols: protocols.join(", "), - }).then((create) => { - if (create.success) { - this.#rid = create.rid; - this.#extensions = create.extensions; - this.#protocol = create.protocol; - - if (this.#readyState === CLOSING) { - core.opAsync("op_ws_close", { - rid: this.#rid, - }).then(() => { - this.#readyState = CLOSED; - - const errEvent = new ErrorEvent("error"); - errEvent.target = this; - this.dispatchEvent(errEvent); - - const event = new CloseEvent("close"); - event.target = this; - this.dispatchEvent(event); - tryClose(this.#rid); - }); - } else { - this.#readyState = OPEN; - const event = new Event("open"); - event.target = this; - this.dispatchEvent(event); - - this.#eventLoop(); - } - } else { - this.#readyState = CLOSED; - - const errEvent = new ErrorEvent("error"); - errEvent.target = this; - this.dispatchEvent(errEvent); - - const closeEvent = new CloseEvent("close"); - closeEvent.target = this; - this.dispatchEvent(closeEvent); - } - }).catch((err) => { - this.#readyState = CLOSED; - - const errorEv = new ErrorEvent( - "error", - { error: err, message: err.toString() }, - ); - errorEv.target = this; - this.dispatchEvent(errorEv); - - const closeEv = new CloseEvent("close"); - closeEv.target = this; - this.dispatchEvent(closeEv); - }); - } - - get CONNECTING() { - return CONNECTING; - } - get OPEN() { - return OPEN; - } - get CLOSING() { - return CLOSING; - } - get CLOSED() { - return CLOSED; - } - - get readyState() { - return this.#readyState; - } - - #extensions = ""; - #protocol = ""; - #url = ""; - #rid; - - get extensions() { - return this.#extensions; - } - get protocol() { - return this.#protocol; - } - - #binaryType = "blob"; - get binaryType() { - return this.#binaryType; - } - set binaryType(value) { - if (value === "blob" || value === "arraybuffer") { - this.#binaryType = value; - } - } - #bufferedAmount = 0; - get bufferedAmount() { - return this.#bufferedAmount; - } - - get url() { - return this.#url; - } - - send(data) { - requiredArguments("WebSocket.send", arguments.length, 1); - - if (this.#readyState != OPEN) { - throw Error("readyState not OPEN"); - } - - const sendTypedArray = (ta) => { - this.#bufferedAmount += ta.size; - core.opAsync("op_ws_send", { - rid: this.#rid, - kind: "binary", - }, ta).then(() => { - this.#bufferedAmount -= ta.size; - }); - }; - - if (data instanceof Blob) { - data.slice().arrayBuffer().then((ab) => - sendTypedArray(new DataView(ab)) - ); - } else if ( - data instanceof Int8Array || data instanceof Int16Array || - data instanceof Int32Array || data instanceof Uint8Array || - data instanceof Uint16Array || data instanceof Uint32Array || - data instanceof Uint8ClampedArray || data instanceof Float32Array || - data instanceof Float64Array || data instanceof DataView - ) { - sendTypedArray(data); - } else if (data instanceof ArrayBuffer) { - sendTypedArray(new DataView(data)); - } else { - const string = String(data); - const encoder = new TextEncoder(); - const d = encoder.encode(string); - this.#bufferedAmount += d.size; - core.opAsync("op_ws_send", { - rid: this.#rid, - kind: "text", - text: string, - }).then(() => { - this.#bufferedAmount -= d.size; - }); - } - } - - close(code, reason) { - if (code && !(code === 1000 || (3000 <= code && code < 5000))) { - throw new DOMException( - "The close code must be either 1000 or in the range of 3000 to 4999.", - "NotSupportedError", - ); - } - - const encoder = new TextEncoder(); - if (reason && encoder.encode(reason).byteLength > 123) { - throw new DOMException( - "The close reason may not be longer than 123 bytes.", - "SyntaxError", - ); - } - - if (this.#readyState === CONNECTING) { - this.#readyState = CLOSING; - } else if (this.#readyState === OPEN) { - this.#readyState = CLOSING; - - core.opAsync("op_ws_close", { - rid: this.#rid, - code, - reason, - }).then(() => { - this.#readyState = CLOSED; - const event = new CloseEvent("close", { - wasClean: true, - code, - reason, - }); - event.target = this; - this.dispatchEvent(event); - tryClose(this.#rid); - }); - } - } - - async #eventLoop() { - while (this.#readyState === OPEN) { - const { kind, value } = await core.opAsync( - "op_ws_next_event", - this.#rid, - ); - - switch (kind) { - case "string": { - const event = new MessageEvent("message", { - data: value, - origin: this.#url, - }); - event.target = this; - this.dispatchEvent(event); - break; - } - case "binary": { - let data; - - if (this.binaryType === "blob") { - data = new Blob([value]); - } else { - data = value.buffer; - } - - const event = new MessageEvent("message", { - data, - origin: this.#url, - }); - event.target = this; - this.dispatchEvent(event); - break; - } - case "ping": { - core.opAsync("op_ws_send", { - rid: this.#rid, - kind: "pong", - }); - break; - } - case "close": { - this.#readyState = CLOSED; - - const event = new CloseEvent("close", { - wasClean: true, - code: value.code, - reason: value.reason, - }); - event.target = this; - this.dispatchEvent(event); - tryClose(this.#rid); - break; - } - case "error": { - this.#readyState = CLOSED; - - const errorEv = new ErrorEvent("error"); - errorEv.target = this; - this.dispatchEvent(errorEv); - - const closeEv = new CloseEvent("close"); - closeEv.target = this; - this.dispatchEvent(closeEv); - tryClose(this.#rid); - break; - } - } - } - } - } - - Object.defineProperties(WebSocket, { - CONNECTING: { - value: 0, - }, - OPEN: { - value: 1, - }, - CLOSING: { - value: 2, - }, - CLOSED: { - value: 3, - }, - }); - - defineEventHandler(WebSocket.prototype, "message"); - defineEventHandler(WebSocket.prototype, "error"); - defineEventHandler(WebSocket.prototype, "close"); - defineEventHandler(WebSocket.prototype, "open"); - - window.__bootstrap.webSocket = { WebSocket }; -})(this); diff --git a/op_crates/websocket/Cargo.toml b/op_crates/websocket/Cargo.toml deleted file mode 100644 index 516401204..000000000 --- a/op_crates/websocket/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -[package] -name = "deno_websocket" -version = "0.10.0" -edition = "2018" -description = "Implementation of WebSocket API for Deno" -authors = ["the Deno authors"] -license = "MIT" -readme = "README.md" -repository = "https://github.com/denoland/deno" - -[lib] -path = "lib.rs" - -[dependencies] -deno_core = { version = "0.86.0", path = "../../core" } -http = "0.2.3" -serde = { version = "1.0.125", features = ["derive"] } -tokio = { version = "1.4.0", features = ["full"] } -tokio-rustls = "0.22.0" -tokio-tungstenite = { version = "0.14.0", features = ["rustls-tls"] } -webpki = "0.21.4" -webpki-roots = "0.21.1" diff --git a/op_crates/websocket/README.md b/op_crates/websocket/README.md deleted file mode 100644 index d6495f397..000000000 --- a/op_crates/websocket/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# deno_websocket - -This op crate implements the websocket functions of Deno. - -Spec: https://html.spec.whatwg.org/multipage/web-sockets.html diff --git a/op_crates/websocket/lib.deno_websocket.d.ts b/op_crates/websocket/lib.deno_websocket.d.ts deleted file mode 100644 index 31e5782a6..000000000 --- a/op_crates/websocket/lib.deno_websocket.d.ts +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -// deno-lint-ignore-file no-explicit-any - -/// <reference no-default-lib="true" /> -/// <reference lib="esnext" /> - -interface CloseEventInit extends EventInit { - code?: number; - reason?: string; - wasClean?: boolean; -} - -declare class CloseEvent extends Event { - constructor(type: string, eventInitDict?: CloseEventInit); - /** - * Returns the WebSocket connection close code provided by the server. - */ - readonly code: number; - /** - * Returns the WebSocket connection close reason provided by the server. - */ - readonly reason: string; - /** - * Returns true if the connection closed cleanly; false otherwise. - */ - readonly wasClean: boolean; -} - -interface WebSocketEventMap { - close: CloseEvent; - error: Event; - message: MessageEvent; - open: Event; -} - -/** Provides the API for creating and managing a WebSocket connection to a server, as well as for sending and receiving data on the connection. */ -declare class WebSocket extends EventTarget { - constructor(url: string, protocols?: string | string[]); - - static readonly CLOSED: number; - static readonly CLOSING: number; - static readonly CONNECTING: number; - static readonly OPEN: number; - - /** - * Returns a string that indicates how binary data from the WebSocket object is exposed to scripts: - * - * Can be set, to change how binary data is returned. The default is "blob". - */ - binaryType: BinaryType; - /** - * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network. - * - * If the WebSocket connection is closed, this attribute's value will only increase with each call to the send() method. (The number does not reset to zero once the connection closes.) - */ - readonly bufferedAmount: number; - /** - * Returns the extensions selected by the server, if any. - */ - readonly extensions: string; - onclose: ((this: WebSocket, ev: CloseEvent) => any) | null; - onerror: ((this: WebSocket, ev: Event | ErrorEvent) => any) | null; - onmessage: ((this: WebSocket, ev: MessageEvent) => any) | null; - onopen: ((this: WebSocket, ev: Event) => any) | null; - /** - * Returns the subprotocol selected by the server, if any. It can be used in conjunction with the array form of the constructor's second argument to perform subprotocol negotiation. - */ - readonly protocol: string; - /** - * Returns the state of the WebSocket object's connection. It can have the values described below. - */ - readonly readyState: number; - /** - * Returns the URL that was used to establish the WebSocket connection. - */ - readonly url: string; - /** - * Closes the WebSocket connection, optionally using code as the the WebSocket connection close code and reason as the the WebSocket connection close reason. - */ - close(code?: number, reason?: string): void; - /** - * Transmits data using the WebSocket connection. data can be a string, a Blob, an ArrayBuffer, or an ArrayBufferView. - */ - send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void; - readonly CLOSED: number; - readonly CLOSING: number; - readonly CONNECTING: number; - readonly OPEN: number; - addEventListener<K extends keyof WebSocketEventMap>( - type: K, - listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, - options?: boolean | AddEventListenerOptions, - ): void; - addEventListener( - type: string, - listener: EventListenerOrEventListenerObject, - options?: boolean | AddEventListenerOptions, - ): void; - removeEventListener<K extends keyof WebSocketEventMap>( - type: K, - listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, - options?: boolean | EventListenerOptions, - ): void; - removeEventListener( - type: string, - listener: EventListenerOrEventListenerObject, - options?: boolean | EventListenerOptions, - ): void; -} - -type BinaryType = "arraybuffer" | "blob"; diff --git a/op_crates/websocket/lib.rs b/op_crates/websocket/lib.rs deleted file mode 100644 index acf823775..000000000 --- a/op_crates/websocket/lib.rs +++ /dev/null @@ -1,367 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use deno_core::error::bad_resource_id; -use deno_core::error::invalid_hostname; -use deno_core::error::null_opbuf; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures::stream::SplitSink; -use deno_core::futures::stream::SplitStream; -use deno_core::futures::SinkExt; -use deno_core::futures::StreamExt; -use deno_core::include_js_files; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::url; -use deno_core::AsyncRefCell; -use deno_core::CancelFuture; -use deno_core::CancelHandle; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; - -use http::{Method, Request, Uri}; -use serde::Deserialize; -use serde::Serialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::io::BufReader; -use std::io::Cursor; -use std::path::PathBuf; -use std::rc::Rc; -use std::sync::Arc; -use tokio::net::TcpStream; -use tokio_rustls::{rustls::ClientConfig, TlsConnector}; -use tokio_tungstenite::tungstenite::Error as TungsteniteError; -use tokio_tungstenite::tungstenite::{ - handshake::client::Response, protocol::frame::coding::CloseCode, - protocol::CloseFrame, Message, -}; -use tokio_tungstenite::MaybeTlsStream; -use tokio_tungstenite::{client_async, WebSocketStream}; -use webpki::DNSNameRef; - -pub use tokio_tungstenite; // Re-export tokio_tungstenite - -#[derive(Clone)] -pub struct WsCaData(pub Vec<u8>); -#[derive(Clone)] -pub struct WsUserAgent(pub String); - -pub trait WebSocketPermissions { - fn check_net_url(&mut self, _url: &url::Url) -> Result<(), AnyError>; -} - -/// For use with `op_websocket_*` when the user does not want permissions. -pub struct NoWebSocketPermissions; - -impl WebSocketPermissions for NoWebSocketPermissions { - fn check_net_url(&mut self, _url: &url::Url) -> Result<(), AnyError> { - Ok(()) - } -} - -type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>; -struct WsStreamResource { - tx: AsyncRefCell<SplitSink<WsStream, Message>>, - rx: AsyncRefCell<SplitStream<WsStream>>, - // When a `WsStreamResource` resource is closed, all pending 'read' ops are - // canceled, while 'write' ops are allowed to complete. Therefore only - // 'read' futures are attached to this cancel handle. - cancel: CancelHandle, -} - -impl Resource for WsStreamResource { - fn name(&self) -> Cow<str> { - "webSocketStream".into() - } -} - -impl WsStreamResource {} - -// This op is needed because creating a WS instance in JavaScript is a sync -// operation and should throw error when permissions are not fulfilled, -// but actual op that connects WS is async. -pub fn op_ws_check_permission<WP>( - state: &mut OpState, - url: String, - _zero_copy: Option<ZeroCopyBuf>, -) -> Result<(), AnyError> -where - WP: WebSocketPermissions + 'static, -{ - state - .borrow_mut::<WP>() - .check_net_url(&url::Url::parse(&url)?)?; - - Ok(()) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CreateArgs { - url: String, - protocols: String, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct CreateResponse { - success: bool, - rid: Option<ResourceId>, - protocol: Option<String>, - extensions: Option<String>, -} - -pub async fn op_ws_create<WP>( - state: Rc<RefCell<OpState>>, - args: CreateArgs, - _bufs: Option<ZeroCopyBuf>, -) -> Result<CreateResponse, AnyError> -where - WP: WebSocketPermissions + 'static, -{ - { - let mut s = state.borrow_mut(); - s.borrow_mut::<WP>() - .check_net_url(&url::Url::parse(&args.url)?) - .expect( - "Permission check should have been done in op_ws_check_permission", - ); - } - - let ws_ca_data = state.borrow().try_borrow::<WsCaData>().cloned(); - let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone(); - let uri: Uri = args.url.parse()?; - let mut request = Request::builder().method(Method::GET).uri(&uri); - - request = request.header("User-Agent", user_agent); - - if !args.protocols.is_empty() { - request = request.header("Sec-WebSocket-Protocol", args.protocols); - } - - let request = request.body(())?; - let domain = &uri.host().unwrap().to_string(); - let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { - Some("wss") => 443, - Some("ws") => 80, - _ => unreachable!(), - }); - let addr = format!("{}:{}", domain, port); - let try_socket = TcpStream::connect(addr).await; - let tcp_socket = match try_socket.map_err(TungsteniteError::Io) { - Ok(socket) => socket, - Err(_) => { - return Ok(CreateResponse { - success: false, - rid: None, - protocol: None, - extensions: None, - }) - } - }; - - let socket: MaybeTlsStream<TcpStream> = match uri.scheme_str() { - Some("ws") => MaybeTlsStream::Plain(tcp_socket), - Some("wss") => { - let mut config = ClientConfig::new(); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - - if let Some(ws_ca_data) = ws_ca_data { - let reader = &mut BufReader::new(Cursor::new(ws_ca_data.0)); - config.root_store.add_pem_file(reader).unwrap(); - } - - let tls_connector = TlsConnector::from(Arc::new(config)); - let dnsname = DNSNameRef::try_from_ascii_str(domain) - .map_err(|_| invalid_hostname(domain))?; - let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; - MaybeTlsStream::Rustls(tls_socket) - } - _ => unreachable!(), - }; - - let (stream, response): (WsStream, Response) = - client_async(request, socket).await.map_err(|err| { - type_error(format!( - "failed to connect to WebSocket: {}", - err.to_string() - )) - })?; - - let (ws_tx, ws_rx) = stream.split(); - let resource = WsStreamResource { - rx: AsyncRefCell::new(ws_rx), - tx: AsyncRefCell::new(ws_tx), - cancel: Default::default(), - }; - let mut state = state.borrow_mut(); - let rid = state.resource_table.add(resource); - - let protocol = match response.headers().get("Sec-WebSocket-Protocol") { - Some(header) => header.to_str().unwrap(), - None => "", - }; - let extensions = response - .headers() - .get_all("Sec-WebSocket-Extensions") - .iter() - .map(|header| header.to_str().unwrap()) - .collect::<String>(); - Ok(CreateResponse { - success: true, - rid: Some(rid), - protocol: Some(protocol.to_string()), - extensions: Some(extensions), - }) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SendArgs { - rid: ResourceId, - kind: String, - text: Option<String>, -} - -pub async fn op_ws_send( - state: Rc<RefCell<OpState>>, - args: SendArgs, - buf: Option<ZeroCopyBuf>, -) -> Result<(), AnyError> { - let msg = match args.kind.as_str() { - "text" => Message::Text(args.text.unwrap()), - "binary" => Message::Binary(buf.ok_or_else(null_opbuf)?.to_vec()), - "pong" => Message::Pong(vec![]), - _ => unreachable!(), - }; - let rid = args.rid; - - let resource = state - .borrow_mut() - .resource_table - .get::<WsStreamResource>(rid) - .ok_or_else(bad_resource_id)?; - let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; - tx.send(msg).await?; - Ok(()) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CloseArgs { - rid: ResourceId, - code: Option<u16>, - reason: Option<String>, -} - -pub async fn op_ws_close( - state: Rc<RefCell<OpState>>, - args: CloseArgs, - _bufs: Option<ZeroCopyBuf>, -) -> Result<(), AnyError> { - let rid = args.rid; - let msg = Message::Close(args.code.map(|c| CloseFrame { - code: CloseCode::from(c), - reason: match args.reason { - Some(reason) => Cow::from(reason), - None => Default::default(), - }, - })); - - let resource = state - .borrow_mut() - .resource_table - .get::<WsStreamResource>(rid) - .ok_or_else(bad_resource_id)?; - let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; - tx.send(msg).await?; - Ok(()) -} - -#[derive(Serialize)] -#[serde(tag = "kind", content = "value", rename_all = "camelCase")] -pub enum NextEventResponse { - String(String), - Binary(ZeroCopyBuf), - Close { code: u16, reason: String }, - Ping, - Pong, - Error, - Closed, -} - -pub async fn op_ws_next_event( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - _bufs: Option<ZeroCopyBuf>, -) -> Result<NextEventResponse, AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::<WsStreamResource>(rid) - .ok_or_else(bad_resource_id)?; - - let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let val = rx.next().or_cancel(cancel).await?; - let res = match val { - Some(Ok(Message::Text(text))) => NextEventResponse::String(text), - Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()), - Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close { - code: frame.code.into(), - reason: frame.reason.to_string(), - }, - Some(Ok(Message::Close(None))) => NextEventResponse::Close { - code: 1005, - reason: String::new(), - }, - Some(Ok(Message::Ping(_))) => NextEventResponse::Ping, - Some(Ok(Message::Pong(_))) => NextEventResponse::Pong, - Some(Err(_)) => NextEventResponse::Error, - None => { - state.borrow_mut().resource_table.close(rid).unwrap(); - NextEventResponse::Closed - } - }; - Ok(res) -} - -pub fn init<P: WebSocketPermissions + 'static>( - user_agent: String, - ca_data: Option<Vec<u8>>, -) -> Extension { - Extension::builder() - .js(include_js_files!( - prefix "deno:op_crates/websocket", - "01_websocket.js", - )) - .ops(vec![ - ( - "op_ws_check_permission", - op_sync(op_ws_check_permission::<P>), - ), - ("op_ws_create", op_async(op_ws_create::<P>)), - ("op_ws_send", op_async(op_ws_send)), - ("op_ws_close", op_async(op_ws_close)), - ("op_ws_next_event", op_async(op_ws_next_event)), - ]) - .state(move |state| { - state.put::<WsUserAgent>(WsUserAgent(user_agent.clone())); - if let Some(ca_data) = ca_data.clone() { - state.put::<WsCaData>(WsCaData(ca_data)); - } - Ok(()) - }) - .build() -} - -pub fn get_declaration() -> PathBuf { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts") -} |