From cd21cff29942f24ba7d38287186cce64d0e84e56 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 18 Aug 2022 17:35:02 +0530 Subject: feat(ext/flash): An optimized http/1.1 server (#15405) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bartek IwaƄczuk Co-authored-by: Ben Noordhuis Co-authored-by: crowlkats Co-authored-by: Ryan Dahl --- ext/http/01_http.js | 16 ++++++++++++++++ ext/http/lib.rs | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) (limited to 'ext/http') diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 7dfe86a75..6df26d09f 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -13,6 +13,7 @@ newInnerRequest, newInnerResponse, fromInnerResponse, + _flash, } = window.__bootstrap.fetch; const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype, ops } = core; @@ -475,6 +476,20 @@ } function upgradeHttp(req) { + if (req[_flash]) { + // NOTE(bartlomieju): + // Access these fields so they are cached on `req` object, otherwise + // they wouldn't be available after the connection gets upgraded. + req.url; + req.method; + req.headers; + + const { serverId, streamRid } = req[_flash]; + const connRid = core.ops.op_flash_upgrade_http(streamRid, serverId); + // TODO(@littledivy): return already read first packet too. + return [new TcpConn(connRid), new Uint8Array()]; + } + req[_deferred] = new Deferred(); return req[_deferred].promise; } @@ -483,5 +498,6 @@ HttpConn, upgradeWebSocket, upgradeHttp, + _ws, }; })(this); diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 27d277654..d1b38fb42 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -874,6 +874,41 @@ fn op_http_websocket_accept_header(key: String) -> Result { Ok(base64::encode(digest)) } +struct UpgradedStream(hyper::upgrade::Upgraded); +impl tokio::io::AsyncRead for UpgradedStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut tokio::io::ReadBuf, + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for UpgradedStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().0).poll_flush(cx) + } + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().0).poll_shutdown(cx) + } +} + +impl deno_websocket::Upgraded for UpgradedStream {} + #[op] async fn op_http_upgrade_websocket( state: Rc>, @@ -893,7 +928,9 @@ async fn op_http_upgrade_websocket( }; let transport = hyper::upgrade::on(request).await?; - let ws_rid = ws_create_server_stream(&state, transport).await?; + let ws_rid = + ws_create_server_stream(&state, Box::pin(UpgradedStream(transport))) + .await?; Ok(ws_rid) } -- cgit v1.2.3