diff options
Diffstat (limited to 'extensions/websocket')
-rw-r--r-- | extensions/websocket/01_websocket.js | 13 | ||||
-rw-r--r-- | extensions/websocket/Cargo.toml | 1 | ||||
-rw-r--r-- | extensions/websocket/lib.rs | 98 |
3 files changed, 92 insertions, 20 deletions
diff --git a/extensions/websocket/01_websocket.js b/extensions/websocket/01_websocket.js index f6e285b76..7caff579e 100644 --- a/extensions/websocket/01_websocket.js +++ b/extensions/websocket/01_websocket.js @@ -124,6 +124,7 @@ const _protocol = Symbol("[[protocol]]"); const _binaryType = Symbol("[[binaryType]]"); const _bufferedAmount = Symbol("[[bufferedAmount]]"); + const _eventLoop = Symbol("[[eventLoop]]"); class WebSocket extends EventTarget { [_rid]; @@ -294,7 +295,7 @@ const event = new Event("open"); this.dispatchEvent(event); - this.#eventLoop(); + this[_eventLoop](); } }, (err) => { @@ -427,7 +428,7 @@ } } - async #eventLoop() { + async [_eventLoop]() { while (this[_readyState] === OPEN) { const { kind, value } = await core.opAsync( "op_ws_next_event", @@ -518,5 +519,11 @@ webidl.configurePrototype(WebSocket); - window.__bootstrap.webSocket = { WebSocket }; + window.__bootstrap.webSocket = { + WebSocket, + _rid, + _readyState, + _eventLoop, + _protocol, + }; })(this); diff --git a/extensions/websocket/Cargo.toml b/extensions/websocket/Cargo.toml index 81b57d3f2..0ca38e773 100644 --- a/extensions/websocket/Cargo.toml +++ b/extensions/websocket/Cargo.toml @@ -20,5 +20,6 @@ serde = { version = "1.0.125", features = ["derive"] } tokio = { version = "1.8.0", features = ["full"] } tokio-rustls = "0.22.0" tokio-tungstenite = { version = "0.14.0", features = ["rustls-tls"] } +hyper = { version = "0.14.9" } webpki = "0.21.4" webpki-roots = "0.21.1" diff --git a/extensions/websocket/lib.rs b/extensions/websocket/lib.rs index c6752d23b..f5bf15c79 100644 --- a/extensions/websocket/lib.rs +++ b/extensions/websocket/lib.rs @@ -64,13 +64,81 @@ impl WebSocketPermissions for NoWebSocketPermissions { } type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>; -struct WsStreamResource { - tx: AsyncRefCell<SplitSink<WsStream, Message>>, - rx: AsyncRefCell<SplitStream<WsStream>>, +pub enum WebSocketStreamType { + Client { + tx: AsyncRefCell<SplitSink<WsStream, Message>>, + rx: AsyncRefCell<SplitStream<WsStream>>, + }, + Server { + tx: AsyncRefCell< + SplitSink<WebSocketStream<hyper::upgrade::Upgraded>, Message>, + >, + rx: AsyncRefCell<SplitStream<WebSocketStream<hyper::upgrade::Upgraded>>>, + }, +} + +pub struct WsStreamResource { + pub stream: WebSocketStreamType, // When a `WsStreamResource` resource is closed, all pending 'read' ops are // canceled, while 'write' ops are allowed to complete. Therefore only // 'read' futures are attached to this cancel handle. - cancel: CancelHandle, + pub cancel: CancelHandle, +} + +impl WsStreamResource { + async fn send(self: &Rc<Self>, message: Message) -> Result<(), AnyError> { + match self.stream { + WebSocketStreamType::Client { .. } => { + let mut tx = RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { tx, .. } => tx, + WebSocketStreamType::Server { .. } => unreachable!(), + }) + .borrow_mut() + .await; + tx.send(message).await?; + } + WebSocketStreamType::Server { .. } => { + let mut tx = RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { .. } => unreachable!(), + WebSocketStreamType::Server { tx, .. } => tx, + }) + .borrow_mut() + .await; + tx.send(message).await?; + } + } + + Ok(()) + } + + async fn next_message( + self: &Rc<Self>, + cancel: RcRef<CancelHandle>, + ) -> Result< + Option<Result<Message, tokio_tungstenite::tungstenite::Error>>, + AnyError, + > { + match &self.stream { + WebSocketStreamType::Client { .. } => { + let mut rx = RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { rx, .. } => rx, + WebSocketStreamType::Server { .. } => unreachable!(), + }) + .borrow_mut() + .await; + rx.next().or_cancel(cancel).await.map_err(AnyError::from) + } + WebSocketStreamType::Server { .. } => { + let mut rx = RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { .. } => unreachable!(), + WebSocketStreamType::Server { rx, .. } => rx, + }) + .borrow_mut() + .await; + rx.next().or_cancel(cancel).await.map_err(AnyError::from) + } + } + } } impl Resource for WsStreamResource { @@ -79,8 +147,6 @@ impl Resource for WsStreamResource { } } -impl WsStreamResource {} - // This op is needed because creating a WS instance in JavaScript is a sync // operation and should throw error when permissions are not fulfilled, // but actual op that connects WS is async. @@ -184,8 +250,10 @@ where let (ws_tx, ws_rx) = stream.split(); let resource = WsStreamResource { - rx: AsyncRefCell::new(ws_rx), - tx: AsyncRefCell::new(ws_tx), + stream: WebSocketStreamType::Client { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + }, cancel: Default::default(), }; let mut state = state.borrow_mut(); @@ -227,15 +295,13 @@ pub async fn op_ws_send( "pong" => Message::Pong(vec![]), _ => unreachable!(), }; - let rid = args.rid; let resource = state .borrow_mut() .resource_table - .get::<WsStreamResource>(rid) + .get::<WsStreamResource>(args.rid) .ok_or_else(bad_resource_id)?; - let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; - tx.send(msg).await?; + resource.send(msg).await?; Ok(()) } @@ -266,8 +332,7 @@ pub async fn op_ws_close( .resource_table .get::<WsStreamResource>(rid) .ok_or_else(bad_resource_id)?; - let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; - tx.send(msg).await?; + resource.send(msg).await?; Ok(()) } @@ -294,9 +359,8 @@ pub async fn op_ws_next_event( .get::<WsStreamResource>(rid) .ok_or_else(bad_resource_id)?; - let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let val = rx.next().or_cancel(cancel).await?; + let cancel = RcRef::map(&resource, |r| &r.cancel); + let val = resource.next_message(cancel).await?; let res = match val { Some(Ok(Message::Text(text))) => NextEventResponse::String(text), Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()), |