diff options
author | Bert Belder <bertbelder@gmail.com> | 2020-09-06 02:34:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-06 02:34:02 +0200 |
commit | c821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 (patch) | |
tree | c429a3c2707a4047fb512443a8468b7e15e5730d /cli/ops/websocket.rs | |
parent | 849431eb1d112d1f79f4a327830dc1a5bf22dd47 (diff) |
Move JSON ops to deno_core (#7336)
Diffstat (limited to 'cli/ops/websocket.rs')
-rw-r--r-- | cli/ops/websocket.rs | 279 |
1 files changed, 125 insertions, 154 deletions
diff --git a/cli/ops/websocket.rs b/cli/ops/websocket.rs index 126d67861..131c52179 100644 --- a/cli/ops/websocket.rs +++ b/cli/ops/websocket.rs @@ -1,15 +1,16 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::state::State; use core::task::Poll; +use deno_core::BufVec; use deno_core::ErrBox; -use deno_core::ZeroCopyBuf; -use deno_core::{CoreIsolate, CoreIsolateState}; -use futures::future::{poll_fn, FutureExt}; +use deno_core::OpRegistry; +use futures::future::poll_fn; use futures::StreamExt; use futures::{ready, SinkExt}; use http::{Method, Request, Uri}; +use serde_derive::Deserialize; +use serde_json::Value; use std::borrow::Cow; use std::fs::File; use std::io::BufReader; @@ -25,11 +26,11 @@ use tokio_tungstenite::tungstenite::{ use tokio_tungstenite::{client_async, WebSocketStream}; use webpki::DNSNameRef; -pub fn init(i: &mut CoreIsolate, s: &Rc<State>) { - i.register_op("op_ws_create", s.stateful_json_op2(op_ws_create)); - i.register_op("op_ws_send", s.stateful_json_op2(op_ws_send)); - i.register_op("op_ws_close", s.stateful_json_op2(op_ws_close)); - i.register_op("op_ws_next_event", s.stateful_json_op2(op_ws_next_event)); +pub fn init(s: &Rc<State>) { + s.register_op_json_async("op_ws_create", op_ws_create); + s.register_op_json_async("op_ws_send", op_ws_send); + s.register_op_json_async("op_ws_close", op_ws_close); + s.register_op_json_async("op_ws_next_event", op_ws_next_event); } type MaybeTlsStream = @@ -44,86 +45,81 @@ struct CreateArgs { protocols: String, } -pub fn op_ws_create( - isolate_state: &mut CoreIsolateState, - state: &Rc<State>, +pub async fn op_ws_create( + state: Rc<State>, args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + _bufs: BufVec, +) -> Result<Value, ErrBox> { let args: CreateArgs = serde_json::from_value(args)?; state.check_net_url(&url::Url::parse(&args.url)?)?; - let resource_table = isolate_state.resource_table.clone(); let ca_file = state.global_state.flags.ca_file.clone(); - let future = async move { - let uri: Uri = args.url.parse().unwrap(); - let request = Request::builder() - .method(Method::GET) - .uri(&uri) - .header("Sec-WebSocket-Protocol", args.protocols) - .body(()) - .unwrap(); - let domain = &uri.host().unwrap().to_string(); - let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { - Some("wss") => 443, - Some("ws") => 80, - _ => unreachable!(), - }); - let addr = format!("{}:{}", domain, port); - let try_socket = TcpStream::connect(addr).await; - let tcp_socket = match try_socket.map_err(Error::Io) { - Ok(socket) => socket, - Err(_) => return Ok(json!({"success": false})), - }; + let uri: Uri = args.url.parse().unwrap(); + let request = Request::builder() + .method(Method::GET) + .uri(&uri) + .header("Sec-WebSocket-Protocol", args.protocols) + .body(()) + .unwrap(); + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { + Some("wss") => 443, + Some("ws") => 80, + _ => unreachable!(), + }); + let addr = format!("{}:{}", domain, port); + let try_socket = TcpStream::connect(addr).await; + let tcp_socket = match try_socket.map_err(Error::Io) { + Ok(socket) => socket, + Err(_) => return Ok(json!({"success": false})), + }; - let socket: MaybeTlsStream = match uri.scheme_str() { - Some("ws") => StreamSwitcher::Plain(tcp_socket), - Some("wss") => { - let mut config = ClientConfig::new(); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + let socket: MaybeTlsStream = match uri.scheme_str() { + Some("ws") => StreamSwitcher::Plain(tcp_socket), + Some("wss") => { + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - if let Some(path) = ca_file { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - config.root_store.add_pem_file(reader).unwrap(); - } - - let tls_connector = TlsConnector::from(Arc::new(config)); - let dnsname = - DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); - let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; - StreamSwitcher::Tls(tls_socket) + if let Some(path) = ca_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + config.root_store.add_pem_file(reader).unwrap(); } - _ => unreachable!(), - }; - let (stream, response): (WsStream, Response) = - client_async(request, socket).await.unwrap(); + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; + StreamSwitcher::Tls(tls_socket) + } + _ => unreachable!(), + }; + + let (stream, response): (WsStream, Response) = + client_async(request, socket).await.unwrap(); - let rid = { - let mut resource_table = resource_table.borrow_mut(); - resource_table.add("webSocketStream", Box::new(stream)) - }; + let rid = state + .resource_table + .borrow_mut() + .add("webSocketStream", Box::new(stream)); - let protocol = match response.headers().get("Sec-WebSocket-Protocol") { - Some(header) => header.to_str().unwrap(), - None => "", - }; - let extensions = response - .headers() - .get_all("Sec-WebSocket-Extensions") - .iter() - .map(|header| header.to_str().unwrap()) - .collect::<String>(); - Ok(json!({ + let protocol = match response.headers().get("Sec-WebSocket-Protocol") { + Some(header) => header.to_str().unwrap(), + None => "", + }; + let extensions = response + .headers() + .get_all("Sec-WebSocket-Extensions") + .iter() + .map(|header| header.to_str().unwrap()) + .collect::<String>(); + Ok(json!({ "success": true, "rid": rid, "protocol": protocol, "extensions": extensions - })) - }; - Ok(JsonOp::Async(future.boxed_local())) + })) } #[derive(Deserialize)] @@ -133,23 +129,21 @@ struct SendArgs { text: Option<String>, } -pub fn op_ws_send( - isolate_state: &mut CoreIsolateState, - _state: &Rc<State>, +pub async fn op_ws_send( + state: Rc<State>, args: Value, - zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + bufs: BufVec, +) -> Result<Value, ErrBox> { let args: SendArgs = serde_json::from_value(args)?; let mut maybe_msg = Some(match args.text { Some(text) => Message::Text(text), - None => Message::Binary(zero_copy[0].to_owned().to_vec()), + None => Message::Binary(bufs[0].to_vec()), }); - let resource_table = isolate_state.resource_table.clone(); let rid = args.rid; - let future = poll_fn(move |cx| { - let mut resource_table = resource_table.borrow_mut(); + poll_fn(move |cx| { + let mut resource_table = state.resource_table.borrow_mut(); let stream = resource_table .get_mut::<WsStream>(rid) .ok_or_else(ErrBox::bad_resource_id)?; @@ -164,8 +158,8 @@ pub fn op_ws_send( ready!(stream.poll_flush_unpin(cx)).unwrap(); Poll::Ready(Ok(json!({}))) - }); - Ok(JsonOp::Async(future.boxed_local())) + }) + .await } #[derive(Deserialize)] @@ -176,14 +170,12 @@ struct CloseArgs { reason: Option<String>, } -pub fn op_ws_close( - isolate_state: &mut CoreIsolateState, - _state: &Rc<State>, +pub async fn op_ws_close( + state: Rc<State>, args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + _bufs: BufVec, +) -> Result<Value, ErrBox> { let args: CloseArgs = serde_json::from_value(args)?; - let resource_table = isolate_state.resource_table.clone(); let rid = args.rid; let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame { code: CloseCode::from(c), @@ -193,8 +185,8 @@ pub fn op_ws_close( }, }))); - let future = poll_fn(move |cx| { - let mut resource_table = resource_table.borrow_mut(); + poll_fn(move |cx| { + let mut resource_table = state.resource_table.borrow_mut(); let stream = resource_table .get_mut::<WsStream>(rid) .ok_or_else(ErrBox::bad_resource_id)?; @@ -210,9 +202,8 @@ pub fn op_ws_close( ready!(stream.poll_close_unpin(cx)).unwrap(); Poll::Ready(Ok(json!({}))) - }); - - Ok(JsonOp::Async(future.boxed_local())) + }) + .await } #[derive(Deserialize)] @@ -221,68 +212,48 @@ struct NextEventArgs { rid: u32, } -pub fn op_ws_next_event( - isolate_state: &mut CoreIsolateState, - _state: &Rc<State>, +pub async fn op_ws_next_event( + state: Rc<State>, args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + _bufs: BufVec, +) -> Result<Value, ErrBox> { let args: NextEventArgs = serde_json::from_value(args)?; - let resource_table = isolate_state.resource_table.clone(); - let future = poll_fn(move |cx| { - let mut resource_table = resource_table.borrow_mut(); + poll_fn(move |cx| { + let mut resource_table = state.resource_table.borrow_mut(); let stream = resource_table .get_mut::<WsStream>(args.rid) .ok_or_else(ErrBox::bad_resource_id)?; - - stream.poll_next_unpin(cx).map(|val| { - match val { - Some(val) => { - match val { - Ok(message) => { - match message { - Message::Text(text) => Ok(json!({ - "type": "string", - "data": text - })), - Message::Binary(data) => { - Ok(json!({ //TODO: don't use json to send binary data - "type": "binary", - "data": data - })) - } - Message::Close(frame) => { - if let Some(frame) = frame { - let code: u16 = frame.code.into(); - Ok(json!({ - "type": "close", - "code": code, - "reason": frame.reason.as_ref() - })) - } else { - Ok(json!({ "type": "close" })) - } - } - Message::Ping(_) => Ok(json!({"type": "ping"})), - Message::Pong(_) => Ok(json!({"type": "pong"})), - } - } - Err(_) => Ok(json!({ - "type": "error", - })), + stream + .poll_next_unpin(cx) + .map(|val| { + match val { + Some(Ok(Message::Text(text))) => json!({ + "type": "string", + "data": text + }), + Some(Ok(Message::Binary(data))) => { + // TODO(ry): don't use json to send binary data. + json!({ + "type": "binary", + "data": data + }) + } + Some(Ok(Message::Close(Some(frame)))) => json!({ + "type": "close", + "code": u16::from(frame.code), + "reason": frame.reason.as_ref() + }), + Some(Ok(Message::Close(None))) => json!({ "type": "close" }), + Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), + Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), + Some(Err(_)) => json!({"type": "error"}), + None => { + resource_table.close(args.rid).unwrap(); + json!({"type": "closed"}) } } - None => { - resource_table - .close(args.rid) - .ok_or_else(ErrBox::bad_resource_id)?; - Ok(json!({ - "type": "closed", - })) - } - } - }) - }); - - Ok(JsonOp::Async(future.boxed_local())) + }) + .map(Ok) + }) + .await } |