summaryrefslogtreecommitdiff
path: root/ext/http/lib.rs
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2021-11-09 12:10:21 +0100
committerGitHub <noreply@github.com>2021-11-09 12:10:21 +0100
commit75793baae83123f890442c5d32e3dd38eb18ce1c (patch)
tree0d14bd5edbe28c3beebe9f0944437a89e0e3f724 /ext/http/lib.rs
parent31fde9deba6d4ca55293d60a030babd8d4ce12af (diff)
Revert "refactor(ext/http): rewrite hyper integration and fix bug (#12332)" (#12704)
This reverts commit 5b1e537446454f6332de44adbeb6a15ff072c2fa.
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r--ext/http/lib.rs891
1 files changed, 463 insertions, 428 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 5a14f845f..aae6415cb 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -1,29 +1,17 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use bytes::Bytes;
-use deno_core::error::custom_error;
+use deno_core::error::bad_resource_id;
+use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures::channel::mpsc;
-use deno_core::futures::channel::oneshot;
-use deno_core::futures::future::pending;
-use deno_core::futures::future::select;
-use deno_core::futures::future::Either;
-use deno_core::futures::future::Pending;
-use deno_core::futures::future::RemoteHandle;
-use deno_core::futures::future::Shared;
-use deno_core::futures::never::Never;
-use deno_core::futures::pin_mut;
-use deno_core::futures::ready;
-use deno_core::futures::stream::Peekable;
+use deno_core::futures::future::poll_fn;
use deno_core::futures::FutureExt;
+use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
-use deno_core::futures::TryFutureExt;
use deno_core::include_js_files;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
-use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::Extension;
@@ -33,31 +21,33 @@ use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf;
-use deno_websocket::ws_create_server_stream;
+use hyper::body::HttpBody;
+use hyper::header::CONNECTION;
+use hyper::header::SEC_WEBSOCKET_KEY;
+use hyper::header::SEC_WEBSOCKET_VERSION;
+use hyper::header::UPGRADE;
+use hyper::http;
use hyper::server::conn::Http;
-use hyper::service::Service;
+use hyper::service::Service as HyperService;
use hyper::Body;
+use hyper::Method;
use hyper::Request;
use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
-use std::cmp::min;
-use std::error::Error;
use std::future::Future;
-use std::io;
-use std::mem::replace;
-use std::mem::take;
use std::net::SocketAddr;
use std::pin::Pin;
use std::rc::Rc;
-use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
+use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
-use tokio::task::spawn_local;
+use tokio::sync::oneshot;
+use tokio_util::io::StreamReader;
pub fn init() -> Extension {
Extension::builder()
@@ -66,11 +56,11 @@ pub fn init() -> Extension {
"01_http.js",
))
.ops(vec![
- ("op_http_accept", op_async(op_http_accept)),
- ("op_http_read", op_async(op_http_read)),
- ("op_http_write_headers", op_async(op_http_write_headers)),
- ("op_http_write", op_async(op_http_write)),
- ("op_http_shutdown", op_async(op_http_shutdown)),
+ ("op_http_request_next", op_async(op_http_request_next)),
+ ("op_http_request_read", op_async(op_http_request_read)),
+ ("op_http_response", op_async(op_http_response)),
+ ("op_http_response_write", op_async(op_http_response_write)),
+ ("op_http_response_close", op_async(op_http_response_close)),
(
"op_http_websocket_accept_header",
op_sync(op_http_websocket_accept_header),
@@ -83,247 +73,86 @@ pub fn init() -> Extension {
.build()
}
-struct HttpConnResource {
- addr: SocketAddr,
- scheme: &'static str,
- acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>,
- closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper::Error>>>>,
- cancel_handle: Rc<CancelHandle>, // Closes gracefully and cancels accept ops.
+struct ServiceInner {
+ request: Request<Body>,
+ response_tx: oneshot::Sender<Response<Body>>,
}
-impl HttpConnResource {
- fn new<S>(io: S, scheme: &'static str, addr: SocketAddr) -> Self
- where
- S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
- {
- let (acceptors_tx, acceptors_rx) = mpsc::unbounded::<HttpAcceptor>();
- let service = HttpService::new(acceptors_rx);
-
- let conn_fut = Http::new()
- .with_executor(LocalExecutor)
- .serve_connection(io, service)
- .with_upgrades();
-
- // When the cancel handle is used, the connection shuts down gracefully.
- // No new HTTP streams will be accepted, but existing streams will be able
- // to continue operating and eventually shut down cleanly.
- let cancel_handle = CancelHandle::new_rc();
- let shutdown_fut = never().or_cancel(&cancel_handle).fuse();
-
- // A local task that polls the hyper connection future to completion.
- let task_fut = async move {
- pin_mut!(shutdown_fut);
- pin_mut!(conn_fut);
- let result = match select(conn_fut, shutdown_fut).await {
- Either::Left((result, _)) => result,
- Either::Right((_, mut conn_fut)) => {
- conn_fut.as_mut().graceful_shutdown();
- conn_fut.await
- }
- };
- filter_enotconn(result).map_err(Arc::from)
- };
- let (task_fut, closed_fut) = task_fut.remote_handle();
- let closed_fut = closed_fut.shared();
- spawn_local(task_fut);
-
- Self {
- addr,
- scheme,
- acceptors_tx,
- closed_fut,
- cancel_handle,
- }
- }
-
- // Accepts a new incoming HTTP request.
- async fn accept(
- self: &Rc<Self>,
- ) -> Result<Option<HttpStreamResource>, AnyError> {
- let fut = async {
- let (request_tx, request_rx) = oneshot::channel();
- let (response_tx, response_rx) = oneshot::channel();
-
- let acceptor = HttpAcceptor::new(request_tx, response_rx);
- self.acceptors_tx.unbounded_send(acceptor).ok()?;
-
- let request = request_rx.await.ok()?;
- let stream = HttpStreamResource::new(self, request, response_tx);
- Some(stream)
- };
-
- async {
- match fut.await {
- Some(stream) => Ok(Some(stream)),
- // Return the connection error, if any.
- None => self.closed().map_ok(|_| None).await,
- }
- }
- .try_or_cancel(&self.cancel_handle)
- .await
- }
-
- /// A future that completes when this HTTP connection is closed or errors.
- async fn closed(&self) -> Result<(), AnyError> {
- self.closed_fut.clone().map_err(AnyError::from).await
- }
-
- fn scheme(&self) -> &'static str {
- self.scheme
- }
-
- fn addr(&self) -> SocketAddr {
- self.addr
- }
+#[derive(Clone, Default)]
+struct Service {
+ inner: Rc<RefCell<Option<ServiceInner>>>,
+ waker: Rc<deno_core::futures::task::AtomicWaker>,
}
-impl Resource for HttpConnResource {
- fn name(&self) -> Cow<str> {
- "httpConn".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel_handle.cancel();
- }
-}
-
-/// Creates a new HttpConn resource which uses `io` as its transport.
-pub fn http_create_conn_resource<S>(
- state: &mut OpState,
- io: S,
- addr: SocketAddr,
- scheme: &'static str,
-) -> Result<ResourceId, AnyError>
-where
- S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
-{
- let conn = HttpConnResource::new(io, scheme, addr);
- let rid = state.resource_table.add(conn);
- Ok(rid)
-}
-
-/// An object that implements the `hyper::Service` trait, through which Hyper
-/// delivers incoming HTTP requests.
-struct HttpService {
- acceptors_rx: Peekable<mpsc::UnboundedReceiver<HttpAcceptor>>,
-}
-
-impl HttpService {
- fn new(acceptors_rx: mpsc::UnboundedReceiver<HttpAcceptor>) -> Self {
- let acceptors_rx = acceptors_rx.peekable();
- Self { acceptors_rx }
- }
-}
-
-impl Service<Request<Body>> for HttpService {
+impl HyperService<Request<Body>> for Service {
type Response = Response<Body>;
- type Error = oneshot::Canceled;
- type Future = oneshot::Receiver<Response<Body>>;
+ type Error = http::Error;
+ #[allow(clippy::type_complexity)]
+ type Future =
+ Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(
&mut self,
- cx: &mut Context<'_>,
+ _cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
- let acceptors_rx = Pin::new(&mut self.acceptors_rx);
- let result = ready!(acceptors_rx.poll_peek(cx))
- .map(|_| ())
- .ok_or(oneshot::Canceled);
- Poll::Ready(result)
+ if self.inner.borrow().is_some() {
+ Poll::Pending
+ } else {
+ Poll::Ready(Ok(()))
+ }
}
- fn call(&mut self, request: Request<Body>) -> Self::Future {
- let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap();
- acceptor.call(request)
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ let (resp_tx, resp_rx) = oneshot::channel();
+ self.inner.borrow_mut().replace(ServiceInner {
+ request: req,
+ response_tx: resp_tx,
+ });
+
+ async move {
+ resp_rx.await.or_else(|_|
+ // Fallback dummy response in case sender was dropped due to closed conn
+ Response::builder()
+ .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
+ .body(vec![].into()))
+ }
+ .boxed_local()
}
}
-/// A pair of one-shot channels which first transfer a HTTP request from the
-/// Hyper service to the HttpConn resource, and then take the Response back to
-/// the service.
-struct HttpAcceptor {
- request_tx: oneshot::Sender<Request<Body>>,
- response_rx: oneshot::Receiver<Response<Body>>,
-}
-
-impl HttpAcceptor {
- fn new(
- request_tx: oneshot::Sender<Request<Body>>,
- response_rx: oneshot::Receiver<Response<Body>>,
- ) -> Self {
- Self {
- request_tx,
- response_rx,
- }
- }
+type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>;
- fn call(self, request: Request<Body>) -> oneshot::Receiver<Response<Body>> {
- let Self {
- request_tx,
- response_rx,
- } = self;
- request_tx
- .send(request)
- .map(|_| response_rx)
- .unwrap_or_else(|_| oneshot::channel().1) // Make new canceled receiver.
- }
+struct Conn {
+ scheme: &'static str,
+ addr: SocketAddr,
+ conn: Rc<RefCell<ConnFuture>>,
}
-/// A resource representing a single HTTP request/response stream.
-struct HttpStreamResource {
- conn: Rc<HttpConnResource>,
- rd: AsyncRefCell<HttpRequestReader>,
- wr: AsyncRefCell<HttpResponseWriter>,
- cancel_handle: CancelHandle,
+struct ConnResource {
+ hyper_connection: Conn,
+ deno_service: Service,
+ cancel: CancelHandle,
}
-impl HttpStreamResource {
- fn new(
- conn: &Rc<HttpConnResource>,
- request: Request<Body>,
- response_tx: oneshot::Sender<Response<Body>>,
- ) -> Self {
- Self {
- conn: conn.clone(),
- rd: HttpRequestReader::Headers(request).into(),
- wr: HttpResponseWriter::Headers(response_tx).into(),
- cancel_handle: CancelHandle::new(),
- }
+impl ConnResource {
+ // TODO(ry) impl Future for ConnResource?
+ fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
+ self
+ .hyper_connection
+ .conn
+ .borrow_mut()
+ .poll_unpin(cx)
+ .map_err(AnyError::from)
}
}
-impl Resource for HttpStreamResource {
+impl Resource for ConnResource {
fn name(&self) -> Cow<str> {
- "httpStream".into()
+ "httpConnection".into()
}
fn close(self: Rc<Self>) {
- self.cancel_handle.cancel();
- }
-}
-
-/// The read half of an HTTP stream.
-enum HttpRequestReader {
- Headers(Request<Body>),
- Body(Peekable<Body>),
- Closed,
-}
-
-impl Default for HttpRequestReader {
- fn default() -> Self {
- Self::Closed
- }
-}
-
-/// The write half of an HTTP stream.
-enum HttpResponseWriter {
- Headers(oneshot::Sender<Response<Body>>),
- Body(hyper::body::Sender),
- Closed,
-}
-
-impl Default for HttpResponseWriter {
- fn default() -> Self {
- Self::Closed
+ self.cancel.cancel()
}
}
@@ -331,7 +160,9 @@ impl Default for HttpResponseWriter {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct NextRequestResponse(
- // stream_rid:
+ // request_rid:
+ Option<ResourceId>,
+ // response_sender_rid:
ResourceId,
// method:
// This is a String rather than a ByteString because reqwest will only return
@@ -343,40 +174,111 @@ struct NextRequestResponse(
String,
);
-async fn op_http_accept(
+async fn op_http_request_next(
state: Rc<RefCell<OpState>>,
- rid: ResourceId,
+ conn_rid: ResourceId,
_: (),
) -> Result<Option<NextRequestResponse>, AnyError> {
- let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(conn_rid)?;
+
+ let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
+
+ poll_fn(|cx| {
+ conn_resource.deno_service.waker.register(cx.waker());
+
+ // Check if conn is open/close/errored
+ let (conn_closed, conn_result) = match conn_resource.poll(cx) {
+ Poll::Pending => (false, Ok(())),
+ Poll::Ready(Ok(())) => (true, Ok(())),
+ Poll::Ready(Err(e)) => {
+ if should_ignore_error(&e) {
+ (true, Ok(()))
+ } else {
+ (true, Err(e))
+ }
+ }
+ };
+ // Drop conn resource if closed
+ if conn_closed {
+ // TODO(ry) close RequestResource associated with connection
+ // TODO(ry) close ResponseBodyResource associated with connection
+ // try to close ConnResource, but don't unwrap as it might
+ // already be closed
+ let _ = state
+ .borrow_mut()
+ .resource_table
+ .take::<ConnResource>(conn_rid);
+
+ // Fail with err if unexpected conn error, early return None otherwise
+ return Poll::Ready(conn_result.map(|_| None));
+ }
- let stream = match conn.accept().await {
- Ok(Some(stream)) => Rc::new(stream),
- Ok(None) => return Ok(None),
- Err(err) => return Err(err),
- };
+ if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() {
+ let Conn { scheme, addr, .. } = conn_resource.hyper_connection;
+ let mut state = state.borrow_mut();
+ let next =
+ prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?;
+ Poll::Ready(Ok(Some(next)))
+ } else {
+ Poll::Pending
+ }
+ })
+ .try_or_cancel(cancel)
+ .await
+ .map_err(AnyError::from)
+}
- let rd = RcRef::map(&stream, |r| &r.rd).borrow().await;
- let request = match &*rd {
- HttpRequestReader::Headers(request) => request,
- _ => unreachable!(),
+fn prepare_next_request(
+ state: &mut OpState,
+ conn_rid: ResourceId,
+ request_resource: ServiceInner,
+ scheme: &'static str,
+ addr: SocketAddr,
+) -> Result<NextRequestResponse, AnyError> {
+ let tx = request_resource.response_tx;
+ let req = request_resource.request;
+ let method = req.method().to_string();
+ let headers = req_headers(&req);
+ let url = req_url(&req, scheme, addr)?;
+
+ let is_websocket = is_websocket_request(&req);
+ let can_have_body = !matches!(*req.method(), Method::GET | Method::HEAD);
+ let has_body =
+ is_websocket || (can_have_body && req.size_hint().exact() != Some(0));
+
+ let maybe_request_rid = if has_body {
+ let request_rid = state.resource_table.add(RequestResource {
+ conn_rid,
+ inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
+ cancel: CancelHandle::default(),
+ });
+ Some(request_rid)
+ } else {
+ None
};
- let method = request.method().to_string();
- let headers = req_headers(request);
- let url = req_url(request, conn.scheme(), conn.addr());
-
- let stream_rid = state.borrow_mut().resource_table.add_rc(stream);
-
- let r = NextRequestResponse(stream_rid, method, headers, url);
- Ok(Some(r))
+ let response_sender_rid = state.resource_table.add(ResponseSenderResource {
+ sender: tx,
+ conn_rid,
+ });
+
+ Ok(NextRequestResponse(
+ maybe_request_rid,
+ response_sender_rid,
+ method,
+ headers,
+ url,
+ ))
}
fn req_url(
req: &hyper::Request<hyper::Body>,
scheme: &'static str,
addr: SocketAddr,
-) -> String {
+) -> Result<String, AnyError> {
let host: Cow<str> = if let Some(auth) = req.uri().authority() {
match addr.port() {
443 if scheme == "https" => Cow::Borrowed(auth.host()),
@@ -386,22 +288,12 @@ fn req_url(
} else if let Some(host) = req.uri().host() {
Cow::Borrowed(host)
} else if let Some(host) = req.headers().get("HOST") {
- match host.to_str() {
- Ok(host) => Cow::Borrowed(host),
- Err(_) => Cow::Owned(
- host
- .as_bytes()
- .iter()
- .cloned()
- .map(char::from)
- .collect::<String>(),
- ),
- }
+ Cow::Borrowed(host.to_str()?)
} else {
Cow::Owned(addr.to_string())
};
let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
- [scheme, "://", &host, path].concat()
+ Ok([scheme, "://", &host, path].concat())
}
fn req_headers(
@@ -435,6 +327,68 @@ fn req_headers(
headers
}
+fn is_websocket_request(req: &hyper::Request<hyper::Body>) -> bool {
+ req.version() == hyper::Version::HTTP_11
+ && req.method() == hyper::Method::GET
+ && req.headers().contains_key(&SEC_WEBSOCKET_KEY)
+ && header(req.headers(), &SEC_WEBSOCKET_VERSION) == b"13"
+ && header(req.headers(), &UPGRADE)
+ .split(|c| *c == b' ' || *c == b',')
+ .any(|token| token.eq_ignore_ascii_case(b"websocket"))
+ && header(req.headers(), &CONNECTION)
+ .split(|c| *c == b' ' || *c == b',')
+ .any(|token| token.eq_ignore_ascii_case(b"upgrade"))
+}
+
+fn header<'a>(
+ h: &'a hyper::http::HeaderMap,
+ name: &hyper::header::HeaderName,
+) -> &'a [u8] {
+ h.get(name)
+ .map(hyper::header::HeaderValue::as_bytes)
+ .unwrap_or_default()
+}
+
+fn should_ignore_error(e: &AnyError) -> bool {
+ if let Some(e) = e.downcast_ref::<hyper::Error>() {
+ use std::error::Error;
+ if let Some(std_err) = e.source() {
+ if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() {
+ if io_err.kind() == std::io::ErrorKind::NotConnected {
+ return true;
+ }
+ }
+ }
+ }
+ false
+}
+
+pub fn start_http<IO: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
+ state: &mut OpState,
+ io: IO,
+ addr: SocketAddr,
+ scheme: &'static str,
+) -> Result<ResourceId, AnyError> {
+ let deno_service = Service::default();
+
+ let hyper_connection = Http::new()
+ .with_executor(LocalExecutor)
+ .serve_connection(io, deno_service.clone())
+ .with_upgrades();
+ let conn = Pin::new(Box::new(hyper_connection));
+ let conn_resource = ConnResource {
+ hyper_connection: Conn {
+ scheme,
+ addr,
+ conn: Rc::new(RefCell::new(conn)),
+ },
+ deno_service,
+ cancel: CancelHandle::default(),
+ };
+ let rid = state.resource_table.add(conn_resource);
+ Ok(rid)
+}
+
// We use a tuple instead of struct to avoid serialization overhead of the keys.
#[derive(Deserialize)]
struct RespondArgs(
@@ -446,16 +400,27 @@ struct RespondArgs(
Vec<(ByteString, ByteString)>,
);
-async fn op_http_write_headers(
+async fn op_http_response(
state: Rc<RefCell<OpState>>,
args: RespondArgs,
data: Option<StringOrBuffer>,
-) -> Result<(), AnyError> {
+) -> Result<Option<ResourceId>, AnyError> {
let RespondArgs(rid, status, headers) = args;
- let stream = state
+
+ let response_sender = state
.borrow_mut()
.resource_table
- .get::<HttpStreamResource>(rid)?;
+ .take::<ResponseSenderResource>(rid)?;
+ let response_sender = Rc::try_unwrap(response_sender)
+ .ok()
+ .expect("multiple op_http_respond ongoing");
+
+ let conn_rid = response_sender.conn_rid;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(conn_rid)?;
let mut builder = Response::builder().status(status);
@@ -464,138 +429,171 @@ async fn op_http_write_headers(
builder = builder.header(key.as_ref(), value.as_ref());
}
- let body: Response<Body>;
- let new_wr: HttpResponseWriter;
+ let (maybe_response_body_rid, res) = if let Some(d) = data {
+ // If a body is passed, we use it, and don't return a body for streaming.
+ (None, builder.body(d.into_bytes().into())?)
+ } else {
+ // If no body is passed, we return a writer for streaming the body.
+ let (sender, body) = Body::channel();
+ let res = builder.body(body)?;
- match data {
- Some(data) => {
- // If a buffer was passed, we use it to construct a response body.
- body = builder.body(data.into_bytes().into())?;
- new_wr = HttpResponseWriter::Closed;
- }
- None => {
- // If no buffer was passed, the caller will stream the response body.
- let (body_tx, body_rx) = Body::channel();
- body = builder.body(body_rx)?;
- new_wr = HttpResponseWriter::Body(body_tx);
- }
- }
+ let response_body_rid =
+ state.borrow_mut().resource_table.add(ResponseBodyResource {
+ body: AsyncRefCell::new(sender),
+ conn_rid,
+ });
- let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
- let response_tx = match replace(&mut *old_wr, new_wr) {
- HttpResponseWriter::Headers(response_tx) => response_tx,
- _ => return Err(http_error("response headers already sent")),
+ (Some(response_body_rid), res)
};
- match response_tx.send(body) {
- Ok(_) => Ok(()),
- Err(_) => {
- stream.conn.closed().await?;
- Err(http_error("connection closed while sending response"))
+ // oneshot::Sender::send(v) returns |v| on error, not an error object.
+ // The only failure mode is the receiver already having dropped its end
+ // of the channel.
+ if response_sender.sender.send(res).is_err() {
+ if let Some(rid) = maybe_response_body_rid {
+ let _ = state
+ .borrow_mut()
+ .resource_table
+ .take::<ResponseBodyResource>(rid);
}
+ return Err(type_error("internal communication error"));
}
-}
-async fn op_http_write(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- buf: ZeroCopyBuf,
-) -> Result<(), AnyError> {
- let stream = state
- .borrow()
- .resource_table
- .get::<HttpStreamResource>(rid)?;
- let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
-
- loop {
- let body_tx = match &mut *wr {
- HttpResponseWriter::Body(body_tx) => body_tx,
- HttpResponseWriter::Headers(_) => {
- break Err(http_error("no response headers"))
- }
- HttpResponseWriter::Closed => {
- break Err(http_error("response already completed"))
- }
- };
-
- let bytes = Bytes::copy_from_slice(&buf[..]);
- match body_tx.send_data(bytes).await {
- Ok(_) => break Ok(()),
- Err(err) => {
- // Don't return "channel closed", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- assert!(err.is_closed());
- stream.conn.closed().await?;
- // If there was no connection error, drop body_tx.
- *wr = HttpResponseWriter::Closed;
- }
+ let result = poll_fn(|cx| match conn_resource.poll(cx) {
+ Poll::Ready(x) => {
+ state.borrow_mut().resource_table.close(conn_rid).ok();
+ Poll::Ready(x)
+ }
+ Poll::Pending => Poll::Ready(Ok(())),
+ })
+ .await;
+
+ if let Err(e) = result {
+ if let Some(rid) = maybe_response_body_rid {
+ let _ = state
+ .borrow_mut()
+ .resource_table
+ .take::<ResponseBodyResource>(rid);
}
+ return Err(e);
}
+
+ if maybe_response_body_rid.is_none() {
+ conn_resource.deno_service.waker.wake();
+ }
+ Ok(maybe_response_body_rid)
}
-/// Gracefully closes the write half of the HTTP stream. Note that this does not
-/// remove the HTTP stream resource from the resource table; it still has to be
-/// closed with `Deno.core.close()`.
-async fn op_http_shutdown(
+async fn op_http_response_close(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_: (),
) -> Result<(), AnyError> {
- let stream = state
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .take::<ResponseBodyResource>(rid)?;
+
+ let conn_resource = state
.borrow()
.resource_table
- .get::<HttpStreamResource>(rid)?;
- let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
- take(&mut *wr);
- Ok(())
+ .get::<ConnResource>(resource.conn_rid)?;
+ drop(resource);
+
+ let r = poll_fn(|cx| match conn_resource.poll(cx) {
+ Poll::Ready(x) => Poll::Ready(x),
+ Poll::Pending => Poll::Ready(Ok(())),
+ })
+ .await;
+ conn_resource.deno_service.waker.wake();
+ r
}
-async fn op_http_read(
+async fn op_http_request_read(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
- mut buf: ZeroCopyBuf,
+ mut data: ZeroCopyBuf,
) -> Result<usize, AnyError> {
- let stream = state
- .borrow_mut()
+ let resource = state
+ .borrow()
.resource_table
- .get::<HttpStreamResource>(rid)?;
- let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
-
- let body = loop {
- match &mut *rd {
- HttpRequestReader::Headers(_) => {}
- HttpRequestReader::Body(body) => break body,
- HttpRequestReader::Closed => return Ok(0),
- }
- match take(&mut *rd) {
- HttpRequestReader::Headers(request) => {
- let body = request.into_body().peekable();
- *rd = HttpRequestReader::Body(body);
- }
- _ => unreachable!(),
- };
+ .get::<RequestResource>(rid as u32)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)?;
+
+ let mut inner = RcRef::map(resource.clone(), |r| &r.inner)
+ .borrow_mut()
+ .await;
+
+ if let RequestOrStreamReader::Request(req) = &mut *inner {
+ let req = req.take().unwrap();
+ let stream: BytesStream = Box::pin(req.into_body().map(|r| {
+ r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
+ }));
+ let reader = StreamReader::new(stream);
+ *inner = RequestOrStreamReader::StreamReader(reader);
};
- let fut = async {
- let mut body = Pin::new(body);
- loop {
- match body.as_mut().peek_mut().await {
- Some(Ok(chunk)) if !chunk.is_empty() => {
- let len = min(buf.len(), chunk.len());
- buf[..len].copy_from_slice(&chunk.split_to(len));
- break Ok(len);
- }
- Some(_) => match body.as_mut().next().await.unwrap() {
- Ok(chunk) => assert!(chunk.is_empty()),
- Err(err) => break Err(AnyError::from(err)),
- },
- None => break Ok(0),
- }
- }
+ let reader = match &mut *inner {
+ RequestOrStreamReader::StreamReader(reader) => reader,
+ _ => unreachable!(),
};
- let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle);
- fut.try_or_cancel(cancel_handle).await
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+
+ let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
+
+ poll_fn(|cx| {
+ if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
+ // close ConnResource
+ // close RequestResource associated with connection
+ // close ResponseBodyResource associated with connection
+ return Poll::Ready(Err(e));
+ }
+
+ read_fut.poll_unpin(cx).map_err(AnyError::from)
+ })
+ .await
+}
+
+async fn op_http_response_write(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: ZeroCopyBuf,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<ResponseBodyResource>(rid as u32)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)?;
+
+ let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
+
+ let mut send_data_fut = body.send_data(data.to_vec().into()).boxed_local();
+
+ poll_fn(|cx| {
+ let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
+
+ // Poll connection so the data is flushed
+ if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
+ // close ConnResource
+ // close RequestResource associated with connection
+ // close ResponseBodyResource associated with connection
+ return Poll::Ready(Err(e));
+ }
+
+ r
+ })
+ .await?;
+
+ Ok(())
}
fn op_http_websocket_accept_header(
@@ -615,22 +613,86 @@ async fn op_http_upgrade_websocket(
rid: ResourceId,
_: (),
) -> Result<ResourceId, AnyError> {
- let stream = state
+ let req_resource = state
.borrow_mut()
.resource_table
- .get::<HttpStreamResource>(rid)?;
- let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
+ .take::<RequestResource>(rid)?;
+
+ let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await;
+
+ if let RequestOrStreamReader::Request(req) = inner.as_mut() {
+ let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?;
+ let stream =
+ deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
+ upgraded,
+ deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server,
+ None,
+ )
+ .await;
+
+ let (ws_tx, ws_rx) = stream.split();
+ let rid =
+ state
+ .borrow_mut()
+ .resource_table
+ .add(deno_websocket::WsStreamResource {
+ stream: deno_websocket::WebSocketStreamType::Server {
+ rx: AsyncRefCell::new(ws_rx),
+ tx: AsyncRefCell::new(ws_tx),
+ },
+ cancel: Default::default(),
+ });
+
+ Ok(rid)
+ } else {
+ Err(bad_resource_id())
+ }
+}
- let request = match &mut *rd {
- HttpRequestReader::Headers(request) => request,
- _ => {
- return Err(http_error("cannot upgrade because request body was used"))
- }
- };
+type BytesStream =
+ Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
+
+enum RequestOrStreamReader {
+ Request(Option<Request<hyper::Body>>),
+ StreamReader(StreamReader<BytesStream, bytes::Bytes>),
+}
+
+struct RequestResource {
+ conn_rid: ResourceId,
+ inner: AsyncRefCell<RequestOrStreamReader>,
+ cancel: CancelHandle,
+}
- let transport = hyper::upgrade::on(request).await?;
- let ws_rid = ws_create_server_stream(&state, transport).await?;
- Ok(ws_rid)
+impl Resource for RequestResource {
+ fn name(&self) -> Cow<str> {
+ "request".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
+ }
+}
+
+struct ResponseSenderResource {
+ sender: oneshot::Sender<Response<Body>>,
+ conn_rid: ResourceId,
+}
+
+impl Resource for ResponseSenderResource {
+ fn name(&self) -> Cow<str> {
+ "responseSender".into()
+ }
+}
+
+struct ResponseBodyResource {
+ body: AsyncRefCell<hyper::body::Sender>,
+ conn_rid: ResourceId,
+}
+
+impl Resource for ResponseBodyResource {
+ fn name(&self) -> Cow<str> {
+ "responseBody".into()
+ }
}
// Needed so hyper can use non Send futures
@@ -643,33 +705,6 @@ where
Fut::Output: 'static,
{
fn execute(&self, fut: Fut) {
- spawn_local(fut);
- }
-}
-
-fn http_error(message: &'static str) -> AnyError {
- custom_error("Http", message)
-}
-
-/// Filters out the ever-surprising 'shutdown ENOTCONN' errors.
-fn filter_enotconn(
- result: Result<(), hyper::Error>,
-) -> Result<(), hyper::Error> {
- if result
- .as_ref()
- .err()
- .and_then(|err| err.source())
- .and_then(|err| err.downcast_ref::<io::Error>())
- .filter(|err| err.kind() == io::ErrorKind::NotConnected)
- .is_some()
- {
- Ok(())
- } else {
- result
+ tokio::task::spawn_local(fut);
}
}
-
-/// Create a future that is forever pending.
-fn never() -> Pending<Never> {
- pending()
-}