diff options
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r-- | ext/websocket/lib.rs | 26 |
1 files changed, 24 insertions, 2 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index d469b5aaf..ba626a45a 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -34,12 +34,13 @@ use std::sync::Arc; use tokio::net::TcpStream; use tokio_rustls::rustls::RootCertStore; use tokio_rustls::TlsConnector; +use tokio_tungstenite::client_async; use tokio_tungstenite::tungstenite::{ handshake::client::Response, protocol::frame::coding::CloseCode, - protocol::CloseFrame, Message, + protocol::CloseFrame, protocol::Role, Message, }; use tokio_tungstenite::MaybeTlsStream; -use tokio_tungstenite::{client_async, WebSocketStream}; +use tokio_tungstenite::WebSocketStream; pub use tokio_tungstenite; // Re-export tokio_tungstenite @@ -72,6 +73,27 @@ pub enum WebSocketStreamType { }, } +pub async fn ws_create_server_stream( + state: &Rc<RefCell<OpState>>, + transport: hyper::upgrade::Upgraded, +) -> Result<ResourceId, AnyError> { + let ws_stream = + WebSocketStream::from_raw_socket(transport, Role::Server, None).await; + let (ws_tx, ws_rx) = ws_stream.split(); + + let ws_resource = WsStreamResource { + stream: WebSocketStreamType::Server { + tx: AsyncRefCell::new(ws_tx), + rx: AsyncRefCell::new(ws_rx), + }, + cancel: Default::default(), + }; + + let resource_table = &mut state.borrow_mut().resource_table; + let rid = resource_table.add(ws_resource); + Ok(rid) +} + pub struct WsStreamResource { pub stream: WebSocketStreamType, // When a `WsStreamResource` resource is closed, all pending 'read' ops are |