diff options
Diffstat (limited to 'ext/websocket/stream.rs')
-rw-r--r-- | ext/websocket/stream.rs | 63 |
1 files changed, 63 insertions, 0 deletions
diff --git a/ext/websocket/stream.rs b/ext/websocket/stream.rs index 6f93406f6..7e36c8147 100644 --- a/ext/websocket/stream.rs +++ b/ext/websocket/stream.rs @@ -2,8 +2,12 @@ use bytes::Buf; use bytes::Bytes; use deno_net::raw::NetworkStream; +use h2::RecvStream; +use h2::SendStream; use hyper::upgrade::Upgraded; +use std::io::ErrorKind; use std::pin::Pin; +use std::task::ready; use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; @@ -13,6 +17,7 @@ use tokio::io::ReadBuf; pub(crate) enum WsStreamKind { Upgraded(Upgraded), Network(NetworkStream), + H2(SendStream<Bytes>, RecvStream), } pub(crate) struct WebSocketStream { @@ -54,6 +59,27 @@ impl AsyncRead for WebSocketStream { match &mut self.stream { WsStreamKind::Network(stream) => Pin::new(stream).poll_read(cx, buf), WsStreamKind::Upgraded(stream) => Pin::new(stream).poll_read(cx, buf), + WsStreamKind::H2(_, recv) => { + let data = ready!(recv.poll_data(cx)); + let Some(data) = data else { + // EOF + return Poll::Ready(Ok(())); + }; + let mut data = data.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::InvalidData, e) + })?; + recv.flow_control().release_capacity(data.len()).unwrap(); + // This looks like the prefix code above -- can we share this? + let copy_len = std::cmp::min(data.len(), buf.remaining()); + // TODO: There should be a way to do following two lines cleaner... + buf.put_slice(&data[..copy_len]); + data.advance(copy_len); + // Put back what's left + if !data.is_empty() { + self.pre = Some(data); + } + Poll::Ready(Ok(())) + } } } } @@ -67,6 +93,30 @@ impl AsyncWrite for WebSocketStream { match &mut self.stream { WsStreamKind::Network(stream) => Pin::new(stream).poll_write(cx, buf), WsStreamKind::Upgraded(stream) => Pin::new(stream).poll_write(cx, buf), + WsStreamKind::H2(send, _) => { + // Zero-length write succeeds + if buf.is_empty() { + return Poll::Ready(Ok(0)); + } + + send.reserve_capacity(buf.len()); + let res = ready!(send.poll_capacity(cx)); + + // TODO(mmastrac): the documentation is not entirely clear what to do here, so we'll continue + _ = res; + + // We'll try to send whatever we have capacity for + let size = std::cmp::min(buf.len(), send.capacity()); + assert!(size > 0); + + let buf: Bytes = Bytes::copy_from_slice(&buf[0..size]); + let len = buf.len(); + // TODO(mmastrac): surface the h2 error? + let res = send + .send_data(buf, false) + .map_err(|_| std::io::Error::from(ErrorKind::Other)); + Poll::Ready(res.map(|_| len)) + } } } @@ -77,6 +127,7 @@ impl AsyncWrite for WebSocketStream { match &mut self.stream { WsStreamKind::Network(stream) => Pin::new(stream).poll_flush(cx), WsStreamKind::Upgraded(stream) => Pin::new(stream).poll_flush(cx), + WsStreamKind::H2(..) => Poll::Ready(Ok(())), } } @@ -87,6 +138,13 @@ impl AsyncWrite for WebSocketStream { match &mut self.stream { WsStreamKind::Network(stream) => Pin::new(stream).poll_shutdown(cx), WsStreamKind::Upgraded(stream) => Pin::new(stream).poll_shutdown(cx), + WsStreamKind::H2(send, _) => { + // TODO(mmastrac): surface the h2 error? + let res = send + .send_data(Bytes::new(), false) + .map_err(|_| std::io::Error::from(ErrorKind::Other)); + Poll::Ready(res) + } } } @@ -94,6 +152,7 @@ impl AsyncWrite for WebSocketStream { match &self.stream { WsStreamKind::Network(stream) => stream.is_write_vectored(), WsStreamKind::Upgraded(stream) => stream.is_write_vectored(), + WsStreamKind::H2(..) => false, } } @@ -109,6 +168,10 @@ impl AsyncWrite for WebSocketStream { WsStreamKind::Upgraded(stream) => { Pin::new(stream).poll_write_vectored(cx, bufs) } + WsStreamKind::H2(..) => { + // TODO(mmastrac): this is possibly just too difficult, but we'll never call it + unimplemented!() + } } } } |