diff options
Diffstat (limited to 'extensions/net/ops_http.rs')
-rw-r--r-- | extensions/net/ops_http.rs | 188 |
1 files changed, 147 insertions, 41 deletions
diff --git a/extensions/net/ops_http.rs b/extensions/net/ops_http.rs index 54e06c3a7..782ec91d0 100644 --- a/extensions/net/ops_http.rs +++ b/extensions/net/ops_http.rs @@ -2,7 +2,6 @@ use crate::io::TcpStreamResource; use crate::io::TlsStreamResource; -use crate::ops_tls::TlsStream; use deno_core::error::bad_resource_id; use deno_core::error::null_opbuf; use deno_core::error::type_error; @@ -25,7 +24,6 @@ use deno_core::ResourceId; use deno_core::ZeroCopyBuf; use hyper::body::HttpBody; use hyper::http; -use hyper::server::conn::Connection; use hyper::server::conn::Http; use hyper::service::Service as HyperService; use hyper::Body; @@ -42,7 +40,6 @@ use std::rc::Rc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncReadExt; -use tokio::net::TcpStream; use tokio::sync::oneshot; use tokio_util::io::StreamReader; @@ -54,6 +51,14 @@ pub fn init() -> Vec<OpPair> { ("op_http_response", op_async(op_http_response)), ("op_http_response_write", op_async(op_http_response_write)), ("op_http_response_close", op_async(op_http_response_close)), + ( + "op_http_websocket_accept_header", + op_sync(op_http_websocket_accept_header), + ), + ( + "op_http_upgrade_websocket", + op_async(op_http_upgrade_websocket), + ), ] } @@ -97,13 +102,15 @@ impl HyperService<Request<Body>> for Service { } } -enum ConnType { - Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>), - Tls(Rc<RefCell<Connection<TlsStream, Service, LocalExecutor>>>), +type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>; + +struct Conn { + scheme: &'static str, + conn: Rc<RefCell<ConnFuture>>, } struct ConnResource { - hyper_connection: ConnType, + hyper_connection: Conn, deno_service: Service, addr: SocketAddr, cancel: CancelHandle, @@ -112,11 +119,12 @@ struct ConnResource { impl ConnResource { // TODO(ry) impl Future for ConnResource? fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> { - match &self.hyper_connection { - ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx), - ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx), - } - .map_err(AnyError::from) + self + .hyper_connection + .conn + .borrow_mut() + .poll_unpin(cx) + .map_err(AnyError::from) } } @@ -134,7 +142,7 @@ impl Resource for ConnResource { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // request_body_rid: + // request_rid: Option<ResourceId>, // response_sender_rid: ResourceId, @@ -207,12 +215,7 @@ async fn op_http_request_next( } let url = { - let scheme = { - match conn_resource.hyper_connection { - ConnType::Tcp(_) => "http", - ConnType::Tls(_) => "https", - } - }; + let scheme = &conn_resource.hyper_connection.scheme; let host: Cow<str> = if let Some(host) = req.uri().host() { Cow::Borrowed(host) } else if let Some(host) = req.headers().get("HOST") { @@ -224,24 +227,35 @@ async fn op_http_request_next( format!("{}://{}{}", scheme, host, path) }; + let is_websocket_request = req + .headers() + .get(hyper::header::CONNECTION) + .and_then(|v| { + v.to_str().ok().map(|s| "Upgrade".eq_ignore_ascii_case(s)) + }) + .unwrap_or(false) + && req + .headers() + .get(hyper::header::UPGRADE) + .and_then(|v| { + v.to_str().ok().map(|s| "websocket".eq_ignore_ascii_case(s)) + }) + .unwrap_or(false); + let has_body = if let Some(exact_size) = req.size_hint().exact() { exact_size > 0 } else { true }; - let maybe_request_body_rid = if has_body { - let stream: BytesStream = Box::pin(req.into_body().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let stream_reader = StreamReader::new(stream); + let maybe_request_rid = if is_websocket_request || has_body { let mut state = state.borrow_mut(); - let request_body_rid = state.resource_table.add(RequestBodyResource { + let request_rid = state.resource_table.add(RequestResource { conn_rid, - reader: AsyncRefCell::new(stream_reader), + inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), cancel: CancelHandle::default(), }); - Some(request_body_rid) + Some(request_rid) } else { None }; @@ -254,7 +268,7 @@ async fn op_http_request_next( }); Poll::Ready(Ok(Some(NextRequestResponse( - maybe_request_body_rid, + maybe_request_rid, response_sender_rid, method, headers, @@ -303,9 +317,14 @@ fn op_http_start( let addr = tcp_stream.local_addr()?; let hyper_connection = Http::new() .with_executor(LocalExecutor) - .serve_connection(tcp_stream, deno_service.clone()); + .serve_connection(tcp_stream, deno_service.clone()) + .with_upgrades(); + let conn = Pin::new(Box::new(hyper_connection)); let conn_resource = ConnResource { - hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), + hyper_connection: Conn { + conn: Rc::new(RefCell::new(conn)), + scheme: "http", + }, deno_service, addr, cancel: CancelHandle::default(), @@ -326,9 +345,14 @@ fn op_http_start( let hyper_connection = Http::new() .with_executor(LocalExecutor) - .serve_connection(tls_stream, deno_service.clone()); + .serve_connection(tls_stream, deno_service.clone()) + .with_upgrades(); + let conn = Pin::new(Box::new(hyper_connection)); let conn_resource = ConnResource { - hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), + hyper_connection: Conn { + conn: Rc::new(RefCell::new(conn)), + scheme: "https", + }, deno_service, addr, cancel: CancelHandle::default(), @@ -367,10 +391,12 @@ async fn op_http_response( .ok() .expect("multiple op_http_respond ongoing"); + let conn_rid = response_sender.conn_rid; + let conn_resource = state .borrow() .resource_table - .get::<ConnResource>(response_sender.conn_rid) + .get::<ConnResource>(conn_rid) .ok_or_else(bad_resource_id)?; let mut builder = Response::builder().status(status); @@ -393,7 +419,7 @@ async fn op_http_response( let response_body_rid = state.borrow_mut().resource_table.add(ResponseBodyResource { body: AsyncRefCell::new(sender), - conn_rid: response_sender.conn_rid, + conn_rid, }); Some(response_body_rid) @@ -407,7 +433,10 @@ async fn op_http_response( } poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), + Poll::Ready(x) => { + state.borrow_mut().resource_table.close(conn_rid); + Poll::Ready(x) + } Poll::Pending => Poll::Ready(Ok(())), }) .await?; @@ -455,7 +484,7 @@ async fn op_http_request_read( let resource = state .borrow() .resource_table - .get::<RequestBodyResource>(rid as u32) + .get::<RequestResource>(rid as u32) .ok_or_else(bad_resource_id)?; let conn_resource = state @@ -464,8 +493,26 @@ async fn op_http_request_read( .get::<ConnResource>(resource.conn_rid) .ok_or_else(bad_resource_id)?; - let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; + let mut inner = RcRef::map(resource.clone(), |r| &r.inner) + .borrow_mut() + .await; + + if let RequestOrStreamReader::Request(req) = &mut *inner { + let req = req.take().unwrap(); + let stream: BytesStream = Box::pin(req.into_body().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let reader = StreamReader::new(stream); + *inner = RequestOrStreamReader::StreamReader(reader); + }; + + let reader = match &mut *inner { + RequestOrStreamReader::StreamReader(reader) => reader, + _ => unreachable!(), + }; + let cancel = RcRef::map(resource, |r| &r.cancel); + let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); poll_fn(|cx| { @@ -521,18 +568,77 @@ async fn op_http_response_write( Ok(()) } +fn op_http_websocket_accept_header( + _: &mut OpState, + key: String, + _: (), +) -> Result<String, AnyError> { + let digest = ring::digest::digest( + &ring::digest::SHA1_FOR_LEGACY_USE_ONLY, + format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key).as_bytes(), + ); + Ok(base64::encode(digest)) +} + +async fn op_http_upgrade_websocket( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<ResourceId, AnyError> { + let req_resource = state + .borrow_mut() + .resource_table + .take::<RequestResource>(rid) + .ok_or_else(bad_resource_id)?; + + let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await; + + if let RequestOrStreamReader::Request(req) = inner.as_mut() { + let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?; + let stream = + deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server, + None, + ) + .await; + + let (ws_tx, ws_rx) = stream.split(); + let rid = + state + .borrow_mut() + .resource_table + .add(deno_websocket::WsStreamResource { + stream: deno_websocket::WebSocketStreamType::Server { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + }, + cancel: Default::default(), + }); + + Ok(rid) + } else { + Err(bad_resource_id()) + } +} + type BytesStream = Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>; -struct RequestBodyResource { +enum RequestOrStreamReader { + Request(Option<Request<hyper::Body>>), + StreamReader(StreamReader<BytesStream, bytes::Bytes>), +} + +struct RequestResource { conn_rid: ResourceId, - reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, + inner: AsyncRefCell<RequestOrStreamReader>, cancel: CancelHandle, } -impl Resource for RequestBodyResource { +impl Resource for RequestResource { fn name(&self) -> Cow<str> { - "requestBody".into() + "request".into() } fn close(self: Rc<Self>) { |