diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-12-13 19:45:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-13 19:45:53 +0100 |
commit | 2e74f164b6dcf0ecbf8dd38fba9fae550d784bd0 (patch) | |
tree | 61abe8e09d5331ace5d9de529f0e2737a8e05dbb /runtime/ops/websocket.rs | |
parent | 84ef9bd21fb48fb6b5fbc8dafc3de9f361bade3b (diff) |
refactor: deno_runtime crate (#8640)
This commit moves Deno JS runtime, ops, permissions and
inspector implementation to new "deno_runtime" crate located
in "runtime/" directory.
Details in "runtime/README.md".
Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'runtime/ops/websocket.rs')
-rw-r--r-- | runtime/ops/websocket.rs | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs new file mode 100644 index 000000000..a8c591a33 --- /dev/null +++ b/runtime/ops/websocket.rs @@ -0,0 +1,326 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::permissions::Permissions; +use core::task::Poll; +use deno_core::error::bad_resource_id; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::StreamExt; +use deno_core::futures::{ready, SinkExt}; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::url; +use deno_core::BufVec; +use deno_core::OpState; +use deno_core::{serde_json, ZeroCopyBuf}; +use http::{Method, Request, Uri}; +use serde::Deserialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::fs::File; +use std::io::BufReader; +use std::rc::Rc; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_rustls::{rustls::ClientConfig, TlsConnector}; +use tokio_tungstenite::stream::Stream as StreamSwitcher; +use tokio_tungstenite::tungstenite::Error as TungsteniteError; +use tokio_tungstenite::tungstenite::{ + handshake::client::Response, protocol::frame::coding::CloseCode, + protocol::CloseFrame, Message, +}; +use tokio_tungstenite::{client_async, WebSocketStream}; +use webpki::DNSNameRef; + +#[derive(Clone)] +struct WsCaFile(String); +#[derive(Clone)] +struct WsUserAgent(String); + +pub fn init( + rt: &mut deno_core::JsRuntime, + maybe_ca_file: Option<&str>, + user_agent: String, +) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + if let Some(ca_file) = maybe_ca_file { + state.put::<WsCaFile>(WsCaFile(ca_file.to_string())); + } + state.put::<WsUserAgent>(WsUserAgent(user_agent)); + } + super::reg_json_sync(rt, "op_ws_check_permission", op_ws_check_permission); + super::reg_json_async(rt, "op_ws_create", op_ws_create); + super::reg_json_async(rt, "op_ws_send", op_ws_send); + super::reg_json_async(rt, "op_ws_close", op_ws_close); + super::reg_json_async(rt, "op_ws_next_event", op_ws_next_event); +} + +type MaybeTlsStream = + StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>; + +type WsStream = WebSocketStream<MaybeTlsStream>; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CheckPermissionArgs { + url: String, +} + +// This op is needed because creating a WS instance in JavaScript is a sync +// operation and should throw error when permissions are not fullfiled, +// but actual op that connects WS is async. +pub fn op_ws_check_permission( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: CheckPermissionArgs = serde_json::from_value(args)?; + + state + .borrow::<Permissions>() + .check_net_url(&url::Url::parse(&args.url)?)?; + + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateArgs { + url: String, + protocols: String, +} + +pub async fn op_ws_create( + state: Rc<RefCell<OpState>>, + args: Value, + _bufs: BufVec, +) -> Result<Value, AnyError> { + let args: CreateArgs = serde_json::from_value(args)?; + + { + let s = state.borrow(); + s.borrow::<Permissions>() + .check_net_url(&url::Url::parse(&args.url)?) + .expect( + "Permission check should have been done in op_ws_check_permission", + ); + } + + let maybe_ca_file = state.borrow().try_borrow::<WsCaFile>().cloned(); + let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone(); + let uri: Uri = args.url.parse()?; + let mut request = Request::builder().method(Method::GET).uri(&uri); + + request = request.header("User-Agent", user_agent); + + if !args.protocols.is_empty() { + request = request.header("Sec-WebSocket-Protocol", args.protocols); + } + + let request = request.body(())?; + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { + Some("wss") => 443, + Some("ws") => 80, + _ => unreachable!(), + }); + let addr = format!("{}:{}", domain, port); + let try_socket = TcpStream::connect(addr).await; + let tcp_socket = match try_socket.map_err(TungsteniteError::Io) { + Ok(socket) => socket, + Err(_) => return Ok(json!({"success": false})), + }; + + let socket: MaybeTlsStream = match uri.scheme_str() { + Some("ws") => StreamSwitcher::Plain(tcp_socket), + Some("wss") => { + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + + if let Some(ws_ca_file) = maybe_ca_file { + let key_file = File::open(ws_ca_file.0)?; + let reader = &mut BufReader::new(key_file); + config.root_store.add_pem_file(reader).unwrap(); + } + + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; + StreamSwitcher::Tls(tls_socket) + } + _ => unreachable!(), + }; + + let (stream, response): (WsStream, Response) = + client_async(request, socket).await.map_err(|err| { + type_error(format!( + "failed to connect to WebSocket: {}", + err.to_string() + )) + })?; + + let mut state = state.borrow_mut(); + let rid = state + .resource_table + .add("webSocketStream", Box::new(stream)); + + let protocol = match response.headers().get("Sec-WebSocket-Protocol") { + Some(header) => header.to_str().unwrap(), + None => "", + }; + let extensions = response + .headers() + .get_all("Sec-WebSocket-Extensions") + .iter() + .map(|header| header.to_str().unwrap()) + .collect::<String>(); + Ok(json!({ + "success": true, + "rid": rid, + "protocol": protocol, + "extensions": extensions + })) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SendArgs { + rid: u32, + text: Option<String>, +} + +pub async fn op_ws_send( + state: Rc<RefCell<OpState>>, + args: Value, + bufs: BufVec, +) -> Result<Value, AnyError> { + let args: SendArgs = serde_json::from_value(args)?; + + let mut maybe_msg = Some(match args.text { + Some(text) => Message::Text(text), + None => Message::Binary(bufs[0].to_vec()), + }); + let rid = args.rid; + + poll_fn(move |cx| { + let mut state = state.borrow_mut(); + let stream = state + .resource_table + .get_mut::<WsStream>(rid) + .ok_or_else(bad_resource_id)?; + + // TODO(ry) Handle errors below instead of unwrap. + // Need to map `TungsteniteError` to `AnyError`. + ready!(stream.poll_ready_unpin(cx)).unwrap(); + if let Some(msg) = maybe_msg.take() { + stream.start_send_unpin(msg).unwrap(); + } + ready!(stream.poll_flush_unpin(cx)).unwrap(); + + Poll::Ready(Ok(json!({}))) + }) + .await +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CloseArgs { + rid: u32, + code: Option<u16>, + reason: Option<String>, +} + +pub async fn op_ws_close( + state: Rc<RefCell<OpState>>, + args: Value, + _bufs: BufVec, +) -> Result<Value, AnyError> { + let args: CloseArgs = serde_json::from_value(args)?; + let rid = args.rid; + let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame { + code: CloseCode::from(c), + reason: match args.reason { + Some(reason) => Cow::from(reason), + None => Default::default(), + }, + }))); + + poll_fn(move |cx| { + let mut state = state.borrow_mut(); + let stream = state + .resource_table + .get_mut::<WsStream>(rid) + .ok_or_else(bad_resource_id)?; + + // TODO(ry) Handle errors below instead of unwrap. + // Need to map `TungsteniteError` to `AnyError`. + ready!(stream.poll_ready_unpin(cx)).unwrap(); + if let Some(msg) = maybe_msg.take() { + stream.start_send_unpin(msg).unwrap(); + } + ready!(stream.poll_flush_unpin(cx)).unwrap(); + ready!(stream.poll_close_unpin(cx)).unwrap(); + + Poll::Ready(Ok(json!({}))) + }) + .await +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct NextEventArgs { + rid: u32, +} + +pub async fn op_ws_next_event( + state: Rc<RefCell<OpState>>, + args: Value, + _bufs: BufVec, +) -> Result<Value, AnyError> { + let args: NextEventArgs = serde_json::from_value(args)?; + poll_fn(move |cx| { + let mut state = state.borrow_mut(); + let stream = state + .resource_table + .get_mut::<WsStream>(args.rid) + .ok_or_else(bad_resource_id)?; + stream + .poll_next_unpin(cx) + .map(|val| { + match val { + Some(Ok(Message::Text(text))) => json!({ + "type": "string", + "data": text + }), + Some(Ok(Message::Binary(data))) => { + // TODO(ry): don't use json to send binary data. + json!({ + "type": "binary", + "data": data + }) + } + Some(Ok(Message::Close(Some(frame)))) => json!({ + "type": "close", + "code": u16::from(frame.code), + "reason": frame.reason.as_ref() + }), + Some(Ok(Message::Close(None))) => json!({ "type": "close" }), + Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), + Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), + Some(Err(_)) => json!({"type": "error"}), + None => { + state.resource_table.close(args.rid).unwrap(); + json!({"type": "closed"}) + } + } + }) + .map(Ok) + }) + .await +} |