diff options
Diffstat (limited to 'runtime/ops/websocket.rs')
-rw-r--r-- | runtime/ops/websocket.rs | 173 |
1 files changed, 88 insertions, 85 deletions
diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs index a8c591a33..d805f307b 100644 --- a/runtime/ops/websocket.rs +++ b/runtime/ops/websocket.rs @@ -1,18 +1,23 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::permissions::Permissions; -use core::task::Poll; use deno_core::error::bad_resource_id; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; +use deno_core::futures::stream::SplitSink; +use deno_core::futures::stream::SplitStream; +use deno_core::futures::SinkExt; use deno_core::futures::StreamExt; -use deno_core::futures::{ready, SinkExt}; use deno_core::serde_json::json; use deno_core::serde_json::Value; use deno_core::url; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::{serde_json, ZeroCopyBuf}; use http::{Method, Request, Uri}; use serde::Deserialize; @@ -62,6 +67,22 @@ type MaybeTlsStream = StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>; type WsStream = WebSocketStream<MaybeTlsStream>; +struct WsStreamResource { + tx: AsyncRefCell<SplitSink<WsStream, Message>>, + rx: AsyncRefCell<SplitStream<WsStream>>, + // When a `WsStreamResource` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + cancel: CancelHandle, +} + +impl Resource for WsStreamResource { + fn name(&self) -> Cow<str> { + "webSocketStream".into() + } +} + +impl WsStreamResource {} #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -165,10 +186,14 @@ pub async fn op_ws_create( )) })?; + let (ws_tx, ws_rx) = stream.split(); + let resource = WsStreamResource { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + cancel: Default::default(), + }; let mut state = state.borrow_mut(); - let rid = state - .resource_table - .add("webSocketStream", Box::new(stream)); + let rid = state.resource_table.add(resource); let protocol = match response.headers().get("Sec-WebSocket-Protocol") { Some(header) => header.to_str().unwrap(), @@ -202,30 +227,21 @@ pub async fn op_ws_send( ) -> Result<Value, AnyError> { let args: SendArgs = serde_json::from_value(args)?; - let mut maybe_msg = Some(match args.text { + let msg = match args.text { Some(text) => Message::Text(text), None => Message::Binary(bufs[0].to_vec()), - }); + }; let rid = args.rid; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(rid) - .ok_or_else(bad_resource_id)?; - - // TODO(ry) Handle errors below instead of unwrap. - // Need to map `TungsteniteError` to `AnyError`. - ready!(stream.poll_ready_unpin(cx)).unwrap(); - if let Some(msg) = maybe_msg.take() { - stream.start_send_unpin(msg).unwrap(); - } - ready!(stream.poll_flush_unpin(cx)).unwrap(); - - Poll::Ready(Ok(json!({}))) - }) - .await + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + eprintln!("sent!"); + Ok(json!({})) } #[derive(Deserialize)] @@ -243,33 +259,22 @@ pub async fn op_ws_close( ) -> Result<Value, AnyError> { let args: CloseArgs = serde_json::from_value(args)?; let rid = args.rid; - let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame { + let msg = Message::Close(args.code.map(|c| CloseFrame { code: CloseCode::from(c), reason: match args.reason { Some(reason) => Cow::from(reason), None => Default::default(), }, - }))); - - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(rid) - .ok_or_else(bad_resource_id)?; - - // TODO(ry) Handle errors below instead of unwrap. - // Need to map `TungsteniteError` to `AnyError`. - ready!(stream.poll_ready_unpin(cx)).unwrap(); - if let Some(msg) = maybe_msg.take() { - stream.start_send_unpin(msg).unwrap(); - } - ready!(stream.poll_flush_unpin(cx)).unwrap(); - ready!(stream.poll_close_unpin(cx)).unwrap(); + })); - Poll::Ready(Ok(json!({}))) - }) - .await + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + Ok(json!({})) } #[derive(Deserialize)] @@ -284,43 +289,41 @@ pub async fn op_ws_next_event( _bufs: BufVec, ) -> Result<Value, AnyError> { let args: NextEventArgs = serde_json::from_value(args)?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(args.rid) - .ok_or_else(bad_resource_id)?; - stream - .poll_next_unpin(cx) - .map(|val| { - match val { - Some(Ok(Message::Text(text))) => json!({ - "type": "string", - "data": text - }), - Some(Ok(Message::Binary(data))) => { - // TODO(ry): don't use json to send binary data. - json!({ - "type": "binary", - "data": data - }) - } - Some(Ok(Message::Close(Some(frame)))) => json!({ - "type": "close", - "code": u16::from(frame.code), - "reason": frame.reason.as_ref() - }), - Some(Ok(Message::Close(None))) => json!({ "type": "close" }), - Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), - Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), - Some(Err(_)) => json!({"type": "error"}), - None => { - state.resource_table.close(args.rid).unwrap(); - json!({"type": "closed"}) - } - } + + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(args.rid) + .ok_or_else(bad_resource_id)?; + + let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let val = rx.next().or_cancel(cancel).await?; + let res = match val { + Some(Ok(Message::Text(text))) => json!({ + "type": "string", + "data": text + }), + Some(Ok(Message::Binary(data))) => { + // TODO(ry): don't use json to send binary data. + json!({ + "type": "binary", + "data": data }) - .map(Ok) - }) - .await + } + Some(Ok(Message::Close(Some(frame)))) => json!({ + "type": "close", + "code": u16::from(frame.code), + "reason": frame.reason.as_ref() + }), + Some(Ok(Message::Close(None))) => json!({ "type": "close" }), + Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), + Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), + Some(Err(_)) => json!({"type": "error"}), + None => { + state.borrow_mut().resource_table.close(args.rid).unwrap(); + json!({"type": "closed"}) + } + }; + Ok(res) } |