diff options
| author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-12-16 17:14:12 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-12-16 17:14:12 +0100 |
| commit | 6984b63f2f3c8d0819fe2dced8252a81f3400ae7 (patch) | |
| tree | 5201bc962f913927409ae2770aca48ffa3aaaa34 /runtime/ops/websocket.rs | |
| parent | 9fe26f8ca189ac81d9c20c454b9dbfa5e1011c3f (diff) | |
refactor: rewrite ops to use ResourceTable2 (#8512)
This commit migrates all ops to use new resource table
and "AsyncRefCell".
Old implementation of resource table was completely
removed and all code referencing it was updated to use
new system.
Diffstat (limited to 'runtime/ops/websocket.rs')
| -rw-r--r-- | runtime/ops/websocket.rs | 173 |
1 files changed, 88 insertions, 85 deletions
diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs index a8c591a33..d805f307b 100644 --- a/runtime/ops/websocket.rs +++ b/runtime/ops/websocket.rs @@ -1,18 +1,23 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::permissions::Permissions; -use core::task::Poll; use deno_core::error::bad_resource_id; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; +use deno_core::futures::stream::SplitSink; +use deno_core::futures::stream::SplitStream; +use deno_core::futures::SinkExt; use deno_core::futures::StreamExt; -use deno_core::futures::{ready, SinkExt}; use deno_core::serde_json::json; use deno_core::serde_json::Value; use deno_core::url; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::{serde_json, ZeroCopyBuf}; use http::{Method, Request, Uri}; use serde::Deserialize; @@ -62,6 +67,22 @@ type MaybeTlsStream = StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>; type WsStream = WebSocketStream<MaybeTlsStream>; +struct WsStreamResource { + tx: AsyncRefCell<SplitSink<WsStream, Message>>, + rx: AsyncRefCell<SplitStream<WsStream>>, + // When a `WsStreamResource` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + cancel: CancelHandle, +} + +impl Resource for WsStreamResource { + fn name(&self) -> Cow<str> { + "webSocketStream".into() + } +} + +impl WsStreamResource {} #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -165,10 +186,14 @@ pub async fn op_ws_create( )) })?; + let (ws_tx, ws_rx) = stream.split(); + let resource = WsStreamResource { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + cancel: Default::default(), + }; let mut state = state.borrow_mut(); - let rid = state - .resource_table - .add("webSocketStream", Box::new(stream)); + let rid = state.resource_table.add(resource); let protocol = match response.headers().get("Sec-WebSocket-Protocol") { Some(header) => header.to_str().unwrap(), @@ -202,30 +227,21 @@ pub async fn op_ws_send( ) -> Result<Value, AnyError> { let args: SendArgs = serde_json::from_value(args)?; - let mut maybe_msg = Some(match args.text { + let msg = match args.text { Some(text) => Message::Text(text), None => Message::Binary(bufs[0].to_vec()), - }); + }; let rid = args.rid; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(rid) - .ok_or_else(bad_resource_id)?; - - // TODO(ry) Handle errors below instead of unwrap. - // Need to map `TungsteniteError` to `AnyError`. - ready!(stream.poll_ready_unpin(cx)).unwrap(); - if let Some(msg) = maybe_msg.take() { - stream.start_send_unpin(msg).unwrap(); - } - ready!(stream.poll_flush_unpin(cx)).unwrap(); - - Poll::Ready(Ok(json!({}))) - }) - .await + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + eprintln!("sent!"); + Ok(json!({})) } #[derive(Deserialize)] @@ -243,33 +259,22 @@ pub async fn op_ws_close( ) -> Result<Value, AnyError> { let args: CloseArgs = serde_json::from_value(args)?; let rid = args.rid; - let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame { + let msg = Message::Close(args.code.map(|c| CloseFrame { code: CloseCode::from(c), reason: match args.reason { Some(reason) => Cow::from(reason), None => Default::default(), }, - }))); - - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(rid) - .ok_or_else(bad_resource_id)?; - - // TODO(ry) Handle errors below instead of unwrap. - // Need to map `TungsteniteError` to `AnyError`. - ready!(stream.poll_ready_unpin(cx)).unwrap(); - if let Some(msg) = maybe_msg.take() { - stream.start_send_unpin(msg).unwrap(); - } - ready!(stream.poll_flush_unpin(cx)).unwrap(); - ready!(stream.poll_close_unpin(cx)).unwrap(); + })); - Poll::Ready(Ok(json!({}))) - }) - .await + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + Ok(json!({})) } #[derive(Deserialize)] @@ -284,43 +289,41 @@ pub async fn op_ws_next_event( _bufs: BufVec, ) -> Result<Value, AnyError> { let args: NextEventArgs = serde_json::from_value(args)?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(args.rid) - .ok_or_else(bad_resource_id)?; - stream - .poll_next_unpin(cx) - .map(|val| { - match val { - Some(Ok(Message::Text(text))) => json!({ - "type": "string", - "data": text - }), - Some(Ok(Message::Binary(data))) => { - // TODO(ry): don't use json to send binary data. - json!({ - "type": "binary", - "data": data - }) - } - Some(Ok(Message::Close(Some(frame)))) => json!({ - "type": "close", - "code": u16::from(frame.code), - "reason": frame.reason.as_ref() - }), - Some(Ok(Message::Close(None))) => json!({ "type": "close" }), - Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), - Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), - Some(Err(_)) => json!({"type": "error"}), - None => { - state.resource_table.close(args.rid).unwrap(); - json!({"type": "closed"}) - } - } + + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(args.rid) + .ok_or_else(bad_resource_id)?; + + let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let val = rx.next().or_cancel(cancel).await?; + let res = match val { + Some(Ok(Message::Text(text))) => json!({ + "type": "string", + "data": text + }), + Some(Ok(Message::Binary(data))) => { + // TODO(ry): don't use json to send binary data. + json!({ + "type": "binary", + "data": data }) - .map(Ok) - }) - .await + } + Some(Ok(Message::Close(Some(frame)))) => json!({ + "type": "close", + "code": u16::from(frame.code), + "reason": frame.reason.as_ref() + }), + Some(Ok(Message::Close(None))) => json!({ "type": "close" }), + Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), + Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), + Some(Err(_)) => json!({"type": "error"}), + None => { + state.borrow_mut().resource_table.close(args.rid).unwrap(); + json!({"type": "closed"}) + } + }; + Ok(res) } |
