summaryrefslogtreecommitdiff
path: root/ext/websocket
diff options
context:
space:
mode:
Diffstat (limited to 'ext/websocket')
-rw-r--r--ext/websocket/lib.rs23
1 files changed, 14 insertions, 9 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 0e642be3f..515f798ac 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -34,8 +34,11 @@ use std::cell::RefCell;
use std::convert::TryFrom;
use std::fmt;
use std::path::PathBuf;
+use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncWrite;
use tokio::net::TcpStream;
use tokio_rustls::rustls::RootCertStore;
use tokio_rustls::rustls::ServerName;
@@ -67,23 +70,25 @@ pub trait WebSocketPermissions {
/// would override previously used alias.
pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>);
-type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
+type ClientWsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
+type ServerWsStream = WebSocketStream<Pin<Box<dyn Upgraded>>>;
+
pub enum WebSocketStreamType {
Client {
- tx: AsyncRefCell<SplitSink<WsStream, Message>>,
- rx: AsyncRefCell<SplitStream<WsStream>>,
+ tx: AsyncRefCell<SplitSink<ClientWsStream, Message>>,
+ rx: AsyncRefCell<SplitStream<ClientWsStream>>,
},
Server {
- tx: AsyncRefCell<
- SplitSink<WebSocketStream<hyper::upgrade::Upgraded>, Message>,
- >,
- rx: AsyncRefCell<SplitStream<WebSocketStream<hyper::upgrade::Upgraded>>>,
+ tx: AsyncRefCell<SplitSink<ServerWsStream, Message>>,
+ rx: AsyncRefCell<SplitStream<ServerWsStream>>,
},
}
+pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {}
+
pub async fn ws_create_server_stream(
state: &Rc<RefCell<OpState>>,
- transport: hyper::upgrade::Upgraded,
+ transport: Pin<Box<dyn Upgraded>>,
) -> Result<ResourceId, AnyError> {
let ws_stream = WebSocketStream::from_raw_socket(
transport,
@@ -340,7 +345,7 @@ where
..Default::default()
}),
);
- let (stream, response): (WsStream, Response) =
+ let (stream, response): (ClientWsStream, Response) =
if let Some(cancel_resource) = cancel_resource {
client.or_cancel(cancel_resource.0.to_owned()).await?
} else {