summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock2
-rw-r--r--cli/bench/deno_http_native.js17
-rw-r--r--cli/bench/http.rs20
-rw-r--r--cli/dts/lib.deno.unstable.d.ts27
-rw-r--r--cli/tests/unit/http_test.ts203
-rw-r--r--cli/tests/unit/unit_tests.ts1
-rw-r--r--cli/tokio_util.rs3
-rw-r--r--runtime/Cargo.toml4
-rw-r--r--runtime/errors.rs5
-rw-r--r--runtime/js/40_http.js210
-rw-r--r--runtime/js/90_deno_ns.js2
-rw-r--r--runtime/ops/http.rs523
-rw-r--r--runtime/ops/mod.rs1
-rw-r--r--runtime/ops/net.rs6
-rw-r--r--runtime/worker.rs1
15 files changed, 1020 insertions, 5 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 5a14c5b21..97ae4d21c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -653,6 +653,7 @@ name = "deno_runtime"
version = "0.10.1"
dependencies = [
"atty",
+ "bytes",
"deno_console",
"deno_core",
"deno_crypto",
@@ -684,6 +685,7 @@ dependencies = [
"test_util",
"tokio",
"tokio-rustls",
+ "tokio-util",
"trust-dns-proto",
"trust-dns-resolver",
"uuid",
diff --git a/cli/bench/deno_http_native.js b/cli/bench/deno_http_native.js
new file mode 100644
index 000000000..fa779be21
--- /dev/null
+++ b/cli/bench/deno_http_native.js
@@ -0,0 +1,17 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+const addr = Deno.args[0] || "127.0.0.1:4500";
+const [hostname, port] = addr.split(":");
+const listener = Deno.listen({ hostname, port: Number(port) });
+console.log("Server listening on", addr);
+
+const body = Deno.core.encode("Hello World");
+
+for await (const conn of listener) {
+ (async () => {
+ const requests = Deno.startHttp(conn);
+ for await (const { respondWith } of requests) {
+ respondWith(new Response(body));
+ }
+ })();
+}
diff --git a/cli/bench/http.rs b/cli/bench/http.rs
index 952f3f19b..690e26cf4 100644
--- a/cli/bench/http.rs
+++ b/cli/bench/http.rs
@@ -33,6 +33,7 @@ pub(crate) fn benchmark(
res.insert("deno_tcp".to_string(), deno_tcp(deno_exe)?);
// res.insert("deno_udp".to_string(), deno_udp(deno_exe)?);
res.insert("deno_http".to_string(), deno_http(deno_exe)?);
+ res.insert("deno_http_native".to_string(), deno_http_native(deno_exe)?);
// TODO(ry) deno_proxy disabled to make fetch() standards compliant.
// res.insert("deno_proxy".to_string(), deno_http_proxy(deno_exe) hyper_hello_exe))
res.insert(
@@ -200,6 +201,25 @@ fn deno_http(deno_exe: &str) -> Result<HttpBenchmarkResult> {
)
}
+fn deno_http_native(deno_exe: &str) -> Result<HttpBenchmarkResult> {
+ let port = get_port();
+ println!("http_benchmark testing DENO using native bindings.");
+ run(
+ &[
+ deno_exe,
+ "run",
+ "--allow-net",
+ "--reload",
+ "--unstable",
+ "cli/bench/deno_http_native.js",
+ &server_addr(port),
+ ],
+ port,
+ None,
+ None,
+ )
+}
+
#[allow(dead_code)]
fn deno_http_proxy(
deno_exe: &str,
diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts
index 0833c3301..9dbe6817f 100644
--- a/cli/dts/lib.deno.unstable.d.ts
+++ b/cli/dts/lib.deno.unstable.d.ts
@@ -1196,6 +1196,33 @@ declare namespace Deno {
bytesSentData: number;
bytesReceived: number;
}
+
+ export interface RequestEvent {
+ readonly request: Request;
+ respondWith(r: Response | Promise<Response>): void;
+ }
+
+ export interface HttpConn extends AsyncIterable<RequestEvent> {
+ readonly rid: number;
+
+ nextRequest(): Promise<RequestEvent | null>;
+ close(): void;
+ }
+
+ /** **UNSTABLE**: new API, yet to be vetted.
+ *
+ * Parse HTTP requests from the given connection
+ *
+ * ```ts
+ * const httpConn = await Deno.startHttp(conn);
+ * const { request, respondWith } = await httpConn.next();
+ * respondWith(new Response("Hello World"));
+ * ```
+ *
+ * If `httpConn.next()` encounters an error or returns `done == true` then
+ * the underlying HttpConn resource is closed automatically.
+ */
+ export function startHttp(conn: Conn): HttpConn;
}
declare function fetch(
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
new file mode 100644
index 000000000..fc8530142
--- /dev/null
+++ b/cli/tests/unit/http_test.ts
@@ -0,0 +1,203 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+import {
+ assert,
+ assertEquals,
+ assertThrowsAsync,
+ unitTest,
+} from "./test_util.ts";
+import { BufReader, BufWriter } from "../../../test_util/std/io/bufio.ts";
+import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts";
+
+unitTest({ perms: { net: true } }, async function httpServerBasic() {
+ const promise = (async () => {
+ const listener = Deno.listen({ port: 4501 });
+ for await (const conn of listener) {
+ const httpConn = Deno.startHttp(conn);
+ for await (const { request, respondWith } of httpConn) {
+ assertEquals(await request.text(), "");
+ respondWith(new Response("Hello World"));
+ }
+ break;
+ }
+ })();
+
+ const resp = await fetch("http://127.0.0.1:4501/", {
+ headers: { "connection": "close" },
+ });
+ const text = await resp.text();
+ assertEquals(text, "Hello World");
+ await promise;
+});
+
+unitTest(
+ { perms: { net: true } },
+ async function httpServerStreamResponse() {
+ const stream = new TransformStream();
+ const writer = stream.writable.getWriter();
+ writer.write(new TextEncoder().encode("hello "));
+ writer.write(new TextEncoder().encode("world"));
+ writer.close();
+
+ const promise = (async () => {
+ const listener = Deno.listen({ port: 4501 });
+ const conn = await listener.accept();
+ const httpConn = Deno.startHttp(conn);
+ const evt = await httpConn.nextRequest();
+ assert(evt);
+ const { request, respondWith } = evt;
+ assert(!request.body);
+ await respondWith(new Response(stream.readable));
+ httpConn.close();
+ listener.close();
+ })();
+
+ const resp = await fetch("http://127.0.0.1:4501/");
+ const respBody = await resp.text();
+ assertEquals("hello world", respBody);
+ await promise;
+ },
+);
+
+unitTest(
+ { perms: { net: true } },
+ async function httpServerStreamRequest() {
+ const stream = new TransformStream();
+ const writer = stream.writable.getWriter();
+ writer.write(new TextEncoder().encode("hello "));
+ writer.write(new TextEncoder().encode("world"));
+ writer.close();
+
+ const promise = (async () => {
+ const listener = Deno.listen({ port: 4501 });
+ const conn = await listener.accept();
+ const httpConn = Deno.startHttp(conn);
+ const evt = await httpConn.nextRequest();
+ assert(evt);
+ const { request, respondWith } = evt;
+ const reqBody = await request.text();
+ assertEquals("hello world", reqBody);
+ await respondWith(new Response(""));
+
+ // TODO(ry) If we don't call httpConn.nextRequest() here we get "error sending
+ // request for url (https://localhost:4501/): connection closed before
+ // message completed".
+ assertEquals(await httpConn.nextRequest(), null);
+
+ listener.close();
+ })();
+
+ const resp = await fetch("http://127.0.0.1:4501/", {
+ body: stream.readable,
+ method: "POST",
+ headers: { "connection": "close" },
+ });
+
+ await resp.arrayBuffer();
+ await promise;
+ },
+);
+
+unitTest({ perms: { net: true } }, async function httpServerStreamDuplex() {
+ const promise = (async () => {
+ const listener = Deno.listen({ port: 4501 });
+ const conn = await listener.accept();
+ const httpConn = Deno.startHttp(conn);
+ const evt = await httpConn.nextRequest();
+ assert(evt);
+ const { request, respondWith } = evt;
+ assert(request.body);
+ await respondWith(new Response(request.body));
+ httpConn.close();
+ listener.close();
+ })();
+
+ const ts = new TransformStream();
+ const writable = ts.writable.getWriter();
+ const resp = await fetch("http://127.0.0.1:4501/", {
+ method: "POST",
+ body: ts.readable,
+ });
+ assert(resp.body);
+ const reader = resp.body.getReader();
+ await writable.write(new Uint8Array([1]));
+ const chunk1 = await reader.read();
+ assert(!chunk1.done);
+ assertEquals(chunk1.value, new Uint8Array([1]));
+ await writable.write(new Uint8Array([2]));
+ const chunk2 = await reader.read();
+ assert(!chunk2.done);
+ assertEquals(chunk2.value, new Uint8Array([2]));
+ await writable.close();
+ const chunk3 = await reader.read();
+ assert(chunk3.done);
+ await promise;
+});
+
+unitTest({ perms: { net: true } }, async function httpServerClose() {
+ const listener = Deno.listen({ port: 4501 });
+ const client = await Deno.connect({ port: 4501 });
+ const httpConn = Deno.startHttp(await listener.accept());
+ client.close();
+ const evt = await httpConn.nextRequest();
+ assertEquals(evt, null);
+ // Note httpConn is automatically closed when "done" is reached.
+ listener.close();
+});
+
+unitTest({ perms: { net: true } }, async function httpServerInvalidMethod() {
+ const listener = Deno.listen({ port: 4501 });
+ const client = await Deno.connect({ port: 4501 });
+ const httpConn = Deno.startHttp(await listener.accept());
+ await client.write(new Uint8Array([1, 2, 3]));
+ await assertThrowsAsync(
+ async () => {
+ await httpConn.nextRequest();
+ },
+ Deno.errors.Http,
+ "invalid HTTP method parsed",
+ );
+ // Note httpConn is automatically closed when it errors.
+ client.close();
+ listener.close();
+});
+
+unitTest(
+ { perms: { read: true, net: true } },
+ async function httpServerWithTls(): Promise<void> {
+ const hostname = "localhost";
+ const port = 4501;
+
+ const promise = (async () => {
+ const listener = Deno.listenTls({
+ hostname,
+ port,
+ certFile: "cli/tests/tls/localhost.crt",
+ keyFile: "cli/tests/tls/localhost.key",
+ });
+ const conn = await listener.accept();
+ const httpConn = Deno.startHttp(conn);
+ const evt = await httpConn.nextRequest();
+ assert(evt);
+ const { request, respondWith } = evt;
+ await respondWith(new Response("Hello World"));
+
+ // TODO(ry) If we don't call httpConn.nextRequest() here we get "error sending
+ // request for url (https://localhost:4501/): connection closed before
+ // message completed".
+ assertEquals(await httpConn.nextRequest(), null);
+
+ listener.close();
+ })();
+
+ const caData = Deno.readTextFileSync("cli/tests/tls/RootCA.pem");
+ const client = Deno.createHttpClient({ caData });
+ const resp = await fetch(`https://${hostname}:${port}/`, {
+ client,
+ headers: { "connection": "close" },
+ });
+ const respBody = await resp.text();
+ assertEquals("Hello World", respBody);
+ await promise;
+ client.close();
+ },
+);
diff --git a/cli/tests/unit/unit_tests.ts b/cli/tests/unit/unit_tests.ts
index a736e97ca..0dcbfe80b 100644
--- a/cli/tests/unit/unit_tests.ts
+++ b/cli/tests/unit/unit_tests.ts
@@ -30,6 +30,7 @@ import "./fs_events_test.ts";
import "./get_random_values_test.ts";
import "./globals_test.ts";
import "./headers_test.ts";
+import "./http_test.ts";
import "./internals_test.ts";
import "./io_test.ts";
import "./link_test.ts";
diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs
index 5ee45325d..695b94802 100644
--- a/cli/tokio_util.rs
+++ b/cli/tokio_util.rs
@@ -20,5 +20,6 @@ where
F: std::future::Future<Output = R>,
{
let rt = create_basic_runtime();
- rt.block_on(future)
+ let local = tokio::task::LocalSet::new();
+ local.block_on(&rt, future)
}
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml
index 864b6b7f3..eda9f2bf7 100644
--- a/runtime/Cargo.toml
+++ b/runtime/Cargo.toml
@@ -46,11 +46,12 @@ deno_websocket = { path = "../op_crates/websocket", version = "0.7.1" }
deno_webgpu = { path = "../op_crates/webgpu", version = "0.3.1" }
atty = "0.2.14"
+bytes = "1"
dlopen = "0.1.8"
encoding_rs = "0.8.28"
filetime = "0.2.14"
http = "0.2.3"
-hyper = { version = "0.14.5", features = ["server"] }
+hyper = { version = "0.14.5", features = ["server", "stream", "http1", "http2", "runtime"] }
indexmap = "1.6.2"
lazy_static = "1.4.0"
libc = "0.2.93"
@@ -63,6 +64,7 @@ serde = { version = "1.0.125", features = ["derive"] }
sys-info = "0.8.0"
termcolor = "1.1.2"
tokio = { version = "1.4.0", features = ["full"] }
+tokio-util = { version = "0.6", features = ["io"] }
tokio-rustls = "0.22.0"
uuid = { version = "0.8.2", features = ["v4"] }
webpki = "0.21.4"
diff --git a/runtime/errors.rs b/runtime/errors.rs
index a8152b075..7bb109fb9 100644
--- a/runtime/errors.rs
+++ b/runtime/errors.rs
@@ -131,6 +131,10 @@ fn get_url_parse_error_class(_error: &url::ParseError) -> &'static str {
"URIError"
}
+fn get_hyper_error_class(_error: &hyper::Error) -> &'static str {
+ "Http"
+}
+
#[cfg(unix)]
fn get_nix_error_class(error: &nix::Error) -> &'static str {
use nix::errno::Errno::*;
@@ -156,6 +160,7 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
e.downcast_ref::<dlopen::Error>()
.map(get_dlopen_error_class)
})
+ .or_else(|| e.downcast_ref::<hyper::Error>().map(get_hyper_error_class))
.or_else(|| {
e.downcast_ref::<deno_core::Canceled>().map(|e| {
let io_err: io::Error = e.to_owned().into();
diff --git a/runtime/js/40_http.js b/runtime/js/40_http.js
new file mode 100644
index 000000000..cfb015edd
--- /dev/null
+++ b/runtime/js/40_http.js
@@ -0,0 +1,210 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const { Request, dontValidateUrl, fastBody, Response } =
+ window.__bootstrap.fetch;
+ const { Headers } = window.__bootstrap.headers;
+ const errors = window.__bootstrap.errors.errors;
+ const core = window.Deno.core;
+ const { ReadableStream } = window.__bootstrap.streams;
+
+ function flatEntries(obj) {
+ const entries = [];
+ for (const key in obj) {
+ entries.push(key);
+ entries.push(obj[key]);
+ }
+ return entries;
+ }
+
+ function startHttp(conn) {
+ const rid = Deno.core.jsonOpSync("op_http_start", conn.rid);
+ return new HttpConn(rid);
+ }
+
+ class HttpConn {
+ #rid = 0;
+
+ constructor(rid) {
+ this.#rid = rid;
+ }
+
+ /** @returns {number} */
+ get rid() {
+ return this.#rid;
+ }
+
+ /** @returns {Promise<ResponseEvent | null>} */
+ async nextRequest() {
+ let nextRequest;
+ try {
+ nextRequest = await Deno.core.jsonOpAsync(
+ "op_http_request_next",
+ this.#rid,
+ );
+ } catch (error) {
+ if (error instanceof errors.BadResource) {
+ return null;
+ } else if (error instanceof errors.Interrupted) {
+ return null;
+ }
+ throw error;
+ }
+ if (nextRequest === null) return null;
+
+ const [
+ requestBodyRid,
+ responseSenderRid,
+ method,
+ headersList,
+ url,
+ ] = nextRequest;
+
+ /** @type {ReadableStream<Uint8Array> | undefined} */
+ let body = undefined;
+ if (typeof requestBodyRid === "number") {
+ body = createRequestBodyStream(requestBodyRid);
+ }
+
+ const request = new Request(url, {
+ body,
+ method,
+ headers: new Headers(headersList),
+ [dontValidateUrl]: true,
+ });
+
+ const respondWith = createRespondWith(responseSenderRid, this.#rid);
+
+ return { request, respondWith };
+ }
+
+ /** @returns {void} */
+ close() {
+ core.close(this.#rid);
+ }
+
+ [Symbol.asyncIterator]() {
+ const httpConn = this;
+ return {
+ async next() {
+ const reqEvt = await httpConn.nextRequest();
+ if (reqEvt === null) return { value: undefined, done: true };
+ return { value: reqEvt, done: false };
+ },
+ };
+ }
+ }
+
+ function readRequest(requestRid, zeroCopyBuf) {
+ return Deno.core.jsonOpAsync(
+ "op_http_request_read",
+ requestRid,
+ zeroCopyBuf,
+ );
+ }
+
+ function respond(responseSenderRid, resp, zeroCopyBuf) {
+ return Deno.core.jsonOpSync("op_http_response", [
+ responseSenderRid,
+ resp.status ?? 200,
+ flatEntries(resp.headers ?? {}),
+ ], zeroCopyBuf);
+ }
+
+ function createRespondWith(responseSenderRid, connRid) {
+ return async function (resp) {
+ if (resp instanceof Promise) {
+ resp = await resp;
+ }
+
+ if (!(resp instanceof Response)) {
+ throw new TypeError(
+ "First argument to respondWith must be a Response or a promise resolving to a Response.",
+ );
+ }
+ // If response body is Uint8Array it will be sent synchronously
+ // in a single op, in other case a "response body" resource will be
+ // created and we'll be streaming it.
+ const body = resp[fastBody]();
+ let zeroCopyBuf;
+ if (body instanceof ArrayBuffer) {
+ zeroCopyBuf = new Uint8Array(body);
+ } else if (!body) {
+ zeroCopyBuf = new Uint8Array(0);
+ } else {
+ zeroCopyBuf = null;
+ }
+
+ const responseBodyRid = respond(
+ responseSenderRid,
+ resp,
+ zeroCopyBuf,
+ );
+
+ // If `respond` returns a responseBodyRid, we should stream the body
+ // to that resource.
+ if (typeof responseBodyRid === "number") {
+ if (!body || !(body instanceof ReadableStream)) {
+ throw new Error(
+ "internal error: recieved responseBodyRid, but response has no body or is not a stream",
+ );
+ }
+ for await (const chunk of body) {
+ const data = new Uint8Array(
+ chunk.buffer,
+ chunk.byteOffset,
+ chunk.byteLength,
+ );
+ await Deno.core.jsonOpAsync(
+ "op_http_response_write",
+ responseBodyRid,
+ data,
+ );
+ }
+
+ // Once all chunks are sent, and the request body is closed, we can close
+ // the response body.
+ await Deno.core.jsonOpAsync("op_http_response_close", responseBodyRid);
+ }
+ };
+ }
+
+ function createRequestBodyStream(requestBodyRid) {
+ return new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ try {
+ // This is the largest possible size for a single packet on a TLS
+ // stream.
+ const chunk = new Uint8Array(16 * 1024 + 256);
+ const read = await readRequest(
+ requestBodyRid,
+ chunk,
+ );
+ if (read > 0) {
+ // We read some data. Enqueue it onto the stream.
+ controller.enqueue(chunk.subarray(0, read));
+ } else {
+ // We have reached the end of the body, so we close the stream.
+ controller.close();
+ core.close(requestBodyRid);
+ }
+ } catch (err) {
+ // There was an error while reading a chunk of the body, so we
+ // error.
+ controller.error(err);
+ controller.close();
+ core.close(requestBodyRid);
+ }
+ },
+ cancel() {
+ core.close(requestBodyRid);
+ },
+ });
+ }
+
+ window.__bootstrap.http = {
+ startHttp,
+ };
+})(this);
diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js
index d820d0896..84eb69ef0 100644
--- a/runtime/js/90_deno_ns.js
+++ b/runtime/js/90_deno_ns.js
@@ -120,6 +120,7 @@
listen: __bootstrap.netUnstable.listen,
connect: __bootstrap.netUnstable.connect,
listenDatagram: __bootstrap.netUnstable.listenDatagram,
+ startHttp: __bootstrap.http.startHttp,
startTls: __bootstrap.tls.startTls,
fstatSync: __bootstrap.fs.fstatSync,
fstat: __bootstrap.fs.fstat,
@@ -132,5 +133,6 @@
utimeSync: __bootstrap.fs.utimeSync,
HttpClient: __bootstrap.fetch.HttpClient,
createHttpClient: __bootstrap.fetch.createHttpClient,
+ http: __bootstrap.http,
};
})(this);
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs
new file mode 100644
index 000000000..9cf4ff9d5
--- /dev/null
+++ b/runtime/ops/http.rs
@@ -0,0 +1,523 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::ops::io::TcpStreamResource;
+use crate::ops::io::TlsServerStreamResource;
+use deno_core::error::bad_resource_id;
+use deno_core::error::null_opbuf;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::FutureExt;
+use deno_core::futures::Stream;
+use deno_core::futures::StreamExt;
+use deno_core::AsyncRefCell;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use hyper::body::HttpBody;
+use hyper::http;
+use hyper::server::conn::Connection;
+use hyper::server::conn::Http;
+use hyper::service::Service as HyperService;
+use hyper::Body;
+use hyper::Request;
+use hyper::Response;
+use serde::Deserialize;
+use serde::Serialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::future::Future;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+use tokio::io::AsyncReadExt;
+use tokio::net::TcpStream;
+use tokio::sync::oneshot;
+use tokio_rustls::server::TlsStream;
+use tokio_util::io::StreamReader;
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_http_start", op_http_start);
+
+ super::reg_json_async(rt, "op_http_request_next", op_http_request_next);
+ super::reg_json_async(rt, "op_http_request_read", op_http_request_read);
+
+ super::reg_json_sync(rt, "op_http_response", op_http_response);
+ super::reg_json_async(rt, "op_http_response_write", op_http_response_write);
+ super::reg_json_async(rt, "op_http_response_close", op_http_response_close);
+}
+
+struct ServiceInner {
+ request: Request<Body>,
+ response_tx: oneshot::Sender<Response<Body>>,
+}
+
+#[derive(Clone, Default)]
+struct Service {
+ inner: Rc<RefCell<Option<ServiceInner>>>,
+}
+
+impl HyperService<Request<Body>> for Service {
+ type Response = Response<Body>;
+ type Error = http::Error;
+ #[allow(clippy::type_complexity)]
+ type Future =
+ Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
+
+ fn poll_ready(
+ &mut self,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ if self.inner.borrow().is_some() {
+ Poll::Pending
+ } else {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ let (resp_tx, resp_rx) = oneshot::channel();
+ self.inner.borrow_mut().replace(ServiceInner {
+ request: req,
+ response_tx: resp_tx,
+ });
+
+ async move { Ok(resp_rx.await.unwrap()) }.boxed_local()
+ }
+}
+
+enum ConnType {
+ Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>),
+ Tls(Rc<RefCell<Connection<TlsStream<TcpStream>, Service, LocalExecutor>>>),
+}
+
+struct ConnResource {
+ hyper_connection: ConnType,
+ deno_service: Service,
+}
+
+impl ConnResource {
+ // TODO(ry) impl Future for ConnResource?
+ fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
+ match &self.hyper_connection {
+ ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx),
+ ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx),
+ }
+ .map_err(AnyError::from)
+ }
+}
+
+impl Resource for ConnResource {
+ fn name(&self) -> Cow<str> {
+ "httpConnection".into()
+ }
+}
+
+// We use a tuple instead of struct to avoid serialization overhead of the keys.
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct NextRequestResponse(
+ // request_body_rid:
+ Option<ResourceId>,
+ // response_sender_rid:
+ ResourceId,
+ // method:
+ String,
+ // headers:
+ Vec<(String, String)>,
+ // url:
+ String,
+);
+
+async fn op_http_request_next(
+ state: Rc<RefCell<OpState>>,
+ conn_rid: ResourceId,
+ _data: Option<ZeroCopyBuf>,
+) -> Result<Option<NextRequestResponse>, AnyError> {
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ poll_fn(|cx| {
+ let connection_closed = match conn_resource.poll(cx) {
+ Poll::Pending => false,
+ Poll::Ready(Ok(())) => {
+ // close ConnResource
+ state
+ .borrow_mut()
+ .resource_table
+ .take::<ConnResource>(conn_rid)
+ .unwrap();
+ true
+ }
+ Poll::Ready(Err(e)) => {
+ // TODO(ry) close RequestResource associated with connection
+ // TODO(ry) close ResponseBodyResource associated with connection
+ // close ConnResource
+ state
+ .borrow_mut()
+ .resource_table
+ .take::<ConnResource>(conn_rid)
+ .unwrap();
+
+ if should_ignore_error(&e) {
+ true
+ } else {
+ return Poll::Ready(Err(e));
+ }
+ }
+ };
+
+ if let Some(request_resource) =
+ conn_resource.deno_service.inner.borrow_mut().take()
+ {
+ let tx = request_resource.response_tx;
+ let req = request_resource.request;
+ let method = req.method().to_string();
+
+ let mut headers = Vec::with_capacity(req.headers().len());
+ for (name, value) in req.headers().iter() {
+ let name = name.to_string();
+ let value = value.to_str().unwrap_or("").to_string();
+ headers.push((name, value));
+ }
+
+ let url = req.uri().to_string();
+
+ let has_body = if let Some(exact_size) = req.size_hint().exact() {
+ exact_size > 0
+ } else {
+ true
+ };
+
+ let maybe_request_body_rid = if has_body {
+ let stream: BytesStream = Box::pin(req.into_body().map(|r| {
+ r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
+ }));
+ let stream_reader = StreamReader::new(stream);
+ let mut state = state.borrow_mut();
+ let request_body_rid = state.resource_table.add(RequestBodyResource {
+ conn_rid,
+ reader: AsyncRefCell::new(stream_reader),
+ cancel: CancelHandle::default(),
+ });
+ Some(request_body_rid)
+ } else {
+ None
+ };
+
+ let mut state = state.borrow_mut();
+ let response_sender_rid =
+ state.resource_table.add(ResponseSenderResource {
+ sender: tx,
+ conn_rid,
+ });
+
+ Poll::Ready(Ok(Some(NextRequestResponse(
+ maybe_request_body_rid,
+ response_sender_rid,
+ method,
+ headers,
+ url,
+ ))))
+ } else if connection_closed {
+ Poll::Ready(Ok(None))
+ } else {
+ Poll::Pending
+ }
+ })
+ .await
+ .map_err(AnyError::from)
+}
+
+fn should_ignore_error(e: &AnyError) -> bool {
+ if let Some(e) = e.downcast_ref::<hyper::Error>() {
+ use std::error::Error;
+ if let Some(std_err) = e.source() {
+ if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() {
+ if io_err.kind() == std::io::ErrorKind::NotConnected {
+ return true;
+ }
+ }
+ }
+ }
+ false
+}
+
+fn op_http_start(
+ state: &mut OpState,
+ tcp_stream_rid: ResourceId,
+ _data: Option<ZeroCopyBuf>,
+) -> Result<ResourceId, AnyError> {
+ let deno_service = Service::default();
+
+ if let Some(resource_rc) = state
+ .resource_table
+ .take::<TcpStreamResource>(tcp_stream_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .expect("Only a single use of this resource should happen");
+ let (read_half, write_half) = resource.into_inner();
+ let tcp_stream = read_half.reunite(write_half)?;
+ let hyper_connection = Http::new()
+ .with_executor(LocalExecutor)
+ .serve_connection(tcp_stream, deno_service.clone());
+ let conn_resource = ConnResource {
+ hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))),
+ deno_service,
+ };
+ let rid = state.resource_table.add(conn_resource);
+ return Ok(rid);
+ }
+
+ if let Some(resource_rc) = state
+ .resource_table
+ .take::<TlsServerStreamResource>(tcp_stream_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .expect("Only a single use of this resource should happen");
+ let (read_half, write_half) = resource.into_inner();
+ let tls_stream = read_half.unsplit(write_half);
+
+ let hyper_connection = Http::new()
+ .with_executor(LocalExecutor)
+ .serve_connection(tls_stream, deno_service.clone());
+ let conn_resource = ConnResource {
+ hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))),
+ deno_service,
+ };
+ let rid = state.resource_table.add(conn_resource);
+ return Ok(rid);
+ }
+
+ Err(bad_resource_id())
+}
+
+// We use a tuple instead of struct to avoid serialization overhead of the keys.
+#[derive(Deserialize)]
+struct RespondArgs(
+ // rid:
+ u32,
+ // status:
+ u16,
+ // headers:
+ Vec<String>,
+);
+
+fn op_http_response(
+ state: &mut OpState,
+ args: RespondArgs,
+ data: Option<ZeroCopyBuf>,
+) -> Result<Option<ResourceId>, AnyError> {
+ let rid = args.0;
+ let status = args.1;
+ let headers = args.2;
+
+ let response_sender = state
+ .resource_table
+ .take::<ResponseSenderResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let response_sender = Rc::try_unwrap(response_sender)
+ .ok()
+ .expect("multiple op_http_respond ongoing");
+
+ let mut builder = Response::builder().status(status);
+
+ debug_assert_eq!(headers.len() % 2, 0);
+ let headers_count = headers.len() / 2;
+ builder.headers_mut().unwrap().reserve(headers_count);
+ for i in 0..headers_count {
+ builder = builder.header(&headers[2 * i], &headers[2 * i + 1]);
+ }
+
+ let res;
+ let maybe_response_body_rid = if let Some(d) = data {
+ // If a body is passed, we use it, and don't return a body for streaming.
+ res = builder.body(Vec::from(&*d).into())?;
+ None
+ } else {
+ // If no body is passed, we return a writer for streaming the body.
+ let (sender, body) = Body::channel();
+ res = builder.body(body)?;
+
+ let response_body_rid = state.resource_table.add(ResponseBodyResource {
+ body: AsyncRefCell::new(sender),
+ cancel: CancelHandle::default(),
+ conn_rid: response_sender.conn_rid,
+ });
+
+ Some(response_body_rid)
+ };
+
+ // oneshot::Sender::send(v) returns |v| on error, not an error object.
+ // The only failure mode is the receiver already having dropped its end
+ // of the channel.
+ if response_sender.sender.send(res).is_err() {
+ return Err(type_error("internal communication error"));
+ }
+
+ Ok(maybe_response_body_rid)
+}
+
+async fn op_http_response_close(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _data: Option<ZeroCopyBuf>,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .take::<ResponseBodyResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+ drop(resource);
+
+ poll_fn(|cx| match conn_resource.poll(cx) {
+ Poll::Ready(x) => Poll::Ready(x),
+ Poll::Pending => Poll::Ready(Ok(())),
+ })
+ .await
+}
+
+async fn op_http_request_read(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: Option<ZeroCopyBuf>,
+) -> Result<usize, AnyError> {
+ let mut data = data.ok_or_else(null_opbuf)?;
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<RequestBodyResource>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
+
+ poll_fn(|cx| {
+ if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
+ // close ConnResource
+ // close RequestResource associated with connection
+ // close ResponseBodyResource associated with connection
+ return Poll::Ready(Err(e));
+ }
+
+ read_fut.poll_unpin(cx).map_err(AnyError::from)
+ })
+ .await
+}
+
+async fn op_http_response_write(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: Option<ZeroCopyBuf>,
+) -> Result<(), AnyError> {
+ let buf = data.ok_or_else(null_opbuf)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<ResponseBodyResource>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+
+ let mut send_data_fut = body
+ .send_data(Vec::from(&*buf).into())
+ .or_cancel(cancel)
+ .boxed_local();
+
+ poll_fn(|cx| {
+ if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
+ // close ConnResource
+ // close RequestResource associated with connection
+ // close ResponseBodyResource associated with connection
+ return Poll::Ready(Err(e));
+ }
+
+ send_data_fut.poll_unpin(cx).map_err(AnyError::from)
+ })
+ .await?
+ .unwrap(); // panic on send_data error
+
+ Ok(())
+}
+
+type BytesStream =
+ Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
+
+struct RequestBodyResource {
+ conn_rid: ResourceId,
+ reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
+ cancel: CancelHandle,
+}
+
+impl Resource for RequestBodyResource {
+ fn name(&self) -> Cow<str> {
+ "requestBody".into()
+ }
+}
+
+struct ResponseSenderResource {
+ sender: oneshot::Sender<Response<Body>>,
+ conn_rid: ResourceId,
+}
+
+impl Resource for ResponseSenderResource {
+ fn name(&self) -> Cow<str> {
+ "responseSender".into()
+ }
+}
+
+struct ResponseBodyResource {
+ body: AsyncRefCell<hyper::body::Sender>,
+ cancel: CancelHandle,
+ conn_rid: ResourceId,
+}
+
+impl Resource for ResponseBodyResource {
+ fn name(&self) -> Cow<str> {
+ "responseBody".into()
+ }
+}
+
+// Needed so hyper can use non Send futures
+#[derive(Clone)]
+struct LocalExecutor;
+
+impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
+where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+{
+ fn execute(&self, fut: Fut) {
+ tokio::task::spawn_local(fut);
+ }
+}
diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs
index 67eae27b2..d9bd2ba83 100644
--- a/runtime/ops/mod.rs
+++ b/runtime/ops/mod.rs
@@ -5,6 +5,7 @@ pub mod fetch;
pub mod file;
pub mod fs;
pub mod fs_events;
+pub mod http;
pub mod io;
pub mod net;
#[cfg(unix)]
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
index 48431ef22..4c38d2293 100644
--- a/runtime/ops/net.rs
+++ b/runtime/ops/net.rs
@@ -353,9 +353,9 @@ async fn op_connect(
}
}
-struct TcpListenerResource {
- listener: AsyncRefCell<TcpListener>,
- cancel: CancelHandle,
+pub struct TcpListenerResource {
+ pub listener: AsyncRefCell<TcpListener>,
+ pub cancel: CancelHandle,
}
impl Resource for TcpListenerResource {
diff --git a/runtime/worker.rs b/runtime/worker.rs
index 982198d65..5db542b42 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -138,6 +138,7 @@ impl MainWorker {
);
ops::fs_events::init(js_runtime);
ops::fs::init(js_runtime);
+ ops::http::init(js_runtime);
ops::io::init(js_runtime);
ops::net::init(js_runtime);
ops::os::init(js_runtime);