summaryrefslogtreecommitdiff
path: root/runtime/ops/websocket.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-12-13 19:45:53 +0100
committerGitHub <noreply@github.com>2020-12-13 19:45:53 +0100
commit2e74f164b6dcf0ecbf8dd38fba9fae550d784bd0 (patch)
tree61abe8e09d5331ace5d9de529f0e2737a8e05dbb /runtime/ops/websocket.rs
parent84ef9bd21fb48fb6b5fbc8dafc3de9f361bade3b (diff)
refactor: deno_runtime crate (#8640)
This commit moves Deno JS runtime, ops, permissions and inspector implementation to new "deno_runtime" crate located in "runtime/" directory. Details in "runtime/README.md". Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'runtime/ops/websocket.rs')
-rw-r--r--runtime/ops/websocket.rs326
1 files changed, 326 insertions, 0 deletions
diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs
new file mode 100644
index 000000000..a8c591a33
--- /dev/null
+++ b/runtime/ops/websocket.rs
@@ -0,0 +1,326 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::permissions::Permissions;
+use core::task::Poll;
+use deno_core::error::bad_resource_id;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::StreamExt;
+use deno_core::futures::{ready, SinkExt};
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::url;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::{serde_json, ZeroCopyBuf};
+use http::{Method, Request, Uri};
+use serde::Deserialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::fs::File;
+use std::io::BufReader;
+use std::rc::Rc;
+use std::sync::Arc;
+use tokio::net::TcpStream;
+use tokio_rustls::{rustls::ClientConfig, TlsConnector};
+use tokio_tungstenite::stream::Stream as StreamSwitcher;
+use tokio_tungstenite::tungstenite::Error as TungsteniteError;
+use tokio_tungstenite::tungstenite::{
+ handshake::client::Response, protocol::frame::coding::CloseCode,
+ protocol::CloseFrame, Message,
+};
+use tokio_tungstenite::{client_async, WebSocketStream};
+use webpki::DNSNameRef;
+
+#[derive(Clone)]
+struct WsCaFile(String);
+#[derive(Clone)]
+struct WsUserAgent(String);
+
+pub fn init(
+ rt: &mut deno_core::JsRuntime,
+ maybe_ca_file: Option<&str>,
+ user_agent: String,
+) {
+ {
+ let op_state = rt.op_state();
+ let mut state = op_state.borrow_mut();
+ if let Some(ca_file) = maybe_ca_file {
+ state.put::<WsCaFile>(WsCaFile(ca_file.to_string()));
+ }
+ state.put::<WsUserAgent>(WsUserAgent(user_agent));
+ }
+ super::reg_json_sync(rt, "op_ws_check_permission", op_ws_check_permission);
+ super::reg_json_async(rt, "op_ws_create", op_ws_create);
+ super::reg_json_async(rt, "op_ws_send", op_ws_send);
+ super::reg_json_async(rt, "op_ws_close", op_ws_close);
+ super::reg_json_async(rt, "op_ws_next_event", op_ws_next_event);
+}
+
+type MaybeTlsStream =
+ StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>;
+
+type WsStream = WebSocketStream<MaybeTlsStream>;
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CheckPermissionArgs {
+ url: String,
+}
+
+// This op is needed because creating a WS instance in JavaScript is a sync
+// operation and should throw error when permissions are not fullfiled,
+// but actual op that connects WS is async.
+pub fn op_ws_check_permission(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: CheckPermissionArgs = serde_json::from_value(args)?;
+
+ state
+ .borrow::<Permissions>()
+ .check_net_url(&url::Url::parse(&args.url)?)?;
+
+ Ok(json!({}))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateArgs {
+ url: String,
+ protocols: String,
+}
+
+pub async fn op_ws_create(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let args: CreateArgs = serde_json::from_value(args)?;
+
+ {
+ let s = state.borrow();
+ s.borrow::<Permissions>()
+ .check_net_url(&url::Url::parse(&args.url)?)
+ .expect(
+ "Permission check should have been done in op_ws_check_permission",
+ );
+ }
+
+ let maybe_ca_file = state.borrow().try_borrow::<WsCaFile>().cloned();
+ let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone();
+ let uri: Uri = args.url.parse()?;
+ let mut request = Request::builder().method(Method::GET).uri(&uri);
+
+ request = request.header("User-Agent", user_agent);
+
+ if !args.protocols.is_empty() {
+ request = request.header("Sec-WebSocket-Protocol", args.protocols);
+ }
+
+ let request = request.body(())?;
+ let domain = &uri.host().unwrap().to_string();
+ let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
+ Some("wss") => 443,
+ Some("ws") => 80,
+ _ => unreachable!(),
+ });
+ let addr = format!("{}:{}", domain, port);
+ let try_socket = TcpStream::connect(addr).await;
+ let tcp_socket = match try_socket.map_err(TungsteniteError::Io) {
+ Ok(socket) => socket,
+ Err(_) => return Ok(json!({"success": false})),
+ };
+
+ let socket: MaybeTlsStream = match uri.scheme_str() {
+ Some("ws") => StreamSwitcher::Plain(tcp_socket),
+ Some("wss") => {
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+
+ if let Some(ws_ca_file) = maybe_ca_file {
+ let key_file = File::open(ws_ca_file.0)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
+ }
+
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?;
+ StreamSwitcher::Tls(tls_socket)
+ }
+ _ => unreachable!(),
+ };
+
+ let (stream, response): (WsStream, Response) =
+ client_async(request, socket).await.map_err(|err| {
+ type_error(format!(
+ "failed to connect to WebSocket: {}",
+ err.to_string()
+ ))
+ })?;
+
+ let mut state = state.borrow_mut();
+ let rid = state
+ .resource_table
+ .add("webSocketStream", Box::new(stream));
+
+ let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
+ Some(header) => header.to_str().unwrap(),
+ None => "",
+ };
+ let extensions = response
+ .headers()
+ .get_all("Sec-WebSocket-Extensions")
+ .iter()
+ .map(|header| header.to_str().unwrap())
+ .collect::<String>();
+ Ok(json!({
+ "success": true,
+ "rid": rid,
+ "protocol": protocol,
+ "extensions": extensions
+ }))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct SendArgs {
+ rid: u32,
+ text: Option<String>,
+}
+
+pub async fn op_ws_send(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let args: SendArgs = serde_json::from_value(args)?;
+
+ let mut maybe_msg = Some(match args.text {
+ Some(text) => Message::Text(text),
+ None => Message::Binary(bufs[0].to_vec()),
+ });
+ let rid = args.rid;
+
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let stream = state
+ .resource_table
+ .get_mut::<WsStream>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ // TODO(ry) Handle errors below instead of unwrap.
+ // Need to map `TungsteniteError` to `AnyError`.
+ ready!(stream.poll_ready_unpin(cx)).unwrap();
+ if let Some(msg) = maybe_msg.take() {
+ stream.start_send_unpin(msg).unwrap();
+ }
+ ready!(stream.poll_flush_unpin(cx)).unwrap();
+
+ Poll::Ready(Ok(json!({})))
+ })
+ .await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CloseArgs {
+ rid: u32,
+ code: Option<u16>,
+ reason: Option<String>,
+}
+
+pub async fn op_ws_close(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let args: CloseArgs = serde_json::from_value(args)?;
+ let rid = args.rid;
+ let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame {
+ code: CloseCode::from(c),
+ reason: match args.reason {
+ Some(reason) => Cow::from(reason),
+ None => Default::default(),
+ },
+ })));
+
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let stream = state
+ .resource_table
+ .get_mut::<WsStream>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ // TODO(ry) Handle errors below instead of unwrap.
+ // Need to map `TungsteniteError` to `AnyError`.
+ ready!(stream.poll_ready_unpin(cx)).unwrap();
+ if let Some(msg) = maybe_msg.take() {
+ stream.start_send_unpin(msg).unwrap();
+ }
+ ready!(stream.poll_flush_unpin(cx)).unwrap();
+ ready!(stream.poll_close_unpin(cx)).unwrap();
+
+ Poll::Ready(Ok(json!({})))
+ })
+ .await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct NextEventArgs {
+ rid: u32,
+}
+
+pub async fn op_ws_next_event(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let args: NextEventArgs = serde_json::from_value(args)?;
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let stream = state
+ .resource_table
+ .get_mut::<WsStream>(args.rid)
+ .ok_or_else(bad_resource_id)?;
+ stream
+ .poll_next_unpin(cx)
+ .map(|val| {
+ match val {
+ Some(Ok(Message::Text(text))) => json!({
+ "type": "string",
+ "data": text
+ }),
+ Some(Ok(Message::Binary(data))) => {
+ // TODO(ry): don't use json to send binary data.
+ json!({
+ "type": "binary",
+ "data": data
+ })
+ }
+ Some(Ok(Message::Close(Some(frame)))) => json!({
+ "type": "close",
+ "code": u16::from(frame.code),
+ "reason": frame.reason.as_ref()
+ }),
+ Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
+ Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
+ Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
+ Some(Err(_)) => json!({"type": "error"}),
+ None => {
+ state.resource_table.close(args.rid).unwrap();
+ json!({"type": "closed"})
+ }
+ }
+ })
+ .map(Ok)
+ })
+ .await
+}