summaryrefslogtreecommitdiff
path: root/ext/websocket/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/websocket/stream.rs')
-rw-r--r--ext/websocket/stream.rs63
1 files changed, 63 insertions, 0 deletions
diff --git a/ext/websocket/stream.rs b/ext/websocket/stream.rs
index 6f93406f6..7e36c8147 100644
--- a/ext/websocket/stream.rs
+++ b/ext/websocket/stream.rs
@@ -2,8 +2,12 @@
use bytes::Buf;
use bytes::Bytes;
use deno_net::raw::NetworkStream;
+use h2::RecvStream;
+use h2::SendStream;
use hyper::upgrade::Upgraded;
+use std::io::ErrorKind;
use std::pin::Pin;
+use std::task::ready;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
@@ -13,6 +17,7 @@ use tokio::io::ReadBuf;
pub(crate) enum WsStreamKind {
Upgraded(Upgraded),
Network(NetworkStream),
+ H2(SendStream<Bytes>, RecvStream),
}
pub(crate) struct WebSocketStream {
@@ -54,6 +59,27 @@ impl AsyncRead for WebSocketStream {
match &mut self.stream {
WsStreamKind::Network(stream) => Pin::new(stream).poll_read(cx, buf),
WsStreamKind::Upgraded(stream) => Pin::new(stream).poll_read(cx, buf),
+ WsStreamKind::H2(_, recv) => {
+ let data = ready!(recv.poll_data(cx));
+ let Some(data) = data else {
+ // EOF
+ return Poll::Ready(Ok(()));
+ };
+ let mut data = data.map_err(|e| {
+ std::io::Error::new(std::io::ErrorKind::InvalidData, e)
+ })?;
+ recv.flow_control().release_capacity(data.len()).unwrap();
+ // This looks like the prefix code above -- can we share this?
+ let copy_len = std::cmp::min(data.len(), buf.remaining());
+ // TODO: There should be a way to do following two lines cleaner...
+ buf.put_slice(&data[..copy_len]);
+ data.advance(copy_len);
+ // Put back what's left
+ if !data.is_empty() {
+ self.pre = Some(data);
+ }
+ Poll::Ready(Ok(()))
+ }
}
}
}
@@ -67,6 +93,30 @@ impl AsyncWrite for WebSocketStream {
match &mut self.stream {
WsStreamKind::Network(stream) => Pin::new(stream).poll_write(cx, buf),
WsStreamKind::Upgraded(stream) => Pin::new(stream).poll_write(cx, buf),
+ WsStreamKind::H2(send, _) => {
+ // Zero-length write succeeds
+ if buf.is_empty() {
+ return Poll::Ready(Ok(0));
+ }
+
+ send.reserve_capacity(buf.len());
+ let res = ready!(send.poll_capacity(cx));
+
+ // TODO(mmastrac): the documentation is not entirely clear what to do here, so we'll continue
+ _ = res;
+
+ // We'll try to send whatever we have capacity for
+ let size = std::cmp::min(buf.len(), send.capacity());
+ assert!(size > 0);
+
+ let buf: Bytes = Bytes::copy_from_slice(&buf[0..size]);
+ let len = buf.len();
+ // TODO(mmastrac): surface the h2 error?
+ let res = send
+ .send_data(buf, false)
+ .map_err(|_| std::io::Error::from(ErrorKind::Other));
+ Poll::Ready(res.map(|_| len))
+ }
}
}
@@ -77,6 +127,7 @@ impl AsyncWrite for WebSocketStream {
match &mut self.stream {
WsStreamKind::Network(stream) => Pin::new(stream).poll_flush(cx),
WsStreamKind::Upgraded(stream) => Pin::new(stream).poll_flush(cx),
+ WsStreamKind::H2(..) => Poll::Ready(Ok(())),
}
}
@@ -87,6 +138,13 @@ impl AsyncWrite for WebSocketStream {
match &mut self.stream {
WsStreamKind::Network(stream) => Pin::new(stream).poll_shutdown(cx),
WsStreamKind::Upgraded(stream) => Pin::new(stream).poll_shutdown(cx),
+ WsStreamKind::H2(send, _) => {
+ // TODO(mmastrac): surface the h2 error?
+ let res = send
+ .send_data(Bytes::new(), false)
+ .map_err(|_| std::io::Error::from(ErrorKind::Other));
+ Poll::Ready(res)
+ }
}
}
@@ -94,6 +152,7 @@ impl AsyncWrite for WebSocketStream {
match &self.stream {
WsStreamKind::Network(stream) => stream.is_write_vectored(),
WsStreamKind::Upgraded(stream) => stream.is_write_vectored(),
+ WsStreamKind::H2(..) => false,
}
}
@@ -109,6 +168,10 @@ impl AsyncWrite for WebSocketStream {
WsStreamKind::Upgraded(stream) => {
Pin::new(stream).poll_write_vectored(cx, bufs)
}
+ WsStreamKind::H2(..) => {
+ // TODO(mmastrac): this is possibly just too difficult, but we'll never call it
+ unimplemented!()
+ }
}
}
}