summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/http_test.ts78
-rw-r--r--ext/http/01_http.js272
-rw-r--r--ext/http/lib.rs891
-rw-r--r--ext/websocket/lib.rs26
-rw-r--r--runtime/errors.rs5
-rw-r--r--runtime/ops/http.rs5
6 files changed, 661 insertions, 616 deletions
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
index a6f80eb2c..c57b85441 100644
--- a/cli/tests/unit/http_test.ts
+++ b/cli/tests/unit/http_test.ts
@@ -7,6 +7,7 @@ import {
assert,
assertEquals,
assertRejects,
+ assertStrictEquals,
assertThrows,
deferred,
delay,
@@ -386,7 +387,7 @@ unitTest(
Deno.errors.Http,
"connection closed",
);
- // The error from `op_http_request_next` reroutes to `respondWith()`.
+ // The error from `op_http_accept` reroutes to `respondWith()`.
assertEquals(await nextRequestPromise, null);
listener.close();
})();
@@ -865,6 +866,7 @@ unitTest(
const writer = writable.getWriter();
async function writeResponse() {
+ await delay(50);
await writer.write(
new TextEncoder().encode(
"written to the writable side of a TransformStream",
@@ -1000,6 +1002,80 @@ unitTest(
},
);
+// https://github.com/denoland/deno/issues/12193
+unitTest(
+ { permissions: { net: true } },
+ async function httpConnConcurrentNextRequestCalls() {
+ const hostname = "localhost";
+ const port = 4501;
+
+ async function server() {
+ const listener = Deno.listen({ hostname, port });
+ const tcpConn = await listener.accept();
+ const httpConn = Deno.serveHttp(tcpConn);
+ const promises = new Array(10).fill(null).map(async (_, i) => {
+ const event = await httpConn.nextRequest();
+ assert(event);
+ const { pathname } = new URL(event.request.url);
+ assertStrictEquals(pathname, `/${i}`);
+ const response = new Response(`Response #${i}`);
+ await event.respondWith(response);
+ });
+ await Promise.all(promises);
+ httpConn.close();
+ listener.close();
+ }
+
+ async function client() {
+ for (let i = 0; i < 10; i++) {
+ const response = await fetch(`http://${hostname}:${port}/${i}`);
+ const body = await response.text();
+ assertStrictEquals(body, `Response #${i}`);
+ }
+ }
+
+ await Promise.all([server(), delay(100).then(client)]);
+ },
+);
+
+// https://github.com/denoland/deno/pull/12704
+// https://github.com/denoland/deno/pull/12732
+unitTest(
+ { permissions: { net: true } },
+ async function httpConnAutoCloseDelayedOnUpgrade() {
+ const hostname = "localhost";
+ const port = 4501;
+
+ async function server() {
+ const listener = Deno.listen({ hostname, port });
+ const tcpConn = await listener.accept();
+ const httpConn = Deno.serveHttp(tcpConn);
+
+ const event1 = await httpConn.nextRequest() as Deno.RequestEvent;
+ const event2Promise = httpConn.nextRequest();
+
+ const { socket, response } = Deno.upgradeWebSocket(event1.request);
+ socket.onmessage = (event) => socket.send(event.data);
+ event1.respondWith(response);
+
+ const event2 = await event2Promise;
+ assertStrictEquals(event2, null);
+
+ listener.close();
+ }
+
+ async function client() {
+ const socket = new WebSocket(`ws://${hostname}:${port}/`);
+ socket.onopen = () => socket.send("bla bla");
+ const { data } = await new Promise((res) => socket.onmessage = res);
+ assertStrictEquals(data, "bla bla");
+ socket.close();
+ }
+
+ await Promise.all([server(), client()]);
+ },
+);
+
unitTest(
{ permissions: { net: true } },
async function httpServerRespondNonAsciiUint8Array() {
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index 9f05809f5..94f1a1051 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -27,6 +27,7 @@
Set,
SetPrototypeAdd,
SetPrototypeDelete,
+ SetPrototypeHas,
SetPrototypeValues,
StringPrototypeIncludes,
StringPrototypeToLowerCase,
@@ -42,6 +43,8 @@
class HttpConn {
#rid = 0;
+ #closed = false;
+
// This set holds resource ids of resources
// that were created during lifecycle of this request.
// When the connection is closed these resources should be closed
@@ -62,10 +65,11 @@
let nextRequest;
try {
nextRequest = await core.opAsync(
- "op_http_request_next",
+ "op_http_accept",
this.#rid,
);
} catch (error) {
+ this.close();
// A connection error seen here would cause disrupted responses to throw
// a generic `BadResource` error. Instead store this error and replace
// those with it.
@@ -79,26 +83,31 @@
}
throw error;
}
- if (nextRequest === null) return null;
+ if (nextRequest == null) {
+ // Work-around for servers (deno_std/http in particular) that call
+ // `nextRequest()` before upgrading a previous request which has a
+ // `connection: upgrade` header.
+ await null;
+
+ this.close();
+ return null;
+ }
const [
- requestRid,
- responseSenderRid,
+ streamRid,
method,
headersList,
url,
] = nextRequest;
+ SetPrototypeAdd(this.managedResources, streamRid);
/** @type {ReadableStream<Uint8Array> | undefined} */
let body = null;
- if (typeof requestRid === "number") {
- SetPrototypeAdd(this.managedResources, requestRid);
- // There might be a body, but we don't expose it for GET/HEAD requests.
- // It will be closed automatically once the request has been handled and
- // the response has been sent.
- if (method !== "GET" && method !== "HEAD") {
- body = createRequestBodyStream(this, requestRid);
- }
+ // There might be a body, but we don't expose it for GET/HEAD requests.
+ // It will be closed automatically once the request has been handled and
+ // the response has been sent.
+ if (method !== "GET" && method !== "HEAD") {
+ body = createRequestBodyStream(streamRid);
}
const innerRequest = newInnerRequest(
@@ -111,22 +120,21 @@
const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable");
- SetPrototypeAdd(this.managedResources, responseSenderRid);
- const respondWith = createRespondWith(
- this,
- responseSenderRid,
- requestRid,
- );
+ const respondWith = createRespondWith(this, streamRid);
return { request, respondWith };
}
/** @returns {void} */
close() {
- for (const rid of SetPrototypeValues(this.managedResources)) {
- core.tryClose(rid);
+ if (!this.#closed) {
+ this.#closed = true;
+ core.close(this.#rid);
+ for (const rid of SetPrototypeValues(this.managedResources)) {
+ SetPrototypeDelete(this.managedResources, rid);
+ core.close(rid);
+ }
}
- core.close(this.#rid);
}
[SymbolAsyncIterator]() {
@@ -136,97 +144,86 @@
async next() {
const reqEvt = await httpConn.nextRequest();
// Change with caution, current form avoids a v8 deopt
- return { value: reqEvt, done: reqEvt === null };
+ return { value: reqEvt ?? undefined, done: reqEvt === null };
},
};
}
}
- function readRequest(requestRid, zeroCopyBuf) {
- return core.opAsync(
- "op_http_request_read",
- requestRid,
- zeroCopyBuf,
- );
+ function readRequest(streamRid, buf) {
+ return core.opAsync("op_http_read", streamRid, buf);
}
- function createRespondWith(httpConn, responseSenderRid, requestRid) {
+ function createRespondWith(httpConn, streamRid) {
return async function respondWith(resp) {
- if (resp instanceof Promise) {
- resp = await resp;
- }
+ try {
+ if (resp instanceof Promise) {
+ resp = await resp;
+ }
- if (!(resp instanceof Response)) {
- throw new TypeError(
- "First argument to respondWith must be a Response or a promise resolving to a Response.",
- );
- }
+ if (!(resp instanceof Response)) {
+ throw new TypeError(
+ "First argument to respondWith must be a Response or a promise resolving to a Response.",
+ );
+ }
- const innerResp = toInnerResponse(resp);
-
- // If response body length is known, it will be sent synchronously in a
- // single op, in other case a "response body" resource will be created and
- // we'll be streaming it.
- /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
- let respBody = null;
- if (innerResp.body !== null) {
- if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
- if (innerResp.body.streamOrStatic instanceof ReadableStream) {
- if (
- innerResp.body.length === null ||
- innerResp.body.source instanceof Blob
- ) {
- respBody = innerResp.body.stream;
- } else {
- const reader = innerResp.body.stream.getReader();
- const r1 = await reader.read();
- if (r1.done) {
- respBody = new Uint8Array(0);
+ const innerResp = toInnerResponse(resp);
+
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ if (innerResp.body !== null) {
+ if (innerResp.body.unusable()) {
+ throw new TypeError("Body is unusable.");
+ }
+ if (innerResp.body.streamOrStatic instanceof ReadableStream) {
+ if (
+ innerResp.body.length === null ||
+ innerResp.body.source instanceof Blob
+ ) {
+ respBody = innerResp.body.stream;
} else {
- respBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
+ } else {
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
}
+ } else {
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
}
} else {
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
+ respBody = new Uint8Array(0);
}
- } else {
- respBody = new Uint8Array(0);
- }
+ const isStreamingResponseBody =
+ !(typeof respBody === "string" || respBody instanceof Uint8Array);
- SetPrototypeDelete(httpConn.managedResources, responseSenderRid);
- let responseBodyRid;
- try {
- responseBodyRid = await core.opAsync(
- "op_http_response",
- [
- responseSenderRid,
+ try {
+ await core.opAsync("op_http_write_headers", [
+ streamRid,
innerResp.status ?? 200,
innerResp.headerList,
- ],
- (respBody instanceof Uint8Array || typeof respBody === "string")
- ? respBody
- : null,
- );
- } catch (error) {
- const connError = httpConn[connErrorSymbol];
- if (error instanceof BadResource && connError != null) {
- // deno-lint-ignore no-ex-assign
- error = new connError.constructor(connError.message);
- }
- if (respBody !== null && respBody instanceof ReadableStream) {
- await respBody.cancel(error);
+ ], isStreamingResponseBody ? null : respBody);
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ if (respBody !== null && respBody instanceof ReadableStream) {
+ await respBody.cancel(error);
+ }
+ throw error;
}
- throw error;
- }
- // If `respond` returns a responseBodyRid, we should stream the body
- // to that resource.
- if (responseBodyRid !== null) {
- SetPrototypeAdd(httpConn.managedResources, responseBodyRid);
- try {
+ if (isStreamingResponseBody) {
if (respBody === null || !(respBody instanceof ReadableStream)) {
throw new TypeError("Unreachable");
}
@@ -239,11 +236,7 @@
break;
}
try {
- await core.opAsync(
- "op_http_response_write",
- responseBodyRid,
- value,
- );
+ await core.opAsync("op_http_write", streamRid, value);
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (error instanceof BadResource && connError != null) {
@@ -254,61 +247,55 @@
throw error;
}
}
- } finally {
- // Once all chunks are sent, and the request body is closed, we can
- // close the response body.
- SetPrototypeDelete(httpConn.managedResources, responseBodyRid);
try {
- await core.opAsync("op_http_response_close", responseBodyRid);
- } catch { /* pass */ }
+ await core.opAsync("op_http_shutdown", streamRid);
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
}
- }
- const ws = resp[_ws];
- if (ws) {
- if (typeof requestRid !== "number") {
- throw new TypeError(
- "This request can not be upgraded to a websocket connection.",
+ const ws = resp[_ws];
+ if (ws) {
+ const wsRid = await core.opAsync(
+ "op_http_upgrade_websocket",
+ streamRid,
);
- }
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
- const wsRid = await core.opAsync(
- "op_http_upgrade_websocket",
- requestRid,
- );
- ws[_rid] = wsRid;
- ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+ httpConn.close();
- if (ws[_readyState] === WebSocket.CLOSING) {
- await core.opAsync("op_ws_close", { rid: wsRid });
+ if (ws[_readyState] === WebSocket.CLOSING) {
+ await core.opAsync("op_ws_close", { rid: wsRid });
- ws[_readyState] = WebSocket.CLOSED;
+ ws[_readyState] = WebSocket.CLOSED;
- const errEvent = new ErrorEvent("error");
- ws.dispatchEvent(errEvent);
+ const errEvent = new ErrorEvent("error");
+ ws.dispatchEvent(errEvent);
- const event = new CloseEvent("close");
- ws.dispatchEvent(event);
+ const event = new CloseEvent("close");
+ ws.dispatchEvent(event);
- core.tryClose(wsRid);
- } else {
- ws[_readyState] = WebSocket.OPEN;
- const event = new Event("open");
- ws.dispatchEvent(event);
+ core.tryClose(wsRid);
+ } else {
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
- ws[_eventLoop]();
+ ws[_eventLoop]();
+ }
+ }
+ } finally {
+ if (SetPrototypeHas(httpConn.managedResources, streamRid)) {
+ SetPrototypeDelete(httpConn.managedResources, streamRid);
+ core.close(streamRid);
}
- } else if (typeof requestRid === "number") {
- // Try to close "request" resource. It might have been already consumed,
- // but if it hasn't been we need to close it here to avoid resource
- // leak.
- SetPrototypeDelete(httpConn.managedResources, requestRid);
- core.tryClose(requestRid);
}
};
}
- function createRequestBodyStream(httpConn, requestRid) {
+ function createRequestBodyStream(streamRid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
@@ -316,32 +303,21 @@
// This is the largest possible size for a single packet on a TLS
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
- const read = await readRequest(
- requestRid,
- chunk,
- );
+ const read = await readRequest(streamRid, chunk);
if (read > 0) {
// We read some data. Enqueue it onto the stream.
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
- SetPrototypeDelete(httpConn.managedResources, requestRid);
- core.close(requestRid);
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
- SetPrototypeDelete(httpConn.managedResources, requestRid);
- core.close(requestRid);
}
},
- cancel() {
- SetPrototypeDelete(httpConn.managedResources, requestRid);
- core.close(requestRid);
- },
});
}
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index aae6415cb..5a14f845f 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -1,17 +1,29 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use deno_core::error::bad_resource_id;
-use deno_core::error::type_error;
+use bytes::Bytes;
+use deno_core::error::custom_error;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
+use deno_core::futures::channel::mpsc;
+use deno_core::futures::channel::oneshot;
+use deno_core::futures::future::pending;
+use deno_core::futures::future::select;
+use deno_core::futures::future::Either;
+use deno_core::futures::future::Pending;
+use deno_core::futures::future::RemoteHandle;
+use deno_core::futures::future::Shared;
+use deno_core::futures::never::Never;
+use deno_core::futures::pin_mut;
+use deno_core::futures::ready;
+use deno_core::futures::stream::Peekable;
use deno_core::futures::FutureExt;
-use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
+use deno_core::futures::TryFutureExt;
use deno_core::include_js_files;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
+use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::Extension;
@@ -21,33 +33,31 @@ use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf;
-use hyper::body::HttpBody;
-use hyper::header::CONNECTION;
-use hyper::header::SEC_WEBSOCKET_KEY;
-use hyper::header::SEC_WEBSOCKET_VERSION;
-use hyper::header::UPGRADE;
-use hyper::http;
+use deno_websocket::ws_create_server_stream;
use hyper::server::conn::Http;
-use hyper::service::Service as HyperService;
+use hyper::service::Service;
use hyper::Body;
-use hyper::Method;
use hyper::Request;
use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::cmp::min;
+use std::error::Error;
use std::future::Future;
+use std::io;
+use std::mem::replace;
+use std::mem::take;
use std::net::SocketAddr;
use std::pin::Pin;
use std::rc::Rc;
+use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
-use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
-use tokio::sync::oneshot;
-use tokio_util::io::StreamReader;
+use tokio::task::spawn_local;
pub fn init() -> Extension {
Extension::builder()
@@ -56,11 +66,11 @@ pub fn init() -> Extension {
"01_http.js",
))
.ops(vec![
- ("op_http_request_next", op_async(op_http_request_next)),
- ("op_http_request_read", op_async(op_http_request_read)),
- ("op_http_response", op_async(op_http_response)),
- ("op_http_response_write", op_async(op_http_response_write)),
- ("op_http_response_close", op_async(op_http_response_close)),
+ ("op_http_accept", op_async(op_http_accept)),
+ ("op_http_read", op_async(op_http_read)),
+ ("op_http_write_headers", op_async(op_http_write_headers)),
+ ("op_http_write", op_async(op_http_write)),
+ ("op_http_shutdown", op_async(op_http_shutdown)),
(
"op_http_websocket_accept_header",
op_sync(op_http_websocket_accept_header),
@@ -73,86 +83,247 @@ pub fn init() -> Extension {
.build()
}
-struct ServiceInner {
- request: Request<Body>,
- response_tx: oneshot::Sender<Response<Body>>,
+struct HttpConnResource {
+ addr: SocketAddr,
+ scheme: &'static str,
+ acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>,
+ closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper::Error>>>>,
+ cancel_handle: Rc<CancelHandle>, // Closes gracefully and cancels accept ops.
}
-#[derive(Clone, Default)]
-struct Service {
- inner: Rc<RefCell<Option<ServiceInner>>>,
- waker: Rc<deno_core::futures::task::AtomicWaker>,
+impl HttpConnResource {
+ fn new<S>(io: S, scheme: &'static str, addr: SocketAddr) -> Self
+ where
+ S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ {
+ let (acceptors_tx, acceptors_rx) = mpsc::unbounded::<HttpAcceptor>();
+ let service = HttpService::new(acceptors_rx);
+
+ let conn_fut = Http::new()
+ .with_executor(LocalExecutor)
+ .serve_connection(io, service)
+ .with_upgrades();
+
+ // When the cancel handle is used, the connection shuts down gracefully.
+ // No new HTTP streams will be accepted, but existing streams will be able
+ // to continue operating and eventually shut down cleanly.
+ let cancel_handle = CancelHandle::new_rc();
+ let shutdown_fut = never().or_cancel(&cancel_handle).fuse();
+
+ // A local task that polls the hyper connection future to completion.
+ let task_fut = async move {
+ pin_mut!(shutdown_fut);
+ pin_mut!(conn_fut);
+ let result = match select(conn_fut, shutdown_fut).await {
+ Either::Left((result, _)) => result,
+ Either::Right((_, mut conn_fut)) => {
+ conn_fut.as_mut().graceful_shutdown();
+ conn_fut.await
+ }
+ };
+ filter_enotconn(result).map_err(Arc::from)
+ };
+ let (task_fut, closed_fut) = task_fut.remote_handle();
+ let closed_fut = closed_fut.shared();
+ spawn_local(task_fut);
+
+ Self {
+ addr,
+ scheme,
+ acceptors_tx,
+ closed_fut,
+ cancel_handle,
+ }
+ }
+
+ // Accepts a new incoming HTTP request.
+ async fn accept(
+ self: &Rc<Self>,
+ ) -> Result<Option<HttpStreamResource>, AnyError> {
+ let fut = async {
+ let (request_tx, request_rx) = oneshot::channel();
+ let (response_tx, response_rx) = oneshot::channel();
+
+ let acceptor = HttpAcceptor::new(request_tx, response_rx);
+ self.acceptors_tx.unbounded_send(acceptor).ok()?;
+
+ let request = request_rx.await.ok()?;
+ let stream = HttpStreamResource::new(self, request, response_tx);
+ Some(stream)
+ };
+
+ async {
+ match fut.await {
+ Some(stream) => Ok(Some(stream)),
+ // Return the connection error, if any.
+ None => self.closed().map_ok(|_| None).await,
+ }
+ }
+ .try_or_cancel(&self.cancel_handle)
+ .await
+ }
+
+ /// A future that completes when this HTTP connection is closed or errors.
+ async fn closed(&self) -> Result<(), AnyError> {
+ self.closed_fut.clone().map_err(AnyError::from).await
+ }
+
+ fn scheme(&self) -> &'static str {
+ self.scheme
+ }
+
+ fn addr(&self) -> SocketAddr {
+ self.addr
+ }
}
-impl HyperService<Request<Body>> for Service {
+impl Resource for HttpConnResource {
+ fn name(&self) -> Cow<str> {
+ "httpConn".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_handle.cancel();
+ }
+}
+
+/// Creates a new HttpConn resource which uses `io` as its transport.
+pub fn http_create_conn_resource<S>(
+ state: &mut OpState,
+ io: S,
+ addr: SocketAddr,
+ scheme: &'static str,
+) -> Result<ResourceId, AnyError>
+where
+ S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+{
+ let conn = HttpConnResource::new(io, scheme, addr);
+ let rid = state.resource_table.add(conn);
+ Ok(rid)
+}
+
+/// An object that implements the `hyper::Service` trait, through which Hyper
+/// delivers incoming HTTP requests.
+struct HttpService {
+ acceptors_rx: Peekable<mpsc::UnboundedReceiver<HttpAcceptor>>,
+}
+
+impl HttpService {
+ fn new(acceptors_rx: mpsc::UnboundedReceiver<HttpAcceptor>) -> Self {
+ let acceptors_rx = acceptors_rx.peekable();
+ Self { acceptors_rx }
+ }
+}
+
+impl Service<Request<Body>> for HttpService {
type Response = Response<Body>;
- type Error = http::Error;
- #[allow(clippy::type_complexity)]
- type Future =
- Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
+ type Error = oneshot::Canceled;
+ type Future = oneshot::Receiver<Response<Body>>;
fn poll_ready(
&mut self,
- _cx: &mut Context<'_>,
+ cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
- if self.inner.borrow().is_some() {
- Poll::Pending
- } else {
- Poll::Ready(Ok(()))
- }
+ let acceptors_rx = Pin::new(&mut self.acceptors_rx);
+ let result = ready!(acceptors_rx.poll_peek(cx))
+ .map(|_| ())
+ .ok_or(oneshot::Canceled);
+ Poll::Ready(result)
}
- fn call(&mut self, req: Request<Body>) -> Self::Future {
- let (resp_tx, resp_rx) = oneshot::channel();
- self.inner.borrow_mut().replace(ServiceInner {
- request: req,
- response_tx: resp_tx,
- });
-
- async move {
- resp_rx.await.or_else(|_|
- // Fallback dummy response in case sender was dropped due to closed conn
- Response::builder()
- .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
- .body(vec![].into()))
- }
- .boxed_local()
+ fn call(&mut self, request: Request<Body>) -> Self::Future {
+ let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap();
+ acceptor.call(request)
}
}
-type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>;
+/// A pair of one-shot channels which first transfer a HTTP request from the
+/// Hyper service to the HttpConn resource, and then take the Response back to
+/// the service.
+struct HttpAcceptor {
+ request_tx: oneshot::Sender<Request<Body>>,
+ response_rx: oneshot::Receiver<Response<Body>>,
+}
-struct Conn {
- scheme: &'static str,
- addr: SocketAddr,
- conn: Rc<RefCell<ConnFuture>>,
+impl HttpAcceptor {
+ fn new(
+ request_tx: oneshot::Sender<Request<Body>>,
+ response_rx: oneshot::Receiver<Response<Body>>,
+ ) -> Self {
+ Self {
+ request_tx,
+ response_rx,
+ }
+ }
+
+ fn call(self, request: Request<Body>) -> oneshot::Receiver<Response<Body>> {
+ let Self {
+ request_tx,
+ response_rx,
+ } = self;
+ request_tx
+ .send(request)
+ .map(|_| response_rx)
+ .unwrap_or_else(|_| oneshot::channel().1) // Make new canceled receiver.
+ }
}
-struct ConnResource {
- hyper_connection: Conn,
- deno_service: Service,
- cancel: CancelHandle,
+/// A resource representing a single HTTP request/response stream.
+struct HttpStreamResource {
+ conn: Rc<HttpConnResource>,
+ rd: AsyncRefCell<HttpRequestReader>,
+ wr: AsyncRefCell<HttpResponseWriter>,
+ cancel_handle: CancelHandle,
}
-impl ConnResource {
- // TODO(ry) impl Future for ConnResource?
- fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
- self
- .hyper_connection
- .conn
- .borrow_mut()
- .poll_unpin(cx)
- .map_err(AnyError::from)
+impl HttpStreamResource {
+ fn new(
+ conn: &Rc<HttpConnResource>,
+ request: Request<Body>,
+ response_tx: oneshot::Sender<Response<Body>>,
+ ) -> Self {
+ Self {
+ conn: conn.clone(),
+ rd: HttpRequestReader::Headers(request).into(),
+ wr: HttpResponseWriter::Headers(response_tx).into(),
+ cancel_handle: CancelHandle::new(),
+ }
}
}
-impl Resource for ConnResource {
+impl Resource for HttpStreamResource {
fn name(&self) -> Cow<str> {
- "httpConnection".into()
+ "httpStream".into()
}
fn close(self: Rc<Self>) {
- self.cancel.cancel()
+ self.cancel_handle.cancel();
+ }
+}
+
+/// The read half of an HTTP stream.
+enum HttpRequestReader {
+ Headers(Request<Body>),
+ Body(Peekable<Body>),
+ Closed,
+}
+
+impl Default for HttpRequestReader {
+ fn default() -> Self {
+ Self::Closed
+ }
+}
+
+/// The write half of an HTTP stream.
+enum HttpResponseWriter {
+ Headers(oneshot::Sender<Response<Body>>),
+ Body(hyper::body::Sender),
+ Closed,
+}
+
+impl Default for HttpResponseWriter {
+ fn default() -> Self {
+ Self::Closed
}
}
@@ -160,9 +331,7 @@ impl Resource for ConnResource {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct NextRequestResponse(
- // request_rid:
- Option<ResourceId>,
- // response_sender_rid:
+ // stream_rid:
ResourceId,
// method:
// This is a String rather than a ByteString because reqwest will only return
@@ -174,111 +343,40 @@ struct NextRequestResponse(
String,
);
-async fn op_http_request_next(
+async fn op_http_accept(
state: Rc<RefCell<OpState>>,
- conn_rid: ResourceId,
+ rid: ResourceId,
_: (),
) -> Result<Option<NextRequestResponse>, AnyError> {
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(conn_rid)?;
-
- let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
-
- poll_fn(|cx| {
- conn_resource.deno_service.waker.register(cx.waker());
-
- // Check if conn is open/close/errored
- let (conn_closed, conn_result) = match conn_resource.poll(cx) {
- Poll::Pending => (false, Ok(())),
- Poll::Ready(Ok(())) => (true, Ok(())),
- Poll::Ready(Err(e)) => {
- if should_ignore_error(&e) {
- (true, Ok(()))
- } else {
- (true, Err(e))
- }
- }
- };
- // Drop conn resource if closed
- if conn_closed {
- // TODO(ry) close RequestResource associated with connection
- // TODO(ry) close ResponseBodyResource associated with connection
- // try to close ConnResource, but don't unwrap as it might
- // already be closed
- let _ = state
- .borrow_mut()
- .resource_table
- .take::<ConnResource>(conn_rid);
-
- // Fail with err if unexpected conn error, early return None otherwise
- return Poll::Ready(conn_result.map(|_| None));
- }
+ let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
- if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() {
- let Conn { scheme, addr, .. } = conn_resource.hyper_connection;
- let mut state = state.borrow_mut();
- let next =
- prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?;
- Poll::Ready(Ok(Some(next)))
- } else {
- Poll::Pending
- }
- })
- .try_or_cancel(cancel)
- .await
- .map_err(AnyError::from)
-}
+ let stream = match conn.accept().await {
+ Ok(Some(stream)) => Rc::new(stream),
+ Ok(None) => return Ok(None),
+ Err(err) => return Err(err),
+ };
-fn prepare_next_request(
- state: &mut OpState,
- conn_rid: ResourceId,
- request_resource: ServiceInner,
- scheme: &'static str,
- addr: SocketAddr,
-) -> Result<NextRequestResponse, AnyError> {
- let tx = request_resource.response_tx;
- let req = request_resource.request;
- let method = req.method().to_string();
- let headers = req_headers(&req);
- let url = req_url(&req, scheme, addr)?;
-
- let is_websocket = is_websocket_request(&req);
- let can_have_body = !matches!(*req.method(), Method::GET | Method::HEAD);
- let has_body =
- is_websocket || (can_have_body && req.size_hint().exact() != Some(0));
-
- let maybe_request_rid = if has_body {
- let request_rid = state.resource_table.add(RequestResource {
- conn_rid,
- inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
- cancel: CancelHandle::default(),
- });
- Some(request_rid)
- } else {
- None
+ let rd = RcRef::map(&stream, |r| &r.rd).borrow().await;
+ let request = match &*rd {
+ HttpRequestReader::Headers(request) => request,
+ _ => unreachable!(),
};
- let response_sender_rid = state.resource_table.add(ResponseSenderResource {
- sender: tx,
- conn_rid,
- });
-
- Ok(NextRequestResponse(
- maybe_request_rid,
- response_sender_rid,
- method,
- headers,
- url,
- ))
+ let method = request.method().to_string();
+ let headers = req_headers(request);
+ let url = req_url(request, conn.scheme(), conn.addr());
+
+ let stream_rid = state.borrow_mut().resource_table.add_rc(stream);
+
+ let r = NextRequestResponse(stream_rid, method, headers, url);
+ Ok(Some(r))
}
fn req_url(
req: &hyper::Request<hyper::Body>,
scheme: &'static str,
addr: SocketAddr,
-) -> Result<String, AnyError> {
+) -> String {
let host: Cow<str> = if let Some(auth) = req.uri().authority() {
match addr.port() {
443 if scheme == "https" => Cow::Borrowed(auth.host()),
@@ -288,12 +386,22 @@ fn req_url(
} else if let Some(host) = req.uri().host() {
Cow::Borrowed(host)
} else if let Some(host) = req.headers().get("HOST") {
- Cow::Borrowed(host.to_str()?)
+ match host.to_str() {
+ Ok(host) => Cow::Borrowed(host),
+ Err(_) => Cow::Owned(
+ host
+ .as_bytes()
+ .iter()
+ .cloned()
+ .map(char::from)
+ .collect::<String>(),
+ ),
+ }
} else {
Cow::Owned(addr.to_string())
};
let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
- Ok([scheme, "://", &host, path].concat())
+ [scheme, "://", &host, path].concat()
}
fn req_headers(
@@ -327,68 +435,6 @@ fn req_headers(
headers
}
-fn is_websocket_request(req: &hyper::Request<hyper::Body>) -> bool {
- req.version() == hyper::Version::HTTP_11
- && req.method() == hyper::Method::GET
- && req.headers().contains_key(&SEC_WEBSOCKET_KEY)
- && header(req.headers(), &SEC_WEBSOCKET_VERSION) == b"13"
- && header(req.headers(), &UPGRADE)
- .split(|c| *c == b' ' || *c == b',')
- .any(|token| token.eq_ignore_ascii_case(b"websocket"))
- && header(req.headers(), &CONNECTION)
- .split(|c| *c == b' ' || *c == b',')
- .any(|token| token.eq_ignore_ascii_case(b"upgrade"))
-}
-
-fn header<'a>(
- h: &'a hyper::http::HeaderMap,
- name: &hyper::header::HeaderName,
-) -> &'a [u8] {
- h.get(name)
- .map(hyper::header::HeaderValue::as_bytes)
- .unwrap_or_default()
-}
-
-fn should_ignore_error(e: &AnyError) -> bool {
- if let Some(e) = e.downcast_ref::<hyper::Error>() {
- use std::error::Error;
- if let Some(std_err) = e.source() {
- if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() {
- if io_err.kind() == std::io::ErrorKind::NotConnected {
- return true;
- }
- }
- }
- }
- false
-}
-
-pub fn start_http<IO: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
- state: &mut OpState,
- io: IO,
- addr: SocketAddr,
- scheme: &'static str,
-) -> Result<ResourceId, AnyError> {
- let deno_service = Service::default();
-
- let hyper_connection = Http::new()
- .with_executor(LocalExecutor)
- .serve_connection(io, deno_service.clone())
- .with_upgrades();
- let conn = Pin::new(Box::new(hyper_connection));
- let conn_resource = ConnResource {
- hyper_connection: Conn {
- scheme,
- addr,
- conn: Rc::new(RefCell::new(conn)),
- },
- deno_service,
- cancel: CancelHandle::default(),
- };
- let rid = state.resource_table.add(conn_resource);
- Ok(rid)
-}
-
// We use a tuple instead of struct to avoid serialization overhead of the keys.
#[derive(Deserialize)]
struct RespondArgs(
@@ -400,27 +446,16 @@ struct RespondArgs(
Vec<(ByteString, ByteString)>,
);
-async fn op_http_response(
+async fn op_http_write_headers(
state: Rc<RefCell<OpState>>,
args: RespondArgs,
data: Option<StringOrBuffer>,
-) -> Result<Option<ResourceId>, AnyError> {
+) -> Result<(), AnyError> {
let RespondArgs(rid, status, headers) = args;
-
- let response_sender = state
+ let stream = state
.borrow_mut()
.resource_table
- .take::<ResponseSenderResource>(rid)?;
- let response_sender = Rc::try_unwrap(response_sender)
- .ok()
- .expect("multiple op_http_respond ongoing");
-
- let conn_rid = response_sender.conn_rid;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(conn_rid)?;
+ .get::<HttpStreamResource>(rid)?;
let mut builder = Response::builder().status(status);
@@ -429,171 +464,138 @@ async fn op_http_response(
builder = builder.header(key.as_ref(), value.as_ref());
}
- let (maybe_response_body_rid, res) = if let Some(d) = data {
- // If a body is passed, we use it, and don't return a body for streaming.
- (None, builder.body(d.into_bytes().into())?)
- } else {
- // If no body is passed, we return a writer for streaming the body.
- let (sender, body) = Body::channel();
- let res = builder.body(body)?;
-
- let response_body_rid =
- state.borrow_mut().resource_table.add(ResponseBodyResource {
- body: AsyncRefCell::new(sender),
- conn_rid,
- });
-
- (Some(response_body_rid), res)
- };
-
- // oneshot::Sender::send(v) returns |v| on error, not an error object.
- // The only failure mode is the receiver already having dropped its end
- // of the channel.
- if response_sender.sender.send(res).is_err() {
- if let Some(rid) = maybe_response_body_rid {
- let _ = state
- .borrow_mut()
- .resource_table
- .take::<ResponseBodyResource>(rid);
- }
- return Err(type_error("internal communication error"));
- }
+ let body: Response<Body>;
+ let new_wr: HttpResponseWriter;
- let result = poll_fn(|cx| match conn_resource.poll(cx) {
- Poll::Ready(x) => {
- state.borrow_mut().resource_table.close(conn_rid).ok();
- Poll::Ready(x)
+ match data {
+ Some(data) => {
+ // If a buffer was passed, we use it to construct a response body.
+ body = builder.body(data.into_bytes().into())?;
+ new_wr = HttpResponseWriter::Closed;
}
- Poll::Pending => Poll::Ready(Ok(())),
- })
- .await;
-
- if let Err(e) = result {
- if let Some(rid) = maybe_response_body_rid {
- let _ = state
- .borrow_mut()
- .resource_table
- .take::<ResponseBodyResource>(rid);
+ None => {
+ // If no buffer was passed, the caller will stream the response body.
+ let (body_tx, body_rx) = Body::channel();
+ body = builder.body(body_rx)?;
+ new_wr = HttpResponseWriter::Body(body_tx);
}
- return Err(e);
}
- if maybe_response_body_rid.is_none() {
- conn_resource.deno_service.waker.wake();
+ let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
+ let response_tx = match replace(&mut *old_wr, new_wr) {
+ HttpResponseWriter::Headers(response_tx) => response_tx,
+ _ => return Err(http_error("response headers already sent")),
+ };
+
+ match response_tx.send(body) {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ stream.conn.closed().await?;
+ Err(http_error("connection closed while sending response"))
+ }
}
- Ok(maybe_response_body_rid)
}
-async fn op_http_response_close(
+async fn op_http_write(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
- _: (),
+ buf: ZeroCopyBuf,
) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .take::<ResponseBodyResource>(rid)?;
-
- let conn_resource = state
+ let stream = state
.borrow()
.resource_table
- .get::<ConnResource>(resource.conn_rid)?;
- drop(resource);
-
- let r = poll_fn(|cx| match conn_resource.poll(cx) {
- Poll::Ready(x) => Poll::Ready(x),
- Poll::Pending => Poll::Ready(Ok(())),
- })
- .await;
- conn_resource.deno_service.waker.wake();
- r
-}
-
-async fn op_http_request_read(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- mut data: ZeroCopyBuf,
-) -> Result<usize, AnyError> {
- let resource = state
- .borrow()
- .resource_table
- .get::<RequestResource>(rid as u32)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)?;
-
- let mut inner = RcRef::map(resource.clone(), |r| &r.inner)
- .borrow_mut()
- .await;
-
- if let RequestOrStreamReader::Request(req) = &mut *inner {
- let req = req.take().unwrap();
- let stream: BytesStream = Box::pin(req.into_body().map(|r| {
- r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
- }));
- let reader = StreamReader::new(stream);
- *inner = RequestOrStreamReader::StreamReader(reader);
- };
-
- let reader = match &mut *inner {
- RequestOrStreamReader::StreamReader(reader) => reader,
- _ => unreachable!(),
- };
-
- let cancel = RcRef::map(resource, |r| &r.cancel);
-
- let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
+ .get::<HttpStreamResource>(rid)?;
+ let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
+
+ loop {
+ let body_tx = match &mut *wr {
+ HttpResponseWriter::Body(body_tx) => body_tx,
+ HttpResponseWriter::Headers(_) => {
+ break Err(http_error("no response headers"))
+ }
+ HttpResponseWriter::Closed => {
+ break Err(http_error("response already completed"))
+ }
+ };
- poll_fn(|cx| {
- if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
- // close ConnResource
- // close RequestResource associated with connection
- // close ResponseBodyResource associated with connection
- return Poll::Ready(Err(e));
+ let bytes = Bytes::copy_from_slice(&buf[..]);
+ match body_tx.send_data(bytes).await {
+ Ok(_) => break Ok(()),
+ Err(err) => {
+ // Don't return "channel closed", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ assert!(err.is_closed());
+ stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ }
}
-
- read_fut.poll_unpin(cx).map_err(AnyError::from)
- })
- .await
+ }
}
-async fn op_http_response_write(
+/// Gracefully closes the write half of the HTTP stream. Note that this does not
+/// remove the HTTP stream resource from the resource table; it still has to be
+/// closed with `Deno.core.close()`.
+async fn op_http_shutdown(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
- data: ZeroCopyBuf,
+ _: (),
) -> Result<(), AnyError> {
- let resource = state
+ let stream = state
.borrow()
.resource_table
- .get::<ResponseBodyResource>(rid as u32)?;
+ .get::<HttpStreamResource>(rid)?;
+ let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
+ take(&mut *wr);
+ Ok(())
+}
- let conn_resource = state
- .borrow()
+async fn op_http_read(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ mut buf: ZeroCopyBuf,
+) -> Result<usize, AnyError> {
+ let stream = state
+ .borrow_mut()
.resource_table
- .get::<ConnResource>(resource.conn_rid)?;
-
- let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
-
- let mut send_data_fut = body.send_data(data.to_vec().into()).boxed_local();
-
- poll_fn(|cx| {
- let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
-
- // Poll connection so the data is flushed
- if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
- // close ConnResource
- // close RequestResource associated with connection
- // close ResponseBodyResource associated with connection
- return Poll::Ready(Err(e));
+ .get::<HttpStreamResource>(rid)?;
+ let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
+
+ let body = loop {
+ match &mut *rd {
+ HttpRequestReader::Headers(_) => {}
+ HttpRequestReader::Body(body) => break body,
+ HttpRequestReader::Closed => return Ok(0),
}
+ match take(&mut *rd) {
+ HttpRequestReader::Headers(request) => {
+ let body = request.into_body().peekable();
+ *rd = HttpRequestReader::Body(body);
+ }
+ _ => unreachable!(),
+ };
+ };
- r
- })
- .await?;
+ let fut = async {
+ let mut body = Pin::new(body);
+ loop {
+ match body.as_mut().peek_mut().await {
+ Some(Ok(chunk)) if !chunk.is_empty() => {
+ let len = min(buf.len(), chunk.len());
+ buf[..len].copy_from_slice(&chunk.split_to(len));
+ break Ok(len);
+ }
+ Some(_) => match body.as_mut().next().await.unwrap() {
+ Ok(chunk) => assert!(chunk.is_empty()),
+ Err(err) => break Err(AnyError::from(err)),
+ },
+ None => break Ok(0),
+ }
+ }
+ };
- Ok(())
+ let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle);
+ fut.try_or_cancel(cancel_handle).await
}
fn op_http_websocket_accept_header(
@@ -613,86 +615,22 @@ async fn op_http_upgrade_websocket(
rid: ResourceId,
_: (),
) -> Result<ResourceId, AnyError> {
- let req_resource = state
+ let stream = state
.borrow_mut()
.resource_table
- .take::<RequestResource>(rid)?;
-
- let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await;
-
- if let RequestOrStreamReader::Request(req) = inner.as_mut() {
- let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?;
- let stream =
- deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
- upgraded,
- deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server,
- None,
- )
- .await;
-
- let (ws_tx, ws_rx) = stream.split();
- let rid =
- state
- .borrow_mut()
- .resource_table
- .add(deno_websocket::WsStreamResource {
- stream: deno_websocket::WebSocketStreamType::Server {
- rx: AsyncRefCell::new(ws_rx),
- tx: AsyncRefCell::new(ws_tx),
- },
- cancel: Default::default(),
- });
-
- Ok(rid)
- } else {
- Err(bad_resource_id())
- }
-}
-
-type BytesStream =
- Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
-
-enum RequestOrStreamReader {
- Request(Option<Request<hyper::Body>>),
- StreamReader(StreamReader<BytesStream, bytes::Bytes>),
-}
-
-struct RequestResource {
- conn_rid: ResourceId,
- inner: AsyncRefCell<RequestOrStreamReader>,
- cancel: CancelHandle,
-}
-
-impl Resource for RequestResource {
- fn name(&self) -> Cow<str> {
- "request".into()
- }
+ .get::<HttpStreamResource>(rid)?;
+ let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
- fn close(self: Rc<Self>) {
- self.cancel.cancel()
- }
-}
-
-struct ResponseSenderResource {
- sender: oneshot::Sender<Response<Body>>,
- conn_rid: ResourceId,
-}
-
-impl Resource for ResponseSenderResource {
- fn name(&self) -> Cow<str> {
- "responseSender".into()
- }
-}
-
-struct ResponseBodyResource {
- body: AsyncRefCell<hyper::body::Sender>,
- conn_rid: ResourceId,
-}
+ let request = match &mut *rd {
+ HttpRequestReader::Headers(request) => request,
+ _ => {
+ return Err(http_error("cannot upgrade because request body was used"))
+ }
+ };
-impl Resource for ResponseBodyResource {
- fn name(&self) -> Cow<str> {
- "responseBody".into()
- }
+ let transport = hyper::upgrade::on(request).await?;
+ let ws_rid = ws_create_server_stream(&state, transport).await?;
+ Ok(ws_rid)
}
// Needed so hyper can use non Send futures
@@ -705,6 +643,33 @@ where
Fut::Output: 'static,
{
fn execute(&self, fut: Fut) {
- tokio::task::spawn_local(fut);
+ spawn_local(fut);
+ }
+}
+
+fn http_error(message: &'static str) -> AnyError {
+ custom_error("Http", message)
+}
+
+/// Filters out the ever-surprising 'shutdown ENOTCONN' errors.
+fn filter_enotconn(
+ result: Result<(), hyper::Error>,
+) -> Result<(), hyper::Error> {
+ if result
+ .as_ref()
+ .err()
+ .and_then(|err| err.source())
+ .and_then(|err| err.downcast_ref::<io::Error>())
+ .filter(|err| err.kind() == io::ErrorKind::NotConnected)
+ .is_some()
+ {
+ Ok(())
+ } else {
+ result
}
}
+
+/// Create a future that is forever pending.
+fn never() -> Pending<Never> {
+ pending()
+}
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index d469b5aaf..ba626a45a 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -34,12 +34,13 @@ use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_rustls::rustls::RootCertStore;
use tokio_rustls::TlsConnector;
+use tokio_tungstenite::client_async;
use tokio_tungstenite::tungstenite::{
handshake::client::Response, protocol::frame::coding::CloseCode,
- protocol::CloseFrame, Message,
+ protocol::CloseFrame, protocol::Role, Message,
};
use tokio_tungstenite::MaybeTlsStream;
-use tokio_tungstenite::{client_async, WebSocketStream};
+use tokio_tungstenite::WebSocketStream;
pub use tokio_tungstenite; // Re-export tokio_tungstenite
@@ -72,6 +73,27 @@ pub enum WebSocketStreamType {
},
}
+pub async fn ws_create_server_stream(
+ state: &Rc<RefCell<OpState>>,
+ transport: hyper::upgrade::Upgraded,
+) -> Result<ResourceId, AnyError> {
+ let ws_stream =
+ WebSocketStream::from_raw_socket(transport, Role::Server, None).await;
+ let (ws_tx, ws_rx) = ws_stream.split();
+
+ let ws_resource = WsStreamResource {
+ stream: WebSocketStreamType::Server {
+ tx: AsyncRefCell::new(ws_tx),
+ rx: AsyncRefCell::new(ws_rx),
+ },
+ cancel: Default::default(),
+ };
+
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let rid = resource_table.add(ws_resource);
+ Ok(rid)
+}
+
pub struct WsStreamResource {
pub stream: WebSocketStreamType,
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
diff --git a/runtime/errors.rs b/runtime/errors.rs
index fe6e71193..1491161d3 100644
--- a/runtime/errors.rs
+++ b/runtime/errors.rs
@@ -17,6 +17,7 @@ use deno_fetch::reqwest;
use std::env;
use std::error::Error;
use std::io;
+use std::sync::Arc;
fn get_dlopen_error_class(error: &dlopen::Error) -> &'static str {
use dlopen::Error::*;
@@ -164,6 +165,10 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
})
.or_else(|| e.downcast_ref::<hyper::Error>().map(get_hyper_error_class))
.or_else(|| {
+ e.downcast_ref::<Arc<hyper::Error>>()
+ .map(|e| get_hyper_error_class(&**e))
+ })
+ .or_else(|| {
e.downcast_ref::<deno_core::Canceled>().map(|e| {
let io_err: io::Error = e.to_owned().into();
get_io_error_class(&io_err)
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs
index 683dc1a57..fddac9261 100644
--- a/runtime/ops/http.rs
+++ b/runtime/ops/http.rs
@@ -6,6 +6,7 @@ use deno_core::op_sync;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::ResourceId;
+use deno_http::http_create_conn_resource;
use deno_net::io::TcpStreamResource;
use deno_net::ops_tls::TlsStreamResource;
@@ -29,7 +30,7 @@ fn op_http_start(
let (read_half, write_half) = resource.into_inner();
let tcp_stream = read_half.reunite(write_half)?;
let addr = tcp_stream.local_addr()?;
- return deno_http::start_http(state, tcp_stream, addr, "http");
+ return http_create_conn_resource(state, tcp_stream, addr, "http");
}
if let Ok(resource_rc) = state
@@ -41,7 +42,7 @@ fn op_http_start(
let (read_half, write_half) = resource.into_inner();
let tls_stream = read_half.reunite(write_half);
let addr = tls_stream.get_ref().0.local_addr()?;
- return deno_http::start_http(state, tls_stream, addr, "https");
+ return http_create_conn_resource(state, tls_stream, addr, "https");
}
Err(bad_resource_id())