summaryrefslogtreecommitdiff
path: root/cli/ops/websocket.rs
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2020-09-06 02:34:02 +0200
committerGitHub <noreply@github.com>2020-09-06 02:34:02 +0200
commitc821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 (patch)
treec429a3c2707a4047fb512443a8468b7e15e5730d /cli/ops/websocket.rs
parent849431eb1d112d1f79f4a327830dc1a5bf22dd47 (diff)
Move JSON ops to deno_core (#7336)
Diffstat (limited to 'cli/ops/websocket.rs')
-rw-r--r--cli/ops/websocket.rs279
1 files changed, 125 insertions, 154 deletions
diff --git a/cli/ops/websocket.rs b/cli/ops/websocket.rs
index 126d67861..131c52179 100644
--- a/cli/ops/websocket.rs
+++ b/cli/ops/websocket.rs
@@ -1,15 +1,16 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::state::State;
use core::task::Poll;
+use deno_core::BufVec;
use deno_core::ErrBox;
-use deno_core::ZeroCopyBuf;
-use deno_core::{CoreIsolate, CoreIsolateState};
-use futures::future::{poll_fn, FutureExt};
+use deno_core::OpRegistry;
+use futures::future::poll_fn;
use futures::StreamExt;
use futures::{ready, SinkExt};
use http::{Method, Request, Uri};
+use serde_derive::Deserialize;
+use serde_json::Value;
use std::borrow::Cow;
use std::fs::File;
use std::io::BufReader;
@@ -25,11 +26,11 @@ use tokio_tungstenite::tungstenite::{
use tokio_tungstenite::{client_async, WebSocketStream};
use webpki::DNSNameRef;
-pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
- i.register_op("op_ws_create", s.stateful_json_op2(op_ws_create));
- i.register_op("op_ws_send", s.stateful_json_op2(op_ws_send));
- i.register_op("op_ws_close", s.stateful_json_op2(op_ws_close));
- i.register_op("op_ws_next_event", s.stateful_json_op2(op_ws_next_event));
+pub fn init(s: &Rc<State>) {
+ s.register_op_json_async("op_ws_create", op_ws_create);
+ s.register_op_json_async("op_ws_send", op_ws_send);
+ s.register_op_json_async("op_ws_close", op_ws_close);
+ s.register_op_json_async("op_ws_next_event", op_ws_next_event);
}
type MaybeTlsStream =
@@ -44,86 +45,81 @@ struct CreateArgs {
protocols: String,
}
-pub fn op_ws_create(
- isolate_state: &mut CoreIsolateState,
- state: &Rc<State>,
+pub async fn op_ws_create(
+ state: Rc<State>,
args: Value,
- _zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ _bufs: BufVec,
+) -> Result<Value, ErrBox> {
let args: CreateArgs = serde_json::from_value(args)?;
state.check_net_url(&url::Url::parse(&args.url)?)?;
- let resource_table = isolate_state.resource_table.clone();
let ca_file = state.global_state.flags.ca_file.clone();
- let future = async move {
- let uri: Uri = args.url.parse().unwrap();
- let request = Request::builder()
- .method(Method::GET)
- .uri(&uri)
- .header("Sec-WebSocket-Protocol", args.protocols)
- .body(())
- .unwrap();
- let domain = &uri.host().unwrap().to_string();
- let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
- Some("wss") => 443,
- Some("ws") => 80,
- _ => unreachable!(),
- });
- let addr = format!("{}:{}", domain, port);
- let try_socket = TcpStream::connect(addr).await;
- let tcp_socket = match try_socket.map_err(Error::Io) {
- Ok(socket) => socket,
- Err(_) => return Ok(json!({"success": false})),
- };
+ let uri: Uri = args.url.parse().unwrap();
+ let request = Request::builder()
+ .method(Method::GET)
+ .uri(&uri)
+ .header("Sec-WebSocket-Protocol", args.protocols)
+ .body(())
+ .unwrap();
+ let domain = &uri.host().unwrap().to_string();
+ let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
+ Some("wss") => 443,
+ Some("ws") => 80,
+ _ => unreachable!(),
+ });
+ let addr = format!("{}:{}", domain, port);
+ let try_socket = TcpStream::connect(addr).await;
+ let tcp_socket = match try_socket.map_err(Error::Io) {
+ Ok(socket) => socket,
+ Err(_) => return Ok(json!({"success": false})),
+ };
- let socket: MaybeTlsStream = match uri.scheme_str() {
- Some("ws") => StreamSwitcher::Plain(tcp_socket),
- Some("wss") => {
- let mut config = ClientConfig::new();
- config
- .root_store
- .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ let socket: MaybeTlsStream = match uri.scheme_str() {
+ Some("ws") => StreamSwitcher::Plain(tcp_socket),
+ Some("wss") => {
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
- if let Some(path) = ca_file {
- let key_file = File::open(path)?;
- let reader = &mut BufReader::new(key_file);
- config.root_store.add_pem_file(reader).unwrap();
- }
-
- let tls_connector = TlsConnector::from(Arc::new(config));
- let dnsname =
- DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
- let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?;
- StreamSwitcher::Tls(tls_socket)
+ if let Some(path) = ca_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
}
- _ => unreachable!(),
- };
- let (stream, response): (WsStream, Response) =
- client_async(request, socket).await.unwrap();
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?;
+ StreamSwitcher::Tls(tls_socket)
+ }
+ _ => unreachable!(),
+ };
+
+ let (stream, response): (WsStream, Response) =
+ client_async(request, socket).await.unwrap();
- let rid = {
- let mut resource_table = resource_table.borrow_mut();
- resource_table.add("webSocketStream", Box::new(stream))
- };
+ let rid = state
+ .resource_table
+ .borrow_mut()
+ .add("webSocketStream", Box::new(stream));
- let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
- Some(header) => header.to_str().unwrap(),
- None => "",
- };
- let extensions = response
- .headers()
- .get_all("Sec-WebSocket-Extensions")
- .iter()
- .map(|header| header.to_str().unwrap())
- .collect::<String>();
- Ok(json!({
+ let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
+ Some(header) => header.to_str().unwrap(),
+ None => "",
+ };
+ let extensions = response
+ .headers()
+ .get_all("Sec-WebSocket-Extensions")
+ .iter()
+ .map(|header| header.to_str().unwrap())
+ .collect::<String>();
+ Ok(json!({
"success": true,
"rid": rid,
"protocol": protocol,
"extensions": extensions
- }))
- };
- Ok(JsonOp::Async(future.boxed_local()))
+ }))
}
#[derive(Deserialize)]
@@ -133,23 +129,21 @@ struct SendArgs {
text: Option<String>,
}
-pub fn op_ws_send(
- isolate_state: &mut CoreIsolateState,
- _state: &Rc<State>,
+pub async fn op_ws_send(
+ state: Rc<State>,
args: Value,
- zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ bufs: BufVec,
+) -> Result<Value, ErrBox> {
let args: SendArgs = serde_json::from_value(args)?;
let mut maybe_msg = Some(match args.text {
Some(text) => Message::Text(text),
- None => Message::Binary(zero_copy[0].to_owned().to_vec()),
+ None => Message::Binary(bufs[0].to_vec()),
});
- let resource_table = isolate_state.resource_table.clone();
let rid = args.rid;
- let future = poll_fn(move |cx| {
- let mut resource_table = resource_table.borrow_mut();
+ poll_fn(move |cx| {
+ let mut resource_table = state.resource_table.borrow_mut();
let stream = resource_table
.get_mut::<WsStream>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
@@ -164,8 +158,8 @@ pub fn op_ws_send(
ready!(stream.poll_flush_unpin(cx)).unwrap();
Poll::Ready(Ok(json!({})))
- });
- Ok(JsonOp::Async(future.boxed_local()))
+ })
+ .await
}
#[derive(Deserialize)]
@@ -176,14 +170,12 @@ struct CloseArgs {
reason: Option<String>,
}
-pub fn op_ws_close(
- isolate_state: &mut CoreIsolateState,
- _state: &Rc<State>,
+pub async fn op_ws_close(
+ state: Rc<State>,
args: Value,
- _zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ _bufs: BufVec,
+) -> Result<Value, ErrBox> {
let args: CloseArgs = serde_json::from_value(args)?;
- let resource_table = isolate_state.resource_table.clone();
let rid = args.rid;
let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame {
code: CloseCode::from(c),
@@ -193,8 +185,8 @@ pub fn op_ws_close(
},
})));
- let future = poll_fn(move |cx| {
- let mut resource_table = resource_table.borrow_mut();
+ poll_fn(move |cx| {
+ let mut resource_table = state.resource_table.borrow_mut();
let stream = resource_table
.get_mut::<WsStream>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
@@ -210,9 +202,8 @@ pub fn op_ws_close(
ready!(stream.poll_close_unpin(cx)).unwrap();
Poll::Ready(Ok(json!({})))
- });
-
- Ok(JsonOp::Async(future.boxed_local()))
+ })
+ .await
}
#[derive(Deserialize)]
@@ -221,68 +212,48 @@ struct NextEventArgs {
rid: u32,
}
-pub fn op_ws_next_event(
- isolate_state: &mut CoreIsolateState,
- _state: &Rc<State>,
+pub async fn op_ws_next_event(
+ state: Rc<State>,
args: Value,
- _zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ _bufs: BufVec,
+) -> Result<Value, ErrBox> {
let args: NextEventArgs = serde_json::from_value(args)?;
- let resource_table = isolate_state.resource_table.clone();
- let future = poll_fn(move |cx| {
- let mut resource_table = resource_table.borrow_mut();
+ poll_fn(move |cx| {
+ let mut resource_table = state.resource_table.borrow_mut();
let stream = resource_table
.get_mut::<WsStream>(args.rid)
.ok_or_else(ErrBox::bad_resource_id)?;
-
- stream.poll_next_unpin(cx).map(|val| {
- match val {
- Some(val) => {
- match val {
- Ok(message) => {
- match message {
- Message::Text(text) => Ok(json!({
- "type": "string",
- "data": text
- })),
- Message::Binary(data) => {
- Ok(json!({ //TODO: don't use json to send binary data
- "type": "binary",
- "data": data
- }))
- }
- Message::Close(frame) => {
- if let Some(frame) = frame {
- let code: u16 = frame.code.into();
- Ok(json!({
- "type": "close",
- "code": code,
- "reason": frame.reason.as_ref()
- }))
- } else {
- Ok(json!({ "type": "close" }))
- }
- }
- Message::Ping(_) => Ok(json!({"type": "ping"})),
- Message::Pong(_) => Ok(json!({"type": "pong"})),
- }
- }
- Err(_) => Ok(json!({
- "type": "error",
- })),
+ stream
+ .poll_next_unpin(cx)
+ .map(|val| {
+ match val {
+ Some(Ok(Message::Text(text))) => json!({
+ "type": "string",
+ "data": text
+ }),
+ Some(Ok(Message::Binary(data))) => {
+ // TODO(ry): don't use json to send binary data.
+ json!({
+ "type": "binary",
+ "data": data
+ })
+ }
+ Some(Ok(Message::Close(Some(frame)))) => json!({
+ "type": "close",
+ "code": u16::from(frame.code),
+ "reason": frame.reason.as_ref()
+ }),
+ Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
+ Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
+ Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
+ Some(Err(_)) => json!({"type": "error"}),
+ None => {
+ resource_table.close(args.rid).unwrap();
+ json!({"type": "closed"})
}
}
- None => {
- resource_table
- .close(args.rid)
- .ok_or_else(ErrBox::bad_resource_id)?;
- Ok(json!({
- "type": "closed",
- }))
- }
- }
- })
- });
-
- Ok(JsonOp::Async(future.boxed_local()))
+ })
+ .map(Ok)
+ })
+ .await
}