diff options
author | Luca Casonato <hello@lcas.dev> | 2021-11-09 12:10:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-09 12:10:21 +0100 |
commit | 75793baae83123f890442c5d32e3dd38eb18ce1c (patch) | |
tree | 0d14bd5edbe28c3beebe9f0944437a89e0e3f724 /ext/http/lib.rs | |
parent | 31fde9deba6d4ca55293d60a030babd8d4ce12af (diff) |
Revert "refactor(ext/http): rewrite hyper integration and fix bug (#12332)" (#12704)
This reverts commit 5b1e537446454f6332de44adbeb6a15ff072c2fa.
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 891 |
1 files changed, 463 insertions, 428 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 5a14f845f..aae6415cb 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,29 +1,17 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use bytes::Bytes; -use deno_core::error::custom_error; +use deno_core::error::bad_resource_id; +use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; -use deno_core::futures::channel::oneshot; -use deno_core::futures::future::pending; -use deno_core::futures::future::select; -use deno_core::futures::future::Either; -use deno_core::futures::future::Pending; -use deno_core::futures::future::RemoteHandle; -use deno_core::futures::future::Shared; -use deno_core::futures::never::Never; -use deno_core::futures::pin_mut; -use deno_core::futures::ready; -use deno_core::futures::stream::Peekable; +use deno_core::futures::future::poll_fn; use deno_core::futures::FutureExt; +use deno_core::futures::Stream; use deno_core::futures::StreamExt; -use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op_async; use deno_core::op_sync; use deno_core::AsyncRefCell; use deno_core::ByteString; -use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::Extension; @@ -33,31 +21,33 @@ use deno_core::Resource; use deno_core::ResourceId; use deno_core::StringOrBuffer; use deno_core::ZeroCopyBuf; -use deno_websocket::ws_create_server_stream; +use hyper::body::HttpBody; +use hyper::header::CONNECTION; +use hyper::header::SEC_WEBSOCKET_KEY; +use hyper::header::SEC_WEBSOCKET_VERSION; +use hyper::header::UPGRADE; +use hyper::http; use hyper::server::conn::Http; -use hyper::service::Service; +use hyper::service::Service as HyperService; use hyper::Body; +use hyper::Method; use hyper::Request; use hyper::Response; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; -use std::cmp::min; -use std::error::Error; use std::future::Future; -use std::io; -use std::mem::replace; -use std::mem::take; use std::net::SocketAddr; use std::pin::Pin; use std::rc::Rc; -use std::sync::Arc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; -use tokio::task::spawn_local; +use tokio::sync::oneshot; +use tokio_util::io::StreamReader; pub fn init() -> Extension { Extension::builder() @@ -66,11 +56,11 @@ pub fn init() -> Extension { "01_http.js", )) .ops(vec![ - ("op_http_accept", op_async(op_http_accept)), - ("op_http_read", op_async(op_http_read)), - ("op_http_write_headers", op_async(op_http_write_headers)), - ("op_http_write", op_async(op_http_write)), - ("op_http_shutdown", op_async(op_http_shutdown)), + ("op_http_request_next", op_async(op_http_request_next)), + ("op_http_request_read", op_async(op_http_request_read)), + ("op_http_response", op_async(op_http_response)), + ("op_http_response_write", op_async(op_http_response_write)), + ("op_http_response_close", op_async(op_http_response_close)), ( "op_http_websocket_accept_header", op_sync(op_http_websocket_accept_header), @@ -83,247 +73,86 @@ pub fn init() -> Extension { .build() } -struct HttpConnResource { - addr: SocketAddr, - scheme: &'static str, - acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>, - closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper::Error>>>>, - cancel_handle: Rc<CancelHandle>, // Closes gracefully and cancels accept ops. +struct ServiceInner { + request: Request<Body>, + response_tx: oneshot::Sender<Response<Body>>, } -impl HttpConnResource { - fn new<S>(io: S, scheme: &'static str, addr: SocketAddr) -> Self - where - S: AsyncRead + AsyncWrite + Unpin + Send + 'static, - { - let (acceptors_tx, acceptors_rx) = mpsc::unbounded::<HttpAcceptor>(); - let service = HttpService::new(acceptors_rx); - - let conn_fut = Http::new() - .with_executor(LocalExecutor) - .serve_connection(io, service) - .with_upgrades(); - - // When the cancel handle is used, the connection shuts down gracefully. - // No new HTTP streams will be accepted, but existing streams will be able - // to continue operating and eventually shut down cleanly. - let cancel_handle = CancelHandle::new_rc(); - let shutdown_fut = never().or_cancel(&cancel_handle).fuse(); - - // A local task that polls the hyper connection future to completion. - let task_fut = async move { - pin_mut!(shutdown_fut); - pin_mut!(conn_fut); - let result = match select(conn_fut, shutdown_fut).await { - Either::Left((result, _)) => result, - Either::Right((_, mut conn_fut)) => { - conn_fut.as_mut().graceful_shutdown(); - conn_fut.await - } - }; - filter_enotconn(result).map_err(Arc::from) - }; - let (task_fut, closed_fut) = task_fut.remote_handle(); - let closed_fut = closed_fut.shared(); - spawn_local(task_fut); - - Self { - addr, - scheme, - acceptors_tx, - closed_fut, - cancel_handle, - } - } - - // Accepts a new incoming HTTP request. - async fn accept( - self: &Rc<Self>, - ) -> Result<Option<HttpStreamResource>, AnyError> { - let fut = async { - let (request_tx, request_rx) = oneshot::channel(); - let (response_tx, response_rx) = oneshot::channel(); - - let acceptor = HttpAcceptor::new(request_tx, response_rx); - self.acceptors_tx.unbounded_send(acceptor).ok()?; - - let request = request_rx.await.ok()?; - let stream = HttpStreamResource::new(self, request, response_tx); - Some(stream) - }; - - async { - match fut.await { - Some(stream) => Ok(Some(stream)), - // Return the connection error, if any. - None => self.closed().map_ok(|_| None).await, - } - } - .try_or_cancel(&self.cancel_handle) - .await - } - - /// A future that completes when this HTTP connection is closed or errors. - async fn closed(&self) -> Result<(), AnyError> { - self.closed_fut.clone().map_err(AnyError::from).await - } - - fn scheme(&self) -> &'static str { - self.scheme - } - - fn addr(&self) -> SocketAddr { - self.addr - } +#[derive(Clone, Default)] +struct Service { + inner: Rc<RefCell<Option<ServiceInner>>>, + waker: Rc<deno_core::futures::task::AtomicWaker>, } -impl Resource for HttpConnResource { - fn name(&self) -> Cow<str> { - "httpConn".into() - } - - fn close(self: Rc<Self>) { - self.cancel_handle.cancel(); - } -} - -/// Creates a new HttpConn resource which uses `io` as its transport. -pub fn http_create_conn_resource<S>( - state: &mut OpState, - io: S, - addr: SocketAddr, - scheme: &'static str, -) -> Result<ResourceId, AnyError> -where - S: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - let conn = HttpConnResource::new(io, scheme, addr); - let rid = state.resource_table.add(conn); - Ok(rid) -} - -/// An object that implements the `hyper::Service` trait, through which Hyper -/// delivers incoming HTTP requests. -struct HttpService { - acceptors_rx: Peekable<mpsc::UnboundedReceiver<HttpAcceptor>>, -} - -impl HttpService { - fn new(acceptors_rx: mpsc::UnboundedReceiver<HttpAcceptor>) -> Self { - let acceptors_rx = acceptors_rx.peekable(); - Self { acceptors_rx } - } -} - -impl Service<Request<Body>> for HttpService { +impl HyperService<Request<Body>> for Service { type Response = Response<Body>; - type Error = oneshot::Canceled; - type Future = oneshot::Receiver<Response<Body>>; + type Error = http::Error; + #[allow(clippy::type_complexity)] + type Future = + Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; fn poll_ready( &mut self, - cx: &mut Context<'_>, + _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - let acceptors_rx = Pin::new(&mut self.acceptors_rx); - let result = ready!(acceptors_rx.poll_peek(cx)) - .map(|_| ()) - .ok_or(oneshot::Canceled); - Poll::Ready(result) + if self.inner.borrow().is_some() { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } } - fn call(&mut self, request: Request<Body>) -> Self::Future { - let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap(); - acceptor.call(request) + fn call(&mut self, req: Request<Body>) -> Self::Future { + let (resp_tx, resp_rx) = oneshot::channel(); + self.inner.borrow_mut().replace(ServiceInner { + request: req, + response_tx: resp_tx, + }); + + async move { + resp_rx.await.or_else(|_| + // Fallback dummy response in case sender was dropped due to closed conn + Response::builder() + .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) + .body(vec![].into())) + } + .boxed_local() } } -/// A pair of one-shot channels which first transfer a HTTP request from the -/// Hyper service to the HttpConn resource, and then take the Response back to -/// the service. -struct HttpAcceptor { - request_tx: oneshot::Sender<Request<Body>>, - response_rx: oneshot::Receiver<Response<Body>>, -} - -impl HttpAcceptor { - fn new( - request_tx: oneshot::Sender<Request<Body>>, - response_rx: oneshot::Receiver<Response<Body>>, - ) -> Self { - Self { - request_tx, - response_rx, - } - } +type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>; - fn call(self, request: Request<Body>) -> oneshot::Receiver<Response<Body>> { - let Self { - request_tx, - response_rx, - } = self; - request_tx - .send(request) - .map(|_| response_rx) - .unwrap_or_else(|_| oneshot::channel().1) // Make new canceled receiver. - } +struct Conn { + scheme: &'static str, + addr: SocketAddr, + conn: Rc<RefCell<ConnFuture>>, } -/// A resource representing a single HTTP request/response stream. -struct HttpStreamResource { - conn: Rc<HttpConnResource>, - rd: AsyncRefCell<HttpRequestReader>, - wr: AsyncRefCell<HttpResponseWriter>, - cancel_handle: CancelHandle, +struct ConnResource { + hyper_connection: Conn, + deno_service: Service, + cancel: CancelHandle, } -impl HttpStreamResource { - fn new( - conn: &Rc<HttpConnResource>, - request: Request<Body>, - response_tx: oneshot::Sender<Response<Body>>, - ) -> Self { - Self { - conn: conn.clone(), - rd: HttpRequestReader::Headers(request).into(), - wr: HttpResponseWriter::Headers(response_tx).into(), - cancel_handle: CancelHandle::new(), - } +impl ConnResource { + // TODO(ry) impl Future for ConnResource? + fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> { + self + .hyper_connection + .conn + .borrow_mut() + .poll_unpin(cx) + .map_err(AnyError::from) } } -impl Resource for HttpStreamResource { +impl Resource for ConnResource { fn name(&self) -> Cow<str> { - "httpStream".into() + "httpConnection".into() } fn close(self: Rc<Self>) { - self.cancel_handle.cancel(); - } -} - -/// The read half of an HTTP stream. -enum HttpRequestReader { - Headers(Request<Body>), - Body(Peekable<Body>), - Closed, -} - -impl Default for HttpRequestReader { - fn default() -> Self { - Self::Closed - } -} - -/// The write half of an HTTP stream. -enum HttpResponseWriter { - Headers(oneshot::Sender<Response<Body>>), - Body(hyper::body::Sender), - Closed, -} - -impl Default for HttpResponseWriter { - fn default() -> Self { - Self::Closed + self.cancel.cancel() } } @@ -331,7 +160,9 @@ impl Default for HttpResponseWriter { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // stream_rid: + // request_rid: + Option<ResourceId>, + // response_sender_rid: ResourceId, // method: // This is a String rather than a ByteString because reqwest will only return @@ -343,40 +174,111 @@ struct NextRequestResponse( String, ); -async fn op_http_accept( +async fn op_http_request_next( state: Rc<RefCell<OpState>>, - rid: ResourceId, + conn_rid: ResourceId, _: (), ) -> Result<Option<NextRequestResponse>, AnyError> { - let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?; + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(conn_rid)?; + + let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); + + poll_fn(|cx| { + conn_resource.deno_service.waker.register(cx.waker()); + + // Check if conn is open/close/errored + let (conn_closed, conn_result) = match conn_resource.poll(cx) { + Poll::Pending => (false, Ok(())), + Poll::Ready(Ok(())) => (true, Ok(())), + Poll::Ready(Err(e)) => { + if should_ignore_error(&e) { + (true, Ok(())) + } else { + (true, Err(e)) + } + } + }; + // Drop conn resource if closed + if conn_closed { + // TODO(ry) close RequestResource associated with connection + // TODO(ry) close ResponseBodyResource associated with connection + // try to close ConnResource, but don't unwrap as it might + // already be closed + let _ = state + .borrow_mut() + .resource_table + .take::<ConnResource>(conn_rid); + + // Fail with err if unexpected conn error, early return None otherwise + return Poll::Ready(conn_result.map(|_| None)); + } - let stream = match conn.accept().await { - Ok(Some(stream)) => Rc::new(stream), - Ok(None) => return Ok(None), - Err(err) => return Err(err), - }; + if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() { + let Conn { scheme, addr, .. } = conn_resource.hyper_connection; + let mut state = state.borrow_mut(); + let next = + prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?; + Poll::Ready(Ok(Some(next))) + } else { + Poll::Pending + } + }) + .try_or_cancel(cancel) + .await + .map_err(AnyError::from) +} - let rd = RcRef::map(&stream, |r| &r.rd).borrow().await; - let request = match &*rd { - HttpRequestReader::Headers(request) => request, - _ => unreachable!(), +fn prepare_next_request( + state: &mut OpState, + conn_rid: ResourceId, + request_resource: ServiceInner, + scheme: &'static str, + addr: SocketAddr, +) -> Result<NextRequestResponse, AnyError> { + let tx = request_resource.response_tx; + let req = request_resource.request; + let method = req.method().to_string(); + let headers = req_headers(&req); + let url = req_url(&req, scheme, addr)?; + + let is_websocket = is_websocket_request(&req); + let can_have_body = !matches!(*req.method(), Method::GET | Method::HEAD); + let has_body = + is_websocket || (can_have_body && req.size_hint().exact() != Some(0)); + + let maybe_request_rid = if has_body { + let request_rid = state.resource_table.add(RequestResource { + conn_rid, + inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), + cancel: CancelHandle::default(), + }); + Some(request_rid) + } else { + None }; - let method = request.method().to_string(); - let headers = req_headers(request); - let url = req_url(request, conn.scheme(), conn.addr()); - - let stream_rid = state.borrow_mut().resource_table.add_rc(stream); - - let r = NextRequestResponse(stream_rid, method, headers, url); - Ok(Some(r)) + let response_sender_rid = state.resource_table.add(ResponseSenderResource { + sender: tx, + conn_rid, + }); + + Ok(NextRequestResponse( + maybe_request_rid, + response_sender_rid, + method, + headers, + url, + )) } fn req_url( req: &hyper::Request<hyper::Body>, scheme: &'static str, addr: SocketAddr, -) -> String { +) -> Result<String, AnyError> { let host: Cow<str> = if let Some(auth) = req.uri().authority() { match addr.port() { 443 if scheme == "https" => Cow::Borrowed(auth.host()), @@ -386,22 +288,12 @@ fn req_url( } else if let Some(host) = req.uri().host() { Cow::Borrowed(host) } else if let Some(host) = req.headers().get("HOST") { - match host.to_str() { - Ok(host) => Cow::Borrowed(host), - Err(_) => Cow::Owned( - host - .as_bytes() - .iter() - .cloned() - .map(char::from) - .collect::<String>(), - ), - } + Cow::Borrowed(host.to_str()?) } else { Cow::Owned(addr.to_string()) }; let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); - [scheme, "://", &host, path].concat() + Ok([scheme, "://", &host, path].concat()) } fn req_headers( @@ -435,6 +327,68 @@ fn req_headers( headers } +fn is_websocket_request(req: &hyper::Request<hyper::Body>) -> bool { + req.version() == hyper::Version::HTTP_11 + && req.method() == hyper::Method::GET + && req.headers().contains_key(&SEC_WEBSOCKET_KEY) + && header(req.headers(), &SEC_WEBSOCKET_VERSION) == b"13" + && header(req.headers(), &UPGRADE) + .split(|c| *c == b' ' || *c == b',') + .any(|token| token.eq_ignore_ascii_case(b"websocket")) + && header(req.headers(), &CONNECTION) + .split(|c| *c == b' ' || *c == b',') + .any(|token| token.eq_ignore_ascii_case(b"upgrade")) +} + +fn header<'a>( + h: &'a hyper::http::HeaderMap, + name: &hyper::header::HeaderName, +) -> &'a [u8] { + h.get(name) + .map(hyper::header::HeaderValue::as_bytes) + .unwrap_or_default() +} + +fn should_ignore_error(e: &AnyError) -> bool { + if let Some(e) = e.downcast_ref::<hyper::Error>() { + use std::error::Error; + if let Some(std_err) = e.source() { + if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() { + if io_err.kind() == std::io::ErrorKind::NotConnected { + return true; + } + } + } + } + false +} + +pub fn start_http<IO: AsyncRead + AsyncWrite + Unpin + Send + 'static>( + state: &mut OpState, + io: IO, + addr: SocketAddr, + scheme: &'static str, +) -> Result<ResourceId, AnyError> { + let deno_service = Service::default(); + + let hyper_connection = Http::new() + .with_executor(LocalExecutor) + .serve_connection(io, deno_service.clone()) + .with_upgrades(); + let conn = Pin::new(Box::new(hyper_connection)); + let conn_resource = ConnResource { + hyper_connection: Conn { + scheme, + addr, + conn: Rc::new(RefCell::new(conn)), + }, + deno_service, + cancel: CancelHandle::default(), + }; + let rid = state.resource_table.add(conn_resource); + Ok(rid) +} + // We use a tuple instead of struct to avoid serialization overhead of the keys. #[derive(Deserialize)] struct RespondArgs( @@ -446,16 +400,27 @@ struct RespondArgs( Vec<(ByteString, ByteString)>, ); -async fn op_http_write_headers( +async fn op_http_response( state: Rc<RefCell<OpState>>, args: RespondArgs, data: Option<StringOrBuffer>, -) -> Result<(), AnyError> { +) -> Result<Option<ResourceId>, AnyError> { let RespondArgs(rid, status, headers) = args; - let stream = state + + let response_sender = state .borrow_mut() .resource_table - .get::<HttpStreamResource>(rid)?; + .take::<ResponseSenderResource>(rid)?; + let response_sender = Rc::try_unwrap(response_sender) + .ok() + .expect("multiple op_http_respond ongoing"); + + let conn_rid = response_sender.conn_rid; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(conn_rid)?; let mut builder = Response::builder().status(status); @@ -464,138 +429,171 @@ async fn op_http_write_headers( builder = builder.header(key.as_ref(), value.as_ref()); } - let body: Response<Body>; - let new_wr: HttpResponseWriter; + let (maybe_response_body_rid, res) = if let Some(d) = data { + // If a body is passed, we use it, and don't return a body for streaming. + (None, builder.body(d.into_bytes().into())?) + } else { + // If no body is passed, we return a writer for streaming the body. + let (sender, body) = Body::channel(); + let res = builder.body(body)?; - match data { - Some(data) => { - // If a buffer was passed, we use it to construct a response body. - body = builder.body(data.into_bytes().into())?; - new_wr = HttpResponseWriter::Closed; - } - None => { - // If no buffer was passed, the caller will stream the response body. - let (body_tx, body_rx) = Body::channel(); - body = builder.body(body_rx)?; - new_wr = HttpResponseWriter::Body(body_tx); - } - } + let response_body_rid = + state.borrow_mut().resource_table.add(ResponseBodyResource { + body: AsyncRefCell::new(sender), + conn_rid, + }); - let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - let response_tx = match replace(&mut *old_wr, new_wr) { - HttpResponseWriter::Headers(response_tx) => response_tx, - _ => return Err(http_error("response headers already sent")), + (Some(response_body_rid), res) }; - match response_tx.send(body) { - Ok(_) => Ok(()), - Err(_) => { - stream.conn.closed().await?; - Err(http_error("connection closed while sending response")) + // oneshot::Sender::send(v) returns |v| on error, not an error object. + // The only failure mode is the receiver already having dropped its end + // of the channel. + if response_sender.sender.send(res).is_err() { + if let Some(rid) = maybe_response_body_rid { + let _ = state + .borrow_mut() + .resource_table + .take::<ResponseBodyResource>(rid); } + return Err(type_error("internal communication error")); } -} -async fn op_http_write( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - buf: ZeroCopyBuf, -) -> Result<(), AnyError> { - let stream = state - .borrow() - .resource_table - .get::<HttpStreamResource>(rid)?; - let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - - loop { - let body_tx = match &mut *wr { - HttpResponseWriter::Body(body_tx) => body_tx, - HttpResponseWriter::Headers(_) => { - break Err(http_error("no response headers")) - } - HttpResponseWriter::Closed => { - break Err(http_error("response already completed")) - } - }; - - let bytes = Bytes::copy_from_slice(&buf[..]); - match body_tx.send_data(bytes).await { - Ok(_) => break Ok(()), - Err(err) => { - // Don't return "channel closed", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - assert!(err.is_closed()); - stream.conn.closed().await?; - // If there was no connection error, drop body_tx. - *wr = HttpResponseWriter::Closed; - } + let result = poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => { + state.borrow_mut().resource_table.close(conn_rid).ok(); + Poll::Ready(x) + } + Poll::Pending => Poll::Ready(Ok(())), + }) + .await; + + if let Err(e) = result { + if let Some(rid) = maybe_response_body_rid { + let _ = state + .borrow_mut() + .resource_table + .take::<ResponseBodyResource>(rid); } + return Err(e); } + + if maybe_response_body_rid.is_none() { + conn_resource.deno_service.waker.wake(); + } + Ok(maybe_response_body_rid) } -/// Gracefully closes the write half of the HTTP stream. Note that this does not -/// remove the HTTP stream resource from the resource table; it still has to be -/// closed with `Deno.core.close()`. -async fn op_http_shutdown( +async fn op_http_response_close( state: Rc<RefCell<OpState>>, rid: ResourceId, _: (), ) -> Result<(), AnyError> { - let stream = state + let resource = state + .borrow_mut() + .resource_table + .take::<ResponseBodyResource>(rid)?; + + let conn_resource = state .borrow() .resource_table - .get::<HttpStreamResource>(rid)?; - let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - take(&mut *wr); - Ok(()) + .get::<ConnResource>(resource.conn_rid)?; + drop(resource); + + let r = poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => Poll::Ready(x), + Poll::Pending => Poll::Ready(Ok(())), + }) + .await; + conn_resource.deno_service.waker.wake(); + r } -async fn op_http_read( +async fn op_http_request_read( state: Rc<RefCell<OpState>>, rid: ResourceId, - mut buf: ZeroCopyBuf, + mut data: ZeroCopyBuf, ) -> Result<usize, AnyError> { - let stream = state - .borrow_mut() + let resource = state + .borrow() .resource_table - .get::<HttpStreamResource>(rid)?; - let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; - - let body = loop { - match &mut *rd { - HttpRequestReader::Headers(_) => {} - HttpRequestReader::Body(body) => break body, - HttpRequestReader::Closed => return Ok(0), - } - match take(&mut *rd) { - HttpRequestReader::Headers(request) => { - let body = request.into_body().peekable(); - *rd = HttpRequestReader::Body(body); - } - _ => unreachable!(), - }; + .get::<RequestResource>(rid as u32)?; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(resource.conn_rid)?; + + let mut inner = RcRef::map(resource.clone(), |r| &r.inner) + .borrow_mut() + .await; + + if let RequestOrStreamReader::Request(req) = &mut *inner { + let req = req.take().unwrap(); + let stream: BytesStream = Box::pin(req.into_body().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let reader = StreamReader::new(stream); + *inner = RequestOrStreamReader::StreamReader(reader); }; - let fut = async { - let mut body = Pin::new(body); - loop { - match body.as_mut().peek_mut().await { - Some(Ok(chunk)) if !chunk.is_empty() => { - let len = min(buf.len(), chunk.len()); - buf[..len].copy_from_slice(&chunk.split_to(len)); - break Ok(len); - } - Some(_) => match body.as_mut().next().await.unwrap() { - Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), - }, - None => break Ok(0), - } - } + let reader = match &mut *inner { + RequestOrStreamReader::StreamReader(reader) => reader, + _ => unreachable!(), }; - let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle); - fut.try_or_cancel(cancel_handle).await + let cancel = RcRef::map(resource, |r| &r.cancel); + + let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); + + poll_fn(|cx| { + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + read_fut.poll_unpin(cx).map_err(AnyError::from) + }) + .await +} + +async fn op_http_response_write( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + data: ZeroCopyBuf, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::<ResponseBodyResource>(rid as u32)?; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(resource.conn_rid)?; + + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + + let mut send_data_fut = body.send_data(data.to_vec().into()).boxed_local(); + + poll_fn(|cx| { + let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); + + // Poll connection so the data is flushed + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + r + }) + .await?; + + Ok(()) } fn op_http_websocket_accept_header( @@ -615,22 +613,86 @@ async fn op_http_upgrade_websocket( rid: ResourceId, _: (), ) -> Result<ResourceId, AnyError> { - let stream = state + let req_resource = state .borrow_mut() .resource_table - .get::<HttpStreamResource>(rid)?; - let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; + .take::<RequestResource>(rid)?; + + let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await; + + if let RequestOrStreamReader::Request(req) = inner.as_mut() { + let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?; + let stream = + deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server, + None, + ) + .await; + + let (ws_tx, ws_rx) = stream.split(); + let rid = + state + .borrow_mut() + .resource_table + .add(deno_websocket::WsStreamResource { + stream: deno_websocket::WebSocketStreamType::Server { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + }, + cancel: Default::default(), + }); + + Ok(rid) + } else { + Err(bad_resource_id()) + } +} - let request = match &mut *rd { - HttpRequestReader::Headers(request) => request, - _ => { - return Err(http_error("cannot upgrade because request body was used")) - } - }; +type BytesStream = + Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>; + +enum RequestOrStreamReader { + Request(Option<Request<hyper::Body>>), + StreamReader(StreamReader<BytesStream, bytes::Bytes>), +} + +struct RequestResource { + conn_rid: ResourceId, + inner: AsyncRefCell<RequestOrStreamReader>, + cancel: CancelHandle, +} - let transport = hyper::upgrade::on(request).await?; - let ws_rid = ws_create_server_stream(&state, transport).await?; - Ok(ws_rid) +impl Resource for RequestResource { + fn name(&self) -> Cow<str> { + "request".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } +} + +struct ResponseSenderResource { + sender: oneshot::Sender<Response<Body>>, + conn_rid: ResourceId, +} + +impl Resource for ResponseSenderResource { + fn name(&self) -> Cow<str> { + "responseSender".into() + } +} + +struct ResponseBodyResource { + body: AsyncRefCell<hyper::body::Sender>, + conn_rid: ResourceId, +} + +impl Resource for ResponseBodyResource { + fn name(&self) -> Cow<str> { + "responseBody".into() + } } // Needed so hyper can use non Send futures @@ -643,33 +705,6 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - spawn_local(fut); - } -} - -fn http_error(message: &'static str) -> AnyError { - custom_error("Http", message) -} - -/// Filters out the ever-surprising 'shutdown ENOTCONN' errors. -fn filter_enotconn( - result: Result<(), hyper::Error>, -) -> Result<(), hyper::Error> { - if result - .as_ref() - .err() - .and_then(|err| err.source()) - .and_then(|err| err.downcast_ref::<io::Error>()) - .filter(|err| err.kind() == io::ErrorKind::NotConnected) - .is_some() - { - Ok(()) - } else { - result + tokio::task::spawn_local(fut); } } - -/// Create a future that is forever pending. -fn never() -> Pending<Never> { - pending() -} |