summaryrefslogtreecommitdiff
path: root/ext/http
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http')
-rw-r--r--ext/http/01_http.js16
-rw-r--r--ext/http/lib.rs39
2 files changed, 54 insertions, 1 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index 7dfe86a75..6df26d09f 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -13,6 +13,7 @@
newInnerRequest,
newInnerResponse,
fromInnerResponse,
+ _flash,
} = window.__bootstrap.fetch;
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core;
@@ -475,6 +476,20 @@
}
function upgradeHttp(req) {
+ if (req[_flash]) {
+ // NOTE(bartlomieju):
+ // Access these fields so they are cached on `req` object, otherwise
+ // they wouldn't be available after the connection gets upgraded.
+ req.url;
+ req.method;
+ req.headers;
+
+ const { serverId, streamRid } = req[_flash];
+ const connRid = core.ops.op_flash_upgrade_http(streamRid, serverId);
+ // TODO(@littledivy): return already read first packet too.
+ return [new TcpConn(connRid), new Uint8Array()];
+ }
+
req[_deferred] = new Deferred();
return req[_deferred].promise;
}
@@ -483,5 +498,6 @@
HttpConn,
upgradeWebSocket,
upgradeHttp,
+ _ws,
};
})(this);
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 27d277654..d1b38fb42 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -874,6 +874,41 @@ fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> {
Ok(base64::encode(digest))
}
+struct UpgradedStream(hyper::upgrade::Upgraded);
+impl tokio::io::AsyncRead for UpgradedStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut tokio::io::ReadBuf,
+ ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
+ }
+}
+
+impl tokio::io::AsyncWrite for UpgradedStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
+ }
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_flush(cx)
+ }
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
+ }
+}
+
+impl deno_websocket::Upgraded for UpgradedStream {}
+
#[op]
async fn op_http_upgrade_websocket(
state: Rc<RefCell<OpState>>,
@@ -893,7 +928,9 @@ async fn op_http_upgrade_websocket(
};
let transport = hyper::upgrade::on(request).await?;
- let ws_rid = ws_create_server_stream(&state, transport).await?;
+ let ws_rid =
+ ws_create_server_stream(&state, Box::pin(UpgradedStream(transport)))
+ .await?;
Ok(ws_rid)
}