diff options
author | Bert Belder <bertbelder@gmail.com> | 2021-10-04 18:50:40 -0700 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2021-11-10 14:51:43 -0800 |
commit | 72a6231a614e71a57c4f8ce5f9de68ab97171dd1 (patch) | |
tree | b19bbd749ad67f606ef331fee00bfe2b34477633 /ext/http/lib.rs | |
parent | 0cc8a9741a16efe3e37167731238b33d26887fd0 (diff) |
refactor(ext/http): rewrite hyper integration and fix bug (#12732)
Fixes: #12193
Fixes: #12251
Closes: #12714
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 891 |
1 files changed, 428 insertions, 463 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index aae6415cb..5a14f845f 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,17 +1,29 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use deno_core::error::bad_resource_id; -use deno_core::error::type_error; +use bytes::Bytes; +use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; +use deno_core::futures::channel::mpsc; +use deno_core::futures::channel::oneshot; +use deno_core::futures::future::pending; +use deno_core::futures::future::select; +use deno_core::futures::future::Either; +use deno_core::futures::future::Pending; +use deno_core::futures::future::RemoteHandle; +use deno_core::futures::future::Shared; +use deno_core::futures::never::Never; +use deno_core::futures::pin_mut; +use deno_core::futures::ready; +use deno_core::futures::stream::Peekable; use deno_core::futures::FutureExt; -use deno_core::futures::Stream; use deno_core::futures::StreamExt; +use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op_async; use deno_core::op_sync; use deno_core::AsyncRefCell; use deno_core::ByteString; +use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::Extension; @@ -21,33 +33,31 @@ use deno_core::Resource; use deno_core::ResourceId; use deno_core::StringOrBuffer; use deno_core::ZeroCopyBuf; -use hyper::body::HttpBody; -use hyper::header::CONNECTION; -use hyper::header::SEC_WEBSOCKET_KEY; -use hyper::header::SEC_WEBSOCKET_VERSION; -use hyper::header::UPGRADE; -use hyper::http; +use deno_websocket::ws_create_server_stream; use hyper::server::conn::Http; -use hyper::service::Service as HyperService; +use hyper::service::Service; use hyper::Body; -use hyper::Method; use hyper::Request; use hyper::Response; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; +use std::cmp::min; +use std::error::Error; use std::future::Future; +use std::io; +use std::mem::replace; +use std::mem::take; use std::net::SocketAddr; use std::pin::Pin; use std::rc::Rc; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; -use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; -use tokio::sync::oneshot; -use tokio_util::io::StreamReader; +use tokio::task::spawn_local; pub fn init() -> Extension { Extension::builder() @@ -56,11 +66,11 @@ pub fn init() -> Extension { "01_http.js", )) .ops(vec![ - ("op_http_request_next", op_async(op_http_request_next)), - ("op_http_request_read", op_async(op_http_request_read)), - ("op_http_response", op_async(op_http_response)), - ("op_http_response_write", op_async(op_http_response_write)), - ("op_http_response_close", op_async(op_http_response_close)), + ("op_http_accept", op_async(op_http_accept)), + ("op_http_read", op_async(op_http_read)), + ("op_http_write_headers", op_async(op_http_write_headers)), + ("op_http_write", op_async(op_http_write)), + ("op_http_shutdown", op_async(op_http_shutdown)), ( "op_http_websocket_accept_header", op_sync(op_http_websocket_accept_header), @@ -73,86 +83,247 @@ pub fn init() -> Extension { .build() } -struct ServiceInner { - request: Request<Body>, - response_tx: oneshot::Sender<Response<Body>>, +struct HttpConnResource { + addr: SocketAddr, + scheme: &'static str, + acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>, + closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper::Error>>>>, + cancel_handle: Rc<CancelHandle>, // Closes gracefully and cancels accept ops. } -#[derive(Clone, Default)] -struct Service { - inner: Rc<RefCell<Option<ServiceInner>>>, - waker: Rc<deno_core::futures::task::AtomicWaker>, +impl HttpConnResource { + fn new<S>(io: S, scheme: &'static str, addr: SocketAddr) -> Self + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + let (acceptors_tx, acceptors_rx) = mpsc::unbounded::<HttpAcceptor>(); + let service = HttpService::new(acceptors_rx); + + let conn_fut = Http::new() + .with_executor(LocalExecutor) + .serve_connection(io, service) + .with_upgrades(); + + // When the cancel handle is used, the connection shuts down gracefully. + // No new HTTP streams will be accepted, but existing streams will be able + // to continue operating and eventually shut down cleanly. + let cancel_handle = CancelHandle::new_rc(); + let shutdown_fut = never().or_cancel(&cancel_handle).fuse(); + + // A local task that polls the hyper connection future to completion. + let task_fut = async move { + pin_mut!(shutdown_fut); + pin_mut!(conn_fut); + let result = match select(conn_fut, shutdown_fut).await { + Either::Left((result, _)) => result, + Either::Right((_, mut conn_fut)) => { + conn_fut.as_mut().graceful_shutdown(); + conn_fut.await + } + }; + filter_enotconn(result).map_err(Arc::from) + }; + let (task_fut, closed_fut) = task_fut.remote_handle(); + let closed_fut = closed_fut.shared(); + spawn_local(task_fut); + + Self { + addr, + scheme, + acceptors_tx, + closed_fut, + cancel_handle, + } + } + + // Accepts a new incoming HTTP request. + async fn accept( + self: &Rc<Self>, + ) -> Result<Option<HttpStreamResource>, AnyError> { + let fut = async { + let (request_tx, request_rx) = oneshot::channel(); + let (response_tx, response_rx) = oneshot::channel(); + + let acceptor = HttpAcceptor::new(request_tx, response_rx); + self.acceptors_tx.unbounded_send(acceptor).ok()?; + + let request = request_rx.await.ok()?; + let stream = HttpStreamResource::new(self, request, response_tx); + Some(stream) + }; + + async { + match fut.await { + Some(stream) => Ok(Some(stream)), + // Return the connection error, if any. + None => self.closed().map_ok(|_| None).await, + } + } + .try_or_cancel(&self.cancel_handle) + .await + } + + /// A future that completes when this HTTP connection is closed or errors. + async fn closed(&self) -> Result<(), AnyError> { + self.closed_fut.clone().map_err(AnyError::from).await + } + + fn scheme(&self) -> &'static str { + self.scheme + } + + fn addr(&self) -> SocketAddr { + self.addr + } } -impl HyperService<Request<Body>> for Service { +impl Resource for HttpConnResource { + fn name(&self) -> Cow<str> { + "httpConn".into() + } + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel(); + } +} + +/// Creates a new HttpConn resource which uses `io` as its transport. +pub fn http_create_conn_resource<S>( + state: &mut OpState, + io: S, + addr: SocketAddr, + scheme: &'static str, +) -> Result<ResourceId, AnyError> +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let conn = HttpConnResource::new(io, scheme, addr); + let rid = state.resource_table.add(conn); + Ok(rid) +} + +/// An object that implements the `hyper::Service` trait, through which Hyper +/// delivers incoming HTTP requests. +struct HttpService { + acceptors_rx: Peekable<mpsc::UnboundedReceiver<HttpAcceptor>>, +} + +impl HttpService { + fn new(acceptors_rx: mpsc::UnboundedReceiver<HttpAcceptor>) -> Self { + let acceptors_rx = acceptors_rx.peekable(); + Self { acceptors_rx } + } +} + +impl Service<Request<Body>> for HttpService { type Response = Response<Body>; - type Error = http::Error; - #[allow(clippy::type_complexity)] - type Future = - Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; + type Error = oneshot::Canceled; + type Future = oneshot::Receiver<Response<Body>>; fn poll_ready( &mut self, - _cx: &mut Context<'_>, + cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - if self.inner.borrow().is_some() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + let acceptors_rx = Pin::new(&mut self.acceptors_rx); + let result = ready!(acceptors_rx.poll_peek(cx)) + .map(|_| ()) + .ok_or(oneshot::Canceled); + Poll::Ready(result) } - fn call(&mut self, req: Request<Body>) -> Self::Future { - let (resp_tx, resp_rx) = oneshot::channel(); - self.inner.borrow_mut().replace(ServiceInner { - request: req, - response_tx: resp_tx, - }); - - async move { - resp_rx.await.or_else(|_| - // Fallback dummy response in case sender was dropped due to closed conn - Response::builder() - .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) - .body(vec![].into())) - } - .boxed_local() + fn call(&mut self, request: Request<Body>) -> Self::Future { + let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap(); + acceptor.call(request) } } -type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>; +/// A pair of one-shot channels which first transfer a HTTP request from the +/// Hyper service to the HttpConn resource, and then take the Response back to +/// the service. +struct HttpAcceptor { + request_tx: oneshot::Sender<Request<Body>>, + response_rx: oneshot::Receiver<Response<Body>>, +} -struct Conn { - scheme: &'static str, - addr: SocketAddr, - conn: Rc<RefCell<ConnFuture>>, +impl HttpAcceptor { + fn new( + request_tx: oneshot::Sender<Request<Body>>, + response_rx: oneshot::Receiver<Response<Body>>, + ) -> Self { + Self { + request_tx, + response_rx, + } + } + + fn call(self, request: Request<Body>) -> oneshot::Receiver<Response<Body>> { + let Self { + request_tx, + response_rx, + } = self; + request_tx + .send(request) + .map(|_| response_rx) + .unwrap_or_else(|_| oneshot::channel().1) // Make new canceled receiver. + } } -struct ConnResource { - hyper_connection: Conn, - deno_service: Service, - cancel: CancelHandle, +/// A resource representing a single HTTP request/response stream. +struct HttpStreamResource { + conn: Rc<HttpConnResource>, + rd: AsyncRefCell<HttpRequestReader>, + wr: AsyncRefCell<HttpResponseWriter>, + cancel_handle: CancelHandle, } -impl ConnResource { - // TODO(ry) impl Future for ConnResource? - fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> { - self - .hyper_connection - .conn - .borrow_mut() - .poll_unpin(cx) - .map_err(AnyError::from) +impl HttpStreamResource { + fn new( + conn: &Rc<HttpConnResource>, + request: Request<Body>, + response_tx: oneshot::Sender<Response<Body>>, + ) -> Self { + Self { + conn: conn.clone(), + rd: HttpRequestReader::Headers(request).into(), + wr: HttpResponseWriter::Headers(response_tx).into(), + cancel_handle: CancelHandle::new(), + } } } -impl Resource for ConnResource { +impl Resource for HttpStreamResource { fn name(&self) -> Cow<str> { - "httpConnection".into() + "httpStream".into() } fn close(self: Rc<Self>) { - self.cancel.cancel() + self.cancel_handle.cancel(); + } +} + +/// The read half of an HTTP stream. +enum HttpRequestReader { + Headers(Request<Body>), + Body(Peekable<Body>), + Closed, +} + +impl Default for HttpRequestReader { + fn default() -> Self { + Self::Closed + } +} + +/// The write half of an HTTP stream. +enum HttpResponseWriter { + Headers(oneshot::Sender<Response<Body>>), + Body(hyper::body::Sender), + Closed, +} + +impl Default for HttpResponseWriter { + fn default() -> Self { + Self::Closed } } @@ -160,9 +331,7 @@ impl Resource for ConnResource { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // request_rid: - Option<ResourceId>, - // response_sender_rid: + // stream_rid: ResourceId, // method: // This is a String rather than a ByteString because reqwest will only return @@ -174,111 +343,40 @@ struct NextRequestResponse( String, ); -async fn op_http_request_next( +async fn op_http_accept( state: Rc<RefCell<OpState>>, - conn_rid: ResourceId, + rid: ResourceId, _: (), ) -> Result<Option<NextRequestResponse>, AnyError> { - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(conn_rid)?; - - let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); - - poll_fn(|cx| { - conn_resource.deno_service.waker.register(cx.waker()); - - // Check if conn is open/close/errored - let (conn_closed, conn_result) = match conn_resource.poll(cx) { - Poll::Pending => (false, Ok(())), - Poll::Ready(Ok(())) => (true, Ok(())), - Poll::Ready(Err(e)) => { - if should_ignore_error(&e) { - (true, Ok(())) - } else { - (true, Err(e)) - } - } - }; - // Drop conn resource if closed - if conn_closed { - // TODO(ry) close RequestResource associated with connection - // TODO(ry) close ResponseBodyResource associated with connection - // try to close ConnResource, but don't unwrap as it might - // already be closed - let _ = state - .borrow_mut() - .resource_table - .take::<ConnResource>(conn_rid); - - // Fail with err if unexpected conn error, early return None otherwise - return Poll::Ready(conn_result.map(|_| None)); - } + let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?; - if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() { - let Conn { scheme, addr, .. } = conn_resource.hyper_connection; - let mut state = state.borrow_mut(); - let next = - prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?; - Poll::Ready(Ok(Some(next))) - } else { - Poll::Pending - } - }) - .try_or_cancel(cancel) - .await - .map_err(AnyError::from) -} + let stream = match conn.accept().await { + Ok(Some(stream)) => Rc::new(stream), + Ok(None) => return Ok(None), + Err(err) => return Err(err), + }; -fn prepare_next_request( - state: &mut OpState, - conn_rid: ResourceId, - request_resource: ServiceInner, - scheme: &'static str, - addr: SocketAddr, -) -> Result<NextRequestResponse, AnyError> { - let tx = request_resource.response_tx; - let req = request_resource.request; - let method = req.method().to_string(); - let headers = req_headers(&req); - let url = req_url(&req, scheme, addr)?; - - let is_websocket = is_websocket_request(&req); - let can_have_body = !matches!(*req.method(), Method::GET | Method::HEAD); - let has_body = - is_websocket || (can_have_body && req.size_hint().exact() != Some(0)); - - let maybe_request_rid = if has_body { - let request_rid = state.resource_table.add(RequestResource { - conn_rid, - inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), - cancel: CancelHandle::default(), - }); - Some(request_rid) - } else { - None + let rd = RcRef::map(&stream, |r| &r.rd).borrow().await; + let request = match &*rd { + HttpRequestReader::Headers(request) => request, + _ => unreachable!(), }; - let response_sender_rid = state.resource_table.add(ResponseSenderResource { - sender: tx, - conn_rid, - }); - - Ok(NextRequestResponse( - maybe_request_rid, - response_sender_rid, - method, - headers, - url, - )) + let method = request.method().to_string(); + let headers = req_headers(request); + let url = req_url(request, conn.scheme(), conn.addr()); + + let stream_rid = state.borrow_mut().resource_table.add_rc(stream); + + let r = NextRequestResponse(stream_rid, method, headers, url); + Ok(Some(r)) } fn req_url( req: &hyper::Request<hyper::Body>, scheme: &'static str, addr: SocketAddr, -) -> Result<String, AnyError> { +) -> String { let host: Cow<str> = if let Some(auth) = req.uri().authority() { match addr.port() { 443 if scheme == "https" => Cow::Borrowed(auth.host()), @@ -288,12 +386,22 @@ fn req_url( } else if let Some(host) = req.uri().host() { Cow::Borrowed(host) } else if let Some(host) = req.headers().get("HOST") { - Cow::Borrowed(host.to_str()?) + match host.to_str() { + Ok(host) => Cow::Borrowed(host), + Err(_) => Cow::Owned( + host + .as_bytes() + .iter() + .cloned() + .map(char::from) + .collect::<String>(), + ), + } } else { Cow::Owned(addr.to_string()) }; let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); - Ok([scheme, "://", &host, path].concat()) + [scheme, "://", &host, path].concat() } fn req_headers( @@ -327,68 +435,6 @@ fn req_headers( headers } -fn is_websocket_request(req: &hyper::Request<hyper::Body>) -> bool { - req.version() == hyper::Version::HTTP_11 - && req.method() == hyper::Method::GET - && req.headers().contains_key(&SEC_WEBSOCKET_KEY) - && header(req.headers(), &SEC_WEBSOCKET_VERSION) == b"13" - && header(req.headers(), &UPGRADE) - .split(|c| *c == b' ' || *c == b',') - .any(|token| token.eq_ignore_ascii_case(b"websocket")) - && header(req.headers(), &CONNECTION) - .split(|c| *c == b' ' || *c == b',') - .any(|token| token.eq_ignore_ascii_case(b"upgrade")) -} - -fn header<'a>( - h: &'a hyper::http::HeaderMap, - name: &hyper::header::HeaderName, -) -> &'a [u8] { - h.get(name) - .map(hyper::header::HeaderValue::as_bytes) - .unwrap_or_default() -} - -fn should_ignore_error(e: &AnyError) -> bool { - if let Some(e) = e.downcast_ref::<hyper::Error>() { - use std::error::Error; - if let Some(std_err) = e.source() { - if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() { - if io_err.kind() == std::io::ErrorKind::NotConnected { - return true; - } - } - } - } - false -} - -pub fn start_http<IO: AsyncRead + AsyncWrite + Unpin + Send + 'static>( - state: &mut OpState, - io: IO, - addr: SocketAddr, - scheme: &'static str, -) -> Result<ResourceId, AnyError> { - let deno_service = Service::default(); - - let hyper_connection = Http::new() - .with_executor(LocalExecutor) - .serve_connection(io, deno_service.clone()) - .with_upgrades(); - let conn = Pin::new(Box::new(hyper_connection)); - let conn_resource = ConnResource { - hyper_connection: Conn { - scheme, - addr, - conn: Rc::new(RefCell::new(conn)), - }, - deno_service, - cancel: CancelHandle::default(), - }; - let rid = state.resource_table.add(conn_resource); - Ok(rid) -} - // We use a tuple instead of struct to avoid serialization overhead of the keys. #[derive(Deserialize)] struct RespondArgs( @@ -400,27 +446,16 @@ struct RespondArgs( Vec<(ByteString, ByteString)>, ); -async fn op_http_response( +async fn op_http_write_headers( state: Rc<RefCell<OpState>>, args: RespondArgs, data: Option<StringOrBuffer>, -) -> Result<Option<ResourceId>, AnyError> { +) -> Result<(), AnyError> { let RespondArgs(rid, status, headers) = args; - - let response_sender = state + let stream = state .borrow_mut() .resource_table - .take::<ResponseSenderResource>(rid)?; - let response_sender = Rc::try_unwrap(response_sender) - .ok() - .expect("multiple op_http_respond ongoing"); - - let conn_rid = response_sender.conn_rid; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(conn_rid)?; + .get::<HttpStreamResource>(rid)?; let mut builder = Response::builder().status(status); @@ -429,171 +464,138 @@ async fn op_http_response( builder = builder.header(key.as_ref(), value.as_ref()); } - let (maybe_response_body_rid, res) = if let Some(d) = data { - // If a body is passed, we use it, and don't return a body for streaming. - (None, builder.body(d.into_bytes().into())?) - } else { - // If no body is passed, we return a writer for streaming the body. - let (sender, body) = Body::channel(); - let res = builder.body(body)?; - - let response_body_rid = - state.borrow_mut().resource_table.add(ResponseBodyResource { - body: AsyncRefCell::new(sender), - conn_rid, - }); - - (Some(response_body_rid), res) - }; - - // oneshot::Sender::send(v) returns |v| on error, not an error object. - // The only failure mode is the receiver already having dropped its end - // of the channel. - if response_sender.sender.send(res).is_err() { - if let Some(rid) = maybe_response_body_rid { - let _ = state - .borrow_mut() - .resource_table - .take::<ResponseBodyResource>(rid); - } - return Err(type_error("internal communication error")); - } + let body: Response<Body>; + let new_wr: HttpResponseWriter; - let result = poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => { - state.borrow_mut().resource_table.close(conn_rid).ok(); - Poll::Ready(x) + match data { + Some(data) => { + // If a buffer was passed, we use it to construct a response body. + body = builder.body(data.into_bytes().into())?; + new_wr = HttpResponseWriter::Closed; } - Poll::Pending => Poll::Ready(Ok(())), - }) - .await; - - if let Err(e) = result { - if let Some(rid) = maybe_response_body_rid { - let _ = state - .borrow_mut() - .resource_table - .take::<ResponseBodyResource>(rid); + None => { + // If no buffer was passed, the caller will stream the response body. + let (body_tx, body_rx) = Body::channel(); + body = builder.body(body_rx)?; + new_wr = HttpResponseWriter::Body(body_tx); } - return Err(e); } - if maybe_response_body_rid.is_none() { - conn_resource.deno_service.waker.wake(); + let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; + let response_tx = match replace(&mut *old_wr, new_wr) { + HttpResponseWriter::Headers(response_tx) => response_tx, + _ => return Err(http_error("response headers already sent")), + }; + + match response_tx.send(body) { + Ok(_) => Ok(()), + Err(_) => { + stream.conn.closed().await?; + Err(http_error("connection closed while sending response")) + } } - Ok(maybe_response_body_rid) } -async fn op_http_response_close( +async fn op_http_write( state: Rc<RefCell<OpState>>, rid: ResourceId, - _: (), + buf: ZeroCopyBuf, ) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .take::<ResponseBodyResource>(rid)?; - - let conn_resource = state + let stream = state .borrow() .resource_table - .get::<ConnResource>(resource.conn_rid)?; - drop(resource); - - let r = poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), - Poll::Pending => Poll::Ready(Ok(())), - }) - .await; - conn_resource.deno_service.waker.wake(); - r -} - -async fn op_http_request_read( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - mut data: ZeroCopyBuf, -) -> Result<usize, AnyError> { - let resource = state - .borrow() - .resource_table - .get::<RequestResource>(rid as u32)?; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(resource.conn_rid)?; - - let mut inner = RcRef::map(resource.clone(), |r| &r.inner) - .borrow_mut() - .await; - - if let RequestOrStreamReader::Request(req) = &mut *inner { - let req = req.take().unwrap(); - let stream: BytesStream = Box::pin(req.into_body().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let reader = StreamReader::new(stream); - *inner = RequestOrStreamReader::StreamReader(reader); - }; - - let reader = match &mut *inner { - RequestOrStreamReader::StreamReader(reader) => reader, - _ => unreachable!(), - }; - - let cancel = RcRef::map(resource, |r| &r.cancel); - - let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); + .get::<HttpStreamResource>(rid)?; + let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; + + loop { + let body_tx = match &mut *wr { + HttpResponseWriter::Body(body_tx) => body_tx, + HttpResponseWriter::Headers(_) => { + break Err(http_error("no response headers")) + } + HttpResponseWriter::Closed => { + break Err(http_error("response already completed")) + } + }; - poll_fn(|cx| { - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); + let bytes = Bytes::copy_from_slice(&buf[..]); + match body_tx.send_data(bytes).await { + Ok(_) => break Ok(()), + Err(err) => { + // Don't return "channel closed", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + assert!(err.is_closed()); + stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } } - - read_fut.poll_unpin(cx).map_err(AnyError::from) - }) - .await + } } -async fn op_http_response_write( +/// Gracefully closes the write half of the HTTP stream. Note that this does not +/// remove the HTTP stream resource from the resource table; it still has to be +/// closed with `Deno.core.close()`. +async fn op_http_shutdown( state: Rc<RefCell<OpState>>, rid: ResourceId, - data: ZeroCopyBuf, + _: (), ) -> Result<(), AnyError> { - let resource = state + let stream = state .borrow() .resource_table - .get::<ResponseBodyResource>(rid as u32)?; + .get::<HttpStreamResource>(rid)?; + let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; + take(&mut *wr); + Ok(()) +} - let conn_resource = state - .borrow() +async fn op_http_read( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + mut buf: ZeroCopyBuf, +) -> Result<usize, AnyError> { + let stream = state + .borrow_mut() .resource_table - .get::<ConnResource>(resource.conn_rid)?; - - let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - - let mut send_data_fut = body.send_data(data.to_vec().into()).boxed_local(); - - poll_fn(|cx| { - let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); - - // Poll connection so the data is flushed - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); + .get::<HttpStreamResource>(rid)?; + let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(body) => break body, + HttpRequestReader::Closed => return Ok(0), } + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let body = request.into_body().peekable(); + *rd = HttpRequestReader::Body(body); + } + _ => unreachable!(), + }; + }; - r - }) - .await?; + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(buf.len(), chunk.len()); + buf[..len].copy_from_slice(&chunk.split_to(len)); + break Ok(len); + } + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(0), + } + } + }; - Ok(()) + let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await } fn op_http_websocket_accept_header( @@ -613,86 +615,22 @@ async fn op_http_upgrade_websocket( rid: ResourceId, _: (), ) -> Result<ResourceId, AnyError> { - let req_resource = state + let stream = state .borrow_mut() .resource_table - .take::<RequestResource>(rid)?; - - let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await; - - if let RequestOrStreamReader::Request(req) = inner.as_mut() { - let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?; - let stream = - deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server, - None, - ) - .await; - - let (ws_tx, ws_rx) = stream.split(); - let rid = - state - .borrow_mut() - .resource_table - .add(deno_websocket::WsStreamResource { - stream: deno_websocket::WebSocketStreamType::Server { - rx: AsyncRefCell::new(ws_rx), - tx: AsyncRefCell::new(ws_tx), - }, - cancel: Default::default(), - }); - - Ok(rid) - } else { - Err(bad_resource_id()) - } -} - -type BytesStream = - Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>; - -enum RequestOrStreamReader { - Request(Option<Request<hyper::Body>>), - StreamReader(StreamReader<BytesStream, bytes::Bytes>), -} - -struct RequestResource { - conn_rid: ResourceId, - inner: AsyncRefCell<RequestOrStreamReader>, - cancel: CancelHandle, -} - -impl Resource for RequestResource { - fn name(&self) -> Cow<str> { - "request".into() - } + .get::<HttpStreamResource>(rid)?; + let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; - fn close(self: Rc<Self>) { - self.cancel.cancel() - } -} - -struct ResponseSenderResource { - sender: oneshot::Sender<Response<Body>>, - conn_rid: ResourceId, -} - -impl Resource for ResponseSenderResource { - fn name(&self) -> Cow<str> { - "responseSender".into() - } -} - -struct ResponseBodyResource { - body: AsyncRefCell<hyper::body::Sender>, - conn_rid: ResourceId, -} + let request = match &mut *rd { + HttpRequestReader::Headers(request) => request, + _ => { + return Err(http_error("cannot upgrade because request body was used")) + } + }; -impl Resource for ResponseBodyResource { - fn name(&self) -> Cow<str> { - "responseBody".into() - } + let transport = hyper::upgrade::on(request).await?; + let ws_rid = ws_create_server_stream(&state, transport).await?; + Ok(ws_rid) } // Needed so hyper can use non Send futures @@ -705,6 +643,33 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - tokio::task::spawn_local(fut); + spawn_local(fut); + } +} + +fn http_error(message: &'static str) -> AnyError { + custom_error("Http", message) +} + +/// Filters out the ever-surprising 'shutdown ENOTCONN' errors. +fn filter_enotconn( + result: Result<(), hyper::Error>, +) -> Result<(), hyper::Error> { + if result + .as_ref() + .err() + .and_then(|err| err.source()) + .and_then(|err| err.downcast_ref::<io::Error>()) + .filter(|err| err.kind() == io::ErrorKind::NotConnected) + .is_some() + { + Ok(()) + } else { + result } } + +/// Create a future that is forever pending. +fn never() -> Pending<Never> { + pending() +} |